diff --git a/Makefile b/Makefile index dd7903a84..18f00e1bc 100644 --- a/Makefile +++ b/Makefile @@ -287,13 +287,18 @@ oats-test-redis-other-langs: oats-prereq mkdir -p test/oats/redis_other_langs/$(TEST_OUTPUT)/run cd test/oats/redis_other_langs && TESTCASE_BASE_PATH=./yaml $(GINKGO) -v -r +.PHONY: oats-test-kafka +oats-test-kafka: oats-prereq + mkdir -p test/oats/kafka/$(TEST_OUTPUT)/run + cd test/oats/kafka && TESTCASE_TIMEOUT=120s TESTCASE_BASE_PATH=./yaml $(GINKGO) -v -r + .PHONY: oats-test -oats-test: oats-test-sql oats-test-sql-statement oats-test-sql-other-langs oats-test-redis-other-langs +oats-test: oats-test-sql oats-test-sql-statement oats-test-sql-other-langs oats-test-redis-other-langs oats-test-kafka $(MAKE) itest-coverage-data .PHONY: oats-test-debug oats-test-debug: oats-prereq - cd test/oats/sql_statement && TESTCASE_BASE_PATH=./yaml TESTCASE_MANUAL_DEBUG=true TESTCASE_TIMEOUT=1h $(GINKGO) -v -r + cd test/oats/kafka && TESTCASE_BASE_PATH=./yaml TESTCASE_MANUAL_DEBUG=true TESTCASE_TIMEOUT=1h $(GINKGO) -v -r .PHONY: drone drone: diff --git a/bpf/http_types.h b/bpf/http_types.h index f397617d7..1b01a97a2 100644 --- a/bpf/http_types.h +++ b/bpf/http_types.h @@ -15,7 +15,7 @@ #define KPROBES_LARGE_RESPONSE_LEN 100000 // 100K and above we try to track the response actual time with kretprobes #define K_TCP_MAX_LEN 256 -#define K_TCP_RES_LEN 24 +#define K_TCP_RES_LEN 128 #define CONN_INFO_FLAG_TRACE 0x1 diff --git a/docs/sources/metrics.md b/docs/sources/metrics.md index ad8b1efa0..6a9e8c452 100644 --- a/docs/sources/metrics.md +++ b/docs/sources/metrics.md @@ -25,25 +25,25 @@ The following table describes the exported metrics in both OpenTelemetry and Pro | Application | `rpc.server.duration` | `rpc_server_duration_seconds` | Histogram | seconds | Duration of RPC service calls from the server side | | Application | `sql.client.duration` | `sql_client_duration_seconds` | Histogram | seconds | Duration of SQL client operations (Experimental) | | Application | `redis.client.duration` | `redis_client_duration_seconds` | Histogram | seconds | Duration of Redis client operations (Experimental) | +| Application | `messaging.publish.duration` | `messaging_publish_duration` | Histogram | seconds | Duration of Messaging (Kafka) publish operations (Experimental) | +| Application | `messaging.process.duration` | `messaging_process_duration` | Histogram | seconds | Duration of Messaging (Kafka) process operations (Experimental) | | Network | `beyla.network.flow.bytes` | `beyla_network_flow_bytes` | Counter | bytes | Bytes submitted from a source network endpoint to a destination network endpoint | Beyla can also export [Span metrics](/docs/tempo/latest/metrics-generator/span_metrics/) and -[Service graph metrics](/docs/tempo/latest/metrics-generator/service-graph-view/), which can be enabled via the +[Service graph metrics](/docs/tempo/latest/metrics-generator/service-graph-view/), which you can enable via the [features]({{< relref "./configure/options.md" >}}) configuration option. ## Attributes of Beyla metrics -For the sake of brevity, only the OTEL `dot.notation` of the metrics and attributes is listed, but -the metrics and attributes are exposed `underscore_notation` when a Prometheus exporter is used. +For the sake of brevity, the metrics and attributes in this list use the OTEL `dot.notation`. When using the Prometheus exporter, the metrics use `underscore_notation`. -In order to hide attributes that are shown by default, or show attributes that are hidden by -default, check the `attributes`->`select` section in the [configuration documentation]({{< relref "./configure/options.md" >}}). +In order to configure which attributes to show or which attributes to hide, check the `attributes`->`select` section in the [configuration documentation]({{< relref "./configure/options.md" >}}). | Metrics | Name | Default | |--------------------------------|-----------------------------|-----------------------------------------------| | Application (all) | `http.request.method` | shown | | Application (all) | `http.response.status_code` | shown | -| Application (all) | `http.route` | shown if `routes` configuration is defined | +| Application (all) | `http.route` | shown if `routes` configuration section exists | | Application (all) | `k8s.daemonset.name` | shown if `attributes.kubernetes.enable` | | Application (all) | `k8s.deployment.name` | shown if `attributes.kubernetes.enable` | | Application (all) | `k8s.namespace.name` | shown if `attributes.kubernetes.enable` | @@ -66,9 +66,11 @@ default, check the `attributes`->`select` section in the [configuration document | `beyla.network.flow.bytes` | `beyla.ip` | hidden | | `db.client.operation.duration` | `db.operation.name` | shown | | `db.client.operation.duration` | `db.collection.name` | hidden | +| `message.publish.duration` | `messaging.system` | shown | +| `message.publish.duration` | `messaging.destination.name`| shown | | `beyla.network.flow.bytes` | `direction` | hidden | | `beyla.network.flow.bytes` | `dst.address` | hidden | -| `beyla.network.flow.bytes` | `dst.cidr` | shown if the `cidrs` configuration is defined | +| `beyla.network.flow.bytes` | `dst.cidr` | shown if the `cidrs` configuration section exists | | `beyla.network.flow.bytes` | `dst.name` | hidden | | `beyla.network.flow.bytes` | `dst.port` | hidden | | `beyla.network.flow.bytes` | `iface` | hidden | @@ -87,7 +89,7 @@ default, check the `attributes`->`select` section in the [configuration document | `beyla.network.flow.bytes` | `k8s.src.owner.type` | hidden | | `beyla.network.flow.bytes` | `k8s.src.type` | hidden | | `beyla.network.flow.bytes` | `src.address` | hidden | -| `beyla.network.flow.bytes` | `src.cidr` | shown if the `cidrs` configuration is defined | +| `beyla.network.flow.bytes` | `src.cidr` | shown if the `cidrs` configuration section exists | | `beyla.network.flow.bytes` | `src.name` | hidden | | `beyla.network.flow.bytes` | `src.port` | hidden | | `beyla.network.flow.bytes` | `transport` | hidden | diff --git a/pkg/internal/ebpf/common/bpf_bpfel_arm64.go b/pkg/internal/ebpf/common/bpf_bpfel_arm64.go index 230440b6b..09335506f 100644 --- a/pkg/internal/ebpf/common/bpf_bpfel_arm64.go +++ b/pkg/internal/ebpf/common/bpf_bpfel_arm64.go @@ -178,7 +178,7 @@ type bpfTcpReqT struct { StartMonotimeNs uint64 EndMonotimeNs uint64 Buf [256]uint8 - Rbuf [24]uint8 + Rbuf [128]uint8 Len uint32 RespLen uint32 Ssl uint8 diff --git a/pkg/internal/ebpf/common/bpf_bpfel_arm64.o b/pkg/internal/ebpf/common/bpf_bpfel_arm64.o index c5f7cfe87..2cb134e60 100644 Binary files a/pkg/internal/ebpf/common/bpf_bpfel_arm64.o and b/pkg/internal/ebpf/common/bpf_bpfel_arm64.o differ diff --git a/pkg/internal/ebpf/common/bpf_bpfel_x86.go b/pkg/internal/ebpf/common/bpf_bpfel_x86.go index 907d81b25..2ec81db71 100644 --- a/pkg/internal/ebpf/common/bpf_bpfel_x86.go +++ b/pkg/internal/ebpf/common/bpf_bpfel_x86.go @@ -178,7 +178,7 @@ type bpfTcpReqT struct { StartMonotimeNs uint64 EndMonotimeNs uint64 Buf [256]uint8 - Rbuf [24]uint8 + Rbuf [128]uint8 Len uint32 RespLen uint32 Ssl uint8 diff --git a/pkg/internal/ebpf/common/bpf_bpfel_x86.o b/pkg/internal/ebpf/common/bpf_bpfel_x86.o index c5f7cfe87..2cb134e60 100644 Binary files a/pkg/internal/ebpf/common/bpf_bpfel_x86.o and b/pkg/internal/ebpf/common/bpf_bpfel_x86.o differ diff --git a/pkg/internal/ebpf/common/kafka_detect_transform.go b/pkg/internal/ebpf/common/kafka_detect_transform.go new file mode 100644 index 000000000..956398acb --- /dev/null +++ b/pkg/internal/ebpf/common/kafka_detect_transform.go @@ -0,0 +1,225 @@ +package ebpfcommon + +import ( + "encoding/binary" + "errors" + + "github.com/grafana/beyla/pkg/internal/request" +) + +type Operation int8 + +const ( + Produce Operation = 0 + Fetch Operation = 1 +) + +type Header struct { + MessageSize int32 + APIKey int16 + APIVersion int16 + CorrelationID int32 + ClientIDSize int16 +} + +type KafkaInfo struct { + Operation Operation + Topic string + ClientID string + TopicOffset int +} + +func (k Operation) String() string { + switch k { + case Produce: + return request.MessagingPublish + case Fetch: + return request.MessagingProcess + default: + return "unknown" + } +} + +const KafaMinLength = 14 + +// ProcessKafkaRequest processes a TCP packet and returns error if the packet is not a valid Kafka request. +// Otherwise, return kafka.Info with the processed data. +func ProcessPossibleKafkaEvent(pkt []byte, rpkt []byte) (*KafkaInfo, error) { + k, err := ProcessKafkaRequest(pkt) + if err != nil { + k, err = ProcessKafkaRequest(rpkt) + } + + return k, err +} + +func ProcessKafkaRequest(pkt []byte) (*KafkaInfo, error) { + k := &KafkaInfo{} + if len(pkt) < KafaMinLength { + return k, errors.New("packet too short") + } + + header := &Header{ + MessageSize: int32(binary.BigEndian.Uint32(pkt[0:4])), + APIKey: int16(binary.BigEndian.Uint16(pkt[4:6])), + APIVersion: int16(binary.BigEndian.Uint16(pkt[6:8])), + CorrelationID: int32(binary.BigEndian.Uint32(pkt[8:12])), + ClientIDSize: int16(binary.BigEndian.Uint16(pkt[12:14])), + } + + if !isValidKafkaHeader(header) { + return k, errors.New("invalid Kafka request header") + } + + offset := KafaMinLength + if header.ClientIDSize > 0 { + clientID := pkt[offset : offset+int(header.ClientIDSize)] + if !isValidClientID(clientID, int(header.ClientIDSize)) { + return k, errors.New("invalid client ID") + } + offset += int(header.ClientIDSize) + k.ClientID = string(clientID) + } else if header.ClientIDSize < -1 { + return k, errors.New("invalid client ID size") + } + + switch Operation(header.APIKey) { + case Produce: + ok, err := getTopicOffsetFromProduceOperation(header, pkt, &offset) + if !ok || err != nil { + return k, err + } + k.Operation = Produce + k.TopicOffset = offset + case Fetch: + offset += getTopicOffsetFromFetchOperation(header) + k.Operation = Fetch + k.TopicOffset = offset + default: + return k, errors.New("invalid Kafka operation") + } + topic, err := getTopicName(pkt, offset) + if err != nil { + return k, err + } + k.Topic = topic + return k, nil +} + +func isValidKafkaHeader(header *Header) bool { + if header.MessageSize < int32(KafaMinLength) || header.APIVersion < 0 { + return false + } + switch Operation(header.APIKey) { + case Fetch: + if header.APIVersion > 11 { + return false + } + case Produce: + if header.APIVersion == 0 || header.APIVersion > 8 { + return false + } + default: + return false + } + if header.CorrelationID < 0 { + return false + } + return header.ClientIDSize >= -1 +} + +// nolint:cyclop +func isValidKafkaString(buffer []byte, maxBufferSize, realSize int, printableOk bool) bool { + for j := 0; j < maxBufferSize; j++ { + if j >= realSize { + break + } + ch := buffer[j] + if ('a' <= ch && ch <= 'z') || ('A' <= ch && ch <= 'Z') || ('0' <= ch && ch <= '9') || ch == '.' || ch == '_' || ch == '-' { + continue + } + if printableOk && (ch >= ' ' && ch <= '~') { + continue + } + return false + } + return true +} + +func isValidClientID(buffer []byte, realClientIDSize int) bool { + return isValidKafkaString(buffer, len(buffer), realClientIDSize, true) +} + +func getTopicName(pkt []byte, offset int) (string, error) { + offset += 4 + if offset > len(pkt) { + return "", errors.New("invalid buffer length") + } + topicNameSize := int16(binary.BigEndian.Uint16(pkt[offset:])) + if topicNameSize <= 0 || topicNameSize > 255 { + return "", errors.New("invalid topic name size") + } + offset += 2 + + if offset > len(pkt) { + return "", nil + } + maxLen := offset + int(topicNameSize) + if len(pkt) < maxLen { + maxLen = len(pkt) + } + topicName := pkt[offset:maxLen] + if isValidKafkaString(topicName, len(topicName), int(topicNameSize), false) { + return string(topicName), nil + } + return "", errors.New("invalid topic name") +} + +func getTopicOffsetFromProduceOperation(header *Header, pkt []byte, offset *int) (bool, error) { + if header.APIVersion >= 3 { + if len(pkt) < *offset+2 { + return false, errors.New("packet too short") + } + transactionalIDSize := int16(binary.BigEndian.Uint16(pkt[*offset:])) + *offset += 2 + if transactionalIDSize > 0 { + *offset += int(transactionalIDSize) + } + } + + if len(pkt) < *offset+2 { + return false, errors.New("packet too short") + } + acks := int16(binary.BigEndian.Uint16(pkt[*offset:])) + if acks < -1 || acks > 1 { + return false, nil + } + *offset += 2 + + if len(pkt) < *offset+4 { + return false, errors.New("packet too short") + } + timeoutMS := int32(binary.BigEndian.Uint32(pkt[*offset:])) + if timeoutMS < 0 { + return false, nil + } + *offset += 4 + + return true, nil +} + +func getTopicOffsetFromFetchOperation(header *Header) int { + offset := 3 * 4 // 3 * sizeof(int32) + + if header.APIVersion >= 3 { + offset += 4 // max_bytes + if header.APIVersion >= 4 { + offset++ // isolation_level + if header.APIVersion >= 7 { + offset += 2 * 4 // session_id + session_epoch + } + } + } + + return offset +} diff --git a/pkg/internal/ebpf/common/kafka_detect_transform_test.go b/pkg/internal/ebpf/common/kafka_detect_transform_test.go new file mode 100644 index 000000000..56562ef57 --- /dev/null +++ b/pkg/internal/ebpf/common/kafka_detect_transform_test.go @@ -0,0 +1,143 @@ +package ebpfcommon + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestProcessKafkaRequest(t *testing.T) { + tests := []struct { + name string + input []byte + expected *KafkaInfo + }{ + { + name: "Fetch request", + input: []byte{0, 0, 0, 94, 0, 1, 0, 11, 0, 0, 0, 224, 0, 6, 115, 97, 114, 97, 109, 97, 255, 255, 255, 255, 0, 0, 1, 244, 0, 0, 0, 1, 6, 64, 0, 0, 0, 0, 0, 0, 0, 255, 255, 255, 255, 0, 0, 0, 1, 0, 9, 105, 109, 112, 111, 114, 116, 97, 110, 116, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 19, 0, 0, 0, 0, 0, 0, 0, 0, 0, 16, 0, 0, 0, 0, 0, 0, 0, 0}, + expected: &KafkaInfo{ + ClientID: "sarama", + Operation: Fetch, + Topic: "important", + TopicOffset: 45, + }, + }, + { + name: "Produce request", + input: []byte{0, 0, 0, 123, 0, 0, 0, 7, 0, 0, 0, 2, 0, 6, 115, 97, 114, 97, 109, 97, 255, 255, 255, 255, 0, 0, 39, 16, 0, 0, 0, 1, 0, 9, 105, 109, 112, 111, 114, 116, 97, 110, 116, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 72, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 60, 0, 0, 0, 0, 2, 249, 236, 167, 144, 0, 0, 0, 0, 0, 0, 0, 0, 1, 143, 191, 130, 165, 117, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 0, 0, 0, 0, 0, 0, 0, 1, 20, 0, 0, 0, 1, 8, 100, 97, 116, 97, 0}, + expected: &KafkaInfo{ + ClientID: "sarama", + Operation: Produce, + Topic: "important", + TopicOffset: 28, + }, + }, + { + name: "Invalid request", + input: []byte{0, 0, 0, 1, 0, 0, 0, 7, 0, 0, 0, 2, 0, 6, 115, 97, 114, 97, 109, 97, 255, 255, 255, 255, 0, 0, 39, 16, 0, 0, 0, 1, 0, 9, 105, 109, 112, 111, 114, 116, 97, 110, 116, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 72}, + expected: &KafkaInfo{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + res, _ := ProcessKafkaRequest(tt.input) + assert.Equal(t, tt.expected, res) + }) + } +} + +func TestGetTopicOffsetFromProduceOperation(t *testing.T) { + header := &Header{ + APIVersion: 3, + } + pkt := []byte{0x00, 0x02, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00} + offset := 0 + + success, err := getTopicOffsetFromProduceOperation(header, pkt, &offset) + + assert.True(t, success) + assert.NoError(t, err) + assert.Equal(t, 10, offset) + + header.APIVersion = 2 + pkt = []byte{0x00, 0x02, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00} + offset = 0 + + success, err = getTopicOffsetFromProduceOperation(header, pkt, &offset) + + assert.False(t, success) + assert.NoError(t, err) + assert.Equal(t, 0, offset) + + pkt = []byte{0x00} + offset = 0 + + success, err = getTopicOffsetFromProduceOperation(header, pkt, &offset) + + assert.False(t, success) + assert.Error(t, err) + assert.Equal(t, 0, offset) +} + +func TestGetTopicOffsetFromFetchOperation(t *testing.T) { + header := &Header{ + APIVersion: 3, + } + + offset := getTopicOffsetFromFetchOperation(header) + expectedOffset := 3*4 + 4 + assert.Equal(t, expectedOffset, offset) + + header.APIVersion = 4 + offset = getTopicOffsetFromFetchOperation(header) + expectedOffset = 3*4 + 5 + assert.Equal(t, expectedOffset, offset) + + header.APIVersion = 7 + offset = getTopicOffsetFromFetchOperation(header) + expectedOffset = 3*4 + 4 + 1 + 2*4 + assert.Equal(t, expectedOffset, offset) + + header.APIVersion = 2 + offset = getTopicOffsetFromFetchOperation(header) + expectedOffset = 3 * 4 + assert.Equal(t, expectedOffset, offset) +} + +func TestIsValidKafkaHeader(t *testing.T) { + header := &Header{ + MessageSize: 100, + APIVersion: 3, + APIKey: 1, + CorrelationID: 123, + ClientIDSize: 0, + } + + result := isValidKafkaHeader(header) + assert.True(t, result) + + header.MessageSize = 10 + result = isValidKafkaHeader(header) + assert.False(t, result) + + header.MessageSize = 100 + header.APIVersion = -1 + result = isValidKafkaHeader(header) + assert.False(t, result) + + header.APIVersion = 3 + header.APIKey = 2 + result = isValidKafkaHeader(header) + assert.False(t, result) + + header.APIKey = 1 + header.CorrelationID = -1 + result = isValidKafkaHeader(header) + assert.False(t, result) + + header.CorrelationID = 123 + header.ClientIDSize = -2 + result = isValidKafkaHeader(header) + assert.False(t, result) +} diff --git a/pkg/internal/ebpf/common/tcp_detect_transform.go b/pkg/internal/ebpf/common/tcp_detect_transform.go index cd849576e..c3e66e9e2 100644 --- a/pkg/internal/ebpf/common/tcp_detect_transform.go +++ b/pkg/internal/ebpf/common/tcp_detect_transform.go @@ -49,6 +49,11 @@ func ReadTCPRequestIntoSpan(record *ringbuf.Record) (request.Span, bool, error) return TCPToRedisToSpan(&event, op, text, status), false, nil } + default: + k, err := ProcessPossibleKafkaEvent(b, event.Rbuf[:]) + if err == nil { + return TCPToKafkaToSpan(&event, k), false, nil + } } return request.Span{}, true, nil // ignore if we couldn't parse it @@ -147,3 +152,37 @@ func isHTTP2(data []uint8, event *TCPRequestInfo) bool { return false } + +func TCPToKafkaToSpan(trace *TCPRequestInfo, data *KafkaInfo) request.Span { + peer := "" + hostname := "" + hostPort := 0 + + if trace.ConnInfo.S_port != 0 || trace.ConnInfo.D_port != 0 { + peer, hostname = trace.reqHostInfo() + hostPort = int(trace.ConnInfo.D_port) + } + return request.Span{ + Type: request.EventTypeKafkaClient, + Method: data.Operation.String(), + OtherNamespace: data.ClientID, + Path: data.Topic, + Peer: peer, + Host: hostname, + HostPort: hostPort, + ContentLength: 0, + RequestStart: int64(trace.StartMonotimeNs), + Start: int64(trace.StartMonotimeNs), + End: int64(trace.EndMonotimeNs), + Status: 0, + TraceID: trace2.TraceID(trace.Tp.TraceId), + SpanID: trace2.SpanID(trace.Tp.SpanId), + ParentSpanID: trace2.SpanID(trace.Tp.ParentId), + Flags: trace.Tp.Flags, + Pid: request.PidInfo{ + HostPID: trace.Pid.HostPid, + UserPID: trace.Pid.UserPid, + Namespace: trace.Pid.Ns, + }, + } +} diff --git a/pkg/internal/ebpf/common/tcp_detect_transform_test.go b/pkg/internal/ebpf/common/tcp_detect_transform_test.go index d39672272..a47ebc0cd 100644 --- a/pkg/internal/ebpf/common/tcp_detect_transform_test.go +++ b/pkg/internal/ebpf/common/tcp_detect_transform_test.go @@ -154,6 +154,24 @@ func TestRedisParsing(t *testing.T) { } +func TestTCPReqKafkaParsing(t *testing.T) { + // kafka message + b := []byte{0, 0, 0, 94, 0, 1, 0, 11, 0, 0, 0, 224, 0, 6, 115, 97, 114, 97, 109, 97, 255, 255, 255, 255, 0, 0, 1, 244, 0, 0, 0, 1, 6, 64, 0, 0, 0, 0, 0, 0, 0, 255, 255, 255, 255, 0, 0, 0, 1, 0, 9, 105, 109, 112, 111, 114, 116, 97, 110, 116, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 19, 0, 0, 0, 0, 0, 0, 0, 0, 0, 16, 0, 0, 0, 0, 0, 0, 0, 0} + r := makeTCPReq(string(b), tcpSend, 343534, 8080, 2000) + k, err := ProcessKafkaRequest(b) + assert.NoError(t, err) + s := TCPToKafkaToSpan(&r, k) + assert.NotNil(t, s) + assert.NotEmpty(t, s.Host) + assert.NotEmpty(t, s.Peer) + assert.Equal(t, s.HostPort, 8080) + assert.Greater(t, s.End, s.Start) + assert.Equal(t, "process", s.Method) + assert.Equal(t, "important", s.Path) + assert.Equal(t, "sarama", s.OtherNamespace) + assert.Equal(t, request.EventTypeKafkaClient, s.Type) +} + const charset = "\\0\\1\\2abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" func randomString(length int) string { diff --git a/pkg/internal/ebpf/httpfltr/bpf_bpfel_arm64.go b/pkg/internal/ebpf/httpfltr/bpf_bpfel_arm64.go index fce226a52..7c3ad1361 100644 --- a/pkg/internal/ebpf/httpfltr/bpf_bpfel_arm64.go +++ b/pkg/internal/ebpf/httpfltr/bpf_bpfel_arm64.go @@ -136,7 +136,7 @@ type bpfTcpReqT struct { StartMonotimeNs uint64 EndMonotimeNs uint64 Buf [256]uint8 - Rbuf [24]uint8 + Rbuf [128]uint8 Len uint32 RespLen uint32 Ssl uint8 diff --git a/pkg/internal/ebpf/httpfltr/bpf_bpfel_arm64.o b/pkg/internal/ebpf/httpfltr/bpf_bpfel_arm64.o index 002321955..43bbd5d60 100644 Binary files a/pkg/internal/ebpf/httpfltr/bpf_bpfel_arm64.o and b/pkg/internal/ebpf/httpfltr/bpf_bpfel_arm64.o differ diff --git a/pkg/internal/ebpf/httpfltr/bpf_bpfel_x86.go b/pkg/internal/ebpf/httpfltr/bpf_bpfel_x86.go index b30cb0c78..1f23bc508 100644 --- a/pkg/internal/ebpf/httpfltr/bpf_bpfel_x86.go +++ b/pkg/internal/ebpf/httpfltr/bpf_bpfel_x86.go @@ -136,7 +136,7 @@ type bpfTcpReqT struct { StartMonotimeNs uint64 EndMonotimeNs uint64 Buf [256]uint8 - Rbuf [24]uint8 + Rbuf [128]uint8 Len uint32 RespLen uint32 Ssl uint8 diff --git a/pkg/internal/ebpf/httpfltr/bpf_bpfel_x86.o b/pkg/internal/ebpf/httpfltr/bpf_bpfel_x86.o index 0da03861f..2c6c93da7 100644 Binary files a/pkg/internal/ebpf/httpfltr/bpf_bpfel_x86.o and b/pkg/internal/ebpf/httpfltr/bpf_bpfel_x86.o differ diff --git a/pkg/internal/ebpf/httpfltr/bpf_debug_bpfel_arm64.go b/pkg/internal/ebpf/httpfltr/bpf_debug_bpfel_arm64.go index de56798c0..c709ffc38 100644 --- a/pkg/internal/ebpf/httpfltr/bpf_debug_bpfel_arm64.go +++ b/pkg/internal/ebpf/httpfltr/bpf_debug_bpfel_arm64.go @@ -136,7 +136,7 @@ type bpf_debugTcpReqT struct { StartMonotimeNs uint64 EndMonotimeNs uint64 Buf [256]uint8 - Rbuf [24]uint8 + Rbuf [128]uint8 Len uint32 RespLen uint32 Ssl uint8 diff --git a/pkg/internal/ebpf/httpfltr/bpf_debug_bpfel_arm64.o b/pkg/internal/ebpf/httpfltr/bpf_debug_bpfel_arm64.o index f4b3fe8ff..fe06e19d6 100644 Binary files a/pkg/internal/ebpf/httpfltr/bpf_debug_bpfel_arm64.o and b/pkg/internal/ebpf/httpfltr/bpf_debug_bpfel_arm64.o differ diff --git a/pkg/internal/ebpf/httpfltr/bpf_debug_bpfel_x86.go b/pkg/internal/ebpf/httpfltr/bpf_debug_bpfel_x86.go index c23c0bc5c..9584ce8b9 100644 --- a/pkg/internal/ebpf/httpfltr/bpf_debug_bpfel_x86.go +++ b/pkg/internal/ebpf/httpfltr/bpf_debug_bpfel_x86.go @@ -136,7 +136,7 @@ type bpf_debugTcpReqT struct { StartMonotimeNs uint64 EndMonotimeNs uint64 Buf [256]uint8 - Rbuf [24]uint8 + Rbuf [128]uint8 Len uint32 RespLen uint32 Ssl uint8 diff --git a/pkg/internal/ebpf/httpfltr/bpf_debug_bpfel_x86.o b/pkg/internal/ebpf/httpfltr/bpf_debug_bpfel_x86.o index 41d9851fd..69a50f1c0 100644 Binary files a/pkg/internal/ebpf/httpfltr/bpf_debug_bpfel_x86.o and b/pkg/internal/ebpf/httpfltr/bpf_debug_bpfel_x86.o differ diff --git a/pkg/internal/ebpf/httpfltr/bpf_tp_bpfel_arm64.go b/pkg/internal/ebpf/httpfltr/bpf_tp_bpfel_arm64.go index 8ab0ba31a..51aeb7dc2 100644 --- a/pkg/internal/ebpf/httpfltr/bpf_tp_bpfel_arm64.go +++ b/pkg/internal/ebpf/httpfltr/bpf_tp_bpfel_arm64.go @@ -136,7 +136,7 @@ type bpf_tpTcpReqT struct { StartMonotimeNs uint64 EndMonotimeNs uint64 Buf [256]uint8 - Rbuf [24]uint8 + Rbuf [128]uint8 Len uint32 RespLen uint32 Ssl uint8 diff --git a/pkg/internal/ebpf/httpfltr/bpf_tp_bpfel_arm64.o b/pkg/internal/ebpf/httpfltr/bpf_tp_bpfel_arm64.o index 8450dcb41..fedde5ef8 100644 Binary files a/pkg/internal/ebpf/httpfltr/bpf_tp_bpfel_arm64.o and b/pkg/internal/ebpf/httpfltr/bpf_tp_bpfel_arm64.o differ diff --git a/pkg/internal/ebpf/httpfltr/bpf_tp_bpfel_x86.go b/pkg/internal/ebpf/httpfltr/bpf_tp_bpfel_x86.go index 5c6bddddc..7f73cf426 100644 --- a/pkg/internal/ebpf/httpfltr/bpf_tp_bpfel_x86.go +++ b/pkg/internal/ebpf/httpfltr/bpf_tp_bpfel_x86.go @@ -136,7 +136,7 @@ type bpf_tpTcpReqT struct { StartMonotimeNs uint64 EndMonotimeNs uint64 Buf [256]uint8 - Rbuf [24]uint8 + Rbuf [128]uint8 Len uint32 RespLen uint32 Ssl uint8 diff --git a/pkg/internal/ebpf/httpfltr/bpf_tp_bpfel_x86.o b/pkg/internal/ebpf/httpfltr/bpf_tp_bpfel_x86.o index 9332c3396..43a5d628a 100644 Binary files a/pkg/internal/ebpf/httpfltr/bpf_tp_bpfel_x86.o and b/pkg/internal/ebpf/httpfltr/bpf_tp_bpfel_x86.o differ diff --git a/pkg/internal/ebpf/httpfltr/bpf_tp_debug_bpfel_arm64.go b/pkg/internal/ebpf/httpfltr/bpf_tp_debug_bpfel_arm64.go index ad40eff15..4714b91dd 100644 --- a/pkg/internal/ebpf/httpfltr/bpf_tp_debug_bpfel_arm64.go +++ b/pkg/internal/ebpf/httpfltr/bpf_tp_debug_bpfel_arm64.go @@ -136,7 +136,7 @@ type bpf_tp_debugTcpReqT struct { StartMonotimeNs uint64 EndMonotimeNs uint64 Buf [256]uint8 - Rbuf [24]uint8 + Rbuf [128]uint8 Len uint32 RespLen uint32 Ssl uint8 diff --git a/pkg/internal/ebpf/httpfltr/bpf_tp_debug_bpfel_arm64.o b/pkg/internal/ebpf/httpfltr/bpf_tp_debug_bpfel_arm64.o index 74f991b46..1be0b0dfc 100644 Binary files a/pkg/internal/ebpf/httpfltr/bpf_tp_debug_bpfel_arm64.o and b/pkg/internal/ebpf/httpfltr/bpf_tp_debug_bpfel_arm64.o differ diff --git a/pkg/internal/ebpf/httpfltr/bpf_tp_debug_bpfel_x86.go b/pkg/internal/ebpf/httpfltr/bpf_tp_debug_bpfel_x86.go index 1c73e9d35..69309a73c 100644 --- a/pkg/internal/ebpf/httpfltr/bpf_tp_debug_bpfel_x86.go +++ b/pkg/internal/ebpf/httpfltr/bpf_tp_debug_bpfel_x86.go @@ -136,7 +136,7 @@ type bpf_tp_debugTcpReqT struct { StartMonotimeNs uint64 EndMonotimeNs uint64 Buf [256]uint8 - Rbuf [24]uint8 + Rbuf [128]uint8 Len uint32 RespLen uint32 Ssl uint8 diff --git a/pkg/internal/ebpf/httpfltr/bpf_tp_debug_bpfel_x86.o b/pkg/internal/ebpf/httpfltr/bpf_tp_debug_bpfel_x86.o index fc6348b47..c0e984923 100644 Binary files a/pkg/internal/ebpf/httpfltr/bpf_tp_debug_bpfel_x86.o and b/pkg/internal/ebpf/httpfltr/bpf_tp_debug_bpfel_x86.o differ diff --git a/pkg/internal/ebpf/httpssl/bpf_bpfel_arm64.go b/pkg/internal/ebpf/httpssl/bpf_bpfel_arm64.go index 162c2f2ac..0e53768d5 100644 --- a/pkg/internal/ebpf/httpssl/bpf_bpfel_arm64.go +++ b/pkg/internal/ebpf/httpssl/bpf_bpfel_arm64.go @@ -114,7 +114,7 @@ type bpfTcpReqT struct { StartMonotimeNs uint64 EndMonotimeNs uint64 Buf [256]uint8 - Rbuf [24]uint8 + Rbuf [128]uint8 Len uint32 RespLen uint32 Ssl uint8 diff --git a/pkg/internal/ebpf/httpssl/bpf_bpfel_arm64.o b/pkg/internal/ebpf/httpssl/bpf_bpfel_arm64.o index 84dbfd3f6..48c8081ed 100644 Binary files a/pkg/internal/ebpf/httpssl/bpf_bpfel_arm64.o and b/pkg/internal/ebpf/httpssl/bpf_bpfel_arm64.o differ diff --git a/pkg/internal/ebpf/httpssl/bpf_bpfel_x86.go b/pkg/internal/ebpf/httpssl/bpf_bpfel_x86.go index 67b2dcb32..441f37e7f 100644 --- a/pkg/internal/ebpf/httpssl/bpf_bpfel_x86.go +++ b/pkg/internal/ebpf/httpssl/bpf_bpfel_x86.go @@ -114,7 +114,7 @@ type bpfTcpReqT struct { StartMonotimeNs uint64 EndMonotimeNs uint64 Buf [256]uint8 - Rbuf [24]uint8 + Rbuf [128]uint8 Len uint32 RespLen uint32 Ssl uint8 diff --git a/pkg/internal/ebpf/httpssl/bpf_bpfel_x86.o b/pkg/internal/ebpf/httpssl/bpf_bpfel_x86.o index de51b74a5..daab9ae63 100644 Binary files a/pkg/internal/ebpf/httpssl/bpf_bpfel_x86.o and b/pkg/internal/ebpf/httpssl/bpf_bpfel_x86.o differ diff --git a/pkg/internal/ebpf/httpssl/bpf_debug_bpfel_arm64.go b/pkg/internal/ebpf/httpssl/bpf_debug_bpfel_arm64.go index 380750c59..208bd8b67 100644 --- a/pkg/internal/ebpf/httpssl/bpf_debug_bpfel_arm64.go +++ b/pkg/internal/ebpf/httpssl/bpf_debug_bpfel_arm64.go @@ -114,7 +114,7 @@ type bpf_debugTcpReqT struct { StartMonotimeNs uint64 EndMonotimeNs uint64 Buf [256]uint8 - Rbuf [24]uint8 + Rbuf [128]uint8 Len uint32 RespLen uint32 Ssl uint8 diff --git a/pkg/internal/ebpf/httpssl/bpf_debug_bpfel_arm64.o b/pkg/internal/ebpf/httpssl/bpf_debug_bpfel_arm64.o index fedfae101..62aae3bca 100644 Binary files a/pkg/internal/ebpf/httpssl/bpf_debug_bpfel_arm64.o and b/pkg/internal/ebpf/httpssl/bpf_debug_bpfel_arm64.o differ diff --git a/pkg/internal/ebpf/httpssl/bpf_debug_bpfel_x86.go b/pkg/internal/ebpf/httpssl/bpf_debug_bpfel_x86.go index aa6c1e724..e70b76e2b 100644 --- a/pkg/internal/ebpf/httpssl/bpf_debug_bpfel_x86.go +++ b/pkg/internal/ebpf/httpssl/bpf_debug_bpfel_x86.go @@ -114,7 +114,7 @@ type bpf_debugTcpReqT struct { StartMonotimeNs uint64 EndMonotimeNs uint64 Buf [256]uint8 - Rbuf [24]uint8 + Rbuf [128]uint8 Len uint32 RespLen uint32 Ssl uint8 diff --git a/pkg/internal/ebpf/httpssl/bpf_debug_bpfel_x86.o b/pkg/internal/ebpf/httpssl/bpf_debug_bpfel_x86.o index d5d2d8959..22f6b3f31 100644 Binary files a/pkg/internal/ebpf/httpssl/bpf_debug_bpfel_x86.o and b/pkg/internal/ebpf/httpssl/bpf_debug_bpfel_x86.o differ diff --git a/pkg/internal/ebpf/httpssl/bpf_tp_bpfel_arm64.go b/pkg/internal/ebpf/httpssl/bpf_tp_bpfel_arm64.go index 613e81aa0..2c4d3d58f 100644 --- a/pkg/internal/ebpf/httpssl/bpf_tp_bpfel_arm64.go +++ b/pkg/internal/ebpf/httpssl/bpf_tp_bpfel_arm64.go @@ -114,7 +114,7 @@ type bpf_tpTcpReqT struct { StartMonotimeNs uint64 EndMonotimeNs uint64 Buf [256]uint8 - Rbuf [24]uint8 + Rbuf [128]uint8 Len uint32 RespLen uint32 Ssl uint8 diff --git a/pkg/internal/ebpf/httpssl/bpf_tp_bpfel_arm64.o b/pkg/internal/ebpf/httpssl/bpf_tp_bpfel_arm64.o index 900ac22a5..2bbbbe61e 100644 Binary files a/pkg/internal/ebpf/httpssl/bpf_tp_bpfel_arm64.o and b/pkg/internal/ebpf/httpssl/bpf_tp_bpfel_arm64.o differ diff --git a/pkg/internal/ebpf/httpssl/bpf_tp_bpfel_x86.go b/pkg/internal/ebpf/httpssl/bpf_tp_bpfel_x86.go index fbb1742e0..ac9938d52 100644 --- a/pkg/internal/ebpf/httpssl/bpf_tp_bpfel_x86.go +++ b/pkg/internal/ebpf/httpssl/bpf_tp_bpfel_x86.go @@ -114,7 +114,7 @@ type bpf_tpTcpReqT struct { StartMonotimeNs uint64 EndMonotimeNs uint64 Buf [256]uint8 - Rbuf [24]uint8 + Rbuf [128]uint8 Len uint32 RespLen uint32 Ssl uint8 diff --git a/pkg/internal/ebpf/httpssl/bpf_tp_bpfel_x86.o b/pkg/internal/ebpf/httpssl/bpf_tp_bpfel_x86.o index 24c2a4e34..cc7493ad3 100644 Binary files a/pkg/internal/ebpf/httpssl/bpf_tp_bpfel_x86.o and b/pkg/internal/ebpf/httpssl/bpf_tp_bpfel_x86.o differ diff --git a/pkg/internal/ebpf/httpssl/bpf_tp_debug_bpfel_arm64.go b/pkg/internal/ebpf/httpssl/bpf_tp_debug_bpfel_arm64.go index 28d94b03b..ddf5126e1 100644 --- a/pkg/internal/ebpf/httpssl/bpf_tp_debug_bpfel_arm64.go +++ b/pkg/internal/ebpf/httpssl/bpf_tp_debug_bpfel_arm64.go @@ -114,7 +114,7 @@ type bpf_tp_debugTcpReqT struct { StartMonotimeNs uint64 EndMonotimeNs uint64 Buf [256]uint8 - Rbuf [24]uint8 + Rbuf [128]uint8 Len uint32 RespLen uint32 Ssl uint8 diff --git a/pkg/internal/ebpf/httpssl/bpf_tp_debug_bpfel_arm64.o b/pkg/internal/ebpf/httpssl/bpf_tp_debug_bpfel_arm64.o index 3605298a2..0cfdb455d 100644 Binary files a/pkg/internal/ebpf/httpssl/bpf_tp_debug_bpfel_arm64.o and b/pkg/internal/ebpf/httpssl/bpf_tp_debug_bpfel_arm64.o differ diff --git a/pkg/internal/ebpf/httpssl/bpf_tp_debug_bpfel_x86.go b/pkg/internal/ebpf/httpssl/bpf_tp_debug_bpfel_x86.go index 12f16aa44..22bf69131 100644 --- a/pkg/internal/ebpf/httpssl/bpf_tp_debug_bpfel_x86.go +++ b/pkg/internal/ebpf/httpssl/bpf_tp_debug_bpfel_x86.go @@ -114,7 +114,7 @@ type bpf_tp_debugTcpReqT struct { StartMonotimeNs uint64 EndMonotimeNs uint64 Buf [256]uint8 - Rbuf [24]uint8 + Rbuf [128]uint8 Len uint32 RespLen uint32 Ssl uint8 diff --git a/pkg/internal/ebpf/httpssl/bpf_tp_debug_bpfel_x86.o b/pkg/internal/ebpf/httpssl/bpf_tp_debug_bpfel_x86.o index 74de3c02e..c29209fe5 100644 Binary files a/pkg/internal/ebpf/httpssl/bpf_tp_debug_bpfel_x86.o and b/pkg/internal/ebpf/httpssl/bpf_tp_debug_bpfel_x86.o differ diff --git a/pkg/internal/export/attributes/attr_defs.go b/pkg/internal/export/attributes/attr_defs.go index 258a09cb1..06c800534 100644 --- a/pkg/internal/export/attributes/attr_defs.go +++ b/pkg/internal/export/attributes/attr_defs.go @@ -204,6 +204,20 @@ func getDefinitions(groups AttrGroups) map[Section]AttrReportGroup { attr.DBSystem: true, }, }, + MessagingPublishDuration.Section: { + SubGroups: []*AttrReportGroup{&appAttributes, &appKubeAttributes}, + Attributes: map[attr.Name]Default{ + attr.MessagingSystem: true, + attr.MessagingDestination: true, + }, + }, + MessagingProcessDuration.Section: { + SubGroups: []*AttrReportGroup{&appAttributes, &appKubeAttributes}, + Attributes: map[attr.Name]Default{ + attr.MessagingSystem: true, + attr.MessagingDestination: true, + }, + }, Traces.Section: { Attributes: map[attr.Name]Default{ attr.DBQueryText: false, diff --git a/pkg/internal/export/attributes/metric.go b/pkg/internal/export/attributes/metric.go index 4e3683ea1..ec6697d79 100644 --- a/pkg/internal/export/attributes/metric.go +++ b/pkg/internal/export/attributes/metric.go @@ -61,6 +61,16 @@ var ( Prom: "db_client_operation_duration_seconds", OTEL: "db.client.operation.duration", } + MessagingPublishDuration = Name{ + Section: "messaging.publish.duration", + Prom: "messaging_publish_duration_seconds", + OTEL: "messaging.publish.duration", + } + MessagingProcessDuration = Name{ + Section: "messaging.process.duration", + Prom: "messaging_process_duration_seconds", + OTEL: "messaging.process.duration", + } ) // normalizeMetric will facilitate the user-input in the attributes.enable section. diff --git a/pkg/internal/export/attributes/names/attrs.go b/pkg/internal/export/attributes/names/attrs.go index 6030492d9..ad6397844 100644 --- a/pkg/internal/export/attributes/names/attrs.go +++ b/pkg/internal/export/attributes/names/attrs.go @@ -48,6 +48,9 @@ const ( RPCSystem = Name(semconv.RPCSystemKey) RPCGRPCStatusCode = Name(semconv.RPCGRPCStatusCodeKey) HTTPRoute = Name(semconv.HTTPRouteKey) + MessagingOpType = Name("messaging.operation.type") + MessagingSystem = Name(semconv.MessagingSystemKey) + MessagingDestination = Name(semconv.MessagingDestinationNameKey) K8sNamespaceName = Name("k8s.namespace.name") K8sPodName = Name("k8s.pod.name") diff --git a/pkg/internal/export/debug/debug.go b/pkg/internal/export/debug/debug.go index b1bdc9342..df99d72c9 100644 --- a/pkg/internal/export/debug/debug.go +++ b/pkg/internal/export/debug/debug.go @@ -72,6 +72,8 @@ func spanType(span *request.Span) string { return "SQL" case request.EventTypeRedisClient: return "REDIS" + case request.EventTypeKafkaClient: + return "KAFKA" } return "" diff --git a/pkg/internal/export/otel/metrics.go b/pkg/internal/export/otel/metrics.go index 18091a7bc..28d44032e 100644 --- a/pkg/internal/export/otel/metrics.go +++ b/pkg/internal/export/otel/metrics.go @@ -165,6 +165,8 @@ type MetricsReporter struct { attrGRPCServer []attributes.Field[*request.Span, attribute.KeyValue] attrGRPCClient []attributes.Field[*request.Span, attribute.KeyValue] attrDBClient []attributes.Field[*request.Span, attribute.KeyValue] + attrMessagingPublish []attributes.Field[*request.Span, attribute.KeyValue] + attrMessagingProcess []attributes.Field[*request.Span, attribute.KeyValue] attrHTTPRequestSize []attributes.Field[*request.Span, attribute.KeyValue] attrHTTPClientRequestSize []attributes.Field[*request.Span, attribute.KeyValue] } @@ -181,6 +183,8 @@ type Metrics struct { grpcDuration instrument.Float64Histogram grpcClientDuration instrument.Float64Histogram dbClientDuration instrument.Float64Histogram + msgPublishDuration instrument.Float64Histogram + msgProcessDuration instrument.Float64Histogram httpRequestSize instrument.Float64Histogram httpClientRequestSize instrument.Float64Histogram // trace span metrics @@ -246,6 +250,10 @@ func newMetricsReporter( request.SpanOTELGetters, mr.attributes.For(attributes.RPCClientDuration)) mr.attrDBClient = attributes.OpenTelemetryGetters( request.SpanOTELGetters, mr.attributes.For(attributes.DBClientDuration)) + mr.attrMessagingPublish = attributes.OpenTelemetryGetters( + request.SpanOTELGetters, mr.attributes.For(attributes.MessagingPublishDuration)) + mr.attrMessagingProcess = attributes.OpenTelemetryGetters( + request.SpanOTELGetters, mr.attributes.For(attributes.MessagingProcessDuration)) mr.reporters = NewReporterPool(cfg.ReportersCacheLen, func(id svc.UID, v *Metrics) { @@ -285,6 +293,8 @@ func (mr *MetricsReporter) otelMetricOptions(mlog *slog.Logger) []metric.Option metric.WithView(otelHistogramConfig(attributes.RPCServerDuration.OTEL, mr.cfg.Buckets.DurationHistogram, useExponentialHistograms)), metric.WithView(otelHistogramConfig(attributes.RPCClientDuration.OTEL, mr.cfg.Buckets.DurationHistogram, useExponentialHistograms)), metric.WithView(otelHistogramConfig(attributes.DBClientDuration.OTEL, mr.cfg.Buckets.DurationHistogram, useExponentialHistograms)), + metric.WithView(otelHistogramConfig(attributes.MessagingPublishDuration.OTEL, mr.cfg.Buckets.DurationHistogram, useExponentialHistograms)), + metric.WithView(otelHistogramConfig(attributes.MessagingProcessDuration.OTEL, mr.cfg.Buckets.DurationHistogram, useExponentialHistograms)), metric.WithView(otelHistogramConfig(attributes.HTTPServerRequestSize.OTEL, mr.cfg.Buckets.RequestSizeHistogram, useExponentialHistograms)), metric.WithView(otelHistogramConfig(attributes.HTTPClientRequestSize.OTEL, mr.cfg.Buckets.RequestSizeHistogram, useExponentialHistograms)), } @@ -341,6 +351,14 @@ func (mr *MetricsReporter) setupOtelMeters(m *Metrics, meter instrument.Meter) e if err != nil { return fmt.Errorf("creating db client duration histogram metric: %w", err) } + m.msgPublishDuration, err = meter.Float64Histogram(attributes.MessagingPublishDuration.OTEL, instrument.WithUnit("s")) + if err != nil { + return fmt.Errorf("creating messaging client publish duration histogram metric: %w", err) + } + m.msgProcessDuration, err = meter.Float64Histogram(attributes.MessagingProcessDuration.OTEL, instrument.WithUnit("s")) + if err != nil { + return fmt.Errorf("creating messaging client process duration histogram metric: %w", err) + } m.httpRequestSize, err = meter.Float64Histogram(attributes.HTTPServerRequestSize.OTEL, instrument.WithUnit("By")) if err != nil { return fmt.Errorf("creating http size histogram metric: %w", err) @@ -646,6 +664,7 @@ func withAttributes(span *request.Span, getters []attributes.Field[*request.Span return instrument.WithAttributeSet(attribute.NewSet(attributes...)) } +// nolint:cyclop func (r *Metrics) record(span *request.Span, mr *MetricsReporter) { t := span.Timings() duration := t.End.Sub(t.RequestStart).Seconds() @@ -672,6 +691,15 @@ func (r *Metrics) record(span *request.Span, mr *MetricsReporter) { case request.EventTypeRedisClient, request.EventTypeSQLClient: r.dbClientDuration.Record(r.ctx, duration, withAttributes(span, mr.attrDBClient)) + case request.EventTypeKafkaClient: + switch span.Method { + case request.MessagingPublish: + r.msgPublishDuration.Record(r.ctx, duration, + withAttributes(span, mr.attrMessagingPublish)) + case request.MessagingProcess: + r.msgProcessDuration.Record(r.ctx, duration, + withAttributes(span, mr.attrMessagingProcess)) + } } } diff --git a/pkg/internal/export/otel/traces.go b/pkg/internal/export/otel/traces.go index 9664494e8..099096b09 100644 --- a/pkg/internal/export/otel/traces.go +++ b/pkg/internal/export/otel/traces.go @@ -30,7 +30,7 @@ import ( "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/trace" - semconv "go.opentelemetry.io/otel/semconv/v1.19.0" + semconv "go.opentelemetry.io/otel/semconv/v1.25.0" trace2 "go.opentelemetry.io/otel/trace" "go.uber.org/zap" @@ -519,6 +519,13 @@ func SpanKindString(span *request.Span) string { return "SPAN_KIND_SERVER" case request.EventTypeHTTPClient, request.EventTypeGRPCClient, request.EventTypeSQLClient, request.EventTypeRedisClient: return "SPAN_KIND_CLIENT" + case request.EventTypeKafkaClient: + switch span.Method { + case request.MessagingPublish: + return "SPAN_KIND_PRODUCER" + case request.MessagingProcess: + return "SPAN_KIND_CONSUMER" + } } return "SPAN_KIND_INTERNAL" } @@ -600,6 +607,14 @@ func traceAttributes(span *request.Span, optionalAttrs map[attr.Name]struct{}) [ } } } + case request.EventTypeKafkaClient: + operation := request.MessagingOperationType(span.Method) + attrs = []attribute.KeyValue{ + semconv.MessagingSystemKafka, + semconv.MessagingDestinationName(span.Path), + semconv.MessagingClientID(span.OtherNamespace), + operation, + } } return attrs @@ -632,6 +647,8 @@ func TraceName(span *request.Span) string { return "REDIS" } return span.Method + case request.EventTypeKafkaClient: + return fmt.Sprintf("%s %s", span.Path, span.Method) } return "" } diff --git a/pkg/internal/export/otel/traces_test.go b/pkg/internal/export/otel/traces_test.go index 21704af1a..e5a8afde0 100644 --- a/pkg/internal/export/otel/traces_test.go +++ b/pkg/internal/export/otel/traces_test.go @@ -19,7 +19,7 @@ import ( "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" - semconv "go.opentelemetry.io/otel/semconv/v1.19.0" + semconv "go.opentelemetry.io/otel/semconv/v1.25.0" "go.opentelemetry.io/otel/trace" "github.com/grafana/beyla/pkg/internal/export/attributes" @@ -551,6 +551,24 @@ func TestGenerateTracesAttributes(t *testing.T) { ensureTraceStrAttr(t, attrs, semconv.DBSystemKey, "other_sql") ensureTraceStrAttr(t, attrs, attribute.Key(attr.DBQueryText), "SELECT password FROM credentials WHERE username=\"bill\"") }) + t.Run("test Kafka trace generation", func(t *testing.T) { + span := request.Span{Type: request.EventTypeKafkaClient, Method: "process", Path: "important-topic", OtherNamespace: "test"} + traces := GenerateTraces(&span, map[attr.Name]struct{}{}) + + assert.Equal(t, 1, traces.ResourceSpans().Len()) + assert.Equal(t, 1, traces.ResourceSpans().At(0).ScopeSpans().Len()) + assert.Equal(t, 1, traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans().Len()) + spans := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans() + + assert.NotEmpty(t, spans.At(0).SpanID().String()) + assert.NotEmpty(t, spans.At(0).TraceID().String()) + + attrs := spans.At(0).Attributes() + ensureTraceStrAttr(t, attrs, attribute.Key(attr.MessagingOpType), "process") + ensureTraceStrAttr(t, attrs, semconv.MessagingDestinationNameKey, "important-topic") + ensureTraceStrAttr(t, attrs, semconv.MessagingClientIDKey, "test") + + }) } func TestAttrsToMap(t *testing.T) { diff --git a/pkg/internal/export/prom/prom.go b/pkg/internal/export/prom/prom.go index 58ba40068..ef1094582 100644 --- a/pkg/internal/export/prom/prom.go +++ b/pkg/internal/export/prom/prom.go @@ -138,6 +138,8 @@ type metricsReporter struct { grpcDuration *expire.Expirer[prometheus.Histogram] grpcClientDuration *expire.Expirer[prometheus.Histogram] dbClientDuration *expire.Expirer[prometheus.Histogram] + msgPublishDuration *expire.Expirer[prometheus.Histogram] + msgProcessDuration *expire.Expirer[prometheus.Histogram] httpRequestSize *expire.Expirer[prometheus.Histogram] httpClientRequestSize *expire.Expirer[prometheus.Histogram] @@ -147,6 +149,8 @@ type metricsReporter struct { attrGRPCDuration []attributes.Field[*request.Span, string] attrGRPCClientDuration []attributes.Field[*request.Span, string] attrDBClientDuration []attributes.Field[*request.Span, string] + attrMsgPublishDuration []attributes.Field[*request.Span, string] + attrMsgProcessDuration []attributes.Field[*request.Span, string] attrHTTPRequestSize []attributes.Field[*request.Span, string] attrHTTPClientRequestSize []attributes.Field[*request.Span, string] @@ -220,6 +224,10 @@ func newReporter( attrsProvider.For(attributes.RPCClientDuration)) attrDBClientDuration := attributes.PrometheusGetters(request.SpanPromGetters, attrsProvider.For(attributes.DBClientDuration)) + attrMessagingPublishDuration := attributes.PrometheusGetters(request.SpanPromGetters, + attrsProvider.For(attributes.MessagingPublishDuration)) + attrMessagingProcessDuration := attributes.PrometheusGetters(request.SpanPromGetters, + attrsProvider.For(attributes.MessagingProcessDuration)) clock := expire.NewCachedClock(timeNow) // If service name is not explicitly set, we take the service name as set by the @@ -235,6 +243,8 @@ func newReporter( attrGRPCDuration: attrGRPCDuration, attrGRPCClientDuration: attrGRPCClientDuration, attrDBClientDuration: attrDBClientDuration, + attrMsgPublishDuration: attrMessagingPublishDuration, + attrMsgProcessDuration: attrMessagingProcessDuration, attrHTTPRequestSize: attrHTTPRequestSize, attrHTTPClientRequestSize: attrHTTPClientRequestSize, beylaInfo: expire.NewExpirer[prometheus.Gauge](prometheus.NewGaugeVec(prometheus.GaugeOpts{ @@ -290,6 +300,22 @@ func newReporter( NativeHistogramMaxBucketNumber: defaultHistogramMaxBucketNumber, NativeHistogramMinResetDuration: defaultHistogramMinResetDuration, }, labelNames(attrDBClientDuration)).MetricVec, clock.Time, cfg.TTL), + msgPublishDuration: expire.NewExpirer[prometheus.Histogram](prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: attributes.MessagingPublishDuration.Prom, + Help: "duration of messaging client publish operations, in seconds", + Buckets: cfg.Buckets.DurationHistogram, + NativeHistogramBucketFactor: defaultHistogramBucketFactor, + NativeHistogramMaxBucketNumber: defaultHistogramMaxBucketNumber, + NativeHistogramMinResetDuration: defaultHistogramMinResetDuration, + }, labelNames(attrMessagingPublishDuration)).MetricVec, clock.Time, cfg.TTL), + msgProcessDuration: expire.NewExpirer[prometheus.Histogram](prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: attributes.MessagingProcessDuration.Prom, + Help: "duration of messaging client process operations, in seconds", + Buckets: cfg.Buckets.DurationHistogram, + NativeHistogramBucketFactor: defaultHistogramBucketFactor, + NativeHistogramMaxBucketNumber: defaultHistogramMaxBucketNumber, + NativeHistogramMinResetDuration: defaultHistogramMinResetDuration, + }, labelNames(attrMessagingProcessDuration)).MetricVec, clock.Time, cfg.TTL), httpRequestSize: expire.NewExpirer[prometheus.Histogram](prometheus.NewHistogramVec(prometheus.HistogramOpts{ Name: attributes.HTTPServerRequestSize.Prom, Help: "size, in bytes, of the HTTP request body as received at the server side", @@ -370,6 +396,8 @@ func newReporter( mr.httpClientDuration, mr.grpcClientDuration, mr.dbClientDuration, + mr.msgProcessDuration, + mr.msgPublishDuration, mr.httpRequestSize, mr.httpDuration, mr.grpcDuration) @@ -451,6 +479,17 @@ func (r *metricsReporter) observe(span *request.Span) { r.dbClientDuration.WithLabelValues( labelValues(span, r.attrDBClientDuration)..., ).Observe(duration) + case request.EventTypeKafkaClient: + switch span.Method { + case request.MessagingPublish: + r.msgPublishDuration.WithLabelValues( + labelValues(span, r.attrMsgPublishDuration)..., + ).Observe(duration) + case request.MessagingProcess: + r.msgProcessDuration.WithLabelValues( + labelValues(span, r.attrMsgProcessDuration)..., + ).Observe(duration) + } } } if r.cfg.SpanMetricsEnabled() { diff --git a/pkg/internal/request/metric_attributes.go b/pkg/internal/request/metric_attributes.go index 7521fe3d1..46ae7ed41 100644 --- a/pkg/internal/request/metric_attributes.go +++ b/pkg/internal/request/metric_attributes.go @@ -90,6 +90,10 @@ func DBOperationName(val string) attribute.KeyValue { return attribute.Key(attr.DBOperation).String(val) } +func MessagingOperationType(val string) attribute.KeyValue { + return attribute.Key(attr.MessagingOpType).String(val) +} + func SpanHost(span *Span) string { if span.HostName != "" { return span.HostName diff --git a/pkg/internal/request/span.go b/pkg/internal/request/span.go index 6c66484da..7c944f204 100644 --- a/pkg/internal/request/span.go +++ b/pkg/internal/request/span.go @@ -21,6 +21,7 @@ const ( EventTypeGRPCClient EventTypeSQLClient EventTypeRedisClient + EventTypeKafkaClient ) type IgnoreMode uint8 @@ -30,6 +31,11 @@ const ( IgnoreTraces ) +const ( + MessagingPublish = "publish" + MessagingProcess = "process" +) + type converter struct { clock func() time.Time monoClock func() time.Duration diff --git a/pkg/internal/request/span_getters.go b/pkg/internal/request/span_getters.go index 69c738fc8..4a84af943 100644 --- a/pkg/internal/request/span_getters.go +++ b/pkg/internal/request/span_getters.go @@ -40,6 +40,20 @@ func SpanOTELGetters(name attr.Name) (attributes.Getter[*Span, attribute.KeyValu getter = func(s *Span) attribute.KeyValue { return semconv.ServiceName(s.ServiceID.Name) } case attr.DBOperation: getter = func(span *Span) attribute.KeyValue { return DBOperationName(span.Method) } + case attr.MessagingSystem: + getter = func(span *Span) attribute.KeyValue { + if span.Type == EventTypeKafkaClient { + return semconv.MessagingSystem("kafka") + } + return semconv.MessagingSystem("unknown") + } + case attr.MessagingDestination: + getter = func(span *Span) attribute.KeyValue { + if span.Type == EventTypeKafkaClient { + return semconv.MessagingDestinationName(span.Path) + } + return semconv.MessagingDestinationName("") + } } // default: unlike the Prometheus getters, we don't check here for service name nor k8s metadata // because they are already attributes of the Resource instead of the attributes. @@ -91,6 +105,20 @@ func SpanPromGetters(attrName attr.Name) (attributes.Getter[*Span, string], bool } return "" } + case attr.MessagingSystem: + getter = func(span *Span) string { + if span.Type == EventTypeKafkaClient { + return "kafka" + } + return "unknown" + } + case attr.MessagingDestination: + getter = func(span *Span) string { + if span.Type == EventTypeKafkaClient { + return span.Path + } + return "" + } // resource metadata values below. Unlike OTEL, they are included here because they // belong to the metric, instead of the Resource case attr.ServiceName: diff --git a/test/integration/components/pythonkafka/Dockerfile b/test/integration/components/pythonkafka/Dockerfile new file mode 100644 index 000000000..58136b894 --- /dev/null +++ b/test/integration/components/pythonkafka/Dockerfile @@ -0,0 +1,7 @@ +# Dockerfile that will build a container that runs python +FROM python:3.11 +EXPOSE 8080 +RUN apt update +RUN pip install kafka-python +COPY main.py /main.py +CMD ["python", "main.py"] \ No newline at end of file diff --git a/test/integration/components/pythonkafka/main.py b/test/integration/components/pythonkafka/main.py new file mode 100644 index 000000000..61029c39f --- /dev/null +++ b/test/integration/components/pythonkafka/main.py @@ -0,0 +1,109 @@ +#!/usr/bin/env python +import threading, time + +from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer +from kafka import errors as Errors +from kafka.admin import NewTopic +from http.server import BaseHTTPRequestHandler, HTTPServer + + +class Producer(threading.Thread): + def __init__(self): + threading.Thread.__init__(self) + self.stop_event = threading.Event() + + def stop(self): + self.stop_event.set() + + def run(self): + while True: + try: + producer = KafkaProducer(bootstrap_servers='kafka:9092') + + while not self.stop_event.is_set(): + producer.send('my-topic', b"test") + producer.send('my-topic', b"\xc2Hola, mundo!") + time.sleep(1) + + producer.close() + break + except Exception as e: + print(f"Producer error occurred: {e}") + time.sleep(1) + + +class Consumer(threading.Thread): + def __init__(self): + threading.Thread.__init__(self) + self.stop_event = threading.Event() + + def stop(self): + self.stop_event.set() + + def run(self): + while True: + try: + consumer = KafkaConsumer(bootstrap_servers='kafka:9092', + auto_offset_reset='latest', + group_id='1', + consumer_timeout_ms=1000) + consumer.subscribe(['my-topic']) + + while not self.stop_event.is_set(): + for message in consumer: + print(message) + if self.stop_event.is_set(): + break + + consumer.close() + break + except Exception as e: + print(f"Consumer error occurred: {e}") + time.sleep(1) + +class RequestHandler(BaseHTTPRequestHandler): + def do_GET(self): + if self.path == '/message': + self.send_response(200) + self.end_headers() + else: + self.send_response(404) + self.end_headers() + +def run_server(server_class=HTTPServer, handler_class=RequestHandler, port=8080): + server_address = ('', port) + httpd = server_class(server_address, handler_class) + print(f'Starting httpd server on port {port}') + httpd.serve_forever() + +def main(): + # Create 'my-topic' Kafka topic + while True: + try: + admin = KafkaAdminClient(bootstrap_servers='kafka:9092') + + topic = NewTopic(name='my-topic', + num_partitions=1, + replication_factor=1) + admin.create_topics([topic]) + break + except Errors.TopicAlreadyExistsError: + break + except Exception as e: + print(f"Admin error occurred: {e}") + time.sleep(1) + + tasks = [ + Producer(), + Consumer() + ] + + # Start threads of a publisher/producer and a subscriber/consumer to 'my-topic' Kafka topic + for t in tasks: + t.start() + + run_server() + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/test/oats/kafka/configs/grafana-datasources.yaml b/test/oats/kafka/configs/grafana-datasources.yaml new file mode 100644 index 000000000..c7b118ad5 --- /dev/null +++ b/test/oats/kafka/configs/grafana-datasources.yaml @@ -0,0 +1,41 @@ +apiVersion: 1 + +datasources: + - name: Prometheus + type: prometheus + uid: prometheus + url: http://prometheus:9090 + jsonData: + exemplarTraceIdDestinations: + - name: trace_id + datasourceUid: tempo + + - name: Tempo + type: tempo + uid: tempo + url: http://tempo:3200 + jsonData: + tracesToLogs: + datasourceUid: 'loki' + mappedTags: [{ key: 'service.name', value: 'job' }] + mapTagNamesEnabled: true + filterByTraceID: true + serviceMap: + datasourceUid: 'prometheus' + search: + hide: false + nodeGraph: + enabled: true + lokiSearch: + datasourceUid: 'loki' + + - name: Loki + type: loki + uid: loki + url: http://loki:3100 + jsonData: + derivedFields: + - name: 'trace_id' + matcherRegex: '"traceid":"(\w+)"' + url: '$${__value.raw}' + datasourceUid: 'tempo' diff --git a/test/oats/kafka/configs/instrumenter-config-traces.yml b/test/oats/kafka/configs/instrumenter-config-traces.yml new file mode 100644 index 000000000..a58cbf63b --- /dev/null +++ b/test/oats/kafka/configs/instrumenter-config-traces.yml @@ -0,0 +1,7 @@ +routes: + patterns: + - /create-trace + - /smoke + - /greeting + - /sqltest + unmatched: path diff --git a/test/oats/kafka/configs/otelcol-config.yaml b/test/oats/kafka/configs/otelcol-config.yaml new file mode 100644 index 000000000..b830bc605 --- /dev/null +++ b/test/oats/kafka/configs/otelcol-config.yaml @@ -0,0 +1,43 @@ +receivers: + otlp: + protocols: + grpc: + http: + +processors: + batch: + +exporters: + prometheusremotewrite: + endpoint: http://prometheus:9090/api/v1/write + add_metric_suffixes: true + otlp: + endpoint: tempo:4317 + tls: + insecure: true + loki: + endpoint: http://loki:3100/loki/api/v1/push + logging/metrics: + verbosity: detailed + logging/traces: + verbosity: detailed + logging/logs: + verbosity: detailed + +service: + pipelines: + traces: + receivers: [otlp] + processors: [batch] + #exporters: [otlp] + exporters: [otlp,logging/traces] + metrics: + receivers: [otlp] + processors: [batch] + exporters: [prometheusremotewrite] + #exporters: [prometheusremotewrite,logging/metrics] + logs: + receivers: [otlp] + processors: [batch] + exporters: [loki] + #exporters: [loki,logging/logs] diff --git a/test/oats/kafka/configs/prometheus-config.yml b/test/oats/kafka/configs/prometheus-config.yml new file mode 100644 index 000000000..cc4215ccc --- /dev/null +++ b/test/oats/kafka/configs/prometheus-config.yml @@ -0,0 +1,13 @@ +global: + evaluation_interval: 30s + scrape_interval: 5s +scrape_configs: + - job_name: otel + honor_labels: true + static_configs: + - targets: + - 'otelcol:9464' + - job_name: otel-collector + static_configs: + - targets: + - 'otelcol:8888' diff --git a/test/oats/kafka/configs/tempo-config.yaml b/test/oats/kafka/configs/tempo-config.yaml new file mode 100644 index 000000000..392dbaf4d --- /dev/null +++ b/test/oats/kafka/configs/tempo-config.yaml @@ -0,0 +1,27 @@ +server: + http_listen_port: 3200 + grpc_listen_port: 9096 + +distributor: + receivers: + otlp: + protocols: + grpc: + +storage: + trace: + backend: local + wal: + path: /tmp/tempo/wal + local: + path: /tmp/tempo/blocks + +#metrics_generator: +# storage: +# path: /tmp/tempo/generator/wal +# remote_write: +# - url: http://localhost:9090/api/v1/write +# send_exemplars: true + +#overrides: +# metrics_generator_processors: [span-metrics] diff --git a/test/oats/kafka/docker-compose-beyla-kafka.yml b/test/oats/kafka/docker-compose-beyla-kafka.yml new file mode 100644 index 000000000..491a566e9 --- /dev/null +++ b/test/oats/kafka/docker-compose-beyla-kafka.yml @@ -0,0 +1,70 @@ +services: + zookeeper: + restart: always + container_name: kafka-like-zookeeper + image: docker.io/bitnami/zookeeper:3.8 + ports: + - "2181:2181" + volumes: + - "zookeeper-volume:/bitnami" + environment: + - ALLOW_ANONYMOUS_LOGIN=yes + kafka: + restart: always + container_name: kafka-like + image: docker.io/bitnami/kafka:3.3 + ports: + - "9093:9093" + - "9092:9092" + volumes: + - "kafka-volume:/bitnami" + environment: + - KAFKA_BROKER_ID=1 + - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 + - ALLOW_PLAINTEXT_LISTENER=yes + - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT + - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093 + - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://kafka:9093 + - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT + depends_on: + - zookeeper + # Simple python HTTP server, which exposes one endpoint /message that does Kafka query + testserver: + build: + context: ../../integration/components/pythonkafka + dockerfile: Dockerfile + image: pythonkafka + ports: + - "8080:8080" + depends_on: + kafka: + condition: service_started + # eBPF auto instrumenter + autoinstrumenter: + build: + context: ../../.. + dockerfile: ./test/integration/components/beyla/Dockerfile + command: + - --config=/configs/instrumenter-config-traces.yml + volumes: + - {{ .ConfigDir }}:/configs + - ./testoutput/run:/var/run/beyla + - ../../../testoutput:/coverage + privileged: true # in some environments (not GH Pull Requests) you can set it to false and then cap_add: [ SYS_ADMIN ] + network_mode: "service:testserver" + pid: "service:testserver" + environment: + GOCOVERDIR: "/coverage" + BEYLA_PRINT_TRACES: "true" + BEYLA_OPEN_PORT: {{ .ApplicationPort }} + BEYLA_SERVICE_NAMESPACE: "integration-test" + BEYLA_METRICS_INTERVAL: "10ms" + BEYLA_BPF_BATCH_TIMEOUT: "10ms" + BEYLA_LOG_LEVEL: "DEBUG" + OTEL_EXPORTER_OTLP_ENDPOINT: "http://collector:4318" + depends_on: + testserver: + condition: service_started +volumes: + kafka-volume: + zookeeper-volume: \ No newline at end of file diff --git a/test/oats/kafka/docker-compose-generic-template.yml b/test/oats/kafka/docker-compose-generic-template.yml new file mode 100644 index 000000000..80214399d --- /dev/null +++ b/test/oats/kafka/docker-compose-generic-template.yml @@ -0,0 +1,37 @@ +version: "3.9" +services: + grafana: + image: grafana/grafana:10.0.5 + volumes: + - "{{ .ConfigDir }}/grafana-datasources.yaml:/etc/grafana/provisioning/datasources/grafana-datasources.yaml" + ports: + - "{{ .GrafanaHTTPPort }}:3000" + prometheus: + image: prom/prometheus:v2.47.0 + command: + - --web.enable-remote-write-receiver + - --enable-feature=exemplar-storage + - --enable-feature=native-histograms + - --config.file=/etc/prometheus/prometheus.yml + ports: + - "{{ .PrometheusHTTPPort }}:9090" + tempo: + image: grafana/tempo:2.2.3 + volumes: + - "{{ .ConfigDir }}/tempo-config.yaml:/config.yaml" + command: + - --config.file=/config.yaml + ports: + - "{{ .TempoHTTPPort }}:3200" +# loki: +# image: grafana/loki:2.9.0 +# ports: +# - "{{ .LokiHTTPPort }}:3100" + collector: + image: otel/opentelemetry-collector-contrib:0.85.0 + volumes: + - "{{ .ConfigDir }}/otelcol-config.yaml:/config.yaml" + command: + - --config=file:/config.yaml + # we currently don't support this in our dashboards and grafana agent doesn't understand it yet + - --feature-gates=-pkg.translator.prometheus.NormalizeName diff --git a/test/oats/kafka/docker-compose-include-base.yml b/test/oats/kafka/docker-compose-include-base.yml new file mode 100644 index 000000000..7a3f4b3e8 --- /dev/null +++ b/test/oats/kafka/docker-compose-include-base.yml @@ -0,0 +1,3 @@ +include: +{{ range .files }}- {{ . }} +{{ end }} diff --git a/test/oats/kafka/go.mod b/test/oats/kafka/go.mod new file mode 100644 index 000000000..bcb510218 --- /dev/null +++ b/test/oats/kafka/go.mod @@ -0,0 +1,63 @@ +module github.com/grafana/beyla/test/oats + +go 1.22 + +require ( + github.com/grafana/oats v0.0.0-20231025143155-a8ecae950304 // indirect + github.com/onsi/ginkgo/v2 v2.12.1 + github.com/onsi/gomega v1.28.0 +) + +require ( + github.com/beorn7/perks v1.0.1 // indirect + github.com/cenkalti/backoff/v4 v4.2.1 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect + github.com/dennwc/varint v1.0.0 // indirect + github.com/go-kit/log v0.2.1 // indirect + github.com/go-logfmt/logfmt v0.6.0 // indirect + github.com/go-logr/logr v1.2.4 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang/protobuf v1.5.3 // indirect + github.com/google/go-cmp v0.5.9 // indirect + github.com/google/pprof v0.0.0-20230705174524-200ffdc848b8 // indirect + github.com/grafana/dashboard-linter v0.0.0-20230815114304-3c1213ef32d9 // indirect + github.com/grafana/regexp v0.0.0-20221122212121-6b5c0a4cb7fd // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect + github.com/prometheus/client_golang v1.16.0 // indirect + github.com/prometheus/client_model v0.4.0 // indirect + github.com/prometheus/common v0.44.0 // indirect + github.com/prometheus/procfs v0.11.0 // indirect + github.com/prometheus/prometheus v0.46.0 // indirect + github.com/stretchr/testify v1.8.4 // indirect + go.opentelemetry.io/collector/pdata v0.66.0 // indirect + go.opentelemetry.io/otel v1.17.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.17.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.17.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.17.0 // indirect + go.opentelemetry.io/otel/metric v1.17.0 // indirect + go.opentelemetry.io/otel/sdk v1.17.0 // indirect + go.opentelemetry.io/otel/trace v1.17.0 // indirect + go.opentelemetry.io/proto/otlp v1.0.0 // indirect + go.uber.org/atomic v1.11.0 // indirect + go.uber.org/goleak v1.2.1 // indirect + go.uber.org/multierr v1.8.0 // indirect + golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 // indirect + golang.org/x/net v0.14.0 // indirect + golang.org/x/sys v0.12.0 // indirect + golang.org/x/text v0.12.0 // indirect + golang.org/x/tools v0.12.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20230717213848-3f92550aa753 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20230717213848-3f92550aa753 // indirect + google.golang.org/grpc v1.58.3 // indirect + google.golang.org/protobuf v1.31.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/test/oats/kafka/go.sum b/test/oats/kafka/go.sum new file mode 100644 index 000000000..5d25f8363 --- /dev/null +++ b/test/oats/kafka/go.sum @@ -0,0 +1,218 @@ +github.com/Azure/azure-sdk-for-go v65.0.0+incompatible h1:HzKLt3kIwMm4KeJYTdx9EbjRYTySD/t8i1Ee/W5EGXw= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.7.0 h1:8q4SaHjFsClSvuVne0ID/5Ka8u3fcIHyqkLjcFpNRHQ= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.7.0/go.mod h1:bjGvMhVMb+EEm3VRNQawDMUyMMjo+S5ewNjflkep/0Q= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.3.0 h1:vcYCAze6p19qBW7MhZybIsqD8sMV8js0NyQM8JDnVtg= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.3.0/go.mod h1:OQeznEEkTZ9OrhHJoDD8ZDq51FHgXjqtP9z6bEwBq9U= +github.com/Azure/azure-sdk-for-go/sdk/internal v1.3.0 h1:sXr+ck84g/ZlZUOZiNELInmMgOsuGwdjjVkEIde0OtY= +github.com/Azure/azure-sdk-for-go/sdk/internal v1.3.0/go.mod h1:okt5dMMTOFjX/aovMlrjvvXoPMBVSPzk9185BT0+eZM= +github.com/AzureAD/microsoft-authentication-library-for-go v1.0.0 h1:OBhqkivkhkMqLPymWEppkm7vgPQY2XsHoEkaMQ0AdZY= +github.com/AzureAD/microsoft-authentication-library-for-go v1.0.0/go.mod h1:kgDmCTgBzIEPFElEF+FK0SdjAor06dRq2Go927dnQ6o= +github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 h1:s6gZFSlWYmbqAuRjVTiNNhvNRfY2Wxp9nhfyel4rklc= +github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE= +github.com/aws/aws-sdk-go v1.44.302 h1:ST3ko6GrJKn3Xi+nAvxjG3uk/V1pW8KC52WLeIxqqNk= +github.com/aws/aws-sdk-go v1.44.302/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI= +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= +github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dennwc/varint v1.0.0 h1:kGNFFSSw8ToIy3obO/kKr8U9GZYUAxQEVuix4zfDWzE= +github.com/dennwc/varint v1.0.0/go.mod h1:hnItb35rvZvJrbTALZtY/iQfDs48JKRG1RPpgziApxA= +github.com/go-kit/log v0.2.1 h1:MRVx0/zhvdseW+Gza6N9rVzU/IVzaeE1SFI4raAhmBU= +github.com/go-kit/log v0.2.1/go.mod h1:NwTd00d/i8cPZ3xOwwiv2PO5MOcx78fFErGNcVmBjv0= +github.com/go-logfmt/logfmt v0.6.0 h1:wGYYu3uicYdqXVgoYbvnkrPVXkuLM1p1ifugDMEdRi4= +github.com/go-logfmt/logfmt v0.6.0/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= +github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= +github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOWzg= +github.com/golang-jwt/jwt/v4 v4.5.0/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= +github.com/golang/glog v1.1.0 h1:/d3pCKDPWNnvIWe0vVUpNP32qc8U3PDVxySP/y360qE= +github.com/golang/glog v1.1.0/go.mod h1:pfYeQZ3JWZoXTV5sFc986z3HTpwQs9At6P4ImfuP3NQ= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/pprof v0.0.0-20230705174524-200ffdc848b8 h1:n6vlPhxsA+BW/XsS5+uqi7GyzaLa5MH7qlSLBZtRdiA= +github.com/google/pprof v0.0.0-20230705174524-200ffdc848b8/go.mod h1:Jh3hGz2jkYak8qXPD19ryItVnUgpgeqzdkY/D0EaeuA= +github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= +github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/grafana/dashboard-linter v0.0.0-20230815114304-3c1213ef32d9 h1:NXsChAvUCYFqCTWS3hmN543x/Rk8cDev4s18vaZq6Fc= +github.com/grafana/dashboard-linter v0.0.0-20230815114304-3c1213ef32d9/go.mod h1:4zovusCMljsbpo6ISSN/awpv+bjHhZJphYNLI1+7I74= +github.com/grafana/oats v0.0.0-20231004163855-07f6e62e76a3 h1:Viyp4xu6ltWamZG9bUMR8yfHhsJlC08+wQvt3R33xv4= +github.com/grafana/oats v0.0.0-20231004163855-07f6e62e76a3/go.mod h1:UlueL8BBVkLhE1RwSst5oa5KhO6qnvgoy45bQ//1VdE= +github.com/grafana/oats v0.0.0-20231025143155-a8ecae950304 h1:wnNo6AlWG5IahNGHoi42fPBH6D9F6jz93qj6UfITYGQ= +github.com/grafana/oats v0.0.0-20231025143155-a8ecae950304/go.mod h1:UlueL8BBVkLhE1RwSst5oa5KhO6qnvgoy45bQ//1VdE= +github.com/grafana/regexp v0.0.0-20221122212121-6b5c0a4cb7fd h1:PpuIBO5P3e9hpqBD0O/HjhShYuM6XE0i/lbE6J94kww= +github.com/grafana/regexp v0.0.0-20221122212121-6b5c0a4cb7fd/go.mod h1:M5qHK+eWfAv8VR/265dIuEpL3fNfeC21tXXp9itM24A= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 h1:YBftPWNWd4WwGqtY2yeZL2ef8rHAxPBD8KFhJpmcqms= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0/go.mod h1:YN5jB8ie0yfIUg6VvR9Kz84aCaG7AsGZnLjhHbUqwPg= +github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= +github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= +github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= +github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= +github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= +github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f h1:KUppIJq7/+SVif2QVs3tOP0zanoHgBEVAwHxUSIzRqU= +github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= +github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= +github.com/onsi/ginkgo/v2 v2.12.1 h1:uHNEO1RP2SpuZApSkel9nEh1/Mu+hmQe7Q+Pepg5OYA= +github.com/onsi/ginkgo/v2 v2.12.1/go.mod h1:TE309ZR8s5FsKKpuB1YAQYBzCaAfUgatB/xlT/ETL/o= +github.com/onsi/gomega v1.28.0 h1:i2rg/p9n/UqIDAMFUJ6qIUUMcsqOuUHgbpbu235Vr1c= +github.com/onsi/gomega v1.28.0/go.mod h1:A1H2JE76sI14WIP57LMKj7FVfCHx3g3BcZVjJG8bjX8= +github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 h1:KoWmjvw+nsYOo29YJK9vDA65RGE3NrOnUtO7a+RF9HU= +github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8/go.mod h1:HKlIX3XHQyzLZPlr7++PzdhaXEj94dEiJgZDTsxEqUI= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.16.0 h1:yk/hx9hDbrGHovbci4BY+pRMfSuuat626eFsHb7tmT8= +github.com/prometheus/client_golang v1.16.0/go.mod h1:Zsulrv/L9oM40tJ7T815tM89lFEugiJ9HzIqaAx4LKc= +github.com/prometheus/client_model v0.4.0 h1:5lQXD3cAg1OXBf4Wq03gTrXHeaV0TQvGfUooCfx1yqY= +github.com/prometheus/client_model v0.4.0/go.mod h1:oMQmHW1/JoDwqLtg57MGgP/Fb1CJEYF2imWWhWtMkYU= +github.com/prometheus/common v0.44.0 h1:+5BrQJwiBB9xsMygAB3TNvpQKOwlkc25LbISbrdOOfY= +github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO7x0VV9VvuY= +github.com/prometheus/common/sigv4 v0.1.0 h1:qoVebwtwwEhS85Czm2dSROY5fTo2PAPEVdDeppTwGX4= +github.com/prometheus/common/sigv4 v0.1.0/go.mod h1:2Jkxxk9yYvCkE5G1sQT7GuEXm57JrvHu9k5YwTjsNtI= +github.com/prometheus/procfs v0.11.0 h1:5EAgkfkMl659uZPbe9AS2N68a7Cc1TJbPEuGzFuRbyk= +github.com/prometheus/procfs v0.11.0/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM= +github.com/prometheus/prometheus v0.46.0 h1:9JSdXnsuT6YsbODEhSQMwxNkGwPExfmzqG73vCMk/Kw= +github.com/prometheus/prometheus v0.46.0/go.mod h1:10L5IJE5CEsjee1FnOcVswYXlPIscDWWt3IJ2UDYrz4= +github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.opentelemetry.io/collector/pdata v0.66.0 h1:UdE5U6MsDNzuiWaXdjGx2lC3ElVqWmN/hiUE8vyvSuM= +go.opentelemetry.io/collector/pdata v0.66.0/go.mod h1:pqyaznLzk21m+1KL6fwOsRryRELL+zNM0qiVSn0MbVc= +go.opentelemetry.io/otel v1.17.0 h1:MW+phZ6WZ5/uk2nd93ANk/6yJ+dVrvNWUjGhnnFU5jM= +go.opentelemetry.io/otel v1.17.0/go.mod h1:I2vmBGtFaODIVMBSTPVDlJSzBDNf93k60E6Ft0nyjo0= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.17.0 h1:U5GYackKpVKlPrd/5gKMlrTlP2dCESAAFU682VCpieY= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.17.0/go.mod h1:aFsJfCEnLzEu9vRRAcUiB/cpRTbVsNdF3OHSPpdjxZQ= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.17.0 h1:iGeIsSYwpYSvh5UGzWrJfTDJvPjrXtxl3GUppj6IXQU= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.17.0/go.mod h1:1j3H3G1SBYpZFti6OI4P0uRQCW20MXkG5v4UWXppLLE= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.17.0 h1:kvWMtSUNVylLVrOE4WLUmBtgziYoCIYUNSpTYtMzVJI= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.17.0/go.mod h1:SExUrRYIXhDgEKG4tkiQovd2HTaELiHUsuK08s5Nqx4= +go.opentelemetry.io/otel/metric v1.17.0 h1:iG6LGVz5Gh+IuO0jmgvpTB6YVrCGngi8QGm+pMd8Pdc= +go.opentelemetry.io/otel/metric v1.17.0/go.mod h1:h4skoxdZI17AxwITdmdZjjYJQH5nzijUUjm+wtPph5o= +go.opentelemetry.io/otel/sdk v1.17.0 h1:FLN2X66Ke/k5Sg3V623Q7h7nt3cHXaW1FOvKKrW0IpE= +go.opentelemetry.io/otel/sdk v1.17.0/go.mod h1:U87sE0f5vQB7hwUoW98pW5Rz4ZDuCFBZFNUBlSgmDFQ= +go.opentelemetry.io/otel/trace v1.17.0 h1:/SWhSRHmDPOImIAetP1QAeMnZYiQXrTy4fMMYOdSKWQ= +go.opentelemetry.io/otel/trace v1.17.0/go.mod h1:I/4vKTgFclIsXRVucpH25X0mpFSczM7aHeaz0ZBLWjY= +go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I= +go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM= +go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= +go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= +go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= +go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= +go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8= +go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.12.0 h1:tFM/ta59kqch6LlvYnPa0yx5a83cL2nHflFhYKvv9Yk= +golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw= +golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 h1:MGwJjxBy0HJshjDNfLsYO8xppfqWlA5ZT9OhtUUhTNw= +golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.12.0 h1:rmsUpXtvNzj340zd98LZ4KntptpfRHwpFOHG188oHXc= +golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.14.0 h1:BONx9s002vGdD9umnlX1Po8vOZmrgH34qlHcD1MfK14= +golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI= +golang.org/x/oauth2 v0.10.0 h1:zHCpF2Khkwy4mMB4bv0U37YtJdTGW8jI0glAApi0Kh8= +golang.org/x/oauth2 v0.10.0/go.mod h1:kTpgurOux7LqtuxjuyZa4Gj2gdezIt/jQtGnNFfypQI= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= +golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o= +golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc= +golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.12.0 h1:YW6HUoUmYBpwSgyaGaZq1fHjrBjX1rlpZ54T6mu2kss= +golang.org/x/tools v0.12.0/go.mod h1:Sc0INKfu04TlqNoRA1hgpFZbhYXHPr4V5DzpSBTPqQM= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c= +google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= +google.golang.org/genproto v0.0.0-20230717213848-3f92550aa753 h1:+VoAg+OKmWaommL56xmZSE2sUK8A7m6SUO7X89F2tbw= +google.golang.org/genproto v0.0.0-20230717213848-3f92550aa753/go.mod h1:iqkVr8IRpZ53gx1dEnWlCUIEwDWqWARWrbzpasaTNYM= +google.golang.org/genproto/googleapis/api v0.0.0-20230717213848-3f92550aa753 h1:lCbbUxUDD+DiXx9Q6F/ttL0aAu7N2pz8XnmMm8ZW4NE= +google.golang.org/genproto/googleapis/api v0.0.0-20230717213848-3f92550aa753/go.mod h1:rsr7RhLuwsDKL7RmgDDCUc6yaGr1iqceVb5Wv6f6YvQ= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230717213848-3f92550aa753 h1:XUODHrpzJEUeWmVo/jfNTLj0YyVveOo28oE6vkFbkO4= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230717213848-3f92550aa753/go.mod h1:TUfxEVdsvPg18p6AslUXFoLdpED4oBnGwyqk3dV1XzM= +google.golang.org/grpc v1.58.3 h1:BjnpXut1btbtgN/6sp+brB2Kbm2LjNXnidYujAVbSoQ= +google.golang.org/grpc v1.58.3/go.mod h1:tgX3ZQDlNJGU96V6yHh1T/JeoBQ2TXdr43YbYSsCJk0= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= +google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/test/oats/kafka/oats_test.go b/test/oats/kafka/oats_test.go new file mode 100644 index 000000000..e02ff1cee --- /dev/null +++ b/test/oats/kafka/oats_test.go @@ -0,0 +1,44 @@ +package oats + +import ( + "fmt" + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/grafana/oats/yaml" +) + +func TestYaml(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Yaml Suite") +} + +var _ = Describe("test case", Label("docker", "integration", "slow"), func() { + fmt.Println("First test") + cases, base := yaml.ReadTestCases() + if base != "" { + It("should have at least one test case", func() { + Expect(cases).ToNot(BeEmpty(), "expected at least one test case in %s", base) + }) + } + + configuration, _ := GinkgoConfiguration() + if configuration.ParallelTotal > 1 { + ports := yaml.NewPortAllocator(len(cases)) + for _, c := range cases { + // Ports have to be allocated before we start executing in parallel to avoid taking the same port. + // Even though it sounds unlikely, it happens quite often. + c.PortConfig = ports.AllocatePorts() + } + } + + yaml.VerboseLogging = true + + for _, c := range cases { + Describe(c.Name, Ordered, func() { + yaml.RunTestCase(c) + }) + } +}) diff --git a/test/oats/kafka/yaml/oats_kafka.yaml b/test/oats/kafka/yaml/oats_kafka.yaml new file mode 100644 index 000000000..57ba7c947 --- /dev/null +++ b/test/oats/kafka/yaml/oats_kafka.yaml @@ -0,0 +1,37 @@ +docker-compose: + generator: generic + files: + - ../docker-compose-beyla-kafka.yml +input: + - path: '/message' + +interval: 2s +expected: + traces: + - traceql: '{ .messaging.operation.type = "process" }' + spans: + - name: 'my-topic process' + attributes: + messaging.destination.name: my-topic + messaging.operation.type: process + messaging.system: kafka + - traceql: '{ .messaging.operation.type = "publish" }' + spans: + - name: 'my-topic publish' + attributes: + messaging.destination.name: my-topic + messaging.operation.type: publish + messaging.system: kafka + metrics: + - promql: 'messaging_publish_duration_count{messaging_system="kafka", messaging_destination_name="my-topic"}' + value: "> 0" + - promql: 'messaging_publish_duration_bucket{le="0"}' + value: "== 0" + - promql: 'messaging_publish_duration_bucket{le="10"}' + value: "> 0" + - promql: 'messaging_process_duration_count{messaging_system="kafka", messaging_destination_name="my-topic"}' + value: "> 0" + - promql: 'messaging_process_duration_bucket{le="0"}' + value: "== 0" + - promql: 'messaging_process_duration_bucket{le="10"}' + value: "> 0"