diff --git a/pkg/internal/ebpf/common/kafka_detect_transform.go b/pkg/internal/ebpf/common/kafka_detect_transform.go
index 6a88bb3b8..700fb3708 100644
--- a/pkg/internal/ebpf/common/kafka_detect_transform.go
+++ b/pkg/internal/ebpf/common/kafka_detect_transform.go
@@ -3,6 +3,7 @@ package ebpfcommon
import (
"encoding/binary"
"errors"
+ "regexp"
"unsafe"
trace2 "go.opentelemetry.io/otel/trace"
@@ -45,6 +46,8 @@ func (k Operation) String() string {
const KafkaMinLength = 14
+var topicRegex = regexp.MustCompile("\x02\t(.*)\x02")
+
// ProcessKafkaRequest processes a TCP packet and returns error if the packet is not a valid Kafka request.
// Otherwise, return kafka.Info with the processed data.
func ProcessPossibleKafkaEvent(pkt []byte, rpkt []byte) (*KafkaInfo, error) {
@@ -52,10 +55,10 @@ func ProcessPossibleKafkaEvent(pkt []byte, rpkt []byte) (*KafkaInfo, error) {
if err != nil {
k, err = ProcessKafkaRequest(rpkt)
}
-
return k, err
}
+// https://kafka.apache.org/protocol.html
func ProcessKafkaRequest(pkt []byte) (*KafkaInfo, error) {
k := &KafkaInfo{}
if len(pkt) < KafkaMinLength {
@@ -81,7 +84,7 @@ func ProcessKafkaRequest(pkt []byte) (*KafkaInfo, error) {
return k, err
}
- topic, err := getTopicName(pkt, offset)
+ topic, err := getTopicName(pkt, offset, k.Operation, header.APIVersion)
if err != nil {
return k, err
}
@@ -110,11 +113,11 @@ func isValidKafkaHeader(header *Header) bool {
}
switch Operation(header.APIKey) {
case Fetch:
- if header.APIVersion > 11 {
+ if header.APIVersion > 16 { // latest: Fetch Request (Version: 16)
return false
}
case Produce:
- if header.APIVersion == 0 || header.APIVersion > 8 {
+ if header.APIVersion == 0 || header.APIVersion > 10 { // latest: Produce Request (Version: 10)
return false
}
default:
@@ -182,31 +185,65 @@ func isValidClientID(buffer []byte, realClientIDSize int) bool {
return isValidKafkaString(buffer, len(buffer), realClientIDSize, true)
}
-func getTopicName(pkt []byte, offset int) (string, error) {
+func getTopicName(pkt []byte, offset int, op Operation, apiVersion int16) (string, error) {
+ if apiVersion >= 13 { // topic name is only a UUID, no need to parse it
+ return "*", nil
+ }
+
offset += 4
if offset > len(pkt) {
return "", errors.New("invalid buffer length")
}
- topicNameSize := int16(binary.BigEndian.Uint16(pkt[offset:]))
- if topicNameSize <= 0 || topicNameSize > 255 {
- return "", errors.New("invalid topic name size")
+ topicNameSize, err := getTopicNameSize(pkt, offset, op, apiVersion)
+ if err != nil {
+ return "", err
}
offset += 2
if offset > len(pkt) {
return "", nil
}
- maxLen := offset + int(topicNameSize)
+ maxLen := offset + topicNameSize
if len(pkt) < maxLen {
maxLen = len(pkt)
}
topicName := pkt[offset:maxLen]
+ if op == Fetch && apiVersion > 11 {
+ // topic name has the following format: uuid\x00\x02\tTOPIC\x02\x00
+ topicName = []byte(extractTopic(string(topicName)))
+ }
if isValidKafkaString(topicName, len(topicName), int(topicNameSize), false) {
return string(topicName), nil
}
return "", errors.New("invalid topic name")
}
+func extractTopic(input string) string {
+ matches := topicRegex.FindStringSubmatch(input)
+ if len(matches) > 1 {
+ return matches[1]
+ }
+ return ""
+}
+
+func getTopicNameSize(pkt []byte, offset int, op Operation, apiVersion int16) (int, error) {
+ topicNameSize := 0
+ if (op == Produce && apiVersion > 7) || (op == Fetch && apiVersion > 11) { // topic is a compact string
+ var err error
+ topicNameSize, err = readUnsignedVarint(pkt[offset+1:])
+ topicNameSize--
+ if err != nil {
+ return 0, err
+ }
+ } else {
+ topicNameSize = int(binary.BigEndian.Uint16(pkt[offset:]))
+ }
+ if topicNameSize <= 0 {
+ return 0, errors.New("invalid topic name size")
+ }
+ return topicNameSize, nil
+}
+
func getTopicOffsetFromProduceOperation(header *Header, pkt []byte, offset *int) (bool, error) {
if header.APIVersion >= 3 {
if len(pkt) < *offset+2 {
@@ -235,7 +272,9 @@ func getTopicOffsetFromProduceOperation(header *Header, pkt []byte, offset *int)
if timeoutMS < 0 {
return false, nil
}
- *offset += 4
+ if header.APIVersion <= 7 {
+ *offset += 4
+ }
return true, nil
}
@@ -243,6 +282,10 @@ func getTopicOffsetFromProduceOperation(header *Header, pkt []byte, offset *int)
func getTopicOffsetFromFetchOperation(header *Header) int {
offset := 3 * 4 // 3 * sizeof(int32)
+ if header.APIVersion >= 15 {
+ offset -= 4 // no replica id
+ }
+
if header.APIVersion >= 3 {
offset += 4 // max_bytes
if header.APIVersion >= 4 {
@@ -256,6 +299,24 @@ func getTopicOffsetFromFetchOperation(header *Header) int {
return offset
}
+func readUnsignedVarint(data []byte) (int, error) {
+ value := 0
+ i := 0
+ for idx := 0; idx < len(data); idx++ {
+ b := data[idx]
+ if (b & 0x80) == 0 {
+ value |= int(b) << i
+ return value, nil
+ }
+ value |= int(b&0x7F) << i
+ i += 7
+ if i > 28 {
+ return 0, errors.New("illegal varint")
+ }
+ }
+ return 0, errors.New("data ended before varint was complete")
+}
+
func TCPToKafkaToSpan(trace *TCPRequestInfo, data *KafkaInfo) request.Span {
peer := ""
hostname := ""
diff --git a/pkg/internal/ebpf/common/kafka_detect_transform_test.go b/pkg/internal/ebpf/common/kafka_detect_transform_test.go
index 56562ef57..d4b774319 100644
--- a/pkg/internal/ebpf/common/kafka_detect_transform_test.go
+++ b/pkg/internal/ebpf/common/kafka_detect_transform_test.go
@@ -1,6 +1,7 @@
package ebpfcommon
import (
+ "errors"
"testing"
"github.com/stretchr/testify/assert"
@@ -13,7 +14,7 @@ func TestProcessKafkaRequest(t *testing.T) {
expected *KafkaInfo
}{
{
- name: "Fetch request",
+ name: "Fetch request (v11)",
input: []byte{0, 0, 0, 94, 0, 1, 0, 11, 0, 0, 0, 224, 0, 6, 115, 97, 114, 97, 109, 97, 255, 255, 255, 255, 0, 0, 1, 244, 0, 0, 0, 1, 6, 64, 0, 0, 0, 0, 0, 0, 0, 255, 255, 255, 255, 0, 0, 0, 1, 0, 9, 105, 109, 112, 111, 114, 116, 97, 110, 116, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 19, 0, 0, 0, 0, 0, 0, 0, 0, 0, 16, 0, 0, 0, 0, 0, 0, 0, 0},
expected: &KafkaInfo{
ClientID: "sarama",
@@ -23,7 +24,27 @@ func TestProcessKafkaRequest(t *testing.T) {
},
},
{
- name: "Produce request",
+ name: "Fetch request (v12)",
+ input: []byte{0, 0, 0, 52, 0, 1, 0, 12, 0, 0, 1, 3, 0, 12, 99, 111, 110, 115, 117, 109, 101, 114, 45, 49, 45, 49, 0, 255, 255, 255, 255, 0, 0, 1, 244, 0, 0, 0, 1, 3, 32, 0, 0, 0, 30, 37, 158, 231, 0, 0, 0, 156, 1, 1, 1, 0, 53, 99, 48, 57, 45, 52, 52, 48, 48, 45, 98, 54, 101, 101, 45, 56, 54, 102, 97, 102, 101, 102, 57, 52, 102, 101, 98, 0, 2, 9, 109, 121, 45, 116, 111, 112, 105, 99, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 100, 0, 0, 0, 0, 1, 0, 0, 0, 101, 121, 12, 118, 97, 108, 117, 101, 51, 0, 30, 0, 0},
+ expected: &KafkaInfo{
+ ClientID: "consumer-1-1",
+ Operation: Fetch,
+ Topic: "my-topic",
+ TopicOffset: 51,
+ },
+ },
+ {
+ name: "Fetch request (v15)",
+ input: []byte{0, 0, 0, 68, 0, 1, 0, 15, 0, 0, 38, 94, 0, 32, 99, 111, 110, 115, 117, 109, 101, 114, 45, 102, 114, 97, 117, 100, 100, 101, 116, 101, 99, 116, 105, 111, 110, 115, 101, 114, 118, 105, 99, 101, 45, 49, 0, 0, 0, 1, 244, 0, 0, 0, 1, 3, 32, 0, 0, 0, 33, 62, 224, 94, 0, 0, 30, 44, 1, 1, 1, 0, 1, 70, 99, 111, 110, 115, 117, 109, 101, 114, 45, 102, 114, 97, 117, 100, 100, 101, 116, 101, 99, 116, 105, 111, 110, 115, 101, 114, 118, 105, 99, 101, 45, 49, 45, 50, 51, 48, 98, 51, 55, 101, 100, 45, 98, 101, 57, 102, 45, 52, 97, 53, 99, 45, 97, 52},
+ expected: &KafkaInfo{
+ ClientID: "consumer-frauddetectionservice-1",
+ Operation: Fetch,
+ Topic: "*",
+ TopicOffset: 67,
+ },
+ },
+ {
+ name: "Produce request (v7)",
input: []byte{0, 0, 0, 123, 0, 0, 0, 7, 0, 0, 0, 2, 0, 6, 115, 97, 114, 97, 109, 97, 255, 255, 255, 255, 0, 0, 39, 16, 0, 0, 0, 1, 0, 9, 105, 109, 112, 111, 114, 116, 97, 110, 116, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 72, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 60, 0, 0, 0, 0, 2, 249, 236, 167, 144, 0, 0, 0, 0, 0, 0, 0, 0, 1, 143, 191, 130, 165, 117, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 0, 0, 0, 0, 0, 0, 0, 1, 20, 0, 0, 0, 1, 8, 100, 97, 116, 97, 0},
expected: &KafkaInfo{
ClientID: "sarama",
@@ -32,6 +53,16 @@ func TestProcessKafkaRequest(t *testing.T) {
TopicOffset: 28,
},
},
+ {
+ name: "Produce request (v9)",
+ input: []byte{0, 0, 0, 124, 0, 0, 0, 9, 0, 0, 0, 8, 0, 10, 112, 114, 111, 100, 117, 99, 101, 114, 45, 49, 0, 0, 0, 1, 0, 0, 117, 48, 2, 9, 109, 121, 45, 116, 111, 112, 105, 99, 2, 0, 0, 0, 0, 78, 103, 0, 0, 0, 1, 2, 0, 0, 9, 109, 121, 45, 116, 111, 112, 105, 99, 193, 136, 51, 44, 67, 57, 71, 124, 178, 93, 33, 21, 191, 31, 138, 233, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 2, 0, 0, 0, 1, 2, 0, 0, 0, 1, 1, 0, 128, 0, 0, 0, 0, 0, 0, 0, 5, 0, 0, 16, 0, 0, 0, 4, 0, 0, 17},
+ expected: &KafkaInfo{
+ ClientID: "producer-1",
+ Operation: Produce,
+ Topic: "my-topic",
+ TopicOffset: 28,
+ },
+ },
{
name: "Invalid request",
input: []byte{0, 0, 0, 1, 0, 0, 0, 7, 0, 0, 0, 2, 0, 6, 115, 97, 114, 97, 109, 97, 255, 255, 255, 255, 0, 0, 39, 16, 0, 0, 0, 1, 0, 9, 105, 109, 112, 111, 114, 116, 97, 110, 116, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 72},
@@ -141,3 +172,91 @@ func TestIsValidKafkaHeader(t *testing.T) {
result = isValidKafkaHeader(header)
assert.False(t, result)
}
+
+func TestReadUnsignedVarint(t *testing.T) {
+ tests := []struct {
+ name string
+ data []byte
+ expected int
+ err error
+ }{
+ {
+ name: "Valid varint",
+ data: []byte{0x8E, 0x02},
+ expected: 270,
+ err: nil,
+ },
+ {
+ name: "Valid varint with multiple bytes",
+ data: []byte{0x8E, 0x8E, 0x02},
+ expected: 34574,
+ err: nil,
+ },
+ {
+ name: "Illegal varint",
+ data: []byte{0x8E, 0x8E, 0x8E, 0x8E, 0x8E, 0x8E, 0x8E, 0x8E},
+ expected: 0,
+ err: errors.New("illegal varint"),
+ },
+ {
+ name: "Incomplete varint",
+ data: []byte{0x8E},
+ expected: 0,
+ err: errors.New("data ended before varint was complete"),
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ result, err := readUnsignedVarint(tt.data)
+ assert.Equal(t, tt.expected, result)
+ assert.Equal(t, tt.err, err)
+ })
+ }
+}
+
+func TestExtractTopic(t *testing.T) {
+ tests := []struct {
+ name string
+ input string
+ output string
+ }{
+ {
+ name: "Valid topic",
+ input: "c09-4400-b6ee-86fafef94feb\x00\x02\tmy-topic\x02\x00\x00\x00\x00\x00\x00",
+ output: "my-topic",
+ },
+ {
+ name: "Valid topic (without uuid)",
+ input: "\x00\x02\tmy_topic\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00d\x00\x00",
+ output: "my_topic",
+ },
+ {
+ name: "Invalid format (without \t)",
+ input: "\x00\x02my_topic\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00d\x00\x00",
+ output: "",
+ },
+ {
+ name: "Invalid format (without \x02)",
+ input: "\x00\tmy_topic\x00\x00\x00\x00\x00\x00\x00\x00",
+ output: "",
+ },
+ {
+ name: "No topic",
+ input: "\x02\t\x02",
+ output: "",
+ },
+ {
+ name: "Invalid input",
+ input: "invalid",
+ output: "",
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ result := extractTopic(tt.input)
+ assert.Equal(t, tt.output, result)
+ })
+ }
+}
diff --git a/pkg/internal/export/otel/traces.go b/pkg/internal/export/otel/traces.go
index 9b74118e1..73b066b06 100644
--- a/pkg/internal/export/otel/traces.go
+++ b/pkg/internal/export/otel/traces.go
@@ -617,6 +617,9 @@ func TraceName(span *request.Span) string {
}
return span.Method
case request.EventTypeKafkaClient:
+ if span.Path == "" {
+ return span.Method
+ }
return fmt.Sprintf("%s %s", span.Path, span.Method)
}
return ""
diff --git a/test/integration/components/javakafka/Dockerfile b/test/integration/components/javakafka/Dockerfile
new file mode 100644
index 000000000..5a7a8e7fe
--- /dev/null
+++ b/test/integration/components/javakafka/Dockerfile
@@ -0,0 +1,45 @@
+FROM ghcr.io/graalvm/native-image:ol8-java17-22 AS javabuilder
+
+# Install tar and gzip to extract the Maven binaries
+RUN microdnf update \
+ && microdnf install --nodocs \
+ tar \
+ gzip \
+ && microdnf clean all \
+ && rm -rf /var/cache/yum
+
+# Install Maven
+ARG USER_HOME_DIR="/cache"
+ARG MAVEN_DOWNLOAD_URL=https://dlcdn.apache.org/maven/maven-3/3.9.4/binaries/apache-maven-3.9.4-bin.tar.gz
+
+RUN mkdir -p /usr/share/maven /usr/share/maven/ref \
+ && curl -L -o /tmp/apache-maven.tar.gz ${MAVEN_DOWNLOAD_URL} \
+ && tar -xzf /tmp/apache-maven.tar.gz -C /usr/share/maven --strip-components=1 \
+ && rm -f /tmp/apache-maven.tar.gz \
+ && ln -s /usr/share/maven/bin/mvn /usr/bin/mvn
+
+ENV JAVA_HOME /usr/lib64/graalvm/graalvm22-ce-java17
+ENV MAVEN_HOME /usr/share/maven
+ENV MAVEN_CONFIG "$USER_HOME_DIR/.m2"
+
+# Set the working directory to /home/app
+WORKDIR /build
+
+# Copy the source code into the image for building
+COPY test/integration/components/javakafka/src src/
+COPY test/integration/components/javakafka/pom.xml pom.xml
+
+RUN java -version
+RUN mvn -version
+
+# Build
+RUN mvn -Pnative native:compile
+
+# The App Image
+FROM debian:bookworm-slim
+
+EXPOSE 8080
+
+# Copy the native executable into the containers
+COPY --from=javabuilder /build/target/javakafka ./javakafka
+ENTRYPOINT ["/javakafka"]
\ No newline at end of file
diff --git a/test/integration/components/javakafka/pom.xml b/test/integration/components/javakafka/pom.xml
new file mode 100644
index 000000000..2aecd8dec
--- /dev/null
+++ b/test/integration/components/javakafka/pom.xml
@@ -0,0 +1,52 @@
+
+
+
+ 4.0.0
+ de.fstab.demo
+ javakafka
+ 1.0.0
+
+
+ 17
+
+
+
+
+
+ org.apache.kafka
+ kafka-clients
+ 2.8.0
+
+
+
+
+
+
+
+ maven-compiler-plugin
+
+
+ ${java.version}
+
+
+
+ org.graalvm.buildtools
+ native-maven-plugin
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+ 3.2.0
+
+
+
+ Kafka
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/test/integration/components/javakafka/src/main/java/Kafka.java b/test/integration/components/javakafka/src/main/java/Kafka.java
new file mode 100644
index 000000000..47d74d1d6
--- /dev/null
+++ b/test/integration/components/javakafka/src/main/java/Kafka.java
@@ -0,0 +1,80 @@
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Properties;
+
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import com.sun.net.httpserver.HttpServer;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+
+public class Kafka {
+
+ public static void main(String[] args) {
+ try {
+ // Producer
+ Properties producerProps = new Properties();
+ producerProps.put("bootstrap.servers", "kafka:9092");
+ producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ producerProps.put("partitioner.class", "org.apache.kafka.clients.producer.RoundRobinPartitioner"); // Explicit partitioner
+ producerProps.put("inter.broker.protocol.version", "2.8");
+
+ Thread producerThread = new Thread(() -> {
+ KafkaProducer producer = new KafkaProducer<>(producerProps);
+ for (int i = 0; i < 100; i++) {
+ producer.send(new ProducerRecord<>("my-topic", "key", "value" + i));
+ System.out.println("Produced message: " + "value" + i);
+ }
+ producer.close();
+ });
+
+ // Consumer
+ Properties consumerProps = new Properties();
+ consumerProps.put("bootstrap.servers", "kafka:9092");
+ consumerProps.put("group.id", "1");
+ consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ consumerProps.put("auto.offset.reset", "earliest");
+
+ Thread consumerThread = new Thread(() -> {
+ KafkaConsumer consumer = new KafkaConsumer<>(consumerProps);
+ consumer.subscribe(Collections.singletonList("my-topic"));
+
+ while (true) {
+ System.out.println("Polling for new messages...");
+ ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));
+ for (ConsumerRecord record : records) {
+ System.out.printf("Consumed message: offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
+ }
+ }
+ });
+
+ HttpServer server = HttpServer.create(new InetSocketAddress(8080), 0);
+ server.createContext("/message", new HttpHandler() {
+ @Override
+ public void handle(HttpExchange exchange) throws IOException {
+ String response = "OK";
+ exchange.sendResponseHeaders(200, response.length());
+ OutputStream os = exchange.getResponseBody();
+ os.write(response.getBytes());
+ os.close();
+ }
+ });
+
+ producerThread.start();
+ consumerThread.start();
+ server.start();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+}
\ No newline at end of file
diff --git a/test/integration/components/pythonkafka/main.py b/test/integration/components/pythonkafka/main.py
index 61029c39f..980177106 100644
--- a/test/integration/components/pythonkafka/main.py
+++ b/test/integration/components/pythonkafka/main.py
@@ -5,8 +5,12 @@
from kafka import errors as Errors
from kafka.admin import NewTopic
from http.server import BaseHTTPRequestHandler, HTTPServer
+import logging
+logger = logging.getLogger(__name__)
+logging.basicConfig(level=logging.INFO)
+
class Producer(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
@@ -85,6 +89,7 @@ def main():
topic = NewTopic(name='my-topic',
num_partitions=1,
replication_factor=1)
+ logger.info(f"Creating topic: {topic}")
admin.create_topics([topic])
break
except Errors.TopicAlreadyExistsError:
diff --git a/test/oats/kafka/docker-compose-beyla-java-kafka.yml b/test/oats/kafka/docker-compose-beyla-java-kafka.yml
new file mode 100644
index 000000000..8dbaef17e
--- /dev/null
+++ b/test/oats/kafka/docker-compose-beyla-java-kafka.yml
@@ -0,0 +1,60 @@
+services:
+ zookeeper:
+ image: confluentinc/cp-zookeeper:7.6.1
+ environment:
+ ZOOKEEPER_CLIENT_PORT: 2181
+ ZOOKEEPER_TICK_TIME: 2000
+ ports:
+ - "2181:2181"
+
+ kafka:
+ image: confluentinc/cp-kafka:7.6.1
+ depends_on:
+ - zookeeper
+ environment:
+ KAFKA_BROKER_ID: 1
+ KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+ KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:9093
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
+ KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+ ports:
+ - "9092:9092"
+ - "9093:9093"
+ testserver:
+ build:
+ context: ../../..
+ dockerfile: ./test/integration/components/javakafka/Dockerfile
+ image: javakafka
+ ports:
+ - "8080:8080"
+ depends_on:
+ kafka:
+ condition: service_started
+ # eBPF auto instrumenter
+ autoinstrumenter:
+ build:
+ context: ../../..
+ dockerfile: ./test/integration/components/beyla/Dockerfile
+ command:
+ - --config=/configs/instrumenter-config-traces.yml
+ volumes:
+ - {{ .ConfigDir }}:/configs
+ - ./testoutput/run:/var/run/beyla
+ - ../../../testoutput:/coverage
+ privileged: true # in some environments (not GH Pull Requests) you can set it to false and then cap_add: [ SYS_ADMIN ]
+ network_mode: "service:testserver"
+ pid: "service:testserver"
+ environment:
+ GOCOVERDIR: "/coverage"
+ BEYLA_PRINT_TRACES: "true"
+ #BEYLA_EXECUTABLE_NAME: "java"
+ BEYLA_OPEN_PORT: {{ .ApplicationPort }}
+ BEYLA_SERVICE_NAMESPACE: "integration-test"
+ BEYLA_METRICS_INTERVAL: "10ms"
+ BEYLA_BPF_BATCH_TIMEOUT: "10ms"
+ BEYLA_LOG_LEVEL: "DEBUG"
+ OTEL_EXPORTER_OTLP_ENDPOINT: "http://collector:4318"
+ depends_on:
+ testserver:
+ condition: service_started
\ No newline at end of file
diff --git a/test/oats/kafka/docker-compose-beyla-kafka.yml b/test/oats/kafka/docker-compose-beyla-python-kafka.yml
similarity index 100%
rename from test/oats/kafka/docker-compose-beyla-kafka.yml
rename to test/oats/kafka/docker-compose-beyla-python-kafka.yml
diff --git a/test/oats/kafka/yaml/oats_kafka.yaml b/test/oats/kafka/yaml/oats_java_kafka.yaml
similarity index 96%
rename from test/oats/kafka/yaml/oats_kafka.yaml
rename to test/oats/kafka/yaml/oats_java_kafka.yaml
index 8c23acbbb..54ae69d83 100644
--- a/test/oats/kafka/yaml/oats_kafka.yaml
+++ b/test/oats/kafka/yaml/oats_java_kafka.yaml
@@ -1,7 +1,7 @@
docker-compose:
generator: generic
files:
- - ../docker-compose-beyla-kafka.yml
+ - ../docker-compose-beyla-java-kafka.yml
input:
- path: '/message'
@@ -35,3 +35,4 @@ expected:
value: "== 0"
- promql: 'messaging_process_duration_bucket{le="10"}'
value: "> 0"
+
diff --git a/test/oats/kafka/yaml/oats_python_kafka.yaml b/test/oats/kafka/yaml/oats_python_kafka.yaml
new file mode 100644
index 000000000..b190cc8eb
--- /dev/null
+++ b/test/oats/kafka/yaml/oats_python_kafka.yaml
@@ -0,0 +1,37 @@
+docker-compose:
+ generator: generic
+ files:
+ - ../docker-compose-beyla-python-kafka.yml
+input:
+ - path: '/message'
+
+interval: 500ms
+expected:
+ traces:
+ - traceql: '{ .messaging.operation.type = "process" }'
+ spans:
+ - name: 'my-topic process'
+ attributes:
+ messaging.destination.name: my-topic
+ messaging.operation.type: process
+ messaging.system: kafka
+ - traceql: '{ .messaging.operation.type = "publish" }'
+ spans:
+ - name: 'my-topic publish'
+ attributes:
+ messaging.destination.name: my-topic
+ messaging.operation.type: publish
+ messaging.system: kafka
+ metrics:
+ - promql: 'messaging_publish_duration_count{messaging_system="kafka", messaging_destination_name="my-topic"}'
+ value: "> 0"
+ - promql: 'messaging_publish_duration_bucket{le="0"}'
+ value: "== 0"
+ - promql: 'messaging_publish_duration_bucket{le="10"}'
+ value: "> 0"
+ - promql: 'messaging_process_duration_count{messaging_system="kafka", messaging_destination_name="my-topic"}'
+ value: "> 0"
+ - promql: 'messaging_process_duration_bucket{le="0"}'
+ value: "== 0"
+ - promql: 'messaging_process_duration_bucket{le="10"}'
+ value: "> 0"