Skip to content

Commit

Permalink
usm: kafka: Use intern.StringInterner instead of manual implementation
Browse files Browse the repository at this point in the history
Prefer code reuse.
The package handles better the lifecycle of the shared strings.
  • Loading branch information
guyarb committed Oct 7, 2024
1 parent 7015f9b commit a946467
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 33 deletions.
2 changes: 1 addition & 1 deletion pkg/network/encoding/marshal/usm_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (e *kafkaEncoder) encodeData(connectionData *USMConnectionData[kafka.Key, *
header.SetRequest_type(uint32(key.RequestAPIKey))
header.SetRequest_version(uint32(key.RequestVersion))
})
builder.SetTopic(key.TopicName)
builder.SetTopic(key.TopicName.Get())
for statusCode, requestStat := range stats.ErrorCodeToStat {
if requestStat.Count == 0 {
continue
Expand Down
2 changes: 1 addition & 1 deletion pkg/network/protocols/kafka/debugging/debugging.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func Kafka(stats map[kafka.Key]*kafka.RequestStats) []RequestSummary {
},

Operation: operationName,
TopicName: key.TopicName,
TopicName: key.TopicName.Get(),
ByStatus: make(map[int8]Stats, len(requestStat.ErrorCodeToStat)),
}

Expand Down
19 changes: 5 additions & 14 deletions pkg/network/protocols/kafka/statkeeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"sync"

"github.com/DataDog/datadog-agent/pkg/network/config"
"github.com/DataDog/datadog-agent/pkg/util/intern"
"github.com/DataDog/datadog-agent/pkg/util/log"
)

Expand All @@ -23,7 +24,7 @@ type StatKeeper struct {

// topicNames stores interned versions of the all topics currently stored in
// the `StatKeeper`
topicNames map[string]string
topicNames *intern.StringInterner
}

// NewStatkeeper creates a new StatKeeper
Expand All @@ -32,7 +33,7 @@ func NewStatkeeper(c *config.Config, telemetry *Telemetry) *StatKeeper {
stats: make(map[Key]*RequestStats),
maxEntries: c.MaxKafkaStatsBuffered,
telemetry: telemetry,
topicNames: make(map[string]string),
topicNames: intern.NewStringInterner(),
}
}

Expand Down Expand Up @@ -73,26 +74,16 @@ func (statKeeper *StatKeeper) GetAndResetAllStats() map[Key]*RequestStats {
defer statKeeper.statsMutex.RUnlock()
ret := statKeeper.stats // No deep copy needed since `statKeeper.stats` gets reset
statKeeper.stats = make(map[Key]*RequestStats)
statKeeper.topicNames = make(map[string]string)
return ret
}

func (statKeeper *StatKeeper) extractTopicName(tx *KafkaTransaction) string {
func (statKeeper *StatKeeper) extractTopicName(tx *KafkaTransaction) *intern.StringValue {
// Limit tx.Topic_name_size to not exceed the actual length of tx.Topic_name
if uint16(tx.Topic_name_size) > uint16(len(tx.Topic_name)) {
log.Debugf("Topic name size was changed from %d, to size: %d", tx.Topic_name_size, len(tx.Topic_name))
tx.Topic_name_size = uint8(len(tx.Topic_name))
}
b := tx.Topic_name[:tx.Topic_name_size]

// the trick here is that the Go runtime doesn't allocate the string used in
// the map lookup, so if we have seen this topic name before, we don't
// perform any allocations
if v, ok := statKeeper.topicNames[string(b)]; ok {
return v
}

v := string(b)
statKeeper.topicNames[v] = v
return v
return statKeeper.topicNames.Get(b)
}
7 changes: 4 additions & 3 deletions pkg/network/protocols/kafka/statkeeper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/DataDog/datadog-agent/pkg/network/config"
"github.com/DataDog/datadog-agent/pkg/process/util"
"github.com/DataDog/datadog-agent/pkg/util/intern"
)

