Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add kafka support #890

Merged
merged 17 commits into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -287,13 +287,18 @@ oats-test-redis-other-langs: oats-prereq
mkdir -p test/oats/redis_other_langs/$(TEST_OUTPUT)/run
cd test/oats/redis_other_langs && TESTCASE_BASE_PATH=./yaml $(GINKGO) -v -r

.PHONY: oats-test-kafka
oats-test-kafka: oats-prereq
mkdir -p test/oats/kafka/$(TEST_OUTPUT)/run
cd test/oats/kafka && TESTCASE_TIMEOUT=120s TESTCASE_BASE_PATH=./yaml $(GINKGO) -v -r

.PHONY: oats-test
oats-test: oats-test-sql oats-test-sql-statement oats-test-sql-other-langs oats-test-redis-other-langs
oats-test: oats-test-sql oats-test-sql-statement oats-test-sql-other-langs oats-test-redis-other-langs oats-test-kafka
$(MAKE) itest-coverage-data

.PHONY: oats-test-debug
oats-test-debug: oats-prereq
cd test/oats/sql_statement && TESTCASE_BASE_PATH=./yaml TESTCASE_MANUAL_DEBUG=true TESTCASE_TIMEOUT=1h $(GINKGO) -v -r
cd test/oats/kafka && TESTCASE_BASE_PATH=./yaml TESTCASE_MANUAL_DEBUG=true TESTCASE_TIMEOUT=1h $(GINKGO) -v -r
marctc marked this conversation as resolved.
Show resolved Hide resolved

.PHONY: drone
drone:
Expand Down
2 changes: 1 addition & 1 deletion bpf/http_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#define KPROBES_LARGE_RESPONSE_LEN 100000 // 100K and above we try to track the response actual time with kretprobes

#define K_TCP_MAX_LEN 256
#define K_TCP_RES_LEN 24
#define K_TCP_RES_LEN 128

#define CONN_INFO_FLAG_TRACE 0x1

Expand Down
18 changes: 10 additions & 8 deletions docs/sources/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,25 @@ The following table describes the exported metrics in both OpenTelemetry and Pro
| Application | `rpc.server.duration` | `rpc_server_duration_seconds` | Histogram | seconds | Duration of RPC service calls from the server side |
| Application | `sql.client.duration` | `sql_client_duration_seconds` | Histogram | seconds | Duration of SQL client operations (Experimental) |
| Application | `redis.client.duration` | `redis_client_duration_seconds` | Histogram | seconds | Duration of Redis client operations (Experimental) |
| Application | `messaging.publish.duration` | `messaging_publish_duration` | Histogram | seconds | Duration of Messaging (Kafka) publish operations (Experimental) |
| Application | `messaging.process.duration` | `messaging_process_duration` | Histogram | seconds | Duration of Messaging (Kafka) process operations (Experimental) |
| Network | `beyla.network.flow.bytes` | `beyla_network_flow_bytes` | Counter | bytes | Bytes submitted from a source network endpoint to a destination network endpoint |

Beyla can also export [Span metrics](/docs/tempo/latest/metrics-generator/span_metrics/) and
[Service graph metrics](/docs/tempo/latest/metrics-generator/service-graph-view/), which can be enabled via the
[Service graph metrics](/docs/tempo/latest/metrics-generator/service-graph-view/), which you can enable via the
[features]({{< relref "./configure/options.md" >}}) configuration option.

## Attributes of Beyla metrics

For the sake of brevity, only the OTEL `dot.notation` of the metrics and attributes is listed, but
the metrics and attributes are exposed `underscore_notation` when a Prometheus exporter is used.
For the sake of brevity, the metrics and attributes in this list use the OTEL `dot.notation`. When using the Prometheus exporter, the metrics use `underscore_notation`.

In order to hide attributes that are shown by default, or show attributes that are hidden by
default, check the `attributes`->`select` section in the [configuration documentation]({{< relref "./configure/options.md" >}}).
In order to configure which attributes to show or which attributes to hide, check the `attributes`->`select` section in the [configuration documentation]({{< relref "./configure/options.md" >}}).

