Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

USMON-830: Kafka fetch latency #27620

Merged
merged 25 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
6dcb9c9
WIP: adding latency
DanielLavie Jul 3, 2024
88db646
Merge branch 'main' into daniel.lavie/USMON-830-kafka-fetch-latency
DanielLavie Jul 16, 2024
3c8afc4
Added user mode code for handling Kafka latencies
DanielLavie Jul 16, 2024
30b5e96
Fixed TestKafkaSerializationWithLocalhostTraffic
DanielLavie Jul 17, 2024
96dcb66
Added a check for fetch api key as produce don't have a latency ATM
DanielLavie Jul 18, 2024
71c872f
Fixed Kafka AddRequest function
DanielLavie Jul 18, 2024
69ef394
Increased KAFKA_BATCH_SIZE from 24 to 25
DanielLavie Jul 18, 2024
3c0706a
Added Kafka latency to debug endpoint
DanielLavie Jul 18, 2024
0982689
Merge branch 'main' into daniel.lavie/USMON-830-kafka-fetch-latency
DanielLavie Jul 18, 2024
0f92b67
Added comment for GetSketchQuantile
DanielLavie Jul 18, 2024
6e220eb
Added `TestProcessKafkaTransactions`
DanielLavie Jul 18, 2024
500c4f8
Using AddWithCount when adding to Kafka sketch, to count the exact nu…
DanielLavie Jul 18, 2024
4f1e970
Added kafka/stats_test.go and fixed a bug in kafka/stats.go
DanielLavie Jul 18, 2024
e4bc658
Fixed import errors
DanielLavie Jul 18, 2024
340a00c
Fixed TestProcessKafkaTransactions
DanielLavie Jul 21, 2024
05ecee1
Merge branch 'main' into daniel.lavie/USMON-830-kafka-fetch-latency
DanielLavie Jul 21, 2024
2ccc4b3
Merge branch 'main' into daniel.lavie/USMON-830-kafka-fetch-latency
DanielLavie Jul 30, 2024
3b42b3a
Fixed typo
DanielLavie Jul 30, 2024
26be4ea
Merge branch 'main' into daniel.lavie/USMON-830-kafka-fetch-latency
DanielLavie Jul 30, 2024
999bb91
Removed FirstLatencySample for Kafka aggregation as it seems like an …
DanielLavie Aug 4, 2024
68c45bc
Fixed encoding_test.go
DanielLavie Aug 4, 2024
3b5c22c
Merge branch 'main' into daniel.lavie/USMON-830-kafka-fetch-latency
DanielLavie Aug 4, 2024
bff2969
Reverted the removal of the FirstLatencySample
DanielLavie Aug 5, 2024
cba049c
Merge branch 'main' into daniel.lavie/USMON-830-kafka-fetch-latency
DanielLavie Aug 5, 2024
62a4515
Merge branch 'main' into daniel.lavie/USMON-830-kafka-fetch-latency
DanielLavie Aug 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/network/ebpf/c/protocols/kafka/defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
#define KAFKA_MAX_ABORTED_TRANSACTIONS 10000

// This controls the number of Kafka transactions read from userspace at a time
#define KAFKA_BATCH_SIZE 26
#define KAFKA_BATCH_SIZE 25

