Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial infrastructure for dealing with Kafka protocol in Go #886

Merged
merged 10 commits into from
May 29, 2024
Prev Previous commit
Next Next commit
WIP: update example with fetch
grcevski committed May 28, 2024
commit 46a0bd2f5091de9d321fa508032e7d4f7ea4b97d
47 changes: 26 additions & 21 deletions bpf/go_kafka.c
Original file line number Diff line number Diff line change
@@ -15,14 +15,8 @@
#include "go_common.h"
#include "ringbuf.h"

#define BUF_MAX_LEN 256

typedef struct kafka_client_req {
u8 type; // Must be first
u64 start_monotime_ns;
u64 end_monotime_ns;
u8 buf[BUF_MAX_LEN];
} kafka_client_req_t;
#define KAFKA_API_FETCH 0
#define KAFKA_API_PRODUCE 1

struct {
__uint(type, BPF_MAP_TYPE_LRU_HASH);
@@ -48,7 +42,7 @@ int uprobe_sarama_sendInternal(struct pt_regs *ctx) {

void *b_ptr = GO_PARAM1(ctx);
if (b_ptr) {
bpf_probe_read(&correlation_id, sizeof(u32), b_ptr + 0x2c);
bpf_probe_read(&correlation_id, sizeof(u32), b_ptr + 0x2c); // TODO: Offsets
}

if (correlation_id) {
@@ -66,16 +60,27 @@ int uprobe_sarama_broker_write(struct pt_regs *ctx) {
void *goroutine_addr = GOROUTINE_PTR(ctx);
bpf_printk("goroutine_addr %lx", goroutine_addr);

u64 *invocation = bpf_map_lookup_elem(&ongoing_kafka_requests, &goroutine_addr);
u32 *invocation = bpf_map_lookup_elem(&ongoing_kafka_requests, &goroutine_addr);
if (invocation) {
u64 correlation_id = *invocation;
kafka_client_req_t req = {
.type = EVENT_GO_KAFKA,
.start_monotime_ns = bpf_ktime_get_ns(),
};

bpf_probe_read(req.buf, BUF_MAX_LEN, GO_PARAM2(ctx));
bpf_map_update_elem(&kafka_requests, &correlation_id, &req, BPF_ANY);
u8 small_buf[8];
bpf_probe_read(small_buf, 8, GO_PARAM2(ctx));
// the api key is 2 bytes, but num APIs at the moment is max 50.
// instead of reading 2 bytes and then doing ntohs, we just read
// the second byte of the api key, assuming the first is 0.
u8 api_key = small_buf[5];

// We only care about fetch and produce
if (api_key == KAFKA_API_FETCH || api_key == KAFKA_API_PRODUCE) {
u32 correlation_id = *invocation;
kafka_client_req_t req = {
.type = EVENT_GO_KAFKA,
.start_monotime_ns = bpf_ktime_get_ns(),
};

bpf_probe_read(req.buf, KAFKA_MAX_LEN, GO_PARAM2(ctx));
bpf_map_update_elem(&kafka_requests, &correlation_id, &req, BPF_ANY);
}

}

bpf_map_delete_elem(&ongoing_kafka_requests, &goroutine_addr);
@@ -94,7 +99,7 @@ int uprobe_sarama_response_promise_handle(struct pt_regs *ctx) {
if (p) {
u32 correlation_id = 0;

bpf_probe_read(&correlation_id, sizeof(u32), p + 0x18);
bpf_probe_read(&correlation_id, sizeof(u32), p + 0x18); // TODO: Offsets
if (correlation_id) {
kafka_client_req_t *req = bpf_map_lookup_elem(&kafka_requests, &correlation_id);

@@ -105,11 +110,11 @@ int uprobe_sarama_response_promise_handle(struct pt_regs *ctx) {
if (trace) {
bpf_dbg_printk("Sending trace");

bpf_memcpy(trace, req, sizeof(kafka_client_req_t));
__builtin_memcpy(trace, req, sizeof(kafka_client_req_t));
bpf_ringbuf_submit(trace, get_flags());
}
}

bpf_map_delete_elem(&kafka_requests, &correlation_id);
}
}
2 changes: 1 addition & 1 deletion bpf/http_sock.h
Original file line number Diff line number Diff line change
@@ -593,7 +593,7 @@ static __always_inline void handle_buf_with_connection(pid_connection_info_t *pi

bpf_dbg_printk("=== http_buffer_event len=%d pid=%d still_reading=%d ===", bytes_len, pid_from_pid_tgid(bpf_get_current_pid_tgid()), still_reading(info));

if (packet_type == PACKET_TYPE_REQUEST && (info->status == 0)) {
if (packet_type == PACKET_TYPE_REQUEST && (info->status == 0)) {
http_connection_metadata_t *meta = connection_meta(pid_conn, direction, PACKET_TYPE_REQUEST);

get_or_create_trace_info(meta, pid_conn->pid, &pid_conn->conn, u_buf, bytes_len, capture_header_buffer);
8 changes: 8 additions & 0 deletions bpf/http_trace.h
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@
#define HOST_LEN 64 // can be a fully qualified DNS name
#define TRACEPARENT_LEN 55
#define SQL_MAX_LEN 500
#define KAFKA_MAX_LEN 256

// Trace of an HTTP call invocation. It is instantiated by the return uprobe and forwarded to the
// user space through the events ringbuffer.
@@ -52,4 +53,11 @@ typedef struct sql_request_trace_t {
pid_info pid;
} __attribute__((packed)) sql_request_trace;

typedef struct kafka_client_req {
u8 type; // Must be first
u64 start_monotime_ns;
u64 end_monotime_ns;
u8 buf[KAFKA_MAX_LEN];
} __attribute__((packed)) kafka_client_req_t;

#endif //HTTP_TRACE_H