From 6dcb9c9c854eae0ba1351caad1cdf013be0384c5 Mon Sep 17 00:00:00 2001 From: DanielLavie Date: Wed, 3 Jul 2024 17:06:26 +0300 Subject: [PATCH 01/17] WIP: adding latency --- pkg/network/ebpf/c/protocols/kafka/defs.h | 2 +- pkg/network/ebpf/c/protocols/kafka/kafka-parsing.h | 3 +++ pkg/network/ebpf/c/protocols/kafka/types.h | 1 + pkg/network/protocols/kafka/types_linux.go | 1 + 4 files changed, 6 insertions(+), 1 deletion(-) diff --git a/pkg/network/ebpf/c/protocols/kafka/defs.h b/pkg/network/ebpf/c/protocols/kafka/defs.h index de884dec6b909..45a3f890cd68e 100644 --- a/pkg/network/ebpf/c/protocols/kafka/defs.h +++ b/pkg/network/ebpf/c/protocols/kafka/defs.h @@ -43,7 +43,7 @@ #define KAFKA_MAX_ABORTED_TRANSACTIONS 10000 // This controls the number of Kafka transactions read from userspace at a time -#define KAFKA_BATCH_SIZE 28 +#define KAFKA_BATCH_SIZE 26 // The amount of buckets we have for the kafka topic name length telemetry. #define KAFKA_TELEMETRY_TOPIC_NAME_NUM_OF_BUCKETS 10 diff --git a/pkg/network/ebpf/c/protocols/kafka/kafka-parsing.h b/pkg/network/ebpf/c/protocols/kafka/kafka-parsing.h index 7f38f4e33acd4..63260660f82a9 100644 --- a/pkg/network/ebpf/c/protocols/kafka/kafka-parsing.h +++ b/pkg/network/ebpf/c/protocols/kafka/kafka-parsing.h @@ -1334,6 +1334,7 @@ static __always_inline bool kafka_process_new_response(void *ctx, conn_tuple_t * kafka->response.state = KAFKA_FETCH_RESPONSE_START; kafka->response.carry_over_offset = offset - orig_offset; kafka->response.expected_tcp_seq = kafka_get_next_tcp_seq(skb_info); + kafka->response.transaction.response_last_seen = bpf_ktime_get_ns(); // Copy it to the stack since the verifier on 4.14 complains otherwise. kafka_response_context_t response_ctx; @@ -1348,6 +1349,7 @@ static __always_inline bool kafka_process_new_response(void *ctx, conn_tuple_t * static __always_inline bool kafka_process_response(void *ctx, conn_tuple_t *tup, kafka_info_t *kafka, pktbuf_t pkt, skb_info_t *skb_info) { kafka_response_context_t *response = bpf_map_lookup_elem(&kafka_response, tup); if (response) { + response->transaction.response_last_seen = bpf_ktime_get_ns(); if (!skb_info || skb_info->tcp_seq == response->expected_tcp_seq) { response->expected_tcp_seq = kafka_get_next_tcp_seq(skb_info); kafka_call_response_parser(ctx, tup, pkt, response->state, response->transaction.request_api_version); @@ -1415,6 +1417,7 @@ static __always_inline bool kafka_process(conn_tuple_t *tup, kafka_info_t *kafka } kafka_transaction->request_started = bpf_ktime_get_ns(); + kafka_transaction->response_last_seen = 0; kafka_transaction->request_api_key = kafka_header.api_key; kafka_transaction->request_api_version = kafka_header.api_version; diff --git a/pkg/network/ebpf/c/protocols/kafka/types.h b/pkg/network/ebpf/c/protocols/kafka/types.h index 30bd4111b4958..eb67439eace0c 100644 --- a/pkg/network/ebpf/c/protocols/kafka/types.h +++ b/pkg/network/ebpf/c/protocols/kafka/types.h @@ -20,6 +20,7 @@ typedef struct { typedef struct kafka_transaction_t { __u64 request_started; + __u64 response_last_seen; __u32 records_count; // Request API key and version are 16-bit in the protocol but we store // them as u8 to reduce memory usage of the map since the APIs and diff --git a/pkg/network/protocols/kafka/types_linux.go b/pkg/network/protocols/kafka/types_linux.go index c7e8278360e6b..d043ab7d1e554 100644 --- a/pkg/network/protocols/kafka/types_linux.go +++ b/pkg/network/protocols/kafka/types_linux.go @@ -31,6 +31,7 @@ type KafkaTransactionKey struct { } type KafkaTransaction struct { Request_started uint64 + Response_last_seen uint64 Records_count uint32 Request_api_key uint8 Request_api_version uint8 From 3c8afc4f21bcc1407d74943dc205da8f85371443 Mon Sep 17 00:00:00 2001 From: DanielLavie Date: Tue, 16 Jul 2024 17:07:06 +0300 Subject: [PATCH 02/17] Added user mode code for handling Kafka latencies --- pkg/network/ebpf/c/protocols/kafka/defs.h | 2 +- pkg/network/encoding/marshal/usm_kafka.go | 12 +++- pkg/network/protocols/kafka/model_linux.go | 13 +++- pkg/network/protocols/kafka/statkeeper.go | 9 ++- pkg/network/protocols/kafka/stats.go | 80 ++++++++++++++++++++-- pkg/network/protocols/kafka/telemetry.go | 11 +-- 6 files changed, 115 insertions(+), 12 deletions(-) diff --git a/pkg/network/ebpf/c/protocols/kafka/defs.h b/pkg/network/ebpf/c/protocols/kafka/defs.h index f4b950cf2af2b..b72cb61f60b90 100644 --- a/pkg/network/ebpf/c/protocols/kafka/defs.h +++ b/pkg/network/ebpf/c/protocols/kafka/defs.h @@ -43,7 +43,7 @@ #define KAFKA_MAX_ABORTED_TRANSACTIONS 10000 // This controls the number of Kafka transactions read from userspace at a time -#define KAFKA_BATCH_SIZE 26 +#define KAFKA_BATCH_SIZE 24 // The amount of buckets we have for the kafka topic name length telemetry. #define KAFKA_TELEMETRY_TOPIC_NAME_NUM_OF_BUCKETS 10 diff --git a/pkg/network/encoding/marshal/usm_kafka.go b/pkg/network/encoding/marshal/usm_kafka.go index 57ccf1e422f40..96c20864162c0 100644 --- a/pkg/network/encoding/marshal/usm_kafka.go +++ b/pkg/network/encoding/marshal/usm_kafka.go @@ -9,6 +9,8 @@ import ( "bytes" "io" + "github.com/gogo/protobuf/proto" + model "github.com/DataDog/agent-payload/v5/process" "github.com/DataDog/datadog-agent/pkg/network" @@ -69,9 +71,17 @@ func (e *kafkaEncoder) encodeData(connectionData *USMConnectionData[kafka.Key, * continue } builder.AddStatsByErrorCode(func(statsByErrorCodeBuilder *model.KafkaAggregation_StatsByErrorCodeEntryBuilder) { - statsByErrorCodeBuilder.SetKey(int32(statusCode)) + statsByErrorCodeBuilder.SetKey(statusCode) statsByErrorCodeBuilder.SetValue(func(kafkaStatsBuilder *model.KafkaStatsBuilder) { kafkaStatsBuilder.SetCount(uint32(requestStat.Count)) + if latencies := requestStat.Latencies; latencies != nil { + blob, _ := proto.Marshal(latencies.ToProto()) + kafkaStatsBuilder.SetLatencies(func(b *bytes.Buffer) { + b.Write(blob) + }) + } else { + kafkaStatsBuilder.SetFirstLatencySample(requestStat.FirstLatencySample) + } }) }) staticTags |= requestStat.StaticTags diff --git a/pkg/network/protocols/kafka/model_linux.go b/pkg/network/protocols/kafka/model_linux.go index 2eb286de297a5..07b2493651351 100644 --- a/pkg/network/protocols/kafka/model_linux.go +++ b/pkg/network/protocols/kafka/model_linux.go @@ -7,7 +7,10 @@ package kafka -import "github.com/DataDog/datadog-agent/pkg/network/types" +import ( + "github.com/DataDog/datadog-agent/pkg/network/protocols" + "github.com/DataDog/datadog-agent/pkg/network/types" +) // ConnTuple returns the connection tuple for the transaction func (tx *EbpfTx) ConnTuple() types.ConnectionKey { @@ -40,3 +43,11 @@ func (tx *EbpfTx) RecordsCount() uint32 { func (tx *EbpfTx) ErrorCode() int8 { return tx.Transaction.Error_code } + +// RequestLatency returns the latency of the request in nanoseconds +func (tx *EbpfTx) RequestLatency() float64 { + if uint64(tx.Transaction.Request_started) == 0 || uint64(tx.Transaction.Response_last_seen) == 0 { + return 0 + } + return protocols.NSTimestampToFloat(tx.Transaction.Response_last_seen - tx.Transaction.Request_started) +} diff --git a/pkg/network/protocols/kafka/statkeeper.go b/pkg/network/protocols/kafka/statkeeper.go index 808115475a2bf..5df842bf42c16 100644 --- a/pkg/network/protocols/kafka/statkeeper.go +++ b/pkg/network/protocols/kafka/statkeeper.go @@ -56,7 +56,14 @@ func (statKeeper *StatKeeper) Process(tx *EbpfTx) { requestStats = NewRequestStats() statKeeper.stats[key] = requestStats } - requestStats.AddRequest(int32(tx.ErrorCode()), int(tx.RecordsCount()), uint64(tx.Transaction.Tags)) + + latency := tx.RequestLatency() + if latency <= 0 { + statKeeper.telemetry.invalidLatency.Add(1) + return + } + + requestStats.AddRequest(int32(tx.ErrorCode()), int(tx.RecordsCount()), uint64(tx.Transaction.Tags), latency) } // GetAndResetAllStats returns all the stats and resets the stats diff --git a/pkg/network/protocols/kafka/stats.go b/pkg/network/protocols/kafka/stats.go index cdd461ad20732..e8f73b2340349 100644 --- a/pkg/network/protocols/kafka/stats.go +++ b/pkg/network/protocols/kafka/stats.go @@ -8,6 +8,8 @@ package kafka import ( "github.com/DataDog/datadog-agent/pkg/network/types" "github.com/DataDog/datadog-agent/pkg/process/util" + "github.com/DataDog/datadog-agent/pkg/util/log" + "github.com/DataDog/sketches-go/ddsketch" ) const ( @@ -16,6 +18,11 @@ const ( // FetchAPIKey is the API key for fetch requests FetchAPIKey = 1 + + // RelativeAccuracy defines the acceptable error in quantile values calculated by DDSketch. + // For example, if the actual value at p50 is 100, with a relative accuracy of 0.01 the value calculated + // will be between 99 and 101 + RelativeAccuracy = 0.01 ) // Key is an identifier for a group of Kafka transactions @@ -54,8 +61,26 @@ func NewRequestStats() *RequestStats { // RequestStat stores stats for Kafka requests to a particular key type RequestStat struct { - Count int - StaticTags uint64 + // this field order is intentional to help the GC pointer tracking + Latencies *ddsketch.DDSketch + // Note: every time we add a latency value to the DDSketch, it's possible for the sketch to discard that value + // (ie if it is outside the range that is tracked by the sketch). For that reason, in order to keep an accurate count + // the number of http transactions processed, we have our own count field (rather than relying on DDSketch.GetCount()) + Count int + // This field holds the value (in nanoseconds) of the first HTTP request + // in this bucket. We do this as optimization to avoid creating sketches with + // a single value. This is quite common in the context of HTTP requests without + // keep-alives where a short-lived TCP connection is used for a single request. + FirstLatencySample float64 + StaticTags uint64 +} + +func (r *RequestStat) initSketch() (err error) { + r.Latencies, err = ddsketch.NewDefaultDDSketch(RelativeAccuracy) + if err != nil { + log.Debugf("error recording kafka transaction latency: could not create new ddsketch: %v", err) + } + return } // CombineWith merges the data in 2 RequestStats objects @@ -66,18 +91,65 @@ func (r *RequestStats) CombineWith(newStats *RequestStats) { // Nothing to do in this case continue } - r.AddRequest(statusCode, newRequests.Count, newRequests.StaticTags) + + if newRequests.Latencies == nil { + // In this case, newRequests must have only FirstLatencySample, so use it when adding the request + r.AddRequest(statusCode, newRequests.Count, newRequests.StaticTags, newRequests.FirstLatencySample) + continue + } + + stats, exists := r.ErrorCodeToStat[statusCode] + if !exists { + stats = &RequestStat{} + r.ErrorCodeToStat[statusCode] = stats + } + // The other bucket (newStats) has a DDSketch object + // We first ensure that the bucket we're merging to have a DDSketch object + if stats.Latencies == nil { + stats.Latencies = newRequests.Latencies.Copy() + + // If we have a latency sample in this bucket we now add it to the DDSketch + if stats.FirstLatencySample != 0 { + err := stats.Latencies.Add(stats.FirstLatencySample) + if err != nil { + log.Debugf("could not add kafka request latency to ddsketch: %v", err) + } + } + } else { + err := stats.Latencies.MergeWith(newRequests.Latencies) + if err != nil { + log.Debugf("error merging kafka transactions: %v", err) + } + } + stats.Count += newRequests.Count } } // AddRequest takes information about a Kafka transaction and adds it to the request stats -func (r *RequestStats) AddRequest(errorCode int32, count int, staticTags uint64) { +func (r *RequestStats) AddRequest(errorCode int32, count int, staticTags uint64, latency float64) { if !isValidKafkaErrorCode(errorCode) { return } if stats, exists := r.ErrorCodeToStat[errorCode]; exists { stats.Count += count stats.StaticTags |= staticTags + if stats.FirstLatencySample == 0 { + stats.FirstLatencySample = latency + } else { + if stats.Latencies == nil { + if err := stats.initSketch(); err != nil { + return + } + + // Add the deferred latency sample + if err := stats.Latencies.Add(stats.FirstLatencySample); err != nil { + log.Debugf("could not add request latency to ddsketch: %v", err) + } + } + if err := stats.Latencies.Add(latency); err != nil { + log.Debugf("could not add request latency to ddsketch: %v", err) + } + } } else { r.ErrorCodeToStat[errorCode] = &RequestStat{ Count: count, diff --git a/pkg/network/protocols/kafka/telemetry.go b/pkg/network/protocols/kafka/telemetry.go index 6b7edee238493..aee8780b9a47f 100644 --- a/pkg/network/protocols/kafka/telemetry.go +++ b/pkg/network/protocols/kafka/telemetry.go @@ -18,6 +18,8 @@ type Telemetry struct { produceHits, fetchHits *apiVersionCounter dropped *libtelemetry.Counter // this happens when KafkaStatKeeper reaches capacity + + invalidLatency *libtelemetry.Counter } // NewTelemetry creates a new Telemetry @@ -25,10 +27,11 @@ func NewTelemetry() *Telemetry { metricGroup := libtelemetry.NewMetricGroup("usm.kafka") return &Telemetry{ - metricGroup: metricGroup, - produceHits: newAPIVersionCounter(metricGroup, "total_hits", "operation:produce", libtelemetry.OptStatsd), - fetchHits: newAPIVersionCounter(metricGroup, "total_hits", "operation:fetch", libtelemetry.OptStatsd), - dropped: metricGroup.NewCounter("dropped", libtelemetry.OptStatsd), + metricGroup: metricGroup, + produceHits: newAPIVersionCounter(metricGroup, "total_hits", "operation:produce", libtelemetry.OptStatsd), + fetchHits: newAPIVersionCounter(metricGroup, "total_hits", "operation:fetch", libtelemetry.OptStatsd), + dropped: metricGroup.NewCounter("dropped", libtelemetry.OptStatsd), + invalidLatency: metricGroup.NewCounter("malformed", "type:invalid-latency", libtelemetry.OptStatsd), } } From 30b5e960fdc92a56592e86444a9f3e39eee714be Mon Sep 17 00:00:00 2001 From: DanielLavie Date: Wed, 17 Jul 2024 17:17:26 +0300 Subject: [PATCH 03/17] Fixed TestKafkaSerializationWithLocalhostTraffic --- pkg/network/encoding/encoding_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/network/encoding/encoding_test.go b/pkg/network/encoding/encoding_test.go index 1ca0970cbd11a..1dc5e9a83760d 100644 --- a/pkg/network/encoding/encoding_test.go +++ b/pkg/network/encoding/encoding_test.go @@ -984,7 +984,7 @@ func TestKafkaSerializationWithLocalhostTraffic(t *testing.T) { }, Kafka: map[kafka.Key]*kafka.RequestStats{ kafkaKey: { - ErrorCodeToStat: map[int32]*kafka.RequestStat{0: {Count: 10}}, + ErrorCodeToStat: map[int32]*kafka.RequestStat{0: {Count: 10, FirstLatencySample: 5}}, }, }, } @@ -998,7 +998,7 @@ func TestKafkaSerializationWithLocalhostTraffic(t *testing.T) { }, Topic: topicName, StatsByErrorCode: map[int32]*model.KafkaStats{ - 0: {Count: 10}, + 0: {Count: 10, FirstLatencySample: 5}, }, }, }, From 96dcb66b440a75a2ed29c78bccf4db22f222ee8c Mon Sep 17 00:00:00 2001 From: DanielLavie Date: Thu, 18 Jul 2024 11:44:42 +0300 Subject: [PATCH 04/17] Added a check for fetch api key as produce don't have a latency ATM --- pkg/network/protocols/kafka/statkeeper.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/network/protocols/kafka/statkeeper.go b/pkg/network/protocols/kafka/statkeeper.go index 5df842bf42c16..ccd7d0e6416c4 100644 --- a/pkg/network/protocols/kafka/statkeeper.go +++ b/pkg/network/protocols/kafka/statkeeper.go @@ -58,7 +58,8 @@ func (statKeeper *StatKeeper) Process(tx *EbpfTx) { } latency := tx.RequestLatency() - if latency <= 0 { + // Currently, we only support measuring latency for fetch operations + if key.RequestAPIKey == FetchAPIKey && latency <= 0 { statKeeper.telemetry.invalidLatency.Add(1) return } From 71c872fd0b40eff1595893b9fa2666a1bc1809b1 Mon Sep 17 00:00:00 2001 From: DanielLavie Date: Thu, 18 Jul 2024 13:39:28 +0300 Subject: [PATCH 05/17] Fixed Kafka AddRequest function --- pkg/network/protocols/kafka/stats.go | 44 +++++++++++++-------------- pkg/network/usm/kafka_monitor_test.go | 1 + 2 files changed, 23 insertions(+), 22 deletions(-) diff --git a/pkg/network/protocols/kafka/stats.go b/pkg/network/protocols/kafka/stats.go index e8f73b2340349..d8068045bb710 100644 --- a/pkg/network/protocols/kafka/stats.go +++ b/pkg/network/protocols/kafka/stats.go @@ -130,32 +130,32 @@ func (r *RequestStats) AddRequest(errorCode int32, count int, staticTags uint64, if !isValidKafkaErrorCode(errorCode) { return } - if stats, exists := r.ErrorCodeToStat[errorCode]; exists { - stats.Count += count - stats.StaticTags |= staticTags - if stats.FirstLatencySample == 0 { - stats.FirstLatencySample = latency - } else { - if stats.Latencies == nil { - if err := stats.initSketch(); err != nil { - return - } + stats, exists := r.ErrorCodeToStat[errorCode] + if !exists { + stats = &RequestStat{} + r.ErrorCodeToStat[errorCode] = stats + } + stats.Count += count + stats.StaticTags |= staticTags - // Add the deferred latency sample - if err := stats.Latencies.Add(stats.FirstLatencySample); err != nil { - log.Debugf("could not add request latency to ddsketch: %v", err) - } - } - if err := stats.Latencies.Add(latency); err != nil { - log.Debugf("could not add request latency to ddsketch: %v", err) - } + if stats.FirstLatencySample == 0 { + stats.FirstLatencySample = latency + return + } + + if stats.Latencies == nil { + if err := stats.initSketch(); err != nil { + return } - } else { - r.ErrorCodeToStat[errorCode] = &RequestStat{ - Count: count, - StaticTags: staticTags, + + // Add the deferred latency sample + if err := stats.Latencies.Add(stats.FirstLatencySample); err != nil { + log.Debugf("could not add request latency to ddsketch: %v", err) } } + if err := stats.Latencies.Add(latency); err != nil { + log.Debugf("could not add request latency to ddsketch: %v", err) + } } func isValidKafkaErrorCode(errorCode int32) bool { diff --git a/pkg/network/usm/kafka_monitor_test.go b/pkg/network/usm/kafka_monitor_test.go index d4d76e7fc20b2..a3dd17da8b240 100644 --- a/pkg/network/usm/kafka_monitor_test.go +++ b/pkg/network/usm/kafka_monitor_test.go @@ -1493,6 +1493,7 @@ func validateProduceFetchCount(t *assert.CollectT, kafkaStats map[kafka.Key]*kaf numberOfProduceRequests += kafkaStat.ErrorCodeToStat[errorCode].Count case kafka.FetchAPIKey: assert.Equal(t, uint16(validation.expectedAPIVersionFetch), kafkaKey.RequestVersion) + assert.Greater(t, kafkaStat.ErrorCodeToStat[errorCode].FirstLatencySample, float64(1)) numberOfFetchRequests += kafkaStat.ErrorCodeToStat[errorCode].Count default: assert.FailNow(t, "Expecting only produce or fetch kafka requests") From 69ef394e70377331213a691473d934233f54243d Mon Sep 17 00:00:00 2001 From: DanielLavie Date: Thu, 18 Jul 2024 13:53:55 +0300 Subject: [PATCH 06/17] Increased KAFKA_BATCH_SIZE from 24 to 25 --- pkg/network/ebpf/c/protocols/kafka/defs.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/network/ebpf/c/protocols/kafka/defs.h b/pkg/network/ebpf/c/protocols/kafka/defs.h index b72cb61f60b90..2dba86ead4d8a 100644 --- a/pkg/network/ebpf/c/protocols/kafka/defs.h +++ b/pkg/network/ebpf/c/protocols/kafka/defs.h @@ -43,7 +43,7 @@ #define KAFKA_MAX_ABORTED_TRANSACTIONS 10000 // This controls the number of Kafka transactions read from userspace at a time -#define KAFKA_BATCH_SIZE 24 +#define KAFKA_BATCH_SIZE 25 // The amount of buckets we have for the kafka topic name length telemetry. #define KAFKA_TELEMETRY_TOPIC_NAME_NUM_OF_BUCKETS 10 From 3c0706aed6455468bc792cf2b754f547a20cf54c Mon Sep 17 00:00:00 2001 From: DanielLavie Date: Thu, 18 Jul 2024 15:17:24 +0300 Subject: [PATCH 07/17] Added Kafka latency to debug endpoint --- pkg/network/protocols/common.go | 11 +++++++++++ pkg/network/protocols/http/debugging/debugging.go | 14 ++------------ pkg/network/protocols/kafka/debugging/debugging.go | 9 +++++++-- 3 files changed, 20 insertions(+), 14 deletions(-) diff --git a/pkg/network/protocols/common.go b/pkg/network/protocols/common.go index 0df95e665b472..ff0fb165ba67b 100644 --- a/pkg/network/protocols/common.go +++ b/pkg/network/protocols/common.go @@ -8,6 +8,8 @@ package protocols import ( "math" + + "github.com/DataDog/sketches-go/ddsketch" ) // below is copied from pkg/trace/stats/statsraw.go @@ -23,3 +25,12 @@ func NSTimestampToFloat(ns uint64) float64 { b &= 0xfffff80000000000 return math.Float64frombits(b) } + +func GetSketchQuantile(sketch *ddsketch.DDSketch, percentile float64) float64 { + if sketch == nil { + return 0.0 + } + + val, _ := sketch.GetValueAtQuantile(percentile) + return val +} diff --git a/pkg/network/protocols/http/debugging/debugging.go b/pkg/network/protocols/http/debugging/debugging.go index 6c45a5898aba6..361bfecd4c099 100644 --- a/pkg/network/protocols/http/debugging/debugging.go +++ b/pkg/network/protocols/http/debugging/debugging.go @@ -7,9 +7,8 @@ package debugging import ( - "github.com/DataDog/sketches-go/ddsketch" - "github.com/DataDog/datadog-agent/pkg/network/dns" + "github.com/DataDog/datadog-agent/pkg/network/protocols" "github.com/DataDog/datadog-agent/pkg/network/protocols/http" "github.com/DataDog/datadog-agent/pkg/process/util" ) @@ -69,7 +68,7 @@ func HTTP(stats map[http.Key]*http.RequestStats, dns map[util.Address][]dns.Host debug.ByStatus[status] = Stats{ Count: stat.Count, FirstLatencySample: stat.FirstLatencySample, - LatencyP50: getSketchQuantile(stat.Latencies, 0.5), + LatencyP50: protocols.GetSketchQuantile(stat.Latencies, 0.5), } } @@ -96,12 +95,3 @@ func getDNS(dnsData map[util.Address][]dns.Hostname, addr util.Address) string { return "" } - -func getSketchQuantile(sketch *ddsketch.DDSketch, percentile float64) float64 { - if sketch == nil { - return 0.0 - } - - val, _ := sketch.GetValueAtQuantile(percentile) - return val -} diff --git a/pkg/network/protocols/kafka/debugging/debugging.go b/pkg/network/protocols/kafka/debugging/debugging.go index 2eb61fe458016..a1f7b9cf9544d 100644 --- a/pkg/network/protocols/kafka/debugging/debugging.go +++ b/pkg/network/protocols/kafka/debugging/debugging.go @@ -7,6 +7,7 @@ package debugging import ( + "github.com/DataDog/datadog-agent/pkg/network/protocols" "github.com/DataDog/datadog-agent/pkg/network/protocols/kafka" "github.com/DataDog/datadog-agent/pkg/process/util" ) @@ -29,7 +30,9 @@ type Address struct { // Stats consolidates request count and latency information for a certain status code type Stats struct { - Count int + Count int + FirstLatencySample float64 + LatencyP50 float64 } // Kafka returns a debug-friendly representation of map[kafka.Key]kafka.RequestStats @@ -65,7 +68,9 @@ func Kafka(stats map[kafka.Key]*kafka.RequestStats) []RequestSummary { for status, stat := range requestStat.ErrorCodeToStat { debug.ByStatus[int8(status)] = Stats{ - Count: stat.Count, + Count: stat.Count, + FirstLatencySample: stat.FirstLatencySample, + LatencyP50: protocols.GetSketchQuantile(stat.Latencies, 0.5), } } From 0f92b67738f30578fc14f68f26c1ab1eaaacd8c9 Mon Sep 17 00:00:00 2001 From: DanielLavie Date: Thu, 18 Jul 2024 15:56:52 +0300 Subject: [PATCH 08/17] Added comment for GetSketchQuantile --- pkg/network/protocols/common.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/network/protocols/common.go b/pkg/network/protocols/common.go index ff0fb165ba67b..17ff91bb6b6c3 100644 --- a/pkg/network/protocols/common.go +++ b/pkg/network/protocols/common.go @@ -26,6 +26,7 @@ func NSTimestampToFloat(ns uint64) float64 { return math.Float64frombits(b) } +// GetSketchQuantile returns the value at the given percentile in the sketch func GetSketchQuantile(sketch *ddsketch.DDSketch, percentile float64) float64 { if sketch == nil { return 0.0 From 6e220ebf9c3f48c675ae7ceda420a8c99033d9d2 Mon Sep 17 00:00:00 2001 From: DanielLavie Date: Thu, 18 Jul 2024 16:17:48 +0300 Subject: [PATCH 09/17] Added `TestProcessKafkaTransactions` --- .../protocols/kafka/statkeeper_test.go | 83 +++++++++++++++++++ pkg/network/protocols/kafka/types.go | 1 + pkg/network/protocols/kafka/types_linux.go | 1 + 3 files changed, 85 insertions(+) diff --git a/pkg/network/protocols/kafka/statkeeper_test.go b/pkg/network/protocols/kafka/statkeeper_test.go index af9a0bc91bb55..ce245c289428f 100644 --- a/pkg/network/protocols/kafka/statkeeper_test.go +++ b/pkg/network/protocols/kafka/statkeeper_test.go @@ -8,10 +8,16 @@ package kafka import ( + "encoding/binary" + "strconv" "strings" "testing" + "time" "github.com/DataDog/datadog-agent/pkg/network/config" + "github.com/DataDog/datadog-agent/pkg/process/util" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func BenchmarkStatKeeperSameTX(b *testing.B) { @@ -68,3 +74,80 @@ func TestStatKeeper_extractTopicName(t *testing.T) { }) } } + +func TestProcessKafkaTransactions(t *testing.T) { + cfg := &config.Config{MaxKafkaStatsBuffered: 1000} + tel := NewTelemetry() + sk := NewStatkeeper(cfg, tel) + + srcString := "1.1.1.1" + dstString := "2.2.2.2" + sourceIP := util.AddressFromString(srcString) + sourcePort := 1234 + destIP := util.AddressFromString(dstString) + destPort := 9092 + + const numOfTopics = 10 + topicNamePrefix := "test-topic" + for i := 0; i < numOfTopics; i++ { + topicName := topicNamePrefix + strconv.Itoa(i) + + for j := 0; j < 10; j++ { + errorCode := j % 5 + latency := time.Duration(j%5+1) * time.Millisecond + tx := generateKafkaTransaction(sourceIP, destIP, sourcePort, destPort, topicName, int8(errorCode), uint32(10), latency) + sk.Process(tx) + } + } + + stats := sk.GetAndResetAllStats() + assert.Equal(t, 0, len(sk.stats)) + assert.Equal(t, numOfTopics, len(stats)) + for key, stats := range stats { + assert.Equal(t, topicNamePrefix, key.TopicName[:len(topicNamePrefix)]) + for i := 0; i < 5; i++ { + s := stats.ErrorCodeToStat[int32(i)] + require.NotNil(t, s) + assert.Equal(t, 20, s.Count) + assert.Equal(t, 2.0, s.Latencies.GetCount()) + + p50, err := s.Latencies.GetValueAtQuantile(0.5) + assert.Nil(t, err) + + expectedLatency := float64(time.Duration(i+1) * time.Millisecond) + acceptableError := expectedLatency * s.Latencies.IndexMapping.RelativeAccuracy() + assert.GreaterOrEqual(t, p50, expectedLatency-acceptableError) + assert.LessOrEqual(t, p50, expectedLatency+acceptableError) + } + } +} + +func generateKafkaTransaction(source util.Address, dest util.Address, sourcePort int, destPort int, topicName string, errorCode int8, recordsCount uint32, latency time.Duration) *EbpfTx { + var event EbpfTx + + latencyNS := uint64(latency) + event.Transaction.Request_started = 1 + event.Transaction.Request_api_key = FetchAPIKey + event.Transaction.Request_api_version = 7 + event.Transaction.Response_last_seen = event.Transaction.Request_started + latencyNS + event.Transaction.Error_code = errorCode + event.Transaction.Records_count = recordsCount + event.Transaction.Topic_name_size = uint8(len(topicName)) + event.Transaction.Topic_name = topicNameFromString([]byte(topicName)) + event.Tup.Saddr_l = uint64(binary.LittleEndian.Uint32(source.Bytes())) + event.Tup.Sport = uint16(sourcePort) + event.Tup.Daddr_l = uint64(binary.LittleEndian.Uint32(dest.Bytes())) + event.Tup.Dport = uint16(destPort) + event.Tup.Metadata = 1 + + return &event +} + +func topicNameFromString(fragment []byte) [TopicNameMaxSize]byte { + if len(fragment) >= TopicNameMaxSize { + return *(*[TopicNameMaxSize]byte)(fragment) + } + var b [TopicNameMaxSize]byte + copy(b[:], fragment) + return b +} diff --git a/pkg/network/protocols/kafka/types.go b/pkg/network/protocols/kafka/types.go index 9cd2407ce3ce1..e071dd1be5dc3 100644 --- a/pkg/network/protocols/kafka/types.go +++ b/pkg/network/protocols/kafka/types.go @@ -15,6 +15,7 @@ import "C" const ( TopicNameBuckets = C.KAFKA_TELEMETRY_TOPIC_NAME_NUM_OF_BUCKETS + TopicNameMaxSize = C.TOPIC_NAME_MAX_STRING_SIZE ) type ConnTuple C.conn_tuple_t diff --git a/pkg/network/protocols/kafka/types_linux.go b/pkg/network/protocols/kafka/types_linux.go index c6c00ffc21d4a..cf33022e3dd2f 100644 --- a/pkg/network/protocols/kafka/types_linux.go +++ b/pkg/network/protocols/kafka/types_linux.go @@ -5,6 +5,7 @@ package kafka const ( TopicNameBuckets = 0xa + TopicNameMaxSize = 0x50 ) type ConnTuple struct { From 500c4f896f37161c114dd71d6942ead074e93a36 Mon Sep 17 00:00:00 2001 From: DanielLavie Date: Thu, 18 Jul 2024 19:18:00 +0300 Subject: [PATCH 10/17] Using AddWithCount when adding to Kafka sketch, to count the exact number of hits in the sketch --- pkg/network/protocols/kafka/stats.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/network/protocols/kafka/stats.go b/pkg/network/protocols/kafka/stats.go index d8068045bb710..762306ee92259 100644 --- a/pkg/network/protocols/kafka/stats.go +++ b/pkg/network/protocols/kafka/stats.go @@ -110,7 +110,7 @@ func (r *RequestStats) CombineWith(newStats *RequestStats) { // If we have a latency sample in this bucket we now add it to the DDSketch if stats.FirstLatencySample != 0 { - err := stats.Latencies.Add(stats.FirstLatencySample) + err := stats.Latencies.AddWithCount(stats.FirstLatencySample, float64(stats.Count)) if err != nil { log.Debugf("could not add kafka request latency to ddsketch: %v", err) } @@ -149,11 +149,11 @@ func (r *RequestStats) AddRequest(errorCode int32, count int, staticTags uint64, } // Add the deferred latency sample - if err := stats.Latencies.Add(stats.FirstLatencySample); err != nil { + if err := stats.Latencies.AddWithCount(stats.FirstLatencySample, float64(stats.Count)); err != nil { log.Debugf("could not add request latency to ddsketch: %v", err) } } - if err := stats.Latencies.Add(latency); err != nil { + if err := stats.Latencies.AddWithCount(latency, float64(count)); err != nil { log.Debugf("could not add request latency to ddsketch: %v", err) } } From 4f1e970cc21370b9795cef935a76945f6d7b4ef7 Mon Sep 17 00:00:00 2001 From: DanielLavie Date: Thu, 18 Jul 2024 19:47:06 +0300 Subject: [PATCH 11/17] Added kafka/stats_test.go and fixed a bug in kafka/stats.go --- pkg/network/protocols/kafka/stats.go | 3 +- pkg/network/protocols/kafka/stats_test.go | 85 +++++++++++++++++++++++ 2 files changed, 87 insertions(+), 1 deletion(-) create mode 100644 pkg/network/protocols/kafka/stats_test.go diff --git a/pkg/network/protocols/kafka/stats.go b/pkg/network/protocols/kafka/stats.go index 762306ee92259..5b8507b805884 100644 --- a/pkg/network/protocols/kafka/stats.go +++ b/pkg/network/protocols/kafka/stats.go @@ -135,6 +135,7 @@ func (r *RequestStats) AddRequest(errorCode int32, count int, staticTags uint64, stats = &RequestStat{} r.ErrorCodeToStat[errorCode] = stats } + originalCount := stats.Count stats.Count += count stats.StaticTags |= staticTags @@ -149,7 +150,7 @@ func (r *RequestStats) AddRequest(errorCode int32, count int, staticTags uint64, } // Add the deferred latency sample - if err := stats.Latencies.AddWithCount(stats.FirstLatencySample, float64(stats.Count)); err != nil { + if err := stats.Latencies.AddWithCount(stats.FirstLatencySample, float64(originalCount)); err != nil { log.Debugf("could not add request latency to ddsketch: %v", err) } } diff --git a/pkg/network/protocols/kafka/stats_test.go b/pkg/network/protocols/kafka/stats_test.go new file mode 100644 index 0000000000000..7abe7e07755a0 --- /dev/null +++ b/pkg/network/protocols/kafka/stats_test.go @@ -0,0 +1,85 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +//go:build linux_bpf + +package kafka + +import ( + "github.com/DataDog/sketches-go/ddsketch" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestAddRequest(t *testing.T) { + testErrorCode := int32(5) + stats := NewRequestStats() + stats.AddRequest(testErrorCode, 10, 1, 10.0) + stats.AddRequest(testErrorCode, 15, 2, 15.0) + stats.AddRequest(testErrorCode, 20, 3, 20.0) + + // Check we don't have stats for other error codes + for i := int32(-1); i < 119; i++ { + if i == testErrorCode { + continue + } + assert.Nil(t, stats.ErrorCodeToStat[i]) + } + s := stats.ErrorCodeToStat[testErrorCode] + + if assert.NotNil(t, s) { + assert.Equal(t, 45, s.Count) + assert.Equal(t, float64(45), s.Latencies.GetCount()) + + verifyQuantile(t, s.Latencies, 0.0, 10.0) // min item + verifyQuantile(t, s.Latencies, 0.5, 15.0) // median + verifyQuantile(t, s.Latencies, 1.0, 20.0) // max item + } +} + +func TestCombineWith(t *testing.T) { + testErrorCode := int32(5) + + stats := NewRequestStats() + stats2 := NewRequestStats() + stats3 := NewRequestStats() + stats4 := NewRequestStats() + + stats2.AddRequest(testErrorCode, 10, 1, 10.0) + stats3.AddRequest(testErrorCode, 15, 2, 15.0) + stats4.AddRequest(testErrorCode, 20, 3, 20.0) + + stats.CombineWith(stats2) + stats.CombineWith(stats3) + stats.CombineWith(stats4) + + // Check we don't have stats for other error codes + for i := int32(-1); i < 119; i++ { + if i == testErrorCode { + continue + } + assert.Nil(t, stats.ErrorCodeToStat[i]) + } + s := stats.ErrorCodeToStat[testErrorCode] + + if assert.NotNil(t, s) { + assert.Equal(t, 45, s.Count) + assert.Equal(t, float64(45), s.Latencies.GetCount()) + + verifyQuantile(t, s.Latencies, 0.0, 10.0) // min item + verifyQuantile(t, s.Latencies, 0.5, 15.0) // median + verifyQuantile(t, s.Latencies, 1.0, 20.0) // max item + } +} + +func verifyQuantile(t *testing.T, sketch *ddsketch.DDSketch, q float64, expectedValue float64) { + val, err := sketch.GetValueAtQuantile(q) + assert.Nil(t, err) + + acceptableError := expectedValue * sketch.IndexMapping.RelativeAccuracy() + assert.GreaterOrEqual(t, val, expectedValue-acceptableError) + assert.LessOrEqual(t, val, expectedValue+acceptableError) +} From e4bc65836ac02c715883fb2f3a2018c483e281a5 Mon Sep 17 00:00:00 2001 From: DanielLavie Date: Thu, 18 Jul 2024 19:49:56 +0300 Subject: [PATCH 12/17] Fixed import errors --- pkg/network/protocols/kafka/statkeeper_test.go | 5 +++-- pkg/network/protocols/kafka/stats_test.go | 3 ++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/pkg/network/protocols/kafka/statkeeper_test.go b/pkg/network/protocols/kafka/statkeeper_test.go index ce245c289428f..03895047b20ec 100644 --- a/pkg/network/protocols/kafka/statkeeper_test.go +++ b/pkg/network/protocols/kafka/statkeeper_test.go @@ -14,10 +14,11 @@ import ( "testing" "time" - "github.com/DataDog/datadog-agent/pkg/network/config" - "github.com/DataDog/datadog-agent/pkg/process/util" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/DataDog/datadog-agent/pkg/network/config" + "github.com/DataDog/datadog-agent/pkg/process/util" ) func BenchmarkStatKeeperSameTX(b *testing.B) { diff --git a/pkg/network/protocols/kafka/stats_test.go b/pkg/network/protocols/kafka/stats_test.go index 7abe7e07755a0..df7c96dce6cfc 100644 --- a/pkg/network/protocols/kafka/stats_test.go +++ b/pkg/network/protocols/kafka/stats_test.go @@ -8,10 +8,11 @@ package kafka import ( - "github.com/DataDog/sketches-go/ddsketch" "testing" "github.com/stretchr/testify/assert" + + "github.com/DataDog/sketches-go/ddsketch" ) func TestAddRequest(t *testing.T) { From 340a00c06f0ab9b9c24503323d1db02c5958042c Mon Sep 17 00:00:00 2001 From: DanielLavie Date: Sun, 21 Jul 2024 14:23:44 +0300 Subject: [PATCH 13/17] Fixed TestProcessKafkaTransactions --- pkg/network/protocols/kafka/statkeeper_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/network/protocols/kafka/statkeeper_test.go b/pkg/network/protocols/kafka/statkeeper_test.go index 03895047b20ec..dc99ff01902d3 100644 --- a/pkg/network/protocols/kafka/statkeeper_test.go +++ b/pkg/network/protocols/kafka/statkeeper_test.go @@ -110,7 +110,7 @@ func TestProcessKafkaTransactions(t *testing.T) { s := stats.ErrorCodeToStat[int32(i)] require.NotNil(t, s) assert.Equal(t, 20, s.Count) - assert.Equal(t, 2.0, s.Latencies.GetCount()) + assert.Equal(t, 20.0, s.Latencies.GetCount()) p50, err := s.Latencies.GetValueAtQuantile(0.5) assert.Nil(t, err) From 3b42b3a8e07df35a54053c9eb38ff6bed291ac36 Mon Sep 17 00:00:00 2001 From: DanielLavie Date: Tue, 30 Jul 2024 17:25:24 +0300 Subject: [PATCH 14/17] Fixed typo --- pkg/network/protocols/kafka/stats.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/network/protocols/kafka/stats.go b/pkg/network/protocols/kafka/stats.go index 5b8507b805884..79ccfcadc5837 100644 --- a/pkg/network/protocols/kafka/stats.go +++ b/pkg/network/protocols/kafka/stats.go @@ -65,7 +65,7 @@ type RequestStat struct { Latencies *ddsketch.DDSketch // Note: every time we add a latency value to the DDSketch, it's possible for the sketch to discard that value // (ie if it is outside the range that is tracked by the sketch). For that reason, in order to keep an accurate count - // the number of http transactions processed, we have our own count field (rather than relying on DDSketch.GetCount()) + // the number of kafka transactions processed, we have our own count field (rather than relying on DDSketch.GetCount()) Count int // This field holds the value (in nanoseconds) of the first HTTP request // in this bucket. We do this as optimization to avoid creating sketches with From 999bb9161a5caf900bb1b5d44d10594be872f0a6 Mon Sep 17 00:00:00 2001 From: DanielLavie Date: Sun, 4 Aug 2024 15:47:41 +0300 Subject: [PATCH 15/17] Removed FirstLatencySample for Kafka aggregation as it seems like an necessary optimization --- pkg/network/encoding/marshal/usm_kafka.go | 15 ++++--- .../protocols/kafka/debugging/debugging.go | 5 +-- pkg/network/protocols/kafka/stats.go | 40 +++---------------- pkg/network/usm/kafka_monitor_test.go | 2 +- 4 files changed, 16 insertions(+), 46 deletions(-) diff --git a/pkg/network/encoding/marshal/usm_kafka.go b/pkg/network/encoding/marshal/usm_kafka.go index 96c20864162c0..a2d7338eef20c 100644 --- a/pkg/network/encoding/marshal/usm_kafka.go +++ b/pkg/network/encoding/marshal/usm_kafka.go @@ -73,15 +73,14 @@ func (e *kafkaEncoder) encodeData(connectionData *USMConnectionData[kafka.Key, * builder.AddStatsByErrorCode(func(statsByErrorCodeBuilder *model.KafkaAggregation_StatsByErrorCodeEntryBuilder) { statsByErrorCodeBuilder.SetKey(statusCode) statsByErrorCodeBuilder.SetValue(func(kafkaStatsBuilder *model.KafkaStatsBuilder) { - kafkaStatsBuilder.SetCount(uint32(requestStat.Count)) - if latencies := requestStat.Latencies; latencies != nil { - blob, _ := proto.Marshal(latencies.ToProto()) - kafkaStatsBuilder.SetLatencies(func(b *bytes.Buffer) { - b.Write(blob) - }) - } else { - kafkaStatsBuilder.SetFirstLatencySample(requestStat.FirstLatencySample) + if requestStat.Count <= 0 || requestStat.Latencies == nil { + return } + kafkaStatsBuilder.SetCount(uint32(requestStat.Count)) + blob, _ := proto.Marshal(requestStat.Latencies.ToProto()) + kafkaStatsBuilder.SetLatencies(func(b *bytes.Buffer) { + b.Write(blob) + }) }) }) staticTags |= requestStat.StaticTags diff --git a/pkg/network/protocols/kafka/debugging/debugging.go b/pkg/network/protocols/kafka/debugging/debugging.go index a1f7b9cf9544d..5224ec9b75b16 100644 --- a/pkg/network/protocols/kafka/debugging/debugging.go +++ b/pkg/network/protocols/kafka/debugging/debugging.go @@ -68,9 +68,8 @@ func Kafka(stats map[kafka.Key]*kafka.RequestStats) []RequestSummary { for status, stat := range requestStat.ErrorCodeToStat { debug.ByStatus[int8(status)] = Stats{ - Count: stat.Count, - FirstLatencySample: stat.FirstLatencySample, - LatencyP50: protocols.GetSketchQuantile(stat.Latencies, 0.5), + Count: stat.Count, + LatencyP50: protocols.GetSketchQuantile(stat.Latencies, 0.5), } } diff --git a/pkg/network/protocols/kafka/stats.go b/pkg/network/protocols/kafka/stats.go index 79ccfcadc5837..ed6e67a96d9ec 100644 --- a/pkg/network/protocols/kafka/stats.go +++ b/pkg/network/protocols/kafka/stats.go @@ -61,18 +61,15 @@ func NewRequestStats() *RequestStats { // RequestStat stores stats for Kafka requests to a particular key type RequestStat struct { + // Unlike HTTP, Kafka clients aren't using short-lived TCP connections for each request. + // Hence, we don't use the FirstLatencySample optimization here. // this field order is intentional to help the GC pointer tracking Latencies *ddsketch.DDSketch // Note: every time we add a latency value to the DDSketch, it's possible for the sketch to discard that value // (ie if it is outside the range that is tracked by the sketch). For that reason, in order to keep an accurate count // the number of kafka transactions processed, we have our own count field (rather than relying on DDSketch.GetCount()) - Count int - // This field holds the value (in nanoseconds) of the first HTTP request - // in this bucket. We do this as optimization to avoid creating sketches with - // a single value. This is quite common in the context of HTTP requests without - // keep-alives where a short-lived TCP connection is used for a single request. - FirstLatencySample float64 - StaticTags uint64 + Count int + StaticTags uint64 } func (r *RequestStat) initSketch() (err error) { @@ -87,17 +84,11 @@ func (r *RequestStat) initSketch() (err error) { // newStats is kept as it is, while the method receiver gets mutated func (r *RequestStats) CombineWith(newStats *RequestStats) { for statusCode, newRequests := range newStats.ErrorCodeToStat { - if newRequests.Count == 0 { + if newRequests.Count == 0 || newRequests.Latencies == nil { // Nothing to do in this case continue } - if newRequests.Latencies == nil { - // In this case, newRequests must have only FirstLatencySample, so use it when adding the request - r.AddRequest(statusCode, newRequests.Count, newRequests.StaticTags, newRequests.FirstLatencySample) - continue - } - stats, exists := r.ErrorCodeToStat[statusCode] if !exists { stats = &RequestStat{} @@ -107,14 +98,6 @@ func (r *RequestStats) CombineWith(newStats *RequestStats) { // We first ensure that the bucket we're merging to have a DDSketch object if stats.Latencies == nil { stats.Latencies = newRequests.Latencies.Copy() - - // If we have a latency sample in this bucket we now add it to the DDSketch - if stats.FirstLatencySample != 0 { - err := stats.Latencies.AddWithCount(stats.FirstLatencySample, float64(stats.Count)) - if err != nil { - log.Debugf("could not add kafka request latency to ddsketch: %v", err) - } - } } else { err := stats.Latencies.MergeWith(newRequests.Latencies) if err != nil { @@ -135,27 +118,16 @@ func (r *RequestStats) AddRequest(errorCode int32, count int, staticTags uint64, stats = &RequestStat{} r.ErrorCodeToStat[errorCode] = stats } - originalCount := stats.Count stats.Count += count stats.StaticTags |= staticTags - if stats.FirstLatencySample == 0 { - stats.FirstLatencySample = latency - return - } - if stats.Latencies == nil { if err := stats.initSketch(); err != nil { return } - - // Add the deferred latency sample - if err := stats.Latencies.AddWithCount(stats.FirstLatencySample, float64(originalCount)); err != nil { - log.Debugf("could not add request latency to ddsketch: %v", err) - } } if err := stats.Latencies.AddWithCount(latency, float64(count)); err != nil { - log.Debugf("could not add request latency to ddsketch: %v", err) + log.Errorf("could not add request latency to ddsketch: %v", err) } } diff --git a/pkg/network/usm/kafka_monitor_test.go b/pkg/network/usm/kafka_monitor_test.go index f659eb673a24c..ef90c10453f82 100644 --- a/pkg/network/usm/kafka_monitor_test.go +++ b/pkg/network/usm/kafka_monitor_test.go @@ -1524,7 +1524,7 @@ func validateProduceFetchCount(t *assert.CollectT, kafkaStats map[kafka.Key]*kaf numberOfProduceRequests += kafkaStat.ErrorCodeToStat[errorCode].Count case kafka.FetchAPIKey: assert.Equal(t, uint16(validation.expectedAPIVersionFetch), kafkaKey.RequestVersion) - assert.Greater(t, kafkaStat.ErrorCodeToStat[errorCode].FirstLatencySample, float64(1)) + assert.Greater(t, kafkaStat.ErrorCodeToStat[errorCode].Latencies.GetCount(), float64(1)) numberOfFetchRequests += kafkaStat.ErrorCodeToStat[errorCode].Count default: assert.FailNow(t, "Expecting only produce or fetch kafka requests") From 68c45bcec75a82c3fd29da767e90c45e4a67ab45 Mon Sep 17 00:00:00 2001 From: DanielLavie Date: Sun, 4 Aug 2024 16:02:15 +0300 Subject: [PATCH 16/17] Fixed encoding_test.go --- pkg/network/encoding/encoding_test.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/pkg/network/encoding/encoding_test.go b/pkg/network/encoding/encoding_test.go index 1dc5e9a83760d..1375679abc353 100644 --- a/pkg/network/encoding/encoding_test.go +++ b/pkg/network/encoding/encoding_test.go @@ -30,6 +30,7 @@ import ( "github.com/DataDog/datadog-agent/pkg/network/protocols/kafka" "github.com/DataDog/datadog-agent/pkg/network/protocols/telemetry" "github.com/DataDog/datadog-agent/pkg/process/util" + "github.com/DataDog/sketches-go/ddsketch" ) type connTag = uint64 @@ -41,6 +42,8 @@ const ( tagTLS connTag = 0x10 // network.ConnTagTLS ) +const RelativeAccuracy = 0.01 + func newConfig(t *testing.T) { originalConfig := config.SystemProbe t.Cleanup(func() { @@ -978,17 +981,24 @@ func TestKafkaSerializationWithLocalhostTraffic(t *testing.T) { apiVersion2, ) + latencies, err := ddsketch.NewDefaultDDSketch(RelativeAccuracy) + require.NoError(t, err) + require.NoError(t, latencies.AddWithCount(0.5, 10)) + in := &network.Connections{ BufferedData: network.BufferedData{ Conns: connections, }, Kafka: map[kafka.Key]*kafka.RequestStats{ kafkaKey: { - ErrorCodeToStat: map[int32]*kafka.RequestStat{0: {Count: 10, FirstLatencySample: 5}}, + ErrorCodeToStat: map[int32]*kafka.RequestStat{0: {Count: 10, Latencies: latencies}}, }, }, } + marshaledLatencies, err := proto.Marshal(latencies.ToProto()) + require.NoError(t, err) + kafkaOut := &model.DataStreamsAggregations{ KafkaAggregations: []*model.KafkaAggregation{ { @@ -998,7 +1008,7 @@ func TestKafkaSerializationWithLocalhostTraffic(t *testing.T) { }, Topic: topicName, StatsByErrorCode: map[int32]*model.KafkaStats{ - 0: {Count: 10, FirstLatencySample: 5}, + 0: {Count: 10, Latencies: marshaledLatencies}, }, }, }, From bff2969ef7d4aab179896016530cf9f7e25aca5c Mon Sep 17 00:00:00 2001 From: DanielLavie Date: Mon, 5 Aug 2024 14:49:52 +0300 Subject: [PATCH 17/17] Reverted the removal of the FirstLatencySample --- pkg/network/encoding/encoding_test.go | 14 +------ pkg/network/encoding/marshal/usm_kafka.go | 15 +++---- .../protocols/kafka/debugging/debugging.go | 5 ++- pkg/network/protocols/kafka/stats.go | 40 ++++++++++++++++--- pkg/network/usm/kafka_monitor_test.go | 2 +- 5 files changed, 48 insertions(+), 28 deletions(-) diff --git a/pkg/network/encoding/encoding_test.go b/pkg/network/encoding/encoding_test.go index e77c6ac94318f..4522e7acb03e6 100644 --- a/pkg/network/encoding/encoding_test.go +++ b/pkg/network/encoding/encoding_test.go @@ -30,7 +30,6 @@ import ( "github.com/DataDog/datadog-agent/pkg/network/protocols/kafka" "github.com/DataDog/datadog-agent/pkg/network/protocols/telemetry" "github.com/DataDog/datadog-agent/pkg/process/util" - "github.com/DataDog/sketches-go/ddsketch" ) type connTag = uint64 @@ -42,8 +41,6 @@ const ( tagTLS connTag = 0x10 // network.ConnTagTLS ) -const RelativeAccuracy = 0.01 - func newConfig(t *testing.T) { originalConfig := config.SystemProbe t.Cleanup(func() { @@ -981,24 +978,17 @@ func TestKafkaSerializationWithLocalhostTraffic(t *testing.T) { apiVersion2, ) - latencies, err := ddsketch.NewDefaultDDSketch(RelativeAccuracy) - require.NoError(t, err) - require.NoError(t, latencies.AddWithCount(0.5, 10)) - in := &network.Connections{ BufferedData: network.BufferedData{ Conns: connections, }, Kafka: map[kafka.Key]*kafka.RequestStats{ kafkaKey: { - ErrorCodeToStat: map[int32]*kafka.RequestStat{0: {Count: 10, Latencies: latencies}}, + ErrorCodeToStat: map[int32]*kafka.RequestStat{0: {Count: 10, FirstLatencySample: 5}}, }, }, } - marshaledLatencies, err := proto.Marshal(latencies.ToProto()) - require.NoError(t, err) - kafkaOut := &model.DataStreamsAggregations{ KafkaAggregations: []*model.KafkaAggregation{ { @@ -1008,7 +998,7 @@ func TestKafkaSerializationWithLocalhostTraffic(t *testing.T) { }, Topic: topicName, StatsByErrorCode: map[int32]*model.KafkaStats{ - 0: {Count: 10, Latencies: marshaledLatencies}, + 0: {Count: 10, FirstLatencySample: 5}, }, }, }, diff --git a/pkg/network/encoding/marshal/usm_kafka.go b/pkg/network/encoding/marshal/usm_kafka.go index a2d7338eef20c..96c20864162c0 100644 --- a/pkg/network/encoding/marshal/usm_kafka.go +++ b/pkg/network/encoding/marshal/usm_kafka.go @@ -73,14 +73,15 @@ func (e *kafkaEncoder) encodeData(connectionData *USMConnectionData[kafka.Key, * builder.AddStatsByErrorCode(func(statsByErrorCodeBuilder *model.KafkaAggregation_StatsByErrorCodeEntryBuilder) { statsByErrorCodeBuilder.SetKey(statusCode) statsByErrorCodeBuilder.SetValue(func(kafkaStatsBuilder *model.KafkaStatsBuilder) { - if requestStat.Count <= 0 || requestStat.Latencies == nil { - return - } kafkaStatsBuilder.SetCount(uint32(requestStat.Count)) - blob, _ := proto.Marshal(requestStat.Latencies.ToProto()) - kafkaStatsBuilder.SetLatencies(func(b *bytes.Buffer) { - b.Write(blob) - }) + if latencies := requestStat.Latencies; latencies != nil { + blob, _ := proto.Marshal(latencies.ToProto()) + kafkaStatsBuilder.SetLatencies(func(b *bytes.Buffer) { + b.Write(blob) + }) + } else { + kafkaStatsBuilder.SetFirstLatencySample(requestStat.FirstLatencySample) + } }) }) staticTags |= requestStat.StaticTags diff --git a/pkg/network/protocols/kafka/debugging/debugging.go b/pkg/network/protocols/kafka/debugging/debugging.go index 5224ec9b75b16..a1f7b9cf9544d 100644 --- a/pkg/network/protocols/kafka/debugging/debugging.go +++ b/pkg/network/protocols/kafka/debugging/debugging.go @@ -68,8 +68,9 @@ func Kafka(stats map[kafka.Key]*kafka.RequestStats) []RequestSummary { for status, stat := range requestStat.ErrorCodeToStat { debug.ByStatus[int8(status)] = Stats{ - Count: stat.Count, - LatencyP50: protocols.GetSketchQuantile(stat.Latencies, 0.5), + Count: stat.Count, + FirstLatencySample: stat.FirstLatencySample, + LatencyP50: protocols.GetSketchQuantile(stat.Latencies, 0.5), } } diff --git a/pkg/network/protocols/kafka/stats.go b/pkg/network/protocols/kafka/stats.go index ed6e67a96d9ec..79ccfcadc5837 100644 --- a/pkg/network/protocols/kafka/stats.go +++ b/pkg/network/protocols/kafka/stats.go @@ -61,15 +61,18 @@ func NewRequestStats() *RequestStats { // RequestStat stores stats for Kafka requests to a particular key type RequestStat struct { - // Unlike HTTP, Kafka clients aren't using short-lived TCP connections for each request. - // Hence, we don't use the FirstLatencySample optimization here. // this field order is intentional to help the GC pointer tracking Latencies *ddsketch.DDSketch // Note: every time we add a latency value to the DDSketch, it's possible for the sketch to discard that value // (ie if it is outside the range that is tracked by the sketch). For that reason, in order to keep an accurate count // the number of kafka transactions processed, we have our own count field (rather than relying on DDSketch.GetCount()) - Count int - StaticTags uint64 + Count int + // This field holds the value (in nanoseconds) of the first HTTP request + // in this bucket. We do this as optimization to avoid creating sketches with + // a single value. This is quite common in the context of HTTP requests without + // keep-alives where a short-lived TCP connection is used for a single request. + FirstLatencySample float64 + StaticTags uint64 } func (r *RequestStat) initSketch() (err error) { @@ -84,11 +87,17 @@ func (r *RequestStat) initSketch() (err error) { // newStats is kept as it is, while the method receiver gets mutated func (r *RequestStats) CombineWith(newStats *RequestStats) { for statusCode, newRequests := range newStats.ErrorCodeToStat { - if newRequests.Count == 0 || newRequests.Latencies == nil { + if newRequests.Count == 0 { // Nothing to do in this case continue } + if newRequests.Latencies == nil { + // In this case, newRequests must have only FirstLatencySample, so use it when adding the request + r.AddRequest(statusCode, newRequests.Count, newRequests.StaticTags, newRequests.FirstLatencySample) + continue + } + stats, exists := r.ErrorCodeToStat[statusCode] if !exists { stats = &RequestStat{} @@ -98,6 +107,14 @@ func (r *RequestStats) CombineWith(newStats *RequestStats) { // We first ensure that the bucket we're merging to have a DDSketch object if stats.Latencies == nil { stats.Latencies = newRequests.Latencies.Copy() + + // If we have a latency sample in this bucket we now add it to the DDSketch + if stats.FirstLatencySample != 0 { + err := stats.Latencies.AddWithCount(stats.FirstLatencySample, float64(stats.Count)) + if err != nil { + log.Debugf("could not add kafka request latency to ddsketch: %v", err) + } + } } else { err := stats.Latencies.MergeWith(newRequests.Latencies) if err != nil { @@ -118,16 +135,27 @@ func (r *RequestStats) AddRequest(errorCode int32, count int, staticTags uint64, stats = &RequestStat{} r.ErrorCodeToStat[errorCode] = stats } + originalCount := stats.Count stats.Count += count stats.StaticTags |= staticTags + if stats.FirstLatencySample == 0 { + stats.FirstLatencySample = latency + return + } + if stats.Latencies == nil { if err := stats.initSketch(); err != nil { return } + + // Add the deferred latency sample + if err := stats.Latencies.AddWithCount(stats.FirstLatencySample, float64(originalCount)); err != nil { + log.Debugf("could not add request latency to ddsketch: %v", err) + } } if err := stats.Latencies.AddWithCount(latency, float64(count)); err != nil { - log.Errorf("could not add request latency to ddsketch: %v", err) + log.Debugf("could not add request latency to ddsketch: %v", err) } } diff --git a/pkg/network/usm/kafka_monitor_test.go b/pkg/network/usm/kafka_monitor_test.go index db679738b57ac..d42ba08aab0b3 100644 --- a/pkg/network/usm/kafka_monitor_test.go +++ b/pkg/network/usm/kafka_monitor_test.go @@ -1524,7 +1524,7 @@ func validateProduceFetchCount(t *assert.CollectT, kafkaStats map[kafka.Key]*kaf numberOfProduceRequests += kafkaStat.ErrorCodeToStat[errorCode].Count case kafka.FetchAPIKey: assert.Equal(t, uint16(validation.expectedAPIVersionFetch), kafkaKey.RequestVersion) - assert.Greater(t, kafkaStat.ErrorCodeToStat[errorCode].Latencies.GetCount(), float64(1)) + assert.Greater(t, kafkaStat.ErrorCodeToStat[errorCode].FirstLatencySample, float64(1)) numberOfFetchRequests += kafkaStat.ErrorCodeToStat[errorCode].Count default: assert.FailNow(t, "Expecting only produce or fetch kafka requests")