// The amount of buckets we have for the kafka topic name length telemetry.
#define KAFKA_TELEMETRY_TOPIC_NAME_NUM_OF_BUCKETS 10
Expand Down
3 changes: 3 additions & 0 deletions pkg/network/ebpf/c/protocols/kafka/kafka-parsing.h
Original file line number Diff line number Diff line change
Expand Up @@ -1390,6 +1390,7 @@ static __always_inline bool kafka_process_new_response(void *ctx, conn_tuple_t *
kafka->response.state = KAFKA_FETCH_RESPONSE_START;
kafka->response.carry_over_offset = offset - orig_offset;
kafka->response.expected_tcp_seq = kafka_get_next_tcp_seq(skb_info);
kafka->response.transaction.response_last_seen = bpf_ktime_get_ns();

// Copy it to the stack since the verifier on 4.14 complains otherwise.
kafka_response_context_t response_ctx;
Expand All @@ -1404,6 +1405,7 @@ static __always_inline bool kafka_process_new_response(void *ctx, conn_tuple_t *
static __always_inline bool kafka_process_response(void *ctx, conn_tuple_t *tup, kafka_info_t *kafka, pktbuf_t pkt, skb_info_t *skb_info) {
kafka_response_context_t *response = bpf_map_lookup_elem(&kafka_response, tup);
if (response) {
response->transaction.response_last_seen = bpf_ktime_get_ns();
if (!skb_info || skb_info->tcp_seq == response->expected_tcp_seq) {
response->expected_tcp_seq = kafka_get_next_tcp_seq(skb_info);
kafka_call_response_parser(ctx, tup, pkt, response->state, response->transaction.request_api_version);
Expand Down Expand Up @@ -1471,6 +1473,7 @@ static __always_inline bool kafka_process(conn_tuple_t *tup, kafka_info_t *kafka
}

kafka_transaction->request_started = bpf_ktime_get_ns();
kafka_transaction->response_last_seen = 0;
kafka_transaction->request_api_key = kafka_header.api_key;
kafka_transaction->request_api_version = kafka_header.api_version;

Expand Down
1 change: 1 addition & 0 deletions pkg/network/ebpf/c/protocols/kafka/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ typedef struct {

typedef struct kafka_transaction_t {
__u64 request_started;
__u64 response_last_seen;
__u32 records_count;
// Request API key and version are 16-bit in the protocol but we store
// them as u8 to reduce memory usage of the map since the APIs and
Expand Down
4 changes: 2 additions & 2 deletions pkg/network/encoding/encoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -984,7 +984,7 @@ func TestKafkaSerializationWithLocalhostTraffic(t *testing.T) {
},
Kafka: map[kafka.Key]*kafka.RequestStats{
kafkaKey: {
ErrorCodeToStat: map[int32]*kafka.RequestStat{0: {Count: 10}},
ErrorCodeToStat: map[int32]*kafka.RequestStat{0: {Count: 10, FirstLatencySample: 5}},
},
},
}
Expand All @@ -998,7 +998,7 @@ func TestKafkaSerializationWithLocalhostTraffic(t *testing.T) {
},
Topic: topicName,
StatsByErrorCode: map[int32]*model.KafkaStats{
0: {Count: 10},
0: {Count: 10, FirstLatencySample: 5},
},
},
},
Expand Down
12 changes: 11 additions & 1 deletion pkg/network/encoding/marshal/usm_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"bytes"
"io"

"github.com/gogo/protobuf/proto"

model "github.com/DataDog/agent-payload/v5/process"

"github.com/DataDog/datadog-agent/pkg/network"
Expand Down Expand Up @@ -69,9 +71,17 @@ func (e *kafkaEncoder) encodeData(connectionData *USMConnectionData[kafka.Key, *
continue
}
builder.AddStatsByErrorCode(func(statsByErrorCodeBuilder *model.KafkaAggregation_StatsByErrorCodeEntryBuilder) {
statsByErrorCodeBuilder.SetKey(int32(statusCode))
statsByErrorCodeBuilder.SetKey(statusCode)
statsByErrorCodeBuilder.SetValue(func(kafkaStatsBuilder *model.KafkaStatsBuilder) {
kafkaStatsBuilder.SetCount(uint32(requestStat.Count))
if latencies := requestStat.Latencies; latencies != nil {
blob, _ := proto.Marshal(latencies.ToProto())
kafkaStatsBuilder.SetLatencies(func(b *bytes.Buffer) {
b.Write(blob)
})
} else {
kafkaStatsBuilder.SetFirstLatencySample(requestStat.FirstLatencySample)
}
})
})
staticTags |= requestStat.StaticTags
Expand Down
12 changes: 12 additions & 0 deletions pkg/network/protocols/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ package protocols

import (
"math"

"github.com/DataDog/sketches-go/ddsketch"
)

// below is copied from pkg/trace/stats/statsraw.go
Expand All @@ -23,3 +25,13 @@ func NSTimestampToFloat(ns uint64) float64 {
b &= 0xfffff80000000000
return math.Float64frombits(b)
}

// GetSketchQuantile returns the value at the given percentile in the sketch
func GetSketchQuantile(sketch *ddsketch.DDSketch, percentile float64) float64 {
if sketch == nil {
return 0.0
}

val, _ := sketch.GetValueAtQuantile(percentile)
return val
}
14 changes: 2 additions & 12 deletions pkg/network/protocols/http/debugging/debugging.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@
package debugging

import (
"github.com/DataDog/sketches-go/ddsketch"

"github.com/DataDog/datadog-agent/pkg/network/dns"
"github.com/DataDog/datadog-agent/pkg/network/protocols"
"github.com/DataDog/datadog-agent/pkg/network/protocols/http"
"github.com/DataDog/datadog-agent/pkg/process/util"
)
Expand Down Expand Up @@ -69,7 +68,7 @@ func HTTP(stats map[http.Key]*http.RequestStats, dns map[util.Address][]dns.Host
debug.ByStatus[status] = Stats{
Count: stat.Count,
FirstLatencySample: stat.FirstLatencySample,
LatencyP50: getSketchQuantile(stat.Latencies, 0.5),
LatencyP50: protocols.GetSketchQuantile(stat.Latencies, 0.5),
}
}

Expand All @@ -96,12 +95,3 @@ func getDNS(dnsData map[util.Address][]dns.Hostname, addr util.Address) string {

return ""
}

func getSketchQuantile(sketch *ddsketch.DDSketch, percentile float64) float64 {
if sketch == nil {
return 0.0
}

val, _ := sketch.GetValueAtQuantile(percentile)
return val
}
9 changes: 7 additions & 2 deletions pkg/network/protocols/kafka/debugging/debugging.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package debugging

import (
"github.com/DataDog/datadog-agent/pkg/network/protocols"
"github.com/DataDog/datadog-agent/pkg/network/protocols/kafka"
"github.com/DataDog/datadog-agent/pkg/process/util"
)
Expand All @@ -29,7 +30,9 @@ type Address struct {

// Stats consolidates request count and latency information for a certain status code
type Stats struct {
Count int
Count int
FirstLatencySample float64
LatencyP50 float64
}

// Kafka returns a debug-friendly representation of map[kafka.Key]kafka.RequestStats
Expand Down Expand Up @@ -65,7 +68,9 @@ func Kafka(stats map[kafka.Key]*kafka.RequestStats) []RequestSummary {
for status, stat := range requestStat.ErrorCodeToStat {

debug.ByStatus[int8(status)] = Stats{
Count: stat.Count,
Count: stat.Count,
FirstLatencySample: stat.FirstLatencySample,
LatencyP50: protocols.GetSketchQuantile(stat.Latencies, 0.5),
}
}

Expand Down
13 changes: 12 additions & 1 deletion pkg/network/protocols/kafka/model_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@

package kafka

import "github.com/DataDog/datadog-agent/pkg/network/types"
import (
"github.com/DataDog/datadog-agent/pkg/network/protocols"
"github.com/DataDog/datadog-agent/pkg/network/types"
)

// ConnTuple returns the connection tuple for the transaction
func (tx *EbpfTx) ConnTuple() types.ConnectionKey {
Expand Down Expand Up @@ -40,3 +43,11 @@ func (tx *EbpfTx) RecordsCount() uint32 {
func (tx *EbpfTx) ErrorCode() int8 {
return tx.Transaction.Error_code
}

// RequestLatency returns the latency of the request in nanoseconds
func (tx *EbpfTx) RequestLatency() float64 {
if uint64(tx.Transaction.Request_started) == 0 || uint64(tx.Transaction.Response_last_seen) == 0 {
return 0
}
return protocols.NSTimestampToFloat(tx.Transaction.Response_last_seen - tx.Transaction.Request_started)
}
10 changes: 9 additions & 1 deletion pkg/network/protocols/kafka/statkeeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,15 @@ func (statKeeper *StatKeeper) Process(tx *EbpfTx) {
requestStats = NewRequestStats()
statKeeper.stats[key] = requestStats
}
requestStats.AddRequest(int32(tx.ErrorCode()), int(tx.RecordsCount()), uint64(tx.Transaction.Tags))

latency := tx.RequestLatency()
// Currently, we only support measuring latency for fetch operations
if key.RequestAPIKey == FetchAPIKey && latency <= 0 {
statKeeper.telemetry.invalidLatency.Add(1)
return
}

requestStats.AddRequest(int32(tx.ErrorCode()), int(tx.RecordsCount()), uint64(tx.Transaction.Tags), latency)
}

// GetAndResetAllStats returns all the stats and resets the stats
Expand Down
84 changes: 84 additions & 0 deletions pkg/network/protocols/kafka/statkeeper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,17 @@
package kafka

import (
"encoding/binary"
"strconv"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/DataDog/datadog-agent/pkg/network/config"
"github.com/DataDog/datadog-agent/pkg/process/util"
)

func BenchmarkStatKeeperSameTX(b *testing.B) {
Expand Down Expand Up @@ -68,3 +75,80 @@ func TestStatKeeper_extractTopicName(t *testing.T) {
})
}
}

func TestProcessKafkaTransactions(t *testing.T) {
cfg := &config.Config{MaxKafkaStatsBuffered: 1000}
tel := NewTelemetry()
sk := NewStatkeeper(cfg, tel)

srcString := "1.1.1.1"
dstString := "2.2.2.2"
sourceIP := util.AddressFromString(srcString)
sourcePort := 1234
destIP := util.AddressFromString(dstString)
destPort := 9092

const numOfTopics = 10
topicNamePrefix := "test-topic"
for i := 0; i < numOfTopics; i++ {
topicName := topicNamePrefix + strconv.Itoa(i)

for j := 0; j < 10; j++ {
errorCode := j % 5
latency := time.Duration(j%5+1) * time.Millisecond
tx := generateKafkaTransaction(sourceIP, destIP, sourcePort, destPort, topicName, int8(errorCode), uint32(10), latency)
sk.Process(tx)
}
}

stats := sk.GetAndResetAllStats()
assert.Equal(t, 0, len(sk.stats))
assert.Equal(t, numOfTopics, len(stats))
for key, stats := range stats {
assert.Equal(t, topicNamePrefix, key.TopicName[:len(topicNamePrefix)])
for i := 0; i < 5; i++ {
s := stats.ErrorCodeToStat[int32(i)]
require.NotNil(t, s)
assert.Equal(t, 20, s.Count)
assert.Equal(t, 2.0, s.Latencies.GetCount())

p50, err := s.Latencies.GetValueAtQuantile(0.5)
assert.Nil(t, err)

expectedLatency := float64(time.Duration(i+1) * time.Millisecond)
acceptableError := expectedLatency * s.Latencies.IndexMapping.RelativeAccuracy()
assert.GreaterOrEqual(t, p50, expectedLatency-acceptableError)
assert.LessOrEqual(t, p50, expectedLatency+acceptableError)
}
}
}

func generateKafkaTransaction(source util.Address, dest util.Address, sourcePort int, destPort int, topicName string, errorCode int8, recordsCount uint32, latency time.Duration) *EbpfTx {
var event EbpfTx

latencyNS := uint64(latency)
event.Transaction.Request_started = 1
event.Transaction.Request_api_key = FetchAPIKey
event.Transaction.Request_api_version = 7
event.Transaction.Response_last_seen = event.Transaction.Request_started + latencyNS
event.Transaction.Error_code = errorCode
event.Transaction.Records_count = recordsCount
event.Transaction.Topic_name_size = uint8(len(topicName))
event.Transaction.Topic_name = topicNameFromString([]byte(topicName))
event.Tup.Saddr_l = uint64(binary.LittleEndian.Uint32(source.Bytes()))
event.Tup.Sport = uint16(sourcePort)
event.Tup.Daddr_l = uint64(binary.LittleEndian.Uint32(dest.Bytes()))
event.Tup.Dport = uint16(destPort)
event.Tup.Metadata = 1

return &event
}

func topicNameFromString(fragment []byte) [TopicNameMaxSize]byte {
if len(fragment) >= TopicNameMaxSize {
return *(*[TopicNameMaxSize]byte)(fragment)
}
var b [TopicNameMaxSize]byte
copy(b[:], fragment)
return b
}
Loading
Loading