Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Different handling of traceparent #455

Merged
merged 19 commits into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 81 additions & 19 deletions bpf/go_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
#include "bpf_dbg.h"
#include "http_trace.h"
#include "ringbuf.h"
#include "tracing.h"
#include "trace_util.h"
#include "go_traceparent.h"

char __license[] SEC("license") = "Dual MIT/GPL";

Expand All @@ -31,17 +34,6 @@ char __license[] SEC("license") = "Dual MIT/GPL";
// This element is created in the function start probe and stored in the ongoing_http_requests hashmaps.
// Then it is retrieved in the return uprobes and used to know the HTTP call duration as well as its
// attributes (method, path, and status code).
typedef struct func_invocation_t {
u64 start_monotime_ns;
struct pt_regs regs; // we store registers on invocation to be able to fetch the arguments at return
} func_invocation;

struct {
__uint(type, BPF_MAP_TYPE_HASH);
__type(key, void *); // key: pointer to the request goroutine
__type(value, func_invocation);
__uint(max_entries, MAX_CONCURRENT_REQUESTS);
} newproc1 SEC(".maps");

typedef struct goroutine_metadata_t {
u64 parent;
Expand All @@ -56,22 +48,19 @@ struct {
__uint(pinning, LIBBPF_PIN_BY_NAME);
} ongoing_goroutines SEC(".maps");


// Shared structure that keeps track of ongoing server requests, HTTP or gRPC
struct {
__uint(type, BPF_MAP_TYPE_HASH);
__type(key, void *); // key: pointer to the request goroutine
__type(value, func_invocation);
__uint(type, BPF_MAP_TYPE_LRU_HASH);
__type(key, void *); // key: pointer to the goroutine
__type(value, tp_info_t); // value: traceparent info
__uint(max_entries, MAX_CONCURRENT_REQUESTS);
__uint(pinning, LIBBPF_PIN_BY_NAME);
} ongoing_server_requests SEC(".maps");

} go_trace_map SEC(".maps");