func BenchmarkStatKeeperSameTX(b *testing.B) {
Expand Down Expand Up @@ -66,10 +67,10 @@ func TestStatKeeper_extractTopicName(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
statKeeper := &StatKeeper{
topicNames: map[string]string{},
topicNames: intern.NewStringInterner(),
}
copy(tt.tx.Topic_name[:], strings.Repeat("*", len(tt.tx.Topic_name)))
if got := statKeeper.extractTopicName(tt.tx); len(got) != len(tt.want) {
if got := statKeeper.extractTopicName(tt.tx); len(got.Get()) != len(tt.want) {
t.Errorf("extractTopicName() = %v, want %v", got, tt.want)
}
})
Expand Down Expand Up @@ -105,7 +106,7 @@ func TestProcessKafkaTransactions(t *testing.T) {
assert.Equal(t, 0, len(sk.stats))
assert.Equal(t, numOfTopics, len(stats))
for key, stats := range stats {
assert.Equal(t, topicNamePrefix, key.TopicName[:len(topicNamePrefix)])
assert.Equal(t, topicNamePrefix, key.TopicName.Get()[:len(topicNamePrefix)])
for i := 0; i < 5; i++ {
s := stats.ErrorCodeToStat[int32(i)]
require.NotNil(t, s)
Expand Down
14 changes: 2 additions & 12 deletions pkg/network/protocols/kafka/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package kafka

import (
"github.com/DataDog/datadog-agent/pkg/network/types"
"github.com/DataDog/datadog-agent/pkg/process/util"
"github.com/DataDog/datadog-agent/pkg/util/intern"
"github.com/DataDog/datadog-agent/pkg/util/log"
"github.com/DataDog/sketches-go/ddsketch"
)
Expand All @@ -29,20 +29,10 @@ const (
type Key struct {
RequestAPIKey uint16
RequestVersion uint16
TopicName string
TopicName *intern.StringValue
types.ConnectionKey
}

// NewKey generates a new Key
func NewKey(saddr, daddr util.Address, sport, dport uint16, topicName string, requestAPIKey, requestAPIVersion uint16) Key {
return Key{
ConnectionKey: types.NewConnectionKey(saddr, daddr, sport, dport),
TopicName: topicName,
RequestAPIKey: requestAPIKey,
RequestVersion: requestAPIVersion,
}
}

// RequestStats stores Kafka request statistics per Kafka error code
// We include the error code here and not in the Key to avoid creating a new Key for each error code
type RequestStats struct {
Expand Down
28 changes: 28 additions & 0 deletions pkg/network/protocols/kafka/stats_testutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024-present Datadog, Inc.

//go:build test

package kafka

import (
"github.com/DataDog/datadog-agent/pkg/network/types"
"github.com/DataDog/datadog-agent/pkg/process/util"
"github.com/DataDog/datadog-agent/pkg/util/intern"
)

var (
testInterner = intern.NewStringInterner()
)

// NewKey generates a new Key
func NewKey(saddr, daddr util.Address, sport, dport uint16, topicName string, requestAPIKey, requestAPIVersion uint16) Key {
return Key{
ConnectionKey: types.NewConnectionKey(saddr, daddr, sport, dport),
TopicName: testInterner.GetString(topicName),
RequestAPIKey: requestAPIKey,
RequestVersion: requestAPIVersion,
}
}
4 changes: 2 additions & 2 deletions pkg/network/usm/kafka_monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1616,7 +1616,7 @@ func validateProduceFetchCount(t *assert.CollectT, kafkaStats map[kafka.Key]*kaf
if hasTLSTag != validation.tlsEnabled {
continue
}
assert.Equal(t, topicName[:min(len(topicName), 80)], kafkaKey.TopicName)
assert.Equal(t, topicName[:min(len(topicName), 80)], kafkaKey.TopicName.Get())
assert.Greater(t, requestStats.FirstLatencySample, float64(1))
switch kafkaKey.RequestAPIKey {
case kafka.ProduceAPIKey:
Expand All @@ -1639,7 +1639,7 @@ func validateProduceFetchCountWithErrorCodes(t *assert.CollectT, kafkaStats map[
produceRequests := make(map[int32]int, len(validation.expectedNumberOfProduceRequests))
fetchRequests := make(map[int32]int, len(validation.expectedNumberOfFetchRequests))
for kafkaKey, kafkaStat := range kafkaStats {
assert.Equal(t, topicName[:min(len(topicName), 80)], kafkaKey.TopicName)
assert.Equal(t, topicName[:min(len(topicName), 80)], kafkaKey.TopicName.Get())
switch kafkaKey.RequestAPIKey {
case kafka.ProduceAPIKey:
assert.Equal(t, uint16(validation.expectedAPIVersionProduce), kafkaKey.RequestVersion)
Expand Down

0 comments on commit a946467

Please sign in to comment.