Skip to content

Commit

Permalink
Add support for segment.io kafka-go library (#1006)
Browse files Browse the repository at this point in the history
  • Loading branch information
grcevski authored Jul 11, 2024
1 parent 224e973 commit 890ebbf
Show file tree
Hide file tree
Showing 117 changed files with 2,148 additions and 94 deletions.
6 changes: 4 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ test/integration/components/gohttp2/client/client
test/integration/components/gohttp2/server/http2srv
beyla.sln
test/integration/components/gokafka/gokafka
test/integration/components/gokafka/vendor
test/integration/components/goredis/vendor
test/integration/components/gokafka/vendor/*
test/integration/components/gokafka-seg/vendor/*
test/integration/components/gokafka-seg/gokafka
test/integration/components/goredis/vendor/*
test/integration/components/goredis/goredis
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ The Go instrumentation is limited to certain specific libraries.
| [Go x/net/http2](https://golang.org/x/net/http2) ||
| [Go-Redis v9](github.com/redis/go-redis) ||
| [Sarama Kafka](github.com/IBM/sarama) ||
| [kafka-Go](https://github.com/segmentio/kafka-go) ||

HTTPS instrumentation is limited to Go programs and libraries/languages using libssl3.

Expand Down
290 changes: 290 additions & 0 deletions bpf/go_kafka_go.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,290 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "utils.h"
#include "bpf_dbg.h"
#include "go_common.h"
#include "ringbuf.h"

#define KAFKA_API_FETCH 1
#define KAFKA_API_PRODUCE 0

volatile const u64 kafka_go_writer_topic_pos;
volatile const u64 kafka_go_protocol_conn_pos;
volatile const u64 kafka_go_reader_topic_pos;

typedef struct produce_req {
u64 msg_ptr;
u64 conn_ptr;
u64 start_monotime_ns;
} produce_req_t;

typedef struct topic {
char name[MAX_TOPIC_NAME_LEN];
tp_info_t tp;
} topic_t;

struct {
__uint(type, BPF_MAP_TYPE_LRU_HASH);
__type(key, void *); // w_ptr
__type(value, tp_info_t); // traceparent
__uint(max_entries, MAX_CONCURRENT_REQUESTS);
} produce_traceparents SEC(".maps");

struct {
__uint(type, BPF_MAP_TYPE_LRU_HASH);
__type(key, void *); // goroutine
__type(value, topic_t); // topic info
__uint(max_entries, MAX_CONCURRENT_REQUESTS);
} ongoing_produce_topics SEC(".maps");

struct {
__uint(type, BPF_MAP_TYPE_LRU_HASH);
__type(key, void *); // msg ptr
__type(value, topic_t); // topic info
__uint(max_entries, MAX_CONCURRENT_REQUESTS);
} ongoing_produce_messages SEC(".maps");

struct {
__uint(type, BPF_MAP_TYPE_LRU_HASH);
__type(key, void *); // goroutine
__type(value, produce_req_t); // rw ptr + start time
__uint(max_entries, MAX_CONCURRENT_REQUESTS);
} produce_requests SEC(".maps");

struct {
__uint(type, BPF_MAP_TYPE_LRU_HASH);
__type(key, void *); // goroutine
__type(value, kafka_go_req_t); // rw ptr + start time
__uint(max_entries, MAX_CONCURRENT_REQUESTS);
} fetch_requests SEC(".maps");

// Code for the produce messages path
SEC("uprobe/writer_write_messages")
int uprobe_writer_write_messages(struct pt_regs *ctx) {
void *goroutine_addr = (void *)GOROUTINE_PTR(ctx);
void *w_ptr = (void *)GO_PARAM1(ctx);
bpf_dbg_printk("=== uprobe/kafka-go writer_write_messages %llx w_ptr %llx === ", goroutine_addr, w_ptr);

tp_info_t tp = {};

// We don't look up in the headers, no http/grpc request, therefore 0 as last argument
client_trace_parent(goroutine_addr, &tp, 0);

bpf_map_update_elem(&produce_traceparents, &w_ptr, &tp, BPF_ANY);
return 0;
}

SEC("uprobe/writer_produce")
int uprobe_writer_produce(struct pt_regs *ctx) {
void *goroutine_addr = (void *)GOROUTINE_PTR(ctx);
bpf_dbg_printk("=== uprobe/kafka-go writer_produce %llx === ", goroutine_addr);

void *w_ptr = (void *)GO_PARAM1(ctx);

if (w_ptr) {
void *topic_ptr = 0;
bpf_probe_read_user(&topic_ptr, sizeof(void *), w_ptr + kafka_go_writer_topic_pos);

bpf_dbg_printk("topic_ptr %llx", topic_ptr);
if (topic_ptr) {
topic_t topic = {};

tp_info_t *tp = bpf_map_lookup_elem(&produce_traceparents, &w_ptr);
if (tp) {
bpf_dbg_printk("found existing traceparent %llx", tp);
__builtin_memcpy(&topic.tp, tp, sizeof(tp_info_t));
} else {
urand_bytes(topic.tp.trace_id, TRACE_ID_SIZE_BYTES);
urand_bytes(topic.tp.span_id, SPAN_ID_SIZE_BYTES);
}

bpf_probe_read_user(&topic.name, sizeof(topic.name), topic_ptr);
bpf_map_update_elem(&ongoing_produce_topics, &goroutine_addr, &topic, BPF_ANY);
}
bpf_map_delete_elem(&produce_traceparents, &w_ptr);
}

return 0;
}

SEC("uprobe/client_roundTrip")
int uprobe_client_roundTrip(struct pt_regs *ctx) {
void *goroutine_addr = (void *)GOROUTINE_PTR(ctx);
bpf_dbg_printk("=== uprobe/kafka-go client_roundTrip %llx === ", goroutine_addr);

topic_t *topic_ptr = bpf_map_lookup_elem(&ongoing_produce_topics, &goroutine_addr);

if (topic_ptr) {
void *msg_ptr = (void *)GO_PARAM7(ctx);
bpf_dbg_printk("msg ptr %llx", msg_ptr);
if (msg_ptr) {
topic_t topic;
__builtin_memcpy(&topic, topic_ptr, sizeof(topic_t));
bpf_map_update_elem(&ongoing_produce_messages, &msg_ptr, &topic, BPF_ANY);
}
}

bpf_map_delete_elem(&ongoing_produce_topics, &goroutine_addr);
return 0;
}

SEC("uprobe/protocol_RoundTrip")
int uprobe_protocol_roundtrip(struct pt_regs *ctx) {
bpf_dbg_printk("=== uprobe/kafka-go protocol_RoundTrip === ");
void *goroutine_addr = (void *)GOROUTINE_PTR(ctx);
void *rw_ptr = (void *)GO_PARAM2(ctx);
void *msg_ptr = (void *)GO_PARAM8(ctx);
bpf_dbg_printk("goroutine_addr %lx, rw ptr %llx, msg_ptr %llx", goroutine_addr, rw_ptr, msg_ptr);


if (rw_ptr) {
topic_t *topic_ptr = bpf_map_lookup_elem(&ongoing_produce_messages, &msg_ptr);
bpf_dbg_printk("Found topic %llx", topic_ptr);
if (topic_ptr) {
produce_req_t p = {
.conn_ptr = ((u64)rw_ptr) + kafka_go_protocol_conn_pos,
.msg_ptr = (u64)msg_ptr,
.start_monotime_ns = bpf_ktime_get_ns(),
};

bpf_map_update_elem(&produce_requests, &goroutine_addr, &p, BPF_ANY);
}
}

return 0;
}

SEC("uprobe/protocol_RoundTrip_ret")
int uprobe_protocol_roundtrip_ret(struct pt_regs *ctx) {
void *goroutine_addr = (void *)GOROUTINE_PTR(ctx);
bpf_dbg_printk("=== uprobe/protocol_RoundTrip ret %llx === ", goroutine_addr);

produce_req_t *p_ptr = bpf_map_lookup_elem(&produce_requests, &goroutine_addr);

bpf_dbg_printk("p_ptr %llx", p_ptr);

if (p_ptr) {
void *msg_ptr = (void *)p_ptr->msg_ptr;
topic_t *topic_ptr = bpf_map_lookup_elem(&ongoing_produce_messages, &msg_ptr);

bpf_dbg_printk("goroutine_addr %lx, conn ptr %llx, msg_ptr = %llx, topic_ptr = %llx", goroutine_addr, p_ptr->conn_ptr, p_ptr->msg_ptr, topic_ptr);

if (topic_ptr) {
kafka_go_req_t *trace = bpf_ringbuf_reserve(&events, sizeof(kafka_go_req_t), 0);
if (trace) {
trace->type = EVENT_GO_KAFKA_SEG;
trace->op = KAFKA_API_PRODUCE;
trace->start_monotime_ns = p_ptr->start_monotime_ns;
trace->end_monotime_ns = bpf_ktime_get_ns();

void *conn_ptr = 0;
bpf_probe_read(&conn_ptr, sizeof(conn_ptr), (void *)(p_ptr->conn_ptr + 8)); // find conn
bpf_dbg_printk("conn ptr %llx", conn_ptr);
if (conn_ptr) {
get_conn_info(conn_ptr, &trace->conn);
}

__builtin_memcpy(trace->topic, topic_ptr->name, MAX_TOPIC_NAME_LEN);
__builtin_memcpy(&trace->tp, &(topic_ptr->tp), sizeof(tp_info_t));
task_pid(&trace->pid);
bpf_ringbuf_submit(trace, get_flags());
}
}
bpf_map_delete_elem(&ongoing_produce_messages, &msg_ptr);
}

bpf_map_delete_elem(&produce_requests, &goroutine_addr);

return 0;
}


// Code for the fetch messages path
SEC("uprobe/reader_read")
int uprobe_reader_read(struct pt_regs *ctx) {
void *goroutine_addr = (void *)GOROUTINE_PTR(ctx);
void *r_ptr = (void *)GO_PARAM1(ctx);
void *conn = (void *)GO_PARAM5(ctx);
bpf_printk("=== uprobe/kafka-go reader_read %llx r_ptr %llx=== ", goroutine_addr, r_ptr);

if (r_ptr) {
kafka_go_req_t r = {
.type = EVENT_GO_KAFKA_SEG,
.op = KAFKA_API_FETCH,
.start_monotime_ns = 0,
};

void *topic_ptr = 0;
bpf_probe_read_user(&topic_ptr, sizeof(void *), r_ptr + kafka_go_reader_topic_pos);

bpf_dbg_printk("topic_ptr %llx", topic_ptr);
if (topic_ptr) {
bpf_probe_read_user(&r.topic, sizeof(r.topic), topic_ptr);
}

if (conn) {
void *conn_ptr = 0;
bpf_probe_read(&conn_ptr, sizeof(conn_ptr), (void *)(conn + 8)); // find conn
bpf_dbg_printk("conn ptr %llx", conn_ptr);
if (conn_ptr) {
get_conn_info(conn_ptr, &r.conn);
}
}

bpf_map_update_elem(&fetch_requests, &goroutine_addr, &r, BPF_ANY);
}

return 0;
}

SEC("uprobe/reader_send_message")
int uprobe_reader_send_message(struct pt_regs *ctx) {
void *goroutine_addr = (void *)GOROUTINE_PTR(ctx);
bpf_dbg_printk("=== uprobe/kafka-go reader_send_message %llx === ", goroutine_addr);

kafka_go_req_t *req = (kafka_go_req_t *)bpf_map_lookup_elem(&fetch_requests, &goroutine_addr);
bpf_dbg_printk("Found req_ptr %llx", req);

if (req) {
req->start_monotime_ns = bpf_ktime_get_ns();
}

return 0;
}

SEC("uprobe/reader_read")
int uprobe_reader_read_ret(struct pt_regs *ctx) {
void *goroutine_addr = (void *)GOROUTINE_PTR(ctx);
bpf_dbg_printk("=== uprobe/kafka-go reader_read ret %llx === ", goroutine_addr);

kafka_go_req_t *req = (kafka_go_req_t *)bpf_map_lookup_elem(&fetch_requests, &goroutine_addr);
bpf_dbg_printk("Found req_ptr %llx", req);

if (req) {
if (req->start_monotime_ns) {
kafka_go_req_t *trace = bpf_ringbuf_reserve(&events, sizeof(kafka_go_req_t), 0);
if (trace) {
__builtin_memcpy(trace, req, sizeof(kafka_go_req_t));
trace->end_monotime_ns = bpf_ktime_get_ns();
task_pid(&trace->pid);
bpf_ringbuf_submit(trace, get_flags());
}
} else {
bpf_dbg_printk("Found request with no start time, ignoring...");
}
}

bpf_map_delete_elem(&fetch_requests, &goroutine_addr);

return 0;
}
File renamed without changes.
1 change: 1 addition & 0 deletions bpf/http_trace.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ const sql_request_trace *unused_3 __attribute__((unused));
const tcp_req_t *unused_5 __attribute__((unused));
const kafka_client_req_t *unused_6 __attribute__((unused));
const redis_client_req_t *unused_7 __attribute__((unused));
const kafka_go_req_t *unused_8 __attribute__((unused));
12 changes: 12 additions & 0 deletions bpf/http_trace.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#define SQL_MAX_LEN 500
#define KAFKA_MAX_LEN 256
#define REDIS_MAX_LEN 256
#define MAX_TOPIC_NAME_LEN 64

// Trace of an HTTP call invocation. It is instantiated by the return uprobe and forwarded to the
// user space through the events ringbuffer.
Expand Down Expand Up @@ -63,6 +64,17 @@ typedef struct kafka_client_req {
pid_info pid;
} __attribute__((packed)) kafka_client_req_t;

typedef struct kafka_go_req {
u8 type; // Must be first
u64 start_monotime_ns;
u64 end_monotime_ns;
u8 topic[MAX_TOPIC_NAME_LEN];
connection_info_t conn __attribute__ ((aligned (8)));
tp_info_t tp;
pid_info pid;
u8 op;
} __attribute__((packed)) kafka_go_req_t;

typedef struct redis_client_req {
u8 type; // Must be first
u64 start_monotime_ns;
Expand Down
1 change: 1 addition & 0 deletions bpf/ringbuf.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#define EVENT_TCP_REQUEST 8
#define EVENT_GO_KAFKA 9
#define EVENT_GO_REDIS 10
#define EVENT_GO_KAFKA_SEG 11 // the segment-io version (kafka-go) has different format

// setting here the following map definitions without pinning them to a global namespace
// would lead that services running both HTTP and GRPC server would duplicate
Expand Down
10 changes: 10 additions & 0 deletions configs/offsets/kafkago/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
module kafkago_off

go 1.22.2

require github.com/segmentio/kafka-go v0.4.47

require (
github.com/klauspost/compress v1.15.9 // indirect
github.com/pierrec/lz4/v4 v4.1.15 // indirect
)
Loading

0 comments on commit 890ebbf

Please sign in to comment.