Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support kafka fetch version 15 #903

Merged
merged 10 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 68 additions & 10 deletions pkg/internal/ebpf/common/kafka_detect_transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ebpfcommon
import (
"encoding/binary"
"errors"
"strings"
"unsafe"

trace2 "go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -52,10 +53,10 @@ func ProcessPossibleKafkaEvent(pkt []byte, rpkt []byte) (*KafkaInfo, error) {
if err != nil {
k, err = ProcessKafkaRequest(rpkt)
}

return k, err
}

// https://kafka.apache.org/protocol.html
func ProcessKafkaRequest(pkt []byte) (*KafkaInfo, error) {
k := &KafkaInfo{}
if len(pkt) < KafkaMinLength {
Expand All @@ -81,7 +82,7 @@ func ProcessKafkaRequest(pkt []byte) (*KafkaInfo, error) {
return k, err
}

topic, err := getTopicName(pkt, offset)
topic, err := getTopicName(pkt, offset, k.Operation, header.APIVersion)
if err != nil {
return k, err
}
Expand Down Expand Up @@ -110,11 +111,11 @@ func isValidKafkaHeader(header *Header) bool {
}
switch Operation(header.APIKey) {
case Fetch:
if header.APIVersion > 11 {
if header.APIVersion > 16 { // latest: Fetch Request (Version: 16)
return false
}
case Produce:
if header.APIVersion == 0 || header.APIVersion > 8 {
if header.APIVersion == 0 || header.APIVersion > 10 { // latest: Produce Request (Version: 10)
return false
}
default:
Expand Down Expand Up @@ -182,31 +183,64 @@ func isValidClientID(buffer []byte, realClientIDSize int) bool {
return isValidKafkaString(buffer, len(buffer), realClientIDSize, true)
}

func getTopicName(pkt []byte, offset int) (string, error) {
func getTopicName(pkt []byte, offset int, op Operation, apiVersion int16) (string, error) {
if apiVersion >= 13 { // topic name is only a UUID, no need to parse it
return "", nil
}

offset += 4
if offset > len(pkt) {
return "", errors.New("invalid buffer length")
}
topicNameSize := int16(binary.BigEndian.Uint16(pkt[offset:]))
if topicNameSize <= 0 || topicNameSize > 255 {
return "", errors.New("invalid topic name size")
topicNameSize, err := getTopicNameSize(pkt, offset, op, apiVersion)
if err != nil {
return "", err
}
offset += 2

if offset > len(pkt) {
return "", nil
}
maxLen := offset + int(topicNameSize)
maxLen := offset + topicNameSize
if len(pkt) < maxLen {
maxLen = len(pkt)
}
topicName := pkt[offset:maxLen]
if op == Fetch && apiVersion > 11 { // topic name has the following format: uuid\x00\x02\tTOPIC\x02\x00
topicNames := strings.Split(string(topicName), "\t")
if len(topicNames) >= 2 {
topicNames := strings.Split(string(topicNames[1]), "\x02\x00")
if len(topicNames) > 0 {
topicName = []byte(topicNames[0])
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we can use trim of the \x02\x00 here, there's no else statement to handle the topic name.

Copy link
Contributor

@marctc marctc Jun 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some reason. strings.TrimRight(string(topicNames[0]), "\x02\x00") is not doing anything, so that's why I had to do splits.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Argh, maybe @mariomac knows?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Weird... I just did a simple test and worked: https://go.dev/play/p/LKz9ex2F5DH?v=

Don't forget to assign the result of TrimRight to a new variable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ended using regexs. I think it's easier to understand and implementation is shorter

} else {
topicName = []byte(topicNames[0])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to make sure we trim \x02\x00 here.

}
}
if isValidKafkaString(topicName, len(topicName), int(topicNameSize), false) {
return string(topicName), nil
}
return "", errors.New("invalid topic name")
}

func getTopicNameSize(pkt []byte, offset int, op Operation, apiVersion int16) (int, error) {
topicNameSize := 0
if (op == Produce && apiVersion > 7) || (op == Fetch && apiVersion > 11) { // topic is a compact string
var err error
topicNameSize, err = readUnsignedVarint(pkt[offset+1:])
topicNameSize--
if err != nil {
return 0, err
}
} else {
topicNameSize = int(binary.BigEndian.Uint16(pkt[offset:]))
}
if topicNameSize <= 0 || topicNameSize > 255 {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the topicNameSize > 255 still a valid check if we are > 11? Variable size int encoding makes me think it can now be larger than 255...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed the upper limit

return 0, errors.New("invalid topic name size")
}
return topicNameSize, nil
}

func getTopicOffsetFromProduceOperation(header *Header, pkt []byte, offset *int) (bool, error) {
if header.APIVersion >= 3 {
if len(pkt) < *offset+2 {
Expand Down Expand Up @@ -235,14 +269,20 @@ func getTopicOffsetFromProduceOperation(header *Header, pkt []byte, offset *int)
if timeoutMS < 0 {
return false, nil
}
*offset += 4
if header.APIVersion <= 7 {
*offset += 4
}

return true, nil
}

func getTopicOffsetFromFetchOperation(header *Header) int {
offset := 3 * 4 // 3 * sizeof(int32)

if header.APIVersion >= 15 {
offset -= 4 // no replica id
}

if header.APIVersion >= 3 {
offset += 4 // max_bytes
if header.APIVersion >= 4 {
Expand All @@ -256,6 +296,24 @@ func getTopicOffsetFromFetchOperation(header *Header) int {
return offset
}

func readUnsignedVarint(data []byte) (int, error) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mind adding unit tests for this function? Encode some larger numbers, make sure they are decoded correctly.

value := 0
i := 0
for idx := 0; idx < len(data); idx++ {
b := data[idx]
if (b & 0x80) == 0 {
value |= int(b) << i
return value, nil
}
value |= int(b&0x7F) << i
i += 7
if i > 28 {
return 0, errors.New("illegal varint")
}
}
return 0, errors.New("data ended before varint was complete")
}

func TCPToKafkaToSpan(trace *TCPRequestInfo, data *KafkaInfo) request.Span {
peer := ""
hostname := ""
Expand Down
34 changes: 32 additions & 2 deletions pkg/internal/ebpf/common/kafka_detect_transform_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func TestProcessKafkaRequest(t *testing.T) {
expected *KafkaInfo
}{
{
name: "Fetch request",
name: "Fetch request (v11)",
input: []byte{0, 0, 0, 94, 0, 1, 0, 11, 0, 0, 0, 224, 0, 6, 115, 97, 114, 97, 109, 97, 255, 255, 255, 255, 0, 0, 1, 244, 0, 0, 0, 1, 6, 64, 0, 0, 0, 0, 0, 0, 0, 255, 255, 255, 255, 0, 0, 0, 1, 0, 9, 105, 109, 112, 111, 114, 116, 97, 110, 116, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 19, 0, 0, 0, 0, 0, 0, 0, 0, 0, 16, 0, 0, 0, 0, 0, 0, 0, 0},
expected: &KafkaInfo{
ClientID: "sarama",
Expand All @@ -23,7 +23,27 @@ func TestProcessKafkaRequest(t *testing.T) {
},
},
{
name: "Produce request",
name: "Fetch request (v12)",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make a test where the string doesn't have \t inside, so the split doesn't work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also perhaps a test where there's no \x02\x00?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, but I don't have an example of that. I can manually generate one but not sure if that's a realistic scenario.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, just manually crafted would be great. The reason I mentioned this is because there are so many traps here with the kafka protocol, I'd like to make the code as defensive as possible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ended testing the function to extract the topic itself, do you think is enough to test this?

input: []byte{0, 0, 0, 52, 0, 1, 0, 12, 0, 0, 1, 3, 0, 12, 99, 111, 110, 115, 117, 109, 101, 114, 45, 49, 45, 49, 0, 255, 255, 255, 255, 0, 0, 1, 244, 0, 0, 0, 1, 3, 32, 0, 0, 0, 30, 37, 158, 231, 0, 0, 0, 156, 1, 1, 1, 0, 53, 99, 48, 57, 45, 52, 52, 48, 48, 45, 98, 54, 101, 101, 45, 56, 54, 102, 97, 102, 101, 102, 57, 52, 102, 101, 98, 0, 2, 9, 109, 121, 45, 116, 111, 112, 105, 99, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 100, 0, 0, 0, 0, 1, 0, 0, 0, 101, 121, 12, 118, 97, 108, 117, 101, 51, 0, 30, 0, 0},
expected: &KafkaInfo{
ClientID: "consumer-1-1",
Operation: Fetch,
Topic: "my-topic",
TopicOffset: 51,
},
},
{
name: "Fetch request (v15)",
input: []byte{0, 0, 0, 68, 0, 1, 0, 15, 0, 0, 38, 94, 0, 32, 99, 111, 110, 115, 117, 109, 101, 114, 45, 102, 114, 97, 117, 100, 100, 101, 116, 101, 99, 116, 105, 111, 110, 115, 101, 114, 118, 105, 99, 101, 45, 49, 0, 0, 0, 1, 244, 0, 0, 0, 1, 3, 32, 0, 0, 0, 33, 62, 224, 94, 0, 0, 30, 44, 1, 1, 1, 0, 1, 70, 99, 111, 110, 115, 117, 109, 101, 114, 45, 102, 114, 97, 117, 100, 100, 101, 116, 101, 99, 116, 105, 111, 110, 115, 101, 114, 118, 105, 99, 101, 45, 49, 45, 50, 51, 48, 98, 51, 55, 101, 100, 45, 98, 101, 57, 102, 45, 52, 97, 53, 99, 45, 97, 52},
expected: &KafkaInfo{
ClientID: "consumer-frauddetectionservice-1",
Operation: Fetch,
Topic: "",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about making topic equal to * when we have an UUID? It seems similar to what we do for HTTP...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will make it clear that we have new protocol when we debug this in production.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will display process * as span name, are we ok with that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so, * usually in OTel means something high cardinality, so UUID will fit that I think. It will also help us spot the difference when we failed to read, compared to what we knew we couldn't read.

TopicOffset: 67,
},
},
{
name: "Produce request (v7)",
input: []byte{0, 0, 0, 123, 0, 0, 0, 7, 0, 0, 0, 2, 0, 6, 115, 97, 114, 97, 109, 97, 255, 255, 255, 255, 0, 0, 39, 16, 0, 0, 0, 1, 0, 9, 105, 109, 112, 111, 114, 116, 97, 110, 116, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 72, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 60, 0, 0, 0, 0, 2, 249, 236, 167, 144, 0, 0, 0, 0, 0, 0, 0, 0, 1, 143, 191, 130, 165, 117, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 0, 0, 0, 0, 0, 0, 0, 1, 20, 0, 0, 0, 1, 8, 100, 97, 116, 97, 0},
expected: &KafkaInfo{
ClientID: "sarama",
Expand All @@ -32,6 +52,16 @@ func TestProcessKafkaRequest(t *testing.T) {
TopicOffset: 28,
},
},
{
name: "Produce request (v9)",
input: []byte{0, 0, 0, 124, 0, 0, 0, 9, 0, 0, 0, 8, 0, 10, 112, 114, 111, 100, 117, 99, 101, 114, 45, 49, 0, 0, 0, 1, 0, 0, 117, 48, 2, 9, 109, 121, 45, 116, 111, 112, 105, 99, 2, 0, 0, 0, 0, 78, 103, 0, 0, 0, 1, 2, 0, 0, 9, 109, 121, 45, 116, 111, 112, 105, 99, 193, 136, 51, 44, 67, 57, 71, 124, 178, 93, 33, 21, 191, 31, 138, 233, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 2, 0, 0, 0, 1, 2, 0, 0, 0, 1, 1, 0, 128, 0, 0, 0, 0, 0, 0, 0, 5, 0, 0, 16, 0, 0, 0, 4, 0, 0, 17},
expected: &KafkaInfo{
ClientID: "producer-1",
Operation: Produce,
Topic: "my-topic",
TopicOffset: 28,
},
},
{
name: "Invalid request",
input: []byte{0, 0, 0, 1, 0, 0, 0, 7, 0, 0, 0, 2, 0, 6, 115, 97, 114, 97, 109, 97, 255, 255, 255, 255, 0, 0, 39, 16, 0, 0, 0, 1, 0, 9, 105, 109, 112, 111, 114, 116, 97, 110, 116, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 72},
Expand Down
3 changes: 3 additions & 0 deletions pkg/internal/export/otel/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,9 @@ func TraceName(span *request.Span) string {
}
return span.Method
case request.EventTypeKafkaClient:
if span.Path == "" {
return span.Method
}
return fmt.Sprintf("%s %s", span.Path, span.Method)
}
return ""
Expand Down
45 changes: 45 additions & 0 deletions test/integration/components/javakafka/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
FROM ghcr.io/graalvm/native-image:ol8-java17-22 AS javabuilder

# Install tar and gzip to extract the Maven binaries
RUN microdnf update \
&& microdnf install --nodocs \
tar \
gzip \
&& microdnf clean all \
&& rm -rf /var/cache/yum

# Install Maven
ARG USER_HOME_DIR="/cache"
ARG MAVEN_DOWNLOAD_URL=https://dlcdn.apache.org/maven/maven-3/3.9.4/binaries/apache-maven-3.9.4-bin.tar.gz

RUN mkdir -p /usr/share/maven /usr/share/maven/ref \
&& curl -L -o /tmp/apache-maven.tar.gz ${MAVEN_DOWNLOAD_URL} \
&& tar -xzf /tmp/apache-maven.tar.gz -C /usr/share/maven --strip-components=1 \
&& rm -f /tmp/apache-maven.tar.gz \
&& ln -s /usr/share/maven/bin/mvn /usr/bin/mvn

ENV JAVA_HOME /usr/lib64/graalvm/graalvm22-ce-java17
ENV MAVEN_HOME /usr/share/maven
ENV MAVEN_CONFIG "$USER_HOME_DIR/.m2"

# Set the working directory to /home/app
WORKDIR /build

# Copy the source code into the image for building
COPY test/integration/components/javakafka/src src/
COPY test/integration/components/javakafka/pom.xml pom.xml

RUN java -version
RUN mvn -version

# Build
RUN mvn -Pnative native:compile

# The App Image
FROM debian:bookworm-slim

EXPOSE 8080

# Copy the native executable into the containers
COPY --from=javabuilder /build/target/javakafka ./javakafka
ENTRYPOINT ["/javakafka"]
52 changes: 52 additions & 0 deletions test/integration/components/javakafka/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>de.fstab.demo</groupId>
<artifactId>javakafka</artifactId>
<version>1.0.0</version>

<properties>
<java.version>17</java.version>
</properties>

<dependencies>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>

</dependencies>

<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<plugin>
<groupId>org.graalvm.buildtools</groupId>
<artifactId>native-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.2.0</version>
<configuration>
<archive>
<manifest>
<mainClass>Kafka</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>

</project>
Loading
Loading