diff --git a/pkg/network/encoding/marshal/usm_kafka.go b/pkg/network/encoding/marshal/usm_kafka.go index 96c20864162c0..40f8bda592393 100644 --- a/pkg/network/encoding/marshal/usm_kafka.go +++ b/pkg/network/encoding/marshal/usm_kafka.go @@ -65,7 +65,7 @@ func (e *kafkaEncoder) encodeData(connectionData *USMConnectionData[kafka.Key, * header.SetRequest_type(uint32(key.RequestAPIKey)) header.SetRequest_version(uint32(key.RequestVersion)) }) - builder.SetTopic(key.TopicName) + builder.SetTopic(key.TopicName.Get()) for statusCode, requestStat := range stats.ErrorCodeToStat { if requestStat.Count == 0 { continue diff --git a/pkg/network/protocols/kafka/debugging/debugging.go b/pkg/network/protocols/kafka/debugging/debugging.go index a1f7b9cf9544d..5baf42db029db 100644 --- a/pkg/network/protocols/kafka/debugging/debugging.go +++ b/pkg/network/protocols/kafka/debugging/debugging.go @@ -61,7 +61,7 @@ func Kafka(stats map[kafka.Key]*kafka.RequestStats) []RequestSummary { }, Operation: operationName, - TopicName: key.TopicName, + TopicName: key.TopicName.Get(), ByStatus: make(map[int8]Stats, len(requestStat.ErrorCodeToStat)), } diff --git a/pkg/network/protocols/kafka/statkeeper.go b/pkg/network/protocols/kafka/statkeeper.go index 3ef8895d4c201..855b002e5519e 100644 --- a/pkg/network/protocols/kafka/statkeeper.go +++ b/pkg/network/protocols/kafka/statkeeper.go @@ -11,6 +11,7 @@ import ( "sync" "github.com/DataDog/datadog-agent/pkg/network/config" + "github.com/DataDog/datadog-agent/pkg/util/intern" "github.com/DataDog/datadog-agent/pkg/util/log" ) @@ -23,7 +24,7 @@ type StatKeeper struct { // topicNames stores interned versions of the all topics currently stored in // the `StatKeeper` - topicNames map[string]string + topicNames *intern.StringInterner } // NewStatkeeper creates a new StatKeeper @@ -32,7 +33,7 @@ func NewStatkeeper(c *config.Config, telemetry *Telemetry) *StatKeeper { stats: make(map[Key]*RequestStats), maxEntries: c.MaxKafkaStatsBuffered, telemetry: telemetry, - topicNames: make(map[string]string), + topicNames: intern.NewStringInterner(), } } @@ -73,11 +74,10 @@ func (statKeeper *StatKeeper) GetAndResetAllStats() map[Key]*RequestStats { defer statKeeper.statsMutex.RUnlock() ret := statKeeper.stats // No deep copy needed since `statKeeper.stats` gets reset statKeeper.stats = make(map[Key]*RequestStats) - statKeeper.topicNames = make(map[string]string) return ret } -func (statKeeper *StatKeeper) extractTopicName(tx *KafkaTransaction) string { +func (statKeeper *StatKeeper) extractTopicName(tx *KafkaTransaction) *intern.StringValue { // Limit tx.Topic_name_size to not exceed the actual length of tx.Topic_name if uint16(tx.Topic_name_size) > uint16(len(tx.Topic_name)) { log.Debugf("Topic name size was changed from %d, to size: %d", tx.Topic_name_size, len(tx.Topic_name)) @@ -85,14 +85,5 @@ func (statKeeper *StatKeeper) extractTopicName(tx *KafkaTransaction) string { } b := tx.Topic_name[:tx.Topic_name_size] - // the trick here is that the Go runtime doesn't allocate the string used in - // the map lookup, so if we have seen this topic name before, we don't - // perform any allocations - if v, ok := statKeeper.topicNames[string(b)]; ok { - return v - } - - v := string(b) - statKeeper.topicNames[v] = v - return v + return statKeeper.topicNames.Get(b) } diff --git a/pkg/network/protocols/kafka/statkeeper_test.go b/pkg/network/protocols/kafka/statkeeper_test.go index 21170f2ec8d92..23600dfc396b0 100644 --- a/pkg/network/protocols/kafka/statkeeper_test.go +++ b/pkg/network/protocols/kafka/statkeeper_test.go @@ -19,6 +19,7 @@ import ( "github.com/DataDog/datadog-agent/pkg/network/config" "github.com/DataDog/datadog-agent/pkg/process/util" + "github.com/DataDog/datadog-agent/pkg/util/intern" ) func BenchmarkStatKeeperSameTX(b *testing.B) { @@ -66,10 +67,10 @@ func TestStatKeeper_extractTopicName(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { statKeeper := &StatKeeper{ - topicNames: map[string]string{}, + topicNames: intern.NewStringInterner(), } copy(tt.tx.Topic_name[:], strings.Repeat("*", len(tt.tx.Topic_name))) - if got := statKeeper.extractTopicName(tt.tx); len(got) != len(tt.want) { + if got := statKeeper.extractTopicName(tt.tx); len(got.Get()) != len(tt.want) { t.Errorf("extractTopicName() = %v, want %v", got, tt.want) } }) @@ -105,7 +106,7 @@ func TestProcessKafkaTransactions(t *testing.T) { assert.Equal(t, 0, len(sk.stats)) assert.Equal(t, numOfTopics, len(stats)) for key, stats := range stats { - assert.Equal(t, topicNamePrefix, key.TopicName[:len(topicNamePrefix)]) + assert.Equal(t, topicNamePrefix, key.TopicName.Get()[:len(topicNamePrefix)]) for i := 0; i < 5; i++ { s := stats.ErrorCodeToStat[int32(i)] require.NotNil(t, s) diff --git a/pkg/network/protocols/kafka/stats.go b/pkg/network/protocols/kafka/stats.go index 79ccfcadc5837..f984e24627d91 100644 --- a/pkg/network/protocols/kafka/stats.go +++ b/pkg/network/protocols/kafka/stats.go @@ -7,7 +7,7 @@ package kafka import ( "github.com/DataDog/datadog-agent/pkg/network/types" - "github.com/DataDog/datadog-agent/pkg/process/util" + "github.com/DataDog/datadog-agent/pkg/util/intern" "github.com/DataDog/datadog-agent/pkg/util/log" "github.com/DataDog/sketches-go/ddsketch" ) @@ -29,20 +29,10 @@ const ( type Key struct { RequestAPIKey uint16 RequestVersion uint16 - TopicName string + TopicName *intern.StringValue types.ConnectionKey } -// NewKey generates a new Key -func NewKey(saddr, daddr util.Address, sport, dport uint16, topicName string, requestAPIKey, requestAPIVersion uint16) Key { - return Key{ - ConnectionKey: types.NewConnectionKey(saddr, daddr, sport, dport), - TopicName: topicName, - RequestAPIKey: requestAPIKey, - RequestVersion: requestAPIVersion, - } -} - // RequestStats stores Kafka request statistics per Kafka error code // We include the error code here and not in the Key to avoid creating a new Key for each error code type RequestStats struct { diff --git a/pkg/network/protocols/kafka/stats_testutil.go b/pkg/network/protocols/kafka/stats_testutil.go new file mode 100644 index 0000000000000..c781e77282f1d --- /dev/null +++ b/pkg/network/protocols/kafka/stats_testutil.go @@ -0,0 +1,28 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2024-present Datadog, Inc. + +//go:build test + +package kafka + +import ( + "github.com/DataDog/datadog-agent/pkg/network/types" + "github.com/DataDog/datadog-agent/pkg/process/util" + "github.com/DataDog/datadog-agent/pkg/util/intern" +) + +var ( + testInterner = intern.NewStringInterner() +) + +// NewKey generates a new Key +func NewKey(saddr, daddr util.Address, sport, dport uint16, topicName string, requestAPIKey, requestAPIVersion uint16) Key { + return Key{ + ConnectionKey: types.NewConnectionKey(saddr, daddr, sport, dport), + TopicName: testInterner.GetString(topicName), + RequestAPIKey: requestAPIKey, + RequestVersion: requestAPIVersion, + } +} diff --git a/pkg/network/usm/kafka_monitor_test.go b/pkg/network/usm/kafka_monitor_test.go index 1c3c5e12702ca..0330b943ebbbd 100644 --- a/pkg/network/usm/kafka_monitor_test.go +++ b/pkg/network/usm/kafka_monitor_test.go @@ -1616,7 +1616,7 @@ func validateProduceFetchCount(t *assert.CollectT, kafkaStats map[kafka.Key]*kaf if hasTLSTag != validation.tlsEnabled { continue } - assert.Equal(t, topicName[:min(len(topicName), 80)], kafkaKey.TopicName) + assert.Equal(t, topicName[:min(len(topicName), 80)], kafkaKey.TopicName.Get()) assert.Greater(t, requestStats.FirstLatencySample, float64(1)) switch kafkaKey.RequestAPIKey { case kafka.ProduceAPIKey: @@ -1639,7 +1639,7 @@ func validateProduceFetchCountWithErrorCodes(t *assert.CollectT, kafkaStats map[ produceRequests := make(map[int32]int, len(validation.expectedNumberOfProduceRequests)) fetchRequests := make(map[int32]int, len(validation.expectedNumberOfFetchRequests)) for kafkaKey, kafkaStat := range kafkaStats { - assert.Equal(t, topicName[:min(len(topicName), 80)], kafkaKey.TopicName) + assert.Equal(t, topicName[:min(len(topicName), 80)], kafkaKey.TopicName.Get()) switch kafkaKey.RequestAPIKey { case kafka.ProduceAPIKey: assert.Equal(t, uint16(validation.expectedAPIVersionProduce), kafkaKey.RequestVersion)