Skip to content

Commit

Permalink
More fixes for Go connection fetching (#1036)
Browse files Browse the repository at this point in the history
  • Loading branch information
grcevski authored Jul 20, 2024
1 parent c8e6a58 commit 805db03
Show file tree
Hide file tree
Showing 74 changed files with 913 additions and 337 deletions.
18 changes: 17 additions & 1 deletion bpf/go_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ struct {
__uint(type, BPF_MAP_TYPE_LRU_HASH);
__type(key, void *); // key: pointer to the request goroutine
__type(value, connection_info_t);
__uint(max_entries, MAX_CONCURRENT_REQUESTS);
__uint(max_entries, MAX_CONCURRENT_SHARED_REQUESTS);
__uint(pinning, LIBBPF_PIN_BY_NAME);
} ongoing_server_connections SEC(".maps");

struct {
Expand Down Expand Up @@ -273,4 +274,19 @@ static __always_inline u8 get_conn_info(void *conn_ptr, connection_info_t *info)
return 0;
}

static __always_inline void* unwrap_tls_conn_info(void *conn_ptr, void *tls_state) {
if (conn_ptr && tls_state) {
void *c_ptr = 0;
bpf_probe_read(&c_ptr, sizeof(c_ptr), (void *)(conn_ptr)); // unwrap conn

bpf_dbg_printk("unwrapped conn ptr %llx", c_ptr);

if (c_ptr) {
return c_ptr + 8;
}
}

return conn_ptr;
}

#endif // GO_COMMON_H
145 changes: 127 additions & 18 deletions bpf/go_grpc.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,25 @@ typedef struct grpc_client_func_invocation {
u64 flags;
} grpc_client_func_invocation_t;

typedef struct grpc_transports {
u8 type;
connection_info_t conn;
} grpc_transports_t;

struct {
__uint(type, BPF_MAP_TYPE_LRU_HASH);
__type(key, void *); // key: pointer to the transport pointer
__type(value, grpc_transports_t);
__uint(max_entries, MAX_CONCURRENT_REQUESTS);
} ongoing_grpc_transports SEC(".maps");

struct {
__uint(type, BPF_MAP_TYPE_LRU_HASH);
__type(key, void *); // key: goroutine
__type(value, void *); // the transport *
__uint(max_entries, MAX_CONCURRENT_REQUESTS);
} ongoing_grpc_operate_headers SEC(".maps");

struct {
__uint(type, BPF_MAP_TYPE_LRU_HASH);
__type(key, void *); // key: pointer to the request goroutine
Expand Down Expand Up @@ -72,6 +91,9 @@ struct {
} ongoing_grpc_header_writes SEC(".maps");


#define TRANSPORT_HTTP2 1
#define TRANSPORT_HANDLER 2

// To be Injected from the user space during the eBPF program load & initialization

volatile const u64 grpc_stream_st_ptr_pos;
Expand All @@ -84,6 +106,7 @@ volatile const u64 grpc_stream_ctx_ptr_pos;
volatile const u64 value_context_val_ptr_pos;
volatile const u64 grpc_st_conn_pos;
volatile const u64 grpc_t_conn_pos;
volatile const u64 grpc_t_scheme_pos;

// Context propagation
volatile const u64 http2_client_next_id_pos;
Expand All @@ -104,7 +127,7 @@ int uprobe_server_handleStream(struct pt_regs *ctx) {
grpc_srv_func_invocation_t invocation = {
.start_monotime_ns = bpf_ktime_get_ns(),
.stream = (u64)stream_ptr,
.tp = {0}
.tp = {0},
};

if (stream_ptr) {
Expand All @@ -124,6 +147,69 @@ int uprobe_server_handleStream(struct pt_regs *ctx) {
return 0;
}

// Sets up the connection info to be grabbed and mapped over the transport to operateHeaders
SEC("uprobe/netFdReadGRPC")
int uprobe_netFdReadGRPC(struct pt_regs *ctx) {
void *goroutine_addr = GOROUTINE_PTR(ctx);
bpf_dbg_printk("=== uprobe/proc netFD read goroutine %lx === ", goroutine_addr);

void *tr = bpf_map_lookup_elem(&ongoing_grpc_operate_headers, &goroutine_addr);
bpf_dbg_printk("tr %llx", tr);
if (tr) {
grpc_transports_t *t = bpf_map_lookup_elem(&ongoing_grpc_transports, tr);
bpf_dbg_printk("t %llx", t);
if (t) {
void *fd_ptr = GO_PARAM1(ctx);
get_conn_info_from_fd(fd_ptr, &t->conn); // ok to not check the result, we leave it as 0
}
}

return 0;
}

// Handles finding the connection information for http2 servers in grpc
SEC("uprobe/http2Server_operateHeaders")
int uprobe_http2Server_operateHeaders(struct pt_regs *ctx) {
void *goroutine_addr = GOROUTINE_PTR(ctx);
void *tr = GO_PARAM1(ctx);
bpf_dbg_printk("=== uprobe/http2Server_operateHeaders tr %llx goroutine %lx === ", tr, goroutine_addr);

grpc_transports_t t = {
.type = TRANSPORT_HTTP2,
.conn = {0},
};

bpf_map_update_elem(&ongoing_grpc_operate_headers, &goroutine_addr, &tr, BPF_ANY);
bpf_map_update_elem(&ongoing_grpc_transports, &tr, &t, BPF_ANY);

return 0;
}

// Handles finding the connection information for grpc ServeHTTP
SEC("uprobe/serverHandlerTransport_HandleStreams")
int uprobe_server_handler_transport_handle_streams(struct pt_regs *ctx) {
void *tr = GO_PARAM1(ctx);
void *goroutine_addr = GOROUTINE_PTR(ctx);
bpf_printk("=== uprobe/serverHandlerTransport_HandleStreams tr %llx goroutine %lx === ", tr, goroutine_addr);

void *parent_go = (void *)find_parent_goroutine(goroutine_addr);
if (parent_go) {
bpf_dbg_printk("found parent goroutine for transport handler [%llx]", parent_go);
connection_info_t *conn = bpf_map_lookup_elem(&ongoing_server_connections, &parent_go);
bpf_dbg_printk("conn %llx", conn);
if (conn) {
grpc_transports_t t = {
.type = TRANSPORT_HANDLER,
};
__builtin_memcpy(&t.conn, conn, sizeof(connection_info_t));

bpf_map_update_elem(&ongoing_grpc_transports, &tr, &t, BPF_ANY);
}
}

return 0;
}

SEC("uprobe/server_handleStream")
int uprobe_server_handleStream_return(struct pt_regs *ctx) {
bpf_dbg_printk("=== uprobe/server_handleStream return === ");
Expand All @@ -138,10 +224,11 @@ int uprobe_server_handleStream_return(struct pt_regs *ctx) {
goto done;
}

u16 *status = bpf_map_lookup_elem(&ongoing_grpc_request_status, &goroutine_addr);
if (status == NULL) {
u16 *status_ptr = bpf_map_lookup_elem(&ongoing_grpc_request_status, &goroutine_addr);
u16 status = 0;
if (status_ptr != NULL) {
bpf_dbg_printk("can't read grpc invocation status");
goto done;
status = *status_ptr;
}

void *stream_ptr = (void *)invocation->stream;
Expand All @@ -155,7 +242,7 @@ int uprobe_server_handleStream_return(struct pt_regs *ctx) {
task_pid(&trace->pid);
trace->type = EVENT_GRPC_REQUEST;
trace->start_monotime_ns = invocation->start_monotime_ns;
trace->status = *status;
trace->status = status;
trace->content_length = 0;
trace->method[0] = 0;

Expand All @@ -181,19 +268,18 @@ int uprobe_server_handleStream_return(struct pt_regs *ctx) {

bpf_dbg_printk("st_ptr %llx", st_ptr);
if (st_ptr) {
void *conn_ptr = st_ptr + grpc_st_conn_pos;
bpf_dbg_printk("conn_ptr %llx", conn_ptr);
if (conn_ptr) {
void *conn_conn_ptr = 0;
bpf_probe_read(&conn_conn_ptr, sizeof(conn_conn_ptr), conn_ptr + 8);
bpf_dbg_printk("conn_conn_ptr %llx", conn_conn_ptr);
if (conn_conn_ptr) {
found_conn = get_conn_info(conn_conn_ptr, &trace->conn);
}
}
grpc_transports_t *t = bpf_map_lookup_elem(&ongoing_grpc_transports, &st_ptr);

bpf_dbg_printk("found t %llx", t);
if (t) {
bpf_dbg_printk("setting up connection info from grpc handler");
__builtin_memcpy(&trace->conn, &t->conn, sizeof(connection_info_t));
found_conn = 1;
}
}

if (!found_conn) {
bpf_dbg_printk("can't find connection info for st_ptr %llx", st_ptr);
__builtin_memset(&trace->conn, 0, sizeof(connection_info_t));
}

Expand Down Expand Up @@ -422,11 +508,29 @@ int uprobe_transport_http2Client_NewStream(struct pt_regs *ctx) {
bpf_dbg_printk("goroutine_addr %lx, t_ptr %llx, t.conn_pos %x", goroutine_addr, t_ptr, grpc_t_conn_pos);

if (t_ptr) {
void *conn_ptr = t_ptr + grpc_t_conn_pos;
bpf_dbg_printk("conn_ptr %llx", conn_ptr);
void *conn_ptr = t_ptr + grpc_t_conn_pos + 8;
u8 buf[16];
u64 is_secure = 0;

if (!read_go_str("transport scheme", t_ptr, grpc_t_scheme_pos, &buf, sizeof(buf))) {
bpf_dbg_printk("can't read grpc transport.Stream.Method");
}

bpf_dbg_printk("scheme %s", buf);

if (buf[0] == 'h' && buf[1] == 't' && buf[2] == 't' && buf[3] == 'p' && buf[4] == 's') {
is_secure = 1;
}

if (is_secure) {
// double wrapped in grpc
conn_ptr = unwrap_tls_conn_info(conn_ptr, (void *)is_secure);
conn_ptr = unwrap_tls_conn_info(conn_ptr, (void *)is_secure);
}
bpf_dbg_printk("conn_ptr %llx is_secure %lld", conn_ptr, is_secure);
if (conn_ptr) {
void *conn_conn_ptr = 0;
bpf_probe_read(&conn_conn_ptr, sizeof(conn_conn_ptr), conn_ptr + 8);
bpf_probe_read(&conn_conn_ptr, sizeof(conn_conn_ptr), conn_ptr);
bpf_dbg_printk("conn_conn_ptr %llx", conn_conn_ptr);
if (conn_conn_ptr) {
connection_info_t conn = {0};
Expand Down Expand Up @@ -480,6 +584,11 @@ SEC("uprobe/grpcFramerWriteHeaders")
int uprobe_grpcFramerWriteHeaders(struct pt_regs *ctx) {
bpf_dbg_printk("=== uprobe/proc grpc Framer writeHeaders === ");

if (framer_w_pos == 0) {
bpf_dbg_printk("framer w not found");
return 0;
}

void *framer = GO_PARAM1(ctx);
u64 stream_id = (u64)GO_PARAM2(ctx);

Expand Down
Loading

0 comments on commit 805db03

Please sign in to comment.