Skip to content

Commit

Permalink
USMON-830: Kafka fetch latency (#27620)
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielLavie authored Aug 7, 2024
1 parent 667f925 commit e0baff3
Show file tree
Hide file tree
Showing 17 changed files with 322 additions and 34 deletions.
2 changes: 1 addition & 1 deletion pkg/network/ebpf/c/protocols/kafka/defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
#define KAFKA_MAX_ABORTED_TRANSACTIONS 10000

// This controls the number of Kafka transactions read from userspace at a time
#define KAFKA_BATCH_SIZE 26
#define KAFKA_BATCH_SIZE 25

// The amount of buckets we have for the kafka topic name length telemetry.
#define KAFKA_TELEMETRY_TOPIC_NAME_NUM_OF_BUCKETS 10
Expand Down
3 changes: 3 additions & 0 deletions pkg/network/ebpf/c/protocols/kafka/kafka-parsing.h
Original file line number Diff line number Diff line change
Expand Up @@ -1390,6 +1390,7 @@ static __always_inline bool kafka_process_new_response(void *ctx, conn_tuple_t *
kafka->response.state = KAFKA_FETCH_RESPONSE_START;
kafka->response.carry_over_offset = offset - orig_offset;
kafka->response.expected_tcp_seq = kafka_get_next_tcp_seq(skb_info);
kafka->response.transaction.response_last_seen = bpf_ktime_get_ns();

// Copy it to the stack since the verifier on 4.14 complains otherwise.
kafka_response_context_t response_ctx;
Expand All @@ -1404,6 +1405,7 @@ static __always_inline bool kafka_process_new_response(void *ctx, conn_tuple_t *
static __always_inline bool kafka_process_response(void *ctx, conn_tuple_t *tup, kafka_info_t *kafka, pktbuf_t pkt, skb_info_t *skb_info) {
kafka_response_context_t *response = bpf_map_lookup_elem(&kafka_response, tup);
if (response) {
response->transaction.response_last_seen = bpf_ktime_get_ns();
if (!skb_info || skb_info->tcp_seq == response->expected_tcp_seq) {
response->expected_tcp_seq = kafka_get_next_tcp_seq(skb_info);
kafka_call_response_parser(ctx, tup, pkt, response->state, response->transaction.request_api_version);
Expand Down Expand Up @@ -1471,6 +1473,7 @@ static __always_inline bool kafka_process(conn_tuple_t *tup, kafka_info_t *kafka
}

kafka_transaction->request_started = bpf_ktime_get_ns();
kafka_transaction->response_last_seen = 0;
kafka_transaction->request_api_key = kafka_header.api_key;
kafka_transaction->request_api_version = kafka_header.api_version;

Expand Down
1 change: 1 addition & 0 deletions pkg/network/ebpf/c/protocols/kafka/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ typedef struct {

typedef struct kafka_transaction_t {
__u64 request_started;
__u64 response_last_seen;
__u32 records_count;
// Request API key and version are 16-bit in the protocol but we store
// them as u8 to reduce memory usage of the map since the APIs and
Expand Down
4 changes: 2 additions & 2 deletions pkg/network/encoding/encoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -984,7 +984,7 @@ func TestKafkaSerializationWithLocalhostTraffic(t *testing.T) {
},
Kafka: map[kafka.Key]*kafka.RequestStats{
kafkaKey: {
ErrorCodeToStat: map[int32]*kafka.RequestStat{0: {Count: 10}},
ErrorCodeToStat: map[int32]*kafka.RequestStat{0: {Count: 10, FirstLatencySample: 5}},
},
},
}
Expand All @@ -998,7 +998,7 @@ func TestKafkaSerializationWithLocalhostTraffic(t *testing.T) {
},
Topic: topicName,
StatsByErrorCode: map[int32]*model.KafkaStats{
0: {Count: 10},
0: {Count: 10, FirstLatencySample: 5},
},
},
},
Expand Down
12 changes: 11 additions & 1 deletion pkg/network/encoding/marshal/usm_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"bytes"
"io"

"github.com/gogo/protobuf/proto"

model "github.com/DataDog/agent-payload/v5/process"

"github.com/DataDog/datadog-agent/pkg/network"
Expand Down Expand Up @@ -69,9 +71,17 @@ func (e *kafkaEncoder) encodeData(connectionData *USMConnectionData[kafka.Key, *
continue
}
builder.AddStatsByErrorCode(func(statsByErrorCodeBuilder *model.KafkaAggregation_StatsByErrorCodeEntryBuilder) {
statsByErrorCodeBuilder.SetKey(int32(statusCode))
statsByErrorCodeBuilder.SetKey(statusCode)
statsByErrorCodeBuilder.SetValue(func(kafkaStatsBuilder *model.KafkaStatsBuilder) {
kafkaStatsBuilder.SetCount(uint32(requestStat.Count))
if latencies := requestStat.Latencies; latencies != nil {
blob, _ := proto.Marshal(latencies.ToProto())
kafkaStatsBuilder.SetLatencies(func(b *bytes.Buffer) {
b.Write(blob)
})
} else {
kafkaStatsBuilder.SetFirstLatencySample(requestStat.FirstLatencySample)
}
})
})
staticTags |= requestStat.StaticTags
Expand Down
12 changes: 12 additions & 0 deletions pkg/network/protocols/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ package protocols

import (
"math"

"github.com/DataDog/sketches-go/ddsketch"
)

// below is copied from pkg/trace/stats/statsraw.go
Expand All @@ -23,3 +25,13 @@ func NSTimestampToFloat(ns uint64) float64 {
b &= 0xfffff80000000000
return math.Float64frombits(b)
}

// GetSketchQuantile returns the value at the given percentile in the sketch
func GetSketchQuantile(sketch *ddsketch.DDSketch, percentile float64) float64 {
if sketch == nil {
return 0.0
}

val, _ := sketch.GetValueAtQuantile(percentile)
return val
}
14 changes: 2 additions & 12 deletions pkg/network/protocols/http/debugging/debugging.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@
package debugging

import (
"github.com/DataDog/sketches-go/ddsketch"

"github.com/DataDog/datadog-agent/pkg/network/dns"
"github.com/DataDog/datadog-agent/pkg/network/protocols"
"github.com/DataDog/datadog-agent/pkg/network/protocols/http"
"github.com/DataDog/datadog-agent/pkg/process/util"
)
Expand Down Expand Up @@ -69,7 +68,7 @@ func HTTP(stats map[http.Key]*http.RequestStats, dns map[util.Address][]dns.Host
debug.ByStatus[status] = Stats{
Count: stat.Count,
FirstLatencySample: stat.FirstLatencySample,
LatencyP50: getSketchQuantile(stat.Latencies, 0.5),
LatencyP50: protocols.GetSketchQuantile(stat.Latencies, 0.5),
}
}

Expand All @@ -96,12 +95,3 @@ func getDNS(dnsData map[util.Address][]dns.Hostname, addr util.Address) string {

return ""
}

func getSketchQuantile(sketch *ddsketch.DDSketch, percentile float64) float64 {
if sketch == nil {
return 0.0
}

val, _ := sketch.GetValueAtQuantile(percentile)
return val
}
9 changes: 7 additions & 2 deletions pkg/network/protocols/kafka/debugging/debugging.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package debugging

import (
"github.com/DataDog/datadog-agent/pkg/network/protocols"
"github.com/DataDog/datadog-agent/pkg/network/protocols/kafka"
"github.com/DataDog/datadog-agent/pkg/process/util"
)
Expand All @@ -29,7 +30,9 @@ type Address struct {

// Stats consolidates request count and latency information for a certain status code
type Stats struct {
Count int
Count int
FirstLatencySample float64
LatencyP50 float64
}

// Kafka returns a debug-friendly representation of map[kafka.Key]kafka.RequestStats
Expand Down Expand Up @@ -65,7 +68,9 @@ func Kafka(stats map[kafka.Key]*kafka.RequestStats) []RequestSummary {
for status, stat := range requestStat.ErrorCodeToStat {

debug.ByStatus[int8(status)] = Stats{
Count: stat.Count,
Count: stat.Count,
FirstLatencySample: stat.FirstLatencySample,
LatencyP50: protocols.GetSketchQuantile(stat.Latencies, 0.5),
}
}

Expand Down
9 changes: 9 additions & 0 deletions pkg/network/protocols/kafka/model_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package kafka
import (
"fmt"

"github.com/DataDog/datadog-agent/pkg/network/protocols"
"github.com/DataDog/datadog-agent/pkg/network/types"
)

Expand Down Expand Up @@ -45,6 +46,14 @@ func (tx *EbpfTx) ErrorCode() int8 {
return tx.Transaction.Error_code
}

// RequestLatency returns the latency of the request in nanoseconds
func (tx *EbpfTx) RequestLatency() float64 {
if uint64(tx.Transaction.Request_started) == 0 || uint64(tx.Transaction.Response_last_seen) == 0 {
return 0
}
return protocols.NSTimestampToFloat(tx.Transaction.Response_last_seen - tx.Transaction.Request_started)
}

// String returns a string representation of the kafka eBPF telemetry.
func (t *RawKernelTelemetry) String() string {
return fmt.Sprintf(`
Expand Down
10 changes: 9 additions & 1 deletion pkg/network/protocols/kafka/statkeeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,15 @@ func (statKeeper *StatKeeper) Process(tx *EbpfTx) {
requestStats = NewRequestStats()
statKeeper.stats[key] = requestStats
}
requestStats.AddRequest(int32(tx.ErrorCode()), int(tx.RecordsCount()), uint64(tx.Transaction.Tags))

latency := tx.RequestLatency()
// Currently, we only support measuring latency for fetch operations
if key.RequestAPIKey == FetchAPIKey && latency <= 0 {
statKeeper.telemetry.invalidLatency.Add(1)
return
}

requestStats.AddRequest(int32(tx.ErrorCode()), int(tx.RecordsCount()), uint64(tx.Transaction.Tags), latency)
}

// GetAndResetAllStats returns all the stats and resets the stats
Expand Down
84 changes: 84 additions & 0 deletions pkg/network/protocols/kafka/statkeeper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,17 @@
package kafka

import (
"encoding/binary"
"strconv"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/DataDog/datadog-agent/pkg/network/config"
"github.com/DataDog/datadog-agent/pkg/process/util"
)

func BenchmarkStatKeeperSameTX(b *testing.B) {
Expand Down Expand Up @@ -68,3 +75,80 @@ func TestStatKeeper_extractTopicName(t *testing.T) {
})
}
}

func TestProcessKafkaTransactions(t *testing.T) {
cfg := &config.Config{MaxKafkaStatsBuffered: 1000}
tel := NewTelemetry()
sk := NewStatkeeper(cfg, tel)

srcString := "1.1.1.1"
dstString := "2.2.2.2"
sourceIP := util.AddressFromString(srcString)
sourcePort := 1234
destIP := util.AddressFromString(dstString)
destPort := 9092

const numOfTopics = 10
topicNamePrefix := "test-topic"
for i := 0; i < numOfTopics; i++ {
topicName := topicNamePrefix + strconv.Itoa(i)

for j := 0; j < 10; j++ {
errorCode := j % 5
latency := time.Duration(j%5+1) * time.Millisecond
tx := generateKafkaTransaction(sourceIP, destIP, sourcePort, destPort, topicName, int8(errorCode), uint32(10), latency)
sk.Process(tx)
}
}

stats := sk.GetAndResetAllStats()
assert.Equal(t, 0, len(sk.stats))
assert.Equal(t, numOfTopics, len(stats))
for key, stats := range stats {
assert.Equal(t, topicNamePrefix, key.TopicName[:len(topicNamePrefix)])
for i := 0; i < 5; i++ {
s := stats.ErrorCodeToStat[int32(i)]
require.NotNil(t, s)
assert.Equal(t, 20, s.Count)
assert.Equal(t, 20.0, s.Latencies.GetCount())

p50, err := s.Latencies.GetValueAtQuantile(0.5)
assert.Nil(t, err)

expectedLatency := float64(time.Duration(i+1) * time.Millisecond)
acceptableError := expectedLatency * s.Latencies.IndexMapping.RelativeAccuracy()
assert.GreaterOrEqual(t, p50, expectedLatency-acceptableError)
assert.LessOrEqual(t, p50, expectedLatency+acceptableError)
}
}
}

func generateKafkaTransaction(source util.Address, dest util.Address, sourcePort int, destPort int, topicName string, errorCode int8, recordsCount uint32, latency time.Duration) *EbpfTx {
var event EbpfTx

latencyNS := uint64(latency)
event.Transaction.Request_started = 1
event.Transaction.Request_api_key = FetchAPIKey
event.Transaction.Request_api_version = 7
event.Transaction.Response_last_seen = event.Transaction.Request_started + latencyNS
event.Transaction.Error_code = errorCode
event.Transaction.Records_count = recordsCount
event.Transaction.Topic_name_size = uint8(len(topicName))
event.Transaction.Topic_name = topicNameFromString([]byte(topicName))
event.Tup.Saddr_l = uint64(binary.LittleEndian.Uint32(source.Bytes()))
event.Tup.Sport = uint16(sourcePort)
event.Tup.Daddr_l = uint64(binary.LittleEndian.Uint32(dest.Bytes()))
event.Tup.Dport = uint16(destPort)
event.Tup.Metadata = 1

return &event
}

func topicNameFromString(fragment []byte) [TopicNameMaxSize]byte {
if len(fragment) >= TopicNameMaxSize {
return *(*[TopicNameMaxSize]byte)(fragment)
}
var b [TopicNameMaxSize]byte
copy(b[:], fragment)
return b
}
Loading

0 comments on commit e0baff3

Please sign in to comment.