Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support kafka fetch version 15 #903

Merged
merged 10 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 71 additions & 10 deletions pkg/internal/ebpf/common/kafka_detect_transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ebpfcommon
import (
"encoding/binary"
"errors"
"regexp"
"unsafe"

trace2 "go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -45,17 +46,19 @@ func (k Operation) String() string {

const KafkaMinLength = 14

var topicRegex = regexp.MustCompile("\x02\t(.*)\x02")

// ProcessKafkaRequest processes a TCP packet and returns error if the packet is not a valid Kafka request.
// Otherwise, return kafka.Info with the processed data.
func ProcessPossibleKafkaEvent(pkt []byte, rpkt []byte) (*KafkaInfo, error) {
k, err := ProcessKafkaRequest(pkt)
if err != nil {
k, err = ProcessKafkaRequest(rpkt)
}

return k, err
}

// https://kafka.apache.org/protocol.html
func ProcessKafkaRequest(pkt []byte) (*KafkaInfo, error) {
k := &KafkaInfo{}
if len(pkt) < KafkaMinLength {
Expand All @@ -81,7 +84,7 @@ func ProcessKafkaRequest(pkt []byte) (*KafkaInfo, error) {
return k, err
}

topic, err := getTopicName(pkt, offset)
topic, err := getTopicName(pkt, offset, k.Operation, header.APIVersion)
if err != nil {
return k, err
}
Expand Down Expand Up @@ -110,11 +113,11 @@ func isValidKafkaHeader(header *Header) bool {
}
switch Operation(header.APIKey) {
case Fetch:
if header.APIVersion > 11 {
if header.APIVersion > 16 { // latest: Fetch Request (Version: 16)
return false
}
case Produce:
if header.APIVersion == 0 || header.APIVersion > 8 {
if header.APIVersion == 0 || header.APIVersion > 10 { // latest: Produce Request (Version: 10)
return false
}
default:
Expand Down Expand Up @@ -182,31 +185,65 @@ func isValidClientID(buffer []byte, realClientIDSize int) bool {
return isValidKafkaString(buffer, len(buffer), realClientIDSize, true)
}

func getTopicName(pkt []byte, offset int) (string, error) {
func getTopicName(pkt []byte, offset int, op Operation, apiVersion int16) (string, error) {
if apiVersion >= 13 { // topic name is only a UUID, no need to parse it
return "*", nil
}

offset += 4
if offset > len(pkt) {
return "", errors.New("invalid buffer length")
}
topicNameSize := int16(binary.BigEndian.Uint16(pkt[offset:]))
if topicNameSize <= 0 || topicNameSize > 255 {
return "", errors.New("invalid topic name size")
topicNameSize, err := getTopicNameSize(pkt, offset, op, apiVersion)
if err != nil {
return "", err
}
offset += 2

if offset > len(pkt) {
return "", nil
}
maxLen := offset + int(topicNameSize)
maxLen := offset + topicNameSize
if len(pkt) < maxLen {
maxLen = len(pkt)
}
topicName := pkt[offset:maxLen]
if op == Fetch && apiVersion > 11 {
// topic name has the following format: uuid\x00\x02\tTOPIC\x02\x00
topicName = []byte(extractTopic(string(topicName)))
}
if isValidKafkaString(topicName, len(topicName), int(topicNameSize), false) {
return string(topicName), nil
}
return "", errors.New("invalid topic name")
}

func extractTopic(input string) string {
matches := topicRegex.FindStringSubmatch(input)
if len(matches) > 1 {
return matches[1]
}
return ""
}

func getTopicNameSize(pkt []byte, offset int, op Operation, apiVersion int16) (int, error) {
topicNameSize := 0
if (op == Produce && apiVersion > 7) || (op == Fetch && apiVersion > 11) { // topic is a compact string
var err error
topicNameSize, err = readUnsignedVarint(pkt[offset+1:])
topicNameSize--
if err != nil {
return 0, err
}
} else {
topicNameSize = int(binary.BigEndian.Uint16(pkt[offset:]))
}
if topicNameSize <= 0 {
return 0, errors.New("invalid topic name size")
}
return topicNameSize, nil
}

func getTopicOffsetFromProduceOperation(header *Header, pkt []byte, offset *int) (bool, error) {
if header.APIVersion >= 3 {
if len(pkt) < *offset+2 {
Expand Down Expand Up @@ -235,14 +272,20 @@ func getTopicOffsetFromProduceOperation(header *Header, pkt []byte, offset *int)
if timeoutMS < 0 {
return false, nil
}
*offset += 4
if header.APIVersion <= 7 {
*offset += 4
}

return true, nil
}

func getTopicOffsetFromFetchOperation(header *Header) int {
offset := 3 * 4 // 3 * sizeof(int32)

if header.APIVersion >= 15 {
offset -= 4 // no replica id
}

if header.APIVersion >= 3 {
offset += 4 // max_bytes
if header.APIVersion >= 4 {
Expand All @@ -256,6 +299,24 @@ func getTopicOffsetFromFetchOperation(header *Header) int {
return offset
}

func readUnsignedVarint(data []byte) (int, error) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mind adding unit tests for this function? Encode some larger numbers, make sure they are decoded correctly.

value := 0
i := 0
for idx := 0; idx < len(data); idx++ {
b := data[idx]
if (b & 0x80) == 0 {
value |= int(b) << i
return value, nil
}
value |= int(b&0x7F) << i
i += 7
if i > 28 {
return 0, errors.New("illegal varint")
}
}
return 0, errors.New("data ended before varint was complete")
}

func TCPToKafkaToSpan(trace *TCPRequestInfo, data *KafkaInfo) request.Span {
peer := ""
hostname := ""
Expand Down
123 changes: 121 additions & 2 deletions pkg/internal/ebpf/common/kafka_detect_transform_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ebpfcommon

import (
"errors"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -13,7 +14,7 @@ func TestProcessKafkaRequest(t *testing.T) {
expected *KafkaInfo
}{
{
name: "Fetch request",
name: "Fetch request (v11)",
input: []byte{0, 0, 0, 94, 0, 1, 0, 11, 0, 0, 0, 224, 0, 6, 115, 97, 114, 97, 109, 97, 255, 255, 255, 255, 0, 0, 1, 244, 0, 0, 0, 1, 6, 64, 0, 0, 0, 0, 0, 0, 0, 255, 255, 255, 255, 0, 0, 0, 1, 0, 9, 105, 109, 112, 111, 114, 116, 97, 110, 116, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 19, 0, 0, 0, 0, 0, 0, 0, 0, 0, 16, 0, 0, 0, 0, 0, 0, 0, 0},
expected: &KafkaInfo{
ClientID: "sarama",
Expand All @@ -23,7 +24,27 @@ func TestProcessKafkaRequest(t *testing.T) {
},
},
{
name: "Produce request",
name: "Fetch request (v12)",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make a test where the string doesn't have \t inside, so the split doesn't work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also perhaps a test where there's no \x02\x00?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, but I don't have an example of that. I can manually generate one but not sure if that's a realistic scenario.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, just manually crafted would be great. The reason I mentioned this is because there are so many traps here with the kafka protocol, I'd like to make the code as defensive as possible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ended testing the function to extract the topic itself, do you think is enough to test this?

input: []byte{0, 0, 0, 52, 0, 1, 0, 12, 0, 0, 1, 3, 0, 12, 99, 111, 110, 115, 117, 109, 101, 114, 45, 49, 45, 49, 0, 255, 255, 255, 255, 0, 0, 1, 244, 0, 0, 0, 1, 3, 32, 0, 0, 0, 30, 37, 158, 231, 0, 0, 0, 156, 1, 1, 1, 0, 53, 99, 48, 57, 45, 52, 52, 48, 48, 45, 98, 54, 101, 101, 45, 56, 54, 102, 97, 102, 101, 102, 57, 52, 102, 101, 98, 0, 2, 9, 109, 121, 45, 116, 111, 112, 105, 99, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 100, 0, 0, 0, 0, 1, 0, 0, 0, 101, 121, 12, 118, 97, 108, 117, 101, 51, 0, 30, 0, 0},
expected: &KafkaInfo{
ClientID: "consumer-1-1",
Operation: Fetch,
Topic: "my-topic",
TopicOffset: 51,
},
},
{
name: "Fetch request (v15)",
input: []byte{0, 0, 0, 68, 0, 1, 0, 15, 0, 0, 38, 94, 0, 32, 99, 111, 110, 115, 117, 109, 101, 114, 45, 102, 114, 97, 117, 100, 100, 101, 116, 101, 99, 116, 105, 111, 110, 115, 101, 114, 118, 105, 99, 101, 45, 49, 0, 0, 0, 1, 244, 0, 0, 0, 1, 3, 32, 0, 0, 0, 33, 62, 224, 94, 0, 0, 30, 44, 1, 1, 1, 0, 1, 70, 99, 111, 110, 115, 117, 109, 101, 114, 45, 102, 114, 97, 117, 100, 100, 101, 116, 101, 99, 116, 105, 111, 110, 115, 101, 114, 118, 105, 99, 101, 45, 49, 45, 50, 51, 48, 98, 51, 55, 101, 100, 45, 98, 101, 57, 102, 45, 52, 97, 53, 99, 45, 97, 52},
expected: &KafkaInfo{
ClientID: "consumer-frauddetectionservice-1",
Operation: Fetch,
Topic: "*",
TopicOffset: 67,
},
},
{
name: "Produce request (v7)",
input: []byte{0, 0, 0, 123, 0, 0, 0, 7, 0, 0, 0, 2, 0, 6, 115, 97, 114, 97, 109, 97, 255, 255, 255, 255, 0, 0, 39, 16, 0, 0, 0, 1, 0, 9, 105, 109, 112, 111, 114, 116, 97, 110, 116, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 72, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 60, 0, 0, 0, 0, 2, 249, 236, 167, 144, 0, 0, 0, 0, 0, 0, 0, 0, 1, 143, 191, 130, 165, 117, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 0, 0, 0, 0, 0, 0, 0, 1, 20, 0, 0, 0, 1, 8, 100, 97, 116, 97, 0},
expected: &KafkaInfo{
ClientID: "sarama",
Expand All @@ -32,6 +53,16 @@ func TestProcessKafkaRequest(t *testing.T) {
TopicOffset: 28,
},
},
{
name: "Produce request (v9)",
input: []byte{0, 0, 0, 124, 0, 0, 0, 9, 0, 0, 0, 8, 0, 10, 112, 114, 111, 100, 117, 99, 101, 114, 45, 49, 0, 0, 0, 1, 0, 0, 117, 48, 2, 9, 109, 121, 45, 116, 111, 112, 105, 99, 2, 0, 0, 0, 0, 78, 103, 0, 0, 0, 1, 2, 0, 0, 9, 109, 121, 45, 116, 111, 112, 105, 99, 193, 136, 51, 44, 67, 57, 71, 124, 178, 93, 33, 21, 191, 31, 138, 233, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 2, 0, 0, 0, 1, 2, 0, 0, 0, 1, 1, 0, 128, 0, 0, 0, 0, 0, 0, 0, 5, 0, 0, 16, 0, 0, 0, 4, 0, 0, 17},
expected: &KafkaInfo{
ClientID: "producer-1",
Operation: Produce,
Topic: "my-topic",
TopicOffset: 28,
},
},
{
name: "Invalid request",
input: []byte{0, 0, 0, 1, 0, 0, 0, 7, 0, 0, 0, 2, 0, 6, 115, 97, 114, 97, 109, 97, 255, 255, 255, 255, 0, 0, 39, 16, 0, 0, 0, 1, 0, 9, 105, 109, 112, 111, 114, 116, 97, 110, 116, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 72},
Expand Down Expand Up @@ -141,3 +172,91 @@ func TestIsValidKafkaHeader(t *testing.T) {
result = isValidKafkaHeader(header)
assert.False(t, result)
}

func TestReadUnsignedVarint(t *testing.T) {
tests := []struct {
name string
data []byte
expected int
err error
}{
{
name: "Valid varint",
data: []byte{0x8E, 0x02},
expected: 270,
err: nil,
},
{
name: "Valid varint with multiple bytes",
data: []byte{0x8E, 0x8E, 0x02},
expected: 34574,
err: nil,
},
{
name: "Illegal varint",
data: []byte{0x8E, 0x8E, 0x8E, 0x8E, 0x8E, 0x8E, 0x8E, 0x8E},
expected: 0,
err: errors.New("illegal varint"),
},
{
name: "Incomplete varint",
data: []byte{0x8E},
expected: 0,
err: errors.New("data ended before varint was complete"),
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result, err := readUnsignedVarint(tt.data)
assert.Equal(t, tt.expected, result)
assert.Equal(t, tt.err, err)
})
}
}

func TestExtractTopic(t *testing.T) {
tests := []struct {
name string
input string
output string
}{
{
name: "Valid topic",
input: "c09-4400-b6ee-86fafef94feb\x00\x02\tmy-topic\x02\x00\x00\x00\x00\x00\x00",
output: "my-topic",
},
{
name: "Valid topic (without uuid)",
input: "\x00\x02\tmy_topic\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00d\x00\x00",
output: "my_topic",
},
{
name: "Invalid format (without \t)",
input: "\x00\x02my_topic\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00d\x00\x00",
output: "",
},
{
name: "Invalid format (without \x02)",
input: "\x00\tmy_topic\x00\x00\x00\x00\x00\x00\x00\x00",
output: "",
},
{
name: "No topic",
input: "\x02\t\x02",
output: "",
},
{
name: "Invalid input",
input: "invalid",
output: "",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := extractTopic(tt.input)
assert.Equal(t, tt.output, result)
})
}
}
3 changes: 3 additions & 0 deletions pkg/internal/export/otel/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,9 @@ func TraceName(span *request.Span) string {
}
return span.Method
case request.EventTypeKafkaClient:
if span.Path == "" {
return span.Method
}
return fmt.Sprintf("%s %s", span.Path, span.Method)
}
return ""
Expand Down
45 changes: 45 additions & 0 deletions test/integration/components/javakafka/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
FROM ghcr.io/graalvm/native-image:ol8-java17-22 AS javabuilder

# Install tar and gzip to extract the Maven binaries
RUN microdnf update \
&& microdnf install --nodocs \
tar \
gzip \
&& microdnf clean all \
&& rm -rf /var/cache/yum

# Install Maven
ARG USER_HOME_DIR="/cache"
ARG MAVEN_DOWNLOAD_URL=https://dlcdn.apache.org/maven/maven-3/3.9.4/binaries/apache-maven-3.9.4-bin.tar.gz

RUN mkdir -p /usr/share/maven /usr/share/maven/ref \
&& curl -L -o /tmp/apache-maven.tar.gz ${MAVEN_DOWNLOAD_URL} \
&& tar -xzf /tmp/apache-maven.tar.gz -C /usr/share/maven --strip-components=1 \
&& rm -f /tmp/apache-maven.tar.gz \
&& ln -s /usr/share/maven/bin/mvn /usr/bin/mvn

ENV JAVA_HOME /usr/lib64/graalvm/graalvm22-ce-java17
ENV MAVEN_HOME /usr/share/maven
ENV MAVEN_CONFIG "$USER_HOME_DIR/.m2"

# Set the working directory to /home/app
WORKDIR /build

# Copy the source code into the image for building
COPY test/integration/components/javakafka/src src/
COPY test/integration/components/javakafka/pom.xml pom.xml

RUN java -version
RUN mvn -version

# Build
RUN mvn -Pnative native:compile

# The App Image
FROM debian:bookworm-slim

EXPOSE 8080

# Copy the native executable into the containers
COPY --from=javabuilder /build/target/javakafka ./javakafka
ENTRYPOINT ["/javakafka"]
Loading
Loading