Skip to content

Commit

Permalink
usm: kafka: Reorder operations to reduce mutex lock time (#30117)
Browse files Browse the repository at this point in the history
Co-authored-by: Daniel Lavie <[email protected]>
  • Loading branch information
guyarb and DanielLavie authored Oct 15, 2024
1 parent b0e4bec commit 03a353c
Showing 1 changed file with 11 additions and 9 deletions.
20 changes: 11 additions & 9 deletions pkg/network/protocols/kafka/statkeeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,24 @@ func NewStatkeeper(c *config.Config, telemetry *Telemetry) *StatKeeper {

// Process processes the kafka transaction
func (statKeeper *StatKeeper) Process(tx *EbpfTx) {
statKeeper.statsMutex.Lock()
defer statKeeper.statsMutex.Unlock()
latency := tx.RequestLatency()
// Produce requests with acks = 0 do not receive a response, and as a result, have no latency
if tx.APIKey() == FetchAPIKey && latency <= 0 {
statKeeper.telemetry.invalidLatency.Add(1)
return
}

// extractTopicName is an expensive operation but, it is also concurrent safe, so we can do it here
// without holding the lock.
key := Key{
RequestAPIKey: tx.APIKey(),
RequestVersion: tx.APIVersion(),
TopicName: statKeeper.extractTopicName(&tx.Transaction),
ConnectionKey: tx.ConnTuple(),
}

statKeeper.statsMutex.Lock()
defer statKeeper.statsMutex.Unlock()
requestStats, ok := statKeeper.stats[key]
if !ok {
if len(statKeeper.stats) >= statKeeper.maxEntries {
Expand All @@ -58,13 +67,6 @@ func (statKeeper *StatKeeper) Process(tx *EbpfTx) {
statKeeper.stats[key] = requestStats
}

latency := tx.RequestLatency()
// Produce requests with acks = 0 do not receive a response, and as a result, have no latency
if key.RequestAPIKey == FetchAPIKey && latency <= 0 {
statKeeper.telemetry.invalidLatency.Add(1)
return
}

requestStats.AddRequest(int32(tx.ErrorCode()), int(tx.RecordsCount()), uint64(tx.Transaction.Tags), latency)
}

Expand Down

0 comments on commit 03a353c

Please sign in to comment.