Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More fixes for Go connection fetching #1036

Merged
merged 20 commits into from
Jul 20, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion bpf/go_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ struct {
__uint(type, BPF_MAP_TYPE_LRU_HASH);
__type(key, void *); // key: pointer to the request goroutine
__type(value, connection_info_t);
__uint(max_entries, MAX_CONCURRENT_REQUESTS);
__uint(max_entries, MAX_CONCURRENT_SHARED_REQUESTS);
__uint(pinning, LIBBPF_PIN_BY_NAME);
} ongoing_server_connections SEC(".maps");

struct {
Expand Down Expand Up @@ -273,4 +274,19 @@ static __always_inline u8 get_conn_info(void *conn_ptr, connection_info_t *info)
return 0;
}

static __always_inline void* unwrap_tls_conn_info(void *conn_ptr, void *tls_state) {
if (conn_ptr && tls_state) {
void *c_ptr = 0;
bpf_probe_read(&c_ptr, sizeof(c_ptr), (void *)(conn_ptr)); // unwrap conn

bpf_dbg_printk("unwrapped conn ptr %llx", c_ptr);

if (c_ptr) {
return c_ptr + 8;
}
}

return conn_ptr;
}

#endif // GO_COMMON_H
145 changes: 127 additions & 18 deletions bpf/go_grpc.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,25 @@ typedef struct grpc_client_func_invocation {
u64 flags;
} grpc_client_func_invocation_t;

typedef struct grpc_transports {
u8 type;
connection_info_t conn;
} grpc_transports_t;

struct {
__uint(type, BPF_MAP_TYPE_LRU_HASH);
__type(key, void *); // key: pointer to the transport pointer
__type(value, grpc_transports_t);
__uint(max_entries, MAX_CONCURRENT_REQUESTS);
} ongoing_grpc_transports SEC(".maps");

struct {
__uint(type, BPF_MAP_TYPE_LRU_HASH);
__type(key, void *); // key: goroutine
__type(value, void *); // the transport *
__uint(max_entries, MAX_CONCURRENT_REQUESTS);
} ongoing_grpc_operate_headers SEC(".maps");

