-
Notifications
You must be signed in to change notification settings - Fork 108
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support kafka fetch version 15 #903
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #903 +/- ##
==========================================
- Coverage 79.10% 79.03% -0.08%
==========================================
Files 131 131
Lines 9839 9949 +110
==========================================
+ Hits 7783 7863 +80
- Misses 1555 1577 +22
- Partials 501 509 +8
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, I just think we need to add a lot more testing for all those edge cases. There are so many paths with this kafka version checks :(.
topicName = []byte(topicNames[0]) | ||
} | ||
} else { | ||
topicName = []byte(topicNames[0]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to make sure we trim \x02\x00
here.
topicNames := strings.Split(string(topicNames[1]), "\x02\x00") | ||
if len(topicNames) > 0 { | ||
topicName = []byte(topicNames[0]) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we can use trim of the \x02\x00 here, there's no else statement to handle the topic name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For some reason. strings.TrimRight(string(topicNames[0]), "\x02\x00")
is not doing anything, so that's why I had to do splits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Argh, maybe @mariomac knows?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Weird... I just did a simple test and worked: https://go.dev/play/p/LKz9ex2F5DH?v=
Don't forget to assign the result of TrimRight
to a new variable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ended using regexs. I think it's easier to understand and implementation is shorter
} else { | ||
topicNameSize = int(binary.BigEndian.Uint16(pkt[offset:])) | ||
} | ||
if topicNameSize <= 0 || topicNameSize > 255 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the topicNameSize > 255 still a valid check if we are > 11? Variable size int encoding makes me think it can now be larger than 255...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed the upper limit
@@ -256,6 +296,24 @@ func getTopicOffsetFromFetchOperation(header *Header) int { | |||
return offset | |||
} | |||
|
|||
func readUnsignedVarint(data []byte) (int, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mind adding unit tests for this function? Encode some larger numbers, make sure they are decoded correctly.
@@ -23,7 +23,27 @@ func TestProcessKafkaRequest(t *testing.T) { | |||
}, | |||
}, | |||
{ | |||
name: "Produce request", | |||
name: "Fetch request (v12)", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make a test where the string doesn't have \t
inside, so the split doesn't work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also perhaps a test where there's no \x02\x00
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, but I don't have an example of that. I can manually generate one but not sure if that's a realistic scenario.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, just manually crafted would be great. The reason I mentioned this is because there are so many traps here with the kafka protocol, I'd like to make the code as defensive as possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ended testing the function to extract the topic itself, do you think is enough to test this?
expected: &KafkaInfo{ | ||
ClientID: "consumer-frauddetectionservice-1", | ||
Operation: Fetch, | ||
Topic: "", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about making topic equal to *
when we have an UUID? It seems similar to what we do for HTTP...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will make it clear that we have new protocol when we debug this in production.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it will display process *
as span name, are we ok with that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so, * usually in OTel means something high cardinality, so UUID will fit that I think. It will also help us spot the difference when we failed to read, compared to what we knew we couldn't read.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Since version 13 of the fetch request, Kafka fetch operations no longer supply the topic name, instead they supply a UUID (16 bytes) which seems auto-generated.
https://kafka.apache.org/protocol.html (search for "Version: 15"). They also seems to have removed replica_id from the fetch request. Since the UUID makes no sense, I think we simply report [UUID] and that's it, I'm not sure we can find the topic name reliably for fetch requests, since it likely is only communicated on subscribe events.