Skip to content

Commit

Permalink
Different handling of traceparent (#455)
Browse files Browse the repository at this point in the history
  • Loading branch information
grcevski authored Nov 22, 2023
1 parent 1b094ce commit 760d3ad
Show file tree
Hide file tree
Showing 95 changed files with 2,792 additions and 1,154 deletions.
102 changes: 82 additions & 20 deletions bpf/go_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
#include "bpf_dbg.h"
#include "http_trace.h"
#include "ringbuf.h"
#include "tracing.h"
#include "trace_util.h"
#include "go_traceparent.h"

char __license[] SEC("license") = "Dual MIT/GPL";

Expand All @@ -31,17 +34,6 @@ char __license[] SEC("license") = "Dual MIT/GPL";
// This element is created in the function start probe and stored in the ongoing_http_requests hashmaps.
// Then it is retrieved in the return uprobes and used to know the HTTP call duration as well as its
// attributes (method, path, and status code).
typedef struct func_invocation_t {
u64 start_monotime_ns;
struct pt_regs regs; // we store registers on invocation to be able to fetch the arguments at return
} func_invocation;

struct {
__uint(type, BPF_MAP_TYPE_HASH);
__type(key, void *); // key: pointer to the request goroutine
__type(value, func_invocation);
__uint(max_entries, MAX_CONCURRENT_REQUESTS);
} newproc1 SEC(".maps");

typedef struct goroutine_metadata_t {
u64 parent;
Expand All @@ -56,25 +48,22 @@ struct {
__uint(pinning, LIBBPF_PIN_BY_NAME);
} ongoing_goroutines SEC(".maps");


// Shared structure that keeps track of ongoing server requests, HTTP or gRPC
struct {
__uint(type, BPF_MAP_TYPE_HASH);
__type(key, void *); // key: pointer to the request goroutine
__type(value, func_invocation);
__uint(type, BPF_MAP_TYPE_LRU_HASH);
__type(key, void *); // key: pointer to the goroutine
__type(value, tp_info_t); // value: traceparent info
__uint(max_entries, MAX_CONCURRENT_REQUESTS);
__uint(pinning, LIBBPF_PIN_BY_NAME);
} ongoing_server_requests SEC(".maps");

} go_trace_map SEC(".maps");

static __always_inline u64 find_parent_goroutine(void *goroutine_addr) {
void *r_addr = goroutine_addr;
int attempts = 0;
do {
func_invocation *p_inv = bpf_map_lookup_elem(&ongoing_server_requests, &r_addr);
void *p_inv = bpf_map_lookup_elem(&go_trace_map, &r_addr);
if (!p_inv) { // not this goroutine running the server request processing
// Let's find the parent scope
goroutine_metadata *g_metadata = bpf_map_lookup_elem(&ongoing_goroutines, &r_addr);
goroutine_metadata *g_metadata = (goroutine_metadata *)bpf_map_lookup_elem(&ongoing_goroutines, &r_addr);
if (g_metadata) {
// Lookup now to see if the parent was a request
r_addr = (void *)g_metadata->parent;
Expand All @@ -92,4 +81,77 @@ static __always_inline u64 find_parent_goroutine(void *goroutine_addr) {
return 0;
}

static __always_inline void decode_go_traceparent(unsigned char *buf, unsigned char *trace_id, unsigned char *span_id) {
unsigned char *t_id = buf + 2 + 1; // strlen(ver) + strlen("-")
unsigned char *s_id = buf + 2 + 1 + 32 + 1; // strlen(ver) + strlen("-") + strlen(trace_id) + strlen("-")

decode_hex(trace_id, t_id, TRACE_ID_CHAR_LEN);
decode_hex(span_id, s_id, SPAN_ID_CHAR_LEN);
}

static __always_inline void server_trace_parent(void *goroutine_addr, tp_info_t *tp, void *req_header) {
// Get traceparent from the Request.Header
void *traceparent_ptr = extract_traceparent_from_req_headers(req_header);
if (traceparent_ptr != NULL) {
unsigned char buf[W3C_VAL_LENGTH];
long res = bpf_probe_read(buf, sizeof(buf), traceparent_ptr);
if (res < 0) {
bpf_dbg_printk("can't copy traceparent header");
urand_bytes(tp->trace_id, TRACE_ID_SIZE_BYTES);
*((u64 *)tp->parent_id) = 0;
} else {
bpf_dbg_printk("Decoding traceparent from headers %s", buf);
decode_go_traceparent(buf, tp->trace_id, tp->parent_id);
}
} else {
bpf_dbg_printk("No traceparent in headers, generating");
urand_bytes(tp->trace_id, TRACE_ID_SIZE_BYTES);
*((u64 *)tp->parent_id) = 0;
}

urand_bytes(tp->span_id, SPAN_ID_SIZE_BYTES);
bpf_map_update_elem(&go_trace_map, &goroutine_addr, tp, BPF_ANY);
}

static __always_inline void client_trace_parent(void *goroutine_addr, tp_info_t *tp_i, void *req_header) {
// Get traceparent from the Request.Header
u8 found_trace_id = 0;

if (req_header) {
void *traceparent_ptr = extract_traceparent_from_req_headers(req_header);
if (traceparent_ptr != NULL) {
unsigned char buf[W3C_VAL_LENGTH];
long res = bpf_probe_read(buf, sizeof(buf), traceparent_ptr);
if (res < 0) {
bpf_dbg_printk("can't copy traceparent header");
} else {
found_trace_id = 1;
decode_go_traceparent(buf, tp_i->trace_id, tp_i->span_id);
}
}
}

if (!found_trace_id) {
tp_info_t *tp = 0;

u64 parent_id = find_parent_goroutine(goroutine_addr);

if (parent_id) {// we found a parent request
tp = (tp_info_t *)bpf_map_lookup_elem(&go_trace_map, &parent_id);
}

if (tp) {
bpf_dbg_printk("Found parent request trace_parent %llx", tp);
*((u64 *)tp_i->trace_id) = *((u64 *)tp->trace_id);
*((u64 *)(tp_i->trace_id + 8)) = *((u64 *)(tp->trace_id + 8));
*((u64 *)tp_i->parent_id) = *((u64 *)tp->span_id);
} else {
urand_bytes(tp_i->trace_id, TRACE_ID_SIZE_BYTES);
}

urand_bytes(tp_i->span_id, SPAN_ID_SIZE_BYTES);
}
}


#endif // GO_COMMON_H
112 changes: 74 additions & 38 deletions bpf/go_grpc.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,41 @@
#include "go_common.h"
#include "go_traceparent.h"

typedef struct grpc_srv_func_invocation {
u64 start_monotime_ns;
u64 stream;
tp_info_t tp;
} grpc_srv_func_invocation_t;

struct {
__uint(type, BPF_MAP_TYPE_HASH);
__type(key, void *); // key: pointer to the request goroutine
__type(value, u16);
__uint(max_entries, MAX_CONCURRENT_REQUESTS);
} ongoing_grpc_request_status SEC(".maps");

typedef struct grpc_client_func_invocation {
u64 start_monotime_ns;
u64 cc;
u64 ctx;
u64 method;
u64 method_len;
} grpc_client_func_invocation_t;

struct {
__uint(type, BPF_MAP_TYPE_HASH);
__type(key, void *); // key: pointer to the request goroutine
__type(value, func_invocation);
__type(value, grpc_client_func_invocation_t);
__uint(max_entries, MAX_CONCURRENT_REQUESTS);
} ongoing_grpc_client_requests SEC(".maps");

struct {
__uint(type, BPF_MAP_TYPE_HASH);
__type(key, void *); // key: pointer to the request goroutine
__type(value, grpc_srv_func_invocation_t);
__uint(max_entries, MAX_CONCURRENT_REQUESTS);
} ongoing_grpc_server_requests SEC(".maps");

// To be Injected from the user space during the eBPF program load & initialization

volatile const u64 grpc_stream_st_ptr_pos;
Expand All @@ -46,19 +67,31 @@ volatile const u64 grpc_client_target_ptr_pos;
volatile const u64 grpc_stream_ctx_ptr_pos;
volatile const u64 value_context_val_ptr_pos;


SEC("uprobe/server_handleStream")
int uprobe_server_handleStream(struct pt_regs *ctx) {
bpf_dbg_printk("=== uprobe/server_handleStream === ");
void *goroutine_addr = GOROUTINE_PTR(ctx);
bpf_dbg_printk("goroutine_addr %lx", goroutine_addr);

func_invocation invocation = {
void *stream_ptr = GO_PARAM4(ctx);

grpc_srv_func_invocation_t invocation = {
.start_monotime_ns = bpf_ktime_get_ns(),
.regs = *ctx
.stream = (u64)stream_ptr,
.tp = {0}
};

if (bpf_map_update_elem(&ongoing_server_requests, &goroutine_addr, &invocation, BPF_ANY)) {
if (stream_ptr) {
void *ctx_ptr = 0;
// Read the embedded context object ptr
bpf_probe_read(&ctx_ptr, sizeof(ctx_ptr), (void *)(stream_ptr + grpc_stream_ctx_ptr_pos + sizeof(void *)));

if (ctx_ptr) {
server_trace_parent(goroutine_addr, &invocation.tp, (void *)(ctx_ptr + value_context_val_ptr_pos + sizeof(void *)));
}
}

if (bpf_map_update_elem(&ongoing_grpc_server_requests, &goroutine_addr, &invocation, BPF_ANY)) {
bpf_dbg_printk("can't update grpc map element");
}

Expand All @@ -72,9 +105,9 @@ int uprobe_server_handleStream_return(struct pt_regs *ctx) {
void *goroutine_addr = GOROUTINE_PTR(ctx);
bpf_dbg_printk("goroutine_addr %lx", goroutine_addr);

func_invocation *invocation =
bpf_map_lookup_elem(&ongoing_server_requests, &goroutine_addr);
bpf_map_delete_elem(&ongoing_server_requests, &goroutine_addr);
grpc_srv_func_invocation_t *invocation =
bpf_map_lookup_elem(&ongoing_grpc_server_requests, &goroutine_addr);
bpf_map_delete_elem(&ongoing_grpc_server_requests, &goroutine_addr);
if (invocation == NULL) {
bpf_dbg_printk("can't read grpc invocation metadata");
return 0;
Expand All @@ -87,7 +120,7 @@ int uprobe_server_handleStream_return(struct pt_regs *ctx) {
return 0;
}

void *stream_ptr = GO_PARAM4(&(invocation->regs));
void *stream_ptr = (void *)invocation->stream;
bpf_dbg_printk("stream_ptr %lx, method pos %lx", stream_ptr, grpc_stream_method_ptr_pos);

http_request_trace *trace = bpf_ringbuf_reserve(&events, sizeof(http_request_trace), 0);
Expand All @@ -97,7 +130,6 @@ int uprobe_server_handleStream_return(struct pt_regs *ctx) {
}
task_pid(&trace->pid);
trace->type = EVENT_GRPC_REQUEST;
trace->id = (u64)goroutine_addr;
trace->start_monotime_ns = invocation->start_monotime_ns;
trace->status = *status;

Expand Down Expand Up @@ -151,17 +183,7 @@ int uprobe_server_handleStream_return(struct pt_regs *ctx) {
}
}

void *ctx_ptr = 0;
// Read the embedded context object ptr
bpf_probe_read(&ctx_ptr, sizeof(ctx_ptr), (void *)(stream_ptr + grpc_stream_ctx_ptr_pos + sizeof(void *)));

if (ctx_ptr) {
void *tp_ptr = extract_traceparent_from_req_headers((void *)(ctx_ptr + value_context_val_ptr_pos + sizeof(void *)));
if (tp_ptr) {
bpf_probe_read(trace->traceparent, sizeof(trace->traceparent), tp_ptr);
bpf_dbg_printk("traceparent %s", trace->traceparent);
}
}
trace->tp = invocation->tp;

trace->end_monotime_ns = bpf_ktime_get_ns();
// submit the completed trace via ringbuffer
Expand Down Expand Up @@ -198,17 +220,24 @@ int uprobe_transport_writeStatus(struct pt_regs *ctx) {
}

/* GRPC client */

SEC("uprobe/ClientConn_Invoke")
int uprobe_ClientConn_Invoke(struct pt_regs *ctx) {
bpf_dbg_printk("=== uprobe/proc grpc ClientConn.Invoke === ");

void *goroutine_addr = GOROUTINE_PTR(ctx);
bpf_dbg_printk("goroutine_addr %lx", goroutine_addr);

func_invocation invocation = {
void *cc_ptr = GO_PARAM1(ctx);
void *ctx_ptr = GO_PARAM3(ctx);
void *method_ptr = GO_PARAM4(ctx);
void *method_len = GO_PARAM5(ctx);

grpc_client_func_invocation_t invocation = {
.start_monotime_ns = bpf_ktime_get_ns(),
.regs = *ctx,
.cc = (u64)cc_ptr,
.ctx = (u64)ctx_ptr,
.method = (u64)method_ptr,
.method_len = (u64)method_len,
};

// Write event
Expand All @@ -226,7 +255,7 @@ int uprobe_ClientConn_Invoke_return(struct pt_regs *ctx) {
void *goroutine_addr = GOROUTINE_PTR(ctx);
bpf_dbg_printk("goroutine_addr %lx", goroutine_addr);

func_invocation *invocation =
grpc_client_func_invocation_t *invocation =
bpf_map_lookup_elem(&ongoing_grpc_client_requests, &goroutine_addr);
bpf_map_delete_elem(&ongoing_grpc_client_requests, &goroutine_addr);

Expand All @@ -242,7 +271,6 @@ int uprobe_ClientConn_Invoke_return(struct pt_regs *ctx) {
}

task_pid(&trace->pid);
trace->id = find_parent_goroutine(goroutine_addr);
trace->type = EVENT_GRPC_CLIENT;
trace->start_monotime_ns = invocation->start_monotime_ns;
trace->go_start_monotime_ns = invocation->start_monotime_ns;
Expand All @@ -251,9 +279,9 @@ int uprobe_ClientConn_Invoke_return(struct pt_regs *ctx) {
// Read arguments from the original set of registers

// Get client request value pointers
void *cc_ptr = GO_PARAM1(&(invocation->regs));
void *method_ptr = GO_PARAM4(&(invocation->regs));
void *method_len = GO_PARAM5(&(invocation->regs));
void *cc_ptr = (void *)invocation->cc;
void *method_ptr = (void *)invocation->method;
void *method_len = (void *)invocation->method_len;
void *err = (void *)GO_PARAM1(ctx);

bpf_dbg_printk("method ptr = %lx, method_len = %d", method_ptr, method_len);
Expand All @@ -272,19 +300,27 @@ int uprobe_ClientConn_Invoke_return(struct pt_regs *ctx) {
return 0;
}

void *ctx_ptr = GO_PARAM3(&(invocation->regs));
void *val_ptr = 0;
// Read the embedded val object ptr from ctx
bpf_probe_read(&val_ptr, sizeof(val_ptr), (void *)(ctx_ptr + value_context_val_ptr_pos + sizeof(void *)));
void *ctx_ptr = (void *)invocation->ctx;

tp_info_t tp = {0};

if (val_ptr) {
void *tp_ptr = extract_traceparent_from_req_headers((void *)(val_ptr)); // embedded metadata.rawMD is at 0 offset
if (tp_ptr) {
bpf_probe_read(trace->traceparent, sizeof(trace->traceparent), tp_ptr);
bpf_dbg_printk("traceparent %s", trace->traceparent);
if (ctx_ptr) {
void *val_ptr = 0;
// Read the embedded val object ptr from ctx
bpf_probe_read(&val_ptr, sizeof(val_ptr), (void *)(ctx_ptr + value_context_val_ptr_pos + sizeof(void *)));

if (val_ptr) {
client_trace_parent(goroutine_addr, &tp, (void *)(val_ptr));
} else {
bpf_dbg_printk("No val_ptr %llx", val_ptr);
}
} else {
// it's OK sending empty tp for a client, the userspace id generator will make random trace_id, span_id
bpf_dbg_printk("No ctx_ptr %llx", ctx_ptr);
}

trace->tp = tp;

trace->status = (err) ? 2 : 0; // Getting the gRPC client status is complex, if there's an error we set Code.Unknown = 2

// submit the completed trace via ringbuffer
Expand Down
Loading

0 comments on commit 760d3ad

Please sign in to comment.