static __always_inline u64 find_parent_goroutine(void *goroutine_addr) {
void *r_addr = goroutine_addr;
int attempts = 0;
do {
func_invocation *p_inv = bpf_map_lookup_elem(&ongoing_server_requests, &r_addr);
void *p_inv = bpf_map_lookup_elem(&go_trace_map, &r_addr);
if (!p_inv) { // not this goroutine running the server request processing
// Let's find the parent scope
goroutine_metadata *g_metadata = bpf_map_lookup_elem(&ongoing_goroutines, &r_addr);
Expand All @@ -92,4 +81,77 @@ static __always_inline u64 find_parent_goroutine(void *goroutine_addr) {
return 0;
}

static __always_inline void decode_go_traceparent(unsigned char *buf, unsigned char *trace_id, unsigned char *span_id) {
unsigned char *t_id = buf + 2 + 1; // strlen(ver) + strlen("-")
unsigned char *s_id = buf + 2 + 1 + 32 + 1; // strlen(ver) + strlen("-") + strlen(trace_id) + strlen("-")

decode_hex(trace_id, t_id, TRACE_ID_CHAR_LEN);
decode_hex(span_id, s_id, SPAN_ID_CHAR_LEN);
}

static __always_inline void server_trace_parent(void *goroutine_addr, tp_info_t *tp, void *req_header) {
// Get traceparent from the Request.Header
void *traceparent_ptr = extract_traceparent_from_req_headers(req_header);
if (traceparent_ptr != NULL) {
unsigned char buf[W3C_VAL_LENGTH];
long res = bpf_probe_read(buf, sizeof(buf), traceparent_ptr);
if (res < 0) {
bpf_printk("can't copy traceparent header");
urand_bytes(tp->trace_id, TRACE_ID_SIZE_BYTES);
*((u64 *)tp->parent_id) = 0;
} else {
bpf_dbg_printk("Decoding traceparent from headers %s", buf);
decode_go_traceparent(buf, tp->trace_id, tp->parent_id);
}
} else {
bpf_dbg_printk("No traceparent in headers, generating");
urand_bytes(tp->trace_id, TRACE_ID_SIZE_BYTES);
*((u64 *)tp->parent_id) = 0;
}

urand_bytes(tp->span_id, SPAN_ID_SIZE_BYTES);
bpf_map_update_elem(&go_trace_map, &goroutine_addr, tp, BPF_ANY);
}

static __always_inline void client_trace_parent(void *goroutine_addr, tp_info_t *tp_i, void *req_header) {
// Get traceparent from the Request.Header
u8 found_trace_id = 0;

if (req_header) {
void *traceparent_ptr = extract_traceparent_from_req_headers(req_header);
if (traceparent_ptr != NULL) {
unsigned char buf[W3C_VAL_LENGTH];
long res = bpf_probe_read(buf, sizeof(buf), traceparent_ptr);
if (res < 0) {
bpf_printk("can't copy traceparent header");
} else {
found_trace_id = 1;
decode_go_traceparent(buf, tp_i->trace_id, tp_i->span_id);
}
}
}

if (!found_trace_id) {
tp_info_t *tp = 0;

u64 parent_id = find_parent_goroutine(goroutine_addr);

if (parent_id) {// we found a parent request
tp = (tp_info_t *)bpf_map_lookup_elem(&go_trace_map, &parent_id);
}

if (tp) {
bpf_dbg_printk("Found parent request trace_parent %llx", tp);
*((u64 *)tp_i->trace_id) = *((u64 *)tp->trace_id);
*((u64 *)(tp_i->trace_id + 8)) = *((u64 *)(tp->trace_id + 8));
*((u64 *)tp_i->parent_id) = *((u64 *)tp->span_id);
} else {
urand_bytes(tp_i->trace_id, TRACE_ID_SIZE_BYTES);
}

urand_bytes(tp_i->span_id, SPAN_ID_SIZE_BYTES);
}
}


#endif // GO_COMMON_H
111 changes: 73 additions & 38 deletions bpf/go_grpc.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,41 @@
#include "go_common.h"
#include "go_traceparent.h"

typedef struct grpc_srv_func_invocation {
u64 start_monotime_ns;
u64 stream;
tp_info_t tp;
} grpc_srv_func_invocation_t;

struct {
__uint(type, BPF_MAP_TYPE_HASH);
__type(key, void *); // key: pointer to the request goroutine
__type(value, u16);
__uint(max_entries, MAX_CONCURRENT_REQUESTS);
} ongoing_grpc_request_status SEC(".maps");

typedef struct grpc_client_func_invocation {
u64 start_monotime_ns;
u64 cc;
u64 ctx;
u64 method;
u64 method_len;
} grpc_client_func_invocation_t;

struct {
__uint(type, BPF_MAP_TYPE_HASH);
__type(key, void *); // key: pointer to the request goroutine
__type(value, func_invocation);
__type(value, grpc_client_func_invocation_t);
__uint(max_entries, MAX_CONCURRENT_REQUESTS);
} ongoing_grpc_client_requests SEC(".maps");

struct {
__uint(type, BPF_MAP_TYPE_HASH);
__type(key, void *); // key: pointer to the request goroutine
__type(value, grpc_srv_func_invocation_t);
__uint(max_entries, MAX_CONCURRENT_REQUESTS);
} ongoing_grpc_server_requests SEC(".maps");

// To be Injected from the user space during the eBPF program load & initialization

volatile const u64 grpc_stream_st_ptr_pos;
Expand All @@ -46,19 +67,31 @@ volatile const u64 grpc_client_target_ptr_pos;
volatile const u64 grpc_stream_ctx_ptr_pos;
volatile const u64 value_context_val_ptr_pos;


SEC("uprobe/server_handleStream")
int uprobe_server_handleStream(struct pt_regs *ctx) {
bpf_dbg_printk("=== uprobe/server_handleStream === ");
void *goroutine_addr = GOROUTINE_PTR(ctx);
bpf_dbg_printk("goroutine_addr %lx", goroutine_addr);

func_invocation invocation = {
void *stream_ptr = GO_PARAM4(ctx);

grpc_srv_func_invocation_t invocation = {
.start_monotime_ns = bpf_ktime_get_ns(),
.regs = *ctx
.stream = (u64)stream_ptr,
.tp = {0}
};

if (bpf_map_update_elem(&ongoing_server_requests, &goroutine_addr, &invocation, BPF_ANY)) {
if (stream_ptr) {
void *ctx_ptr = 0;
// Read the embedded context object ptr
bpf_probe_read(&ctx_ptr, sizeof(ctx_ptr), (void *)(stream_ptr + grpc_stream_ctx_ptr_pos + sizeof(void *)));

if (ctx_ptr) {
server_trace_parent(goroutine_addr, &invocation.tp, (void *)(ctx_ptr + value_context_val_ptr_pos + sizeof(void *)));
}
}

if (bpf_map_update_elem(&ongoing_grpc_server_requests, &goroutine_addr, &invocation, BPF_ANY)) {
bpf_dbg_printk("can't update grpc map element");
}

Expand All @@ -72,9 +105,9 @@ int uprobe_server_handleStream_return(struct pt_regs *ctx) {
void *goroutine_addr = GOROUTINE_PTR(ctx);
bpf_dbg_printk("goroutine_addr %lx", goroutine_addr);

func_invocation *invocation =
bpf_map_lookup_elem(&ongoing_server_requests, &goroutine_addr);
bpf_map_delete_elem(&ongoing_server_requests, &goroutine_addr);
grpc_srv_func_invocation_t *invocation =
bpf_map_lookup_elem(&ongoing_grpc_server_requests, &goroutine_addr);
bpf_map_delete_elem(&ongoing_grpc_server_requests, &goroutine_addr);
if (invocation == NULL) {
bpf_dbg_printk("can't read grpc invocation metadata");
return 0;
Expand All @@ -87,7 +120,7 @@ int uprobe_server_handleStream_return(struct pt_regs *ctx) {
return 0;
}

void *stream_ptr = GO_PARAM4(&(invocation->regs));
void *stream_ptr = (void *)invocation->stream;
bpf_dbg_printk("stream_ptr %lx, method pos %lx", stream_ptr, grpc_stream_method_ptr_pos);

http_request_trace *trace = bpf_ringbuf_reserve(&events, sizeof(http_request_trace), 0);
Expand All @@ -97,7 +130,6 @@ int uprobe_server_handleStream_return(struct pt_regs *ctx) {
}
task_pid(&trace->pid);
trace->type = EVENT_GRPC_REQUEST;
trace->id = (u64)goroutine_addr;
trace->start_monotime_ns = invocation->start_monotime_ns;
trace->status = *status;

Expand Down Expand Up @@ -151,17 +183,7 @@ int uprobe_server_handleStream_return(struct pt_regs *ctx) {
}
}

void *ctx_ptr = 0;
// Read the embedded context object ptr
bpf_probe_read(&ctx_ptr, sizeof(ctx_ptr), (void *)(stream_ptr + grpc_stream_ctx_ptr_pos + sizeof(void *)));

if (ctx_ptr) {
void *tp_ptr = extract_traceparent_from_req_headers((void *)(ctx_ptr + value_context_val_ptr_pos + sizeof(void *)));
if (tp_ptr) {
bpf_probe_read(trace->traceparent, sizeof(trace->traceparent), tp_ptr);
bpf_dbg_printk("traceparent %s", trace->traceparent);
}
}
trace->tp = invocation->tp;

trace->end_monotime_ns = bpf_ktime_get_ns();
// submit the completed trace via ringbuffer
Expand Down Expand Up @@ -198,17 +220,24 @@ int uprobe_transport_writeStatus(struct pt_regs *ctx) {
}

/* GRPC client */

SEC("uprobe/ClientConn_Invoke")
int uprobe_ClientConn_Invoke(struct pt_regs *ctx) {
bpf_dbg_printk("=== uprobe/proc grpc ClientConn.Invoke === ");

void *goroutine_addr = GOROUTINE_PTR(ctx);
bpf_dbg_printk("goroutine_addr %lx", goroutine_addr);

func_invocation invocation = {
void *cc_ptr = GO_PARAM1(ctx);
void *ctx_ptr = GO_PARAM3(ctx);
void *method_ptr = GO_PARAM4(ctx);
void *method_len = GO_PARAM5(ctx);

grpc_client_func_invocation_t invocation = {
.start_monotime_ns = bpf_ktime_get_ns(),
.regs = *ctx,
.cc = (u64)cc_ptr,
.ctx = (u64)ctx_ptr,
.method = (u64)method_ptr,
.method_len = (u64)method_len,
};

// Write event
Expand All @@ -226,7 +255,7 @@ int uprobe_ClientConn_Invoke_return(struct pt_regs *ctx) {
void *goroutine_addr = GOROUTINE_PTR(ctx);
bpf_dbg_printk("goroutine_addr %lx", goroutine_addr);

func_invocation *invocation =
grpc_client_func_invocation_t *invocation =
bpf_map_lookup_elem(&ongoing_grpc_client_requests, &goroutine_addr);
bpf_map_delete_elem(&ongoing_grpc_client_requests, &goroutine_addr);

Expand All @@ -242,7 +271,6 @@ int uprobe_ClientConn_Invoke_return(struct pt_regs *ctx) {
}

task_pid(&trace->pid);
trace->id = find_parent_goroutine(goroutine_addr);
trace->type = EVENT_GRPC_CLIENT;
trace->start_monotime_ns = invocation->start_monotime_ns;
trace->go_start_monotime_ns = invocation->start_monotime_ns;
Expand All @@ -251,9 +279,9 @@ int uprobe_ClientConn_Invoke_return(struct pt_regs *ctx) {
// Read arguments from the original set of registers

// Get client request value pointers
void *cc_ptr = GO_PARAM1(&(invocation->regs));
void *method_ptr = GO_PARAM4(&(invocation->regs));
void *method_len = GO_PARAM5(&(invocation->regs));
void *cc_ptr = (void *)invocation->cc;
void *method_ptr = (void *)invocation->method;
void *method_len = (void *)invocation->method_len;
void *err = (void *)GO_PARAM1(ctx);

bpf_dbg_printk("method ptr = %lx, method_len = %d", method_ptr, method_len);
Expand All @@ -272,19 +300,26 @@ int uprobe_ClientConn_Invoke_return(struct pt_regs *ctx) {
return 0;
}

void *ctx_ptr = GO_PARAM3(&(invocation->regs));
void *val_ptr = 0;
// Read the embedded val object ptr from ctx
bpf_probe_read(&val_ptr, sizeof(val_ptr), (void *)(ctx_ptr + value_context_val_ptr_pos + sizeof(void *)));
void *ctx_ptr = (void *)invocation->ctx;

tp_info_t tp = {0};

if (val_ptr) {
void *tp_ptr = extract_traceparent_from_req_headers((void *)(val_ptr)); // embedded metadata.rawMD is at 0 offset
if (tp_ptr) {
bpf_probe_read(trace->traceparent, sizeof(trace->traceparent), tp_ptr);
bpf_dbg_printk("traceparent %s", trace->traceparent);
if (ctx_ptr) {
void *val_ptr = 0;
// Read the embedded val object ptr from ctx
bpf_probe_read(&val_ptr, sizeof(val_ptr), (void *)(ctx_ptr + value_context_val_ptr_pos + sizeof(void *)));

if (val_ptr) {
client_trace_parent(goroutine_addr, &tp, (void *)(val_ptr));
} else {
bpf_dbg_printk("No val_ptr %llx", val_ptr);
}
} else {
bpf_dbg_printk("No ctx_ptr %llx", ctx_ptr);
}

trace->tp = tp;

trace->status = (err) ? 2 : 0; // Getting the gRPC client status is complex, if there's an error we set Code.Unknown = 2

// submit the completed trace via ringbuffer
Expand Down
Loading