-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
USMON-830: Kafka fetch latency #27620
USMON-830: Kafka fetch latency #27620
Conversation
Test changes on VMUse this command from test-infra-definitions to manually test this PR changes on a VM: inv create-vm --pipeline-id=40993567 --os-family=ubuntu Note: This applies to commit 62a4515 |
Regression DetectorRegression Detector ResultsRun ID: 2acd5305-61ba-4ad7-9d9f-2b73d9f1a52e Metrics dashboard Target profiles Baseline: ff7f7c6 Performance changes are noted in the perf column of each table:
No significant changes in experiment optimization goalsConfidence level: 90.00% There were no significant changes in experiment optimization goals at this confidence level and effect size tolerance.
|
perf | experiment | goal | Δ mean % | Δ mean % CI | links |
---|---|---|---|---|---|
➖ | tcp_syslog_to_blackhole | ingress throughput | +2.58 | [-10.41, +15.57] | Logs |
➖ | basic_py_check | % cpu utilization | +1.65 | [-1.01, +4.31] | Logs |
➖ | uds_dogstatsd_to_api_cpu | % cpu utilization | +0.47 | [-0.41, +1.35] | Logs |
➖ | otel_to_otel_logs | ingress throughput | +0.08 | [-0.73, +0.89] | Logs |
➖ | uds_dogstatsd_to_api | ingress throughput | -0.00 | [-0.00, +0.00] | Logs |
➖ | tcp_dd_logs_filter_exclude | ingress throughput | -0.00 | [-0.01, +0.01] | Logs |
➖ | idle | memory utilization | -0.10 | [-0.13, -0.07] | Logs |
➖ | file_tree | memory utilization | -0.46 | [-0.53, -0.39] | Logs |
➖ | pycheck_1000_100byte_tags | % cpu utilization | -0.58 | [-5.28, +4.13] | Logs |
Explanation
A regression test is an A/B test of target performance in a repeatable rig, where "performance" is measured as "comparison variant minus baseline variant" for an optimization goal (e.g., ingress throughput). Due to intrinsic variability in measuring that goal, we can only estimate its mean value for each experiment; we report uncertainty in that value as a 90.00% confidence interval denoted "Δ mean % CI".
For each experiment, we decide whether a change in performance is a "regression" -- a change worth investigating further -- if all of the following criteria are true:
-
Its estimated |Δ mean %| ≥ 5.00%, indicating the change is big enough to merit a closer look.
-
Its 90.00% confidence interval "Δ mean % CI" does not contain zero, indicating that if our statistical model is accurate, there is at least a 90.00% chance there is a difference in performance between baseline and comparison variants.
-
Its configuration does not mark it "erratic".
…mber of hits in the sketch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM for NPM owned files (just a test)
if newRequests.Latencies == nil { | ||
// In this case, newRequests must have only FirstLatencySample, so use it when adding the request | ||
r.AddRequest(statusCode, newRequests.Count, newRequests.StaticTags, newRequests.FirstLatencySample) | ||
continue | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where and how do you ensure it runs only for fetch requests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What’s the issue with it running for produce requests? The latency will always be 0 in this case. Additionally, on the backend, we'll be only processing latency for fetch requests at the moment
// This field holds the value (in nanoseconds) of the first HTTP request | ||
// in this bucket. We do this as optimization to avoid creating sketches with | ||
// a single value. This is quite common in the context of HTTP requests without | ||
// keep-alives where a short-lived TCP connection is used for a single request. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is taking about HTTP, does this rationale hold for Kafka too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good observation. From what I've seen, the answer is no; most modern Kafka clients are reusing sockets and setting keepalives. An example of this can be found in the Apache Kafka Java Client
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll work on removing the FirstLatencySample
to simplify the code, as it appears to be an unnecessary optimization
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After discussing with @guyarb, to fully understand the performance impact, I plan to:
- Determine the difference between creating a ddsketch and a simple FirstLatency setup.
- Estimate how often the FirstLatency scenario occurs for a connection in the Datadog environment. To do this, we can use the
datadog.network_tracer.usm.kafka.events_captured
metric for the number of Kafka requests and compare it to the number of Kafka connections.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implemented the following benchmark tests:
func benchmarkCreateRequestStat(b *testing.B, useLatencies bool) {
b.ReportAllocs()
b.ResetTimer()
latencySample := 3.2
count := 10
for i := 0; i < b.N; i++ {
if useLatencies {
latencies, err := ddsketch.NewDefaultDDSketch(0.01)
require.NoError(b, err)
require.NoError(b, latencies.AddWithCount(latencySample, float64(count)))
_ = kafka.RequestStat{
Latencies: latencies,
Count: count,
}
} else {
_ = kafka.RequestStat{
FirstLatencySample: latencySample,
Count: count,
}
}
}
}
func BenchmarkCreateRequestStatWithLatencies(b *testing.B) {
benchmarkCreateRequestStat(b, true)
}
func BenchmarkCreateRequestStatWithFirstLatencySample(b *testing.B) {
benchmarkCreateRequestStat(b, false)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The benchmark results are as follows:
BenchmarkCreateRequestStatWithLatencies 1811176 664.7 ns/op 784 B/op 8 allocs/op
BenchmarkCreateRequestStatWithFirstLatencySample 1000000000 0.3180 ns/op 0 B/op 0 allocs/op
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Results Breakdown
BenchmarkCreateRequestStatWithLatencies:
-
Operations per second (
ns/op
): 664.7 ns/op
On average, each operation takes approximately 664.7 nanoseconds. -
Memory allocated per operation (
B/op
): 784 B/op
Each operation allocates around 784 bytes of memory. -
Allocations per operation (
allocs/op
): 8 allocs/op
Each operation results in 8 memory allocations.
BenchmarkCreateRequestStatWithFirstLatencySample:
-
Operations per second (
ns/op
): 0.3180 ns/op
On average, each operation takes approximately 0.318 nanoseconds. This is extraordinarily fast, likely indicating that the benchmark is measuring a minimal or zero-cost operation. -
Memory allocated per operation (
B/op
): 0 B/op
Each operation does not allocate any memory. -
Allocations per operation (
allocs/op
): 0 allocs/op
Each operation results in no memory allocations.
Interpretation
Performance:
The BenchmarkCreateRequestStatWithFirstLatencySample
is significantly faster (0.318 ns/op vs. 664.7 ns/op) compared to BenchmarkCreateRequestStatWithLatencies
. This is expected because storing a single float value (the FirstLatencySample
) is much simpler and less costly than managing a DDSketch
for latencies.
Memory Usage:
The BenchmarkCreateRequestStatWithFirstLatencySample
uses no additional memory (0 B/op and 0 allocs/op), indicating that it’s very efficient in terms of memory usage.
The BenchmarkCreateRequestStatWithLatencies
allocates 784 bytes per operation and performs 8 allocations, indicating higher memory overhead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't currently have an effective way to estimate how often Kafka connections have a single request per connection, which is relevant for the FirstLatencySample optimization. We'll need to add a metric to track how frequently the FirstLatencySample is used and test this in staging and production. Since we lack conclusive data to support either approach at the moment, I'll proceed with the FirstLatencySample optimization for now and we can revisit it in a future task
pkg/network/protocols/kafka/stats.go
Outdated
Latencies *ddsketch.DDSketch | ||
// Note: every time we add a latency value to the DDSketch, it's possible for the sketch to discard that value | ||
// (ie if it is outside the range that is tracked by the sketch). For that reason, in order to keep an accurate count | ||
// the number of http transactions processed, we have our own count field (rather than relying on DDSketch.GetCount()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kafka
if stats.FirstLatencySample != 0 { | ||
err := stats.Latencies.AddWithCount(stats.FirstLatencySample, float64(stats.Count)) | ||
if err != nil { | ||
log.Debugf("could not add kafka request latency to ddsketch: %v", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should these errors be propagate to the caller and/or have a higher log level?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This behaves the same as in HTTP. I’ll open a ticket to address the potential issues related to ignoring errors from these functions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
…necessary optimization
/merge |
🚂 MergeQueue: pull request added to the queue The median merge time in Use |
What does this PR do?
Adds support for measuring Kafka fetch request latency and incorporating it into the Kafka aggregation
Motivation
Additional Notes
For the initial latency check of a given stat, we cannot rely on the count field as we do with other protocols. This is because the Kernel might report a count greater than 1 in a single transaction (it counts the number of Kafka produce/consume messages, which can exceed 1 per Kafka transaction). Therefore, we check that FirstLatencySample is greater than 0 to replicate this behavior
Load test results:
Staging Deployment
Possible Drawbacks / Trade-offs
Describe how to test/QA your changes