struct {
__uint(type, BPF_MAP_TYPE_LRU_HASH);
__type(key, void *); // key: pointer to the request goroutine
Expand Down Expand Up @@ -72,6 +91,9 @@ struct {
} ongoing_grpc_header_writes SEC(".maps");


#define TRANSPORT_HTTP2 1
#define TRANSPORT_HANDLER 2

// To be Injected from the user space during the eBPF program load & initialization

volatile const u64 grpc_stream_st_ptr_pos;
Expand All @@ -84,6 +106,7 @@ volatile const u64 grpc_stream_ctx_ptr_pos;
volatile const u64 value_context_val_ptr_pos;
volatile const u64 grpc_st_conn_pos;
volatile const u64 grpc_t_conn_pos;
volatile const u64 grpc_t_scheme_pos;

// Context propagation
volatile const u64 http2_client_next_id_pos;
Expand All @@ -104,7 +127,7 @@ int uprobe_server_handleStream(struct pt_regs *ctx) {
grpc_srv_func_invocation_t invocation = {
.start_monotime_ns = bpf_ktime_get_ns(),
.stream = (u64)stream_ptr,
.tp = {0}
.tp = {0},
};

if (stream_ptr) {
Expand All @@ -124,6 +147,69 @@ int uprobe_server_handleStream(struct pt_regs *ctx) {
return 0;
}

// Sets up the connection info to be grabbed and mapped over the transport to operateHeaders
SEC("uprobe/netFdReadGRPC")
int uprobe_netFdReadGRPC(struct pt_regs *ctx) {
void *goroutine_addr = GOROUTINE_PTR(ctx);
bpf_dbg_printk("=== uprobe/proc netFD read goroutine %lx === ", goroutine_addr);

void *tr = bpf_map_lookup_elem(&ongoing_grpc_operate_headers, &goroutine_addr);
bpf_dbg_printk("tr %llx", tr);
if (tr) {
grpc_transports_t *t = bpf_map_lookup_elem(&ongoing_grpc_transports, tr);
bpf_dbg_printk("t %llx", t);
if (t) {
void *fd_ptr = GO_PARAM1(ctx);
get_conn_info_from_fd(fd_ptr, &t->conn); // ok to not check the result, we leave it as 0
}
}

return 0;
}

// Handles finding the connection information for http2 servers in grpc
SEC("uprobe/http2Server_operateHeaders")
int uprobe_http2Server_operateHeaders(struct pt_regs *ctx) {
void *goroutine_addr = GOROUTINE_PTR(ctx);
void *tr = GO_PARAM1(ctx);
bpf_dbg_printk("=== uprobe/http2Server_operateHeaders tr %llx goroutine %lx === ", tr, goroutine_addr);

grpc_transports_t t = {
.type = TRANSPORT_HTTP2,
.conn = {0},
};

bpf_map_update_elem(&ongoing_grpc_operate_headers, &goroutine_addr, &tr, BPF_ANY);
bpf_map_update_elem(&ongoing_grpc_transports, &tr, &t, BPF_ANY);

return 0;
}

// Handles finding the connection information for grpc ServeHTTP
SEC("uprobe/serverHandlerTransport_HandleStreams")
int uprobe_server_handler_transport_handle_streams(struct pt_regs *ctx) {
void *tr = GO_PARAM1(ctx);
void *goroutine_addr = GOROUTINE_PTR(ctx);
bpf_printk("=== uprobe/serverHandlerTransport_HandleStreams tr %llx goroutine %lx === ", tr, goroutine_addr);

void *parent_go = (void *)find_parent_goroutine(goroutine_addr);
if (parent_go) {
bpf_dbg_printk("found parent goroutine for transport handler [%llx]", parent_go);
connection_info_t *conn = bpf_map_lookup_elem(&ongoing_server_connections, &parent_go);
bpf_dbg_printk("conn %llx", conn);
if (conn) {
grpc_transports_t t = {
.type = TRANSPORT_HANDLER,
};
__builtin_memcpy(&t.conn, conn, sizeof(connection_info_t));

bpf_map_update_elem(&ongoing_grpc_transports, &tr, &t, BPF_ANY);
}
}

return 0;
}

SEC("uprobe/server_handleStream")
int uprobe_server_handleStream_return(struct pt_regs *ctx) {
bpf_dbg_printk("=== uprobe/server_handleStream return === ");
Expand All @@ -138,10 +224,11 @@ int uprobe_server_handleStream_return(struct pt_regs *ctx) {
goto done;
}

u16 *status = bpf_map_lookup_elem(&ongoing_grpc_request_status, &goroutine_addr);
if (status == NULL) {
u16 *status_ptr = bpf_map_lookup_elem(&ongoing_grpc_request_status, &goroutine_addr);
u16 status = 0;
if (status_ptr != NULL) {
bpf_dbg_printk("can't read grpc invocation status");
goto done;
status = *status_ptr;
}

void *stream_ptr = (void *)invocation->stream;
Expand All @@ -155,7 +242,7 @@ int uprobe_server_handleStream_return(struct pt_regs *ctx) {
task_pid(&trace->pid);
trace->type = EVENT_GRPC_REQUEST;
trace->start_monotime_ns = invocation->start_monotime_ns;
trace->status = *status;
trace->status = status;
trace->content_length = 0;
trace->method[0] = 0;

Expand All @@ -181,19 +268,18 @@ int uprobe_server_handleStream_return(struct pt_regs *ctx) {

bpf_dbg_printk("st_ptr %llx", st_ptr);
if (st_ptr) {
void *conn_ptr = st_ptr + grpc_st_conn_pos;
bpf_dbg_printk("conn_ptr %llx", conn_ptr);
if (conn_ptr) {
void *conn_conn_ptr = 0;
bpf_probe_read(&conn_conn_ptr, sizeof(conn_conn_ptr), conn_ptr + 8);
bpf_dbg_printk("conn_conn_ptr %llx", conn_conn_ptr);
if (conn_conn_ptr) {
found_conn = get_conn_info(conn_conn_ptr, &trace->conn);
}
}
grpc_transports_t *t = bpf_map_lookup_elem(&ongoing_grpc_transports, &st_ptr);

bpf_dbg_printk("found t %llx", t);
if (t) {
bpf_dbg_printk("setting up connection info from grpc handler");
__builtin_memcpy(&trace->conn, &t->conn, sizeof(connection_info_t));
found_conn = 1;
}
}

if (!found_conn) {
bpf_dbg_printk("can't find connection info for st_ptr %llx", st_ptr);
__builtin_memset(&trace->conn, 0, sizeof(connection_info_t));
}

Expand Down Expand Up @@ -422,11 +508,29 @@ int uprobe_transport_http2Client_NewStream(struct pt_regs *ctx) {
bpf_dbg_printk("goroutine_addr %lx, t_ptr %llx, t.conn_pos %x", goroutine_addr, t_ptr, grpc_t_conn_pos);

if (t_ptr) {
void *conn_ptr = t_ptr + grpc_t_conn_pos;
bpf_dbg_printk("conn_ptr %llx", conn_ptr);
void *conn_ptr = t_ptr + grpc_t_conn_pos + 8;
u8 buf[16];
u64 is_secure = 0;

if (!read_go_str("transport scheme", t_ptr, grpc_t_scheme_pos, &buf, sizeof(buf))) {
bpf_dbg_printk("can't read grpc transport.Stream.Method");
}

bpf_dbg_printk("scheme %s", buf);

if (buf[0] == 'h' && buf[1] == 't' && buf[2] == 't' && buf[3] == 'p' && buf[4] == 's') {
is_secure = 1;
}

if (is_secure) {
// double wrapped in grpc
conn_ptr = unwrap_tls_conn_info(conn_ptr, (void *)is_secure);
conn_ptr = unwrap_tls_conn_info(conn_ptr, (void *)is_secure);
}
bpf_dbg_printk("conn_ptr %llx is_secure %lld", conn_ptr, is_secure);
if (conn_ptr) {
void *conn_conn_ptr = 0;
bpf_probe_read(&conn_conn_ptr, sizeof(conn_conn_ptr), conn_ptr + 8);
bpf_probe_read(&conn_conn_ptr, sizeof(conn_conn_ptr), conn_ptr);
bpf_dbg_printk("conn_conn_ptr %llx", conn_conn_ptr);
if (conn_conn_ptr) {
connection_info_t conn = {0};
Expand Down Expand Up @@ -480,6 +584,11 @@ SEC("uprobe/grpcFramerWriteHeaders")
int uprobe_grpcFramerWriteHeaders(struct pt_regs *ctx) {
bpf_dbg_printk("=== uprobe/proc grpc Framer writeHeaders === ");

if (framer_w_pos == 0) {
bpf_dbg_printk("framer w not found");
return 0;
}

void *framer = GO_PARAM1(ctx);
u64 stream_id = (u64)GO_PARAM2(ctx);

Expand Down
Loading
Loading