| Metrics | Name | Default |
|--------------------------------|-----------------------------|-----------------------------------------------|
| Application (all) | `http.request.method` | shown |
| Application (all) | `http.response.status_code` | shown |
| Application (all) | `http.route` | shown if `routes` configuration is defined |
| Application (all) | `http.route` | shown if `routes` configuration section exists |
| Application (all) | `k8s.daemonset.name` | shown if `attributes.kubernetes.enable` |
| Application (all) | `k8s.deployment.name` | shown if `attributes.kubernetes.enable` |
| Application (all) | `k8s.namespace.name` | shown if `attributes.kubernetes.enable` |
Expand All @@ -66,9 +66,11 @@ default, check the `attributes`->`select` section in the [configuration document
| `beyla.network.flow.bytes` | `beyla.ip` | hidden |
| `db.client.operation.duration` | `db.operation.name` | shown |
| `db.client.operation.duration` | `db.collection.name` | hidden |
| `message.publish.duration` | `messaging.system` | shown |
| `message.publish.duration` | `messaging.destination.name`| shown |
| `beyla.network.flow.bytes` | `direction` | hidden |
| `beyla.network.flow.bytes` | `dst.address` | hidden |
| `beyla.network.flow.bytes` | `dst.cidr` | shown if the `cidrs` configuration is defined |
| `beyla.network.flow.bytes` | `dst.cidr` | shown if the `cidrs` configuration section exists |
| `beyla.network.flow.bytes` | `dst.name` | hidden |
| `beyla.network.flow.bytes` | `dst.port` | hidden |
| `beyla.network.flow.bytes` | `iface` | hidden |
Expand All @@ -87,7 +89,7 @@ default, check the `attributes`->`select` section in the [configuration document
| `beyla.network.flow.bytes` | `k8s.src.owner.type` | hidden |
| `beyla.network.flow.bytes` | `k8s.src.type` | hidden |
| `beyla.network.flow.bytes` | `src.address` | hidden |
| `beyla.network.flow.bytes` | `src.cidr` | shown if the `cidrs` configuration is defined |
| `beyla.network.flow.bytes` | `src.cidr` | shown if the `cidrs` configuration section exists |
| `beyla.network.flow.bytes` | `src.name` | hidden |
| `beyla.network.flow.bytes` | `src.port` | hidden |
| `beyla.network.flow.bytes` | `transport` | hidden |
Expand Down
2 changes: 1 addition & 1 deletion pkg/internal/ebpf/common/bpf_bpfel_arm64.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified pkg/internal/ebpf/common/bpf_bpfel_arm64.o
Binary file not shown.
2 changes: 1 addition & 1 deletion pkg/internal/ebpf/common/bpf_bpfel_x86.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified pkg/internal/ebpf/common/bpf_bpfel_x86.o
Binary file not shown.
225 changes: 225 additions & 0 deletions pkg/internal/ebpf/common/kafka_detect_transform.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
package ebpfcommon

import (
"encoding/binary"
"errors"

"github.com/grafana/beyla/pkg/internal/request"
)

type Operation int8

const (
Produce Operation = 0
Fetch Operation = 1
)

type Header struct {
MessageSize int32
APIKey int16
APIVersion int16
CorrelationID int32
ClientIDSize int16
}

type KafkaInfo struct {
Operation Operation
Topic string
ClientID string
TopicOffset int
}

func (k Operation) String() string {
switch k {
case Produce:
return request.MessagingPublish
case Fetch:
return request.MessagingProcess
default:
return "unknown"
}
}

const KafaMinLength = 14

// ProcessKafkaRequest processes a TCP packet and returns error if the packet is not a valid Kafka request.
// Otherwise, return kafka.Info with the processed data.
func ProcessPossibleKafkaEvent(pkt []byte, rpkt []byte) (*KafkaInfo, error) {
k, err := ProcessKafkaRequest(pkt)
if err != nil {
k, err = ProcessKafkaRequest(rpkt)
}

return k, err
}

func ProcessKafkaRequest(pkt []byte) (*KafkaInfo, error) {
k := &KafkaInfo{}
if len(pkt) < KafaMinLength {
return k, errors.New("packet too short")
}

header := &Header{
MessageSize: int32(binary.BigEndian.Uint32(pkt[0:4])),
APIKey: int16(binary.BigEndian.Uint16(pkt[4:6])),
APIVersion: int16(binary.BigEndian.Uint16(pkt[6:8])),
CorrelationID: int32(binary.BigEndian.Uint32(pkt[8:12])),
ClientIDSize: int16(binary.BigEndian.Uint16(pkt[12:14])),
}

if !isValidKafkaHeader(header) {
return k, errors.New("invalid Kafka request header")
}

offset := KafaMinLength
if header.ClientIDSize > 0 {
clientID := pkt[offset : offset+int(header.ClientIDSize)]
if !isValidClientID(clientID, int(header.ClientIDSize)) {
return k, errors.New("invalid client ID")
}
offset += int(header.ClientIDSize)
k.ClientID = string(clientID)
} else if header.ClientIDSize < -1 {
return k, errors.New("invalid client ID size")
}

switch Operation(header.APIKey) {
case Produce:
ok, err := getTopicOffsetFromProduceOperation(header, pkt, &offset)
if !ok || err != nil {
return k, err
}
k.Operation = Produce
k.TopicOffset = offset
case Fetch:
offset += getTopicOffsetFromFetchOperation(header)
k.Operation = Fetch
k.TopicOffset = offset
default:
return k, errors.New("invalid Kafka operation")
}
topic, err := getTopicName(pkt, offset)
if err != nil {
return k, err
}
k.Topic = topic
return k, nil
}

func isValidKafkaHeader(header *Header) bool {
if header.MessageSize < int32(KafaMinLength) || header.APIVersion < 0 {
return false
}
switch Operation(header.APIKey) {
case Fetch:
if header.APIVersion > 11 {
return false
}
case Produce:
if header.APIVersion == 0 || header.APIVersion > 8 {
return false
}
default:
return false
}
if header.CorrelationID < 0 {
return false
}
return header.ClientIDSize >= -1
}

// nolint:cyclop
func isValidKafkaString(buffer []byte, maxBufferSize, realSize int, printableOk bool) bool {
for j := 0; j < maxBufferSize; j++ {
if j >= realSize {
break
}
ch := buffer[j]
if ('a' <= ch && ch <= 'z') || ('A' <= ch && ch <= 'Z') || ('0' <= ch && ch <= '9') || ch == '.' || ch == '_' || ch == '-' {
continue
}
if printableOk && (ch >= ' ' && ch <= '~') {
continue
}
return false
}
return true
}

func isValidClientID(buffer []byte, realClientIDSize int) bool {
return isValidKafkaString(buffer, len(buffer), realClientIDSize, true)
}

func getTopicName(pkt []byte, offset int) (string, error) {
offset += 4
if offset > len(pkt) {
return "", errors.New("invalid buffer length")
}
topicNameSize := int16(binary.BigEndian.Uint16(pkt[offset:]))
if topicNameSize <= 0 || topicNameSize > 255 {
return "", errors.New("invalid topic name size")
}
offset += 2

if offset > len(pkt) {
return "", nil
}
maxLen := offset + int(topicNameSize)
if len(pkt) < maxLen {
maxLen = len(pkt)
}
topicName := pkt[offset:maxLen]
if isValidKafkaString(topicName, len(topicName), int(topicNameSize), false) {
return string(topicName), nil
}
return "", errors.New("invalid topic name")
}

func getTopicOffsetFromProduceOperation(header *Header, pkt []byte, offset *int) (bool, error) {
if header.APIVersion >= 3 {
if len(pkt) < *offset+2 {
return false, errors.New("packet too short")
}
transactionalIDSize := int16(binary.BigEndian.Uint16(pkt[*offset:]))
*offset += 2
if transactionalIDSize > 0 {
*offset += int(transactionalIDSize)
}
}

if len(pkt) < *offset+2 {
return false, errors.New("packet too short")
}
acks := int16(binary.BigEndian.Uint16(pkt[*offset:]))
if acks < -1 || acks > 1 {
return false, nil
}
*offset += 2

if len(pkt) < *offset+4 {
return false, errors.New("packet too short")
}
timeoutMS := int32(binary.BigEndian.Uint32(pkt[*offset:]))
if timeoutMS < 0 {
return false, nil
}
*offset += 4

return true, nil
}

func getTopicOffsetFromFetchOperation(header *Header) int {
offset := 3 * 4 // 3 * sizeof(int32)

if header.APIVersion >= 3 {
offset += 4 // max_bytes
if header.APIVersion >= 4 {
offset++ // isolation_level
if header.APIVersion >= 7 {
offset += 2 * 4 // session_id + session_epoch
}
}
}

return offset
}
Loading
Loading