Skip to content

Commit

Permalink
Add ability to read trace id from goroutine labels of Go processes (#…
Browse files Browse the repository at this point in the history
…2574)

### Why?

It's already possible to attach trace ids to profiling data with
instrumented profilers, but it would be great if we could also create
the connection between distributed tracing and profiling data with
collection from eBPF. Well this is exactly that.

With the trace ID added to profiled stacks it's possible to view all
profiling data of a request in one flamegraph/iciclegraph, which makes
it much easier to identify bottlenecks of a request than having to view
it on a per process basis.

### What?

Attempt to read the `otel.traceid` goroutine label from Go processes.
Plus a flag to enable it, by default this will be disabled as it
significantly increases the amount of data produced.

### How?

Go processes store the current goroutine in thread local store. From
there this reads the `g` (aka goroutine) struct, then the `m` (the
actual operating system thread) of that goroutine, and finally `curg`
(current goroutine).

This chain is necessary because `getg().m.curg` points to the current
user g assigned to the thread (`curg == getg()` when not on the system
stack). `curg` may be nil if there is no user `g`, such as when running
in the scheduler.

### Test Plan

Tested with a test binary.

<img width="1891" alt="Screenshot 2024-03-01 at 10 11 34"
src="https://github.com/parca-dev/parca-agent/assets/4546722/c4d72e44-9bc5-4734-af0c-56d071f0abfa">
  • Loading branch information
kakkoyun authored Mar 4, 2024
2 parents 6d8b0aa + 2c47462 commit 6ce64ff
Show file tree
Hide file tree
Showing 10 changed files with 261 additions and 22 deletions.
2 changes: 1 addition & 1 deletion bpf/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ BPF_BUNDLE := $(OUT_DIR)/parca-agent.bpf.tar.gz
LIBBPF_HEADERS := $(OUT_DIR)/libbpf/$(ARCH)/usr/include

VMLINUX_INCLUDE_PATH := $(SHORT_ARCH)
BPF_SRC := unwinders/native.bpf.c
BPF_SRC := unwinders/native.bpf.c unwinders/go_traceid.h
RBPERF_SRC := unwinders/rbperf.bpf.c
PYPERF_SRC := unwinders/pyperf.bpf.c
OUT_PID_NAMESPACE_DETECTOR_SRC := pid_namespace.bpf.c
Expand Down
199 changes: 199 additions & 0 deletions bpf/unwinders/go_traceid.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
// +build ignore
// ^^ this is a golang build tag meant to exclude this C file from compilation
// by the CGO compiler
//
// SPDX-License-Identifier: GPL-2.0-only
// Copyright 2024 The Parca Authors

#include "vmlinux.h"
#include "basic_types.h"
#include <bpf/bpf_helpers.h>
#include <bpf/bpf_core_read.h>
#include "tls.h"

struct go_string
{
char *str;
s64 len;
};

struct go_slice
{
void *array;
s64 len;
s64 cap;
};

struct map_bucket {
char tophash[8];
struct go_string keys[8];
struct go_string values[8];
void *overflow;
};

struct
{
__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
__uint(key_size, sizeof(u32));
__uint(value_size, sizeof(struct map_bucket));
__uint(max_entries, 1);
} golang_mapbucket_storage_map SEC(".maps");

// length of "otel.traceid" is 12
#define TRACEID_MAP_KEY_LENGTH 12
#define TRACEID_MAP_VAL_LENGTH 32
#define MAX_BUCKETS 8

static __always_inline bool bpf_memcmp(char *s1, char *s2, s32 size)
{
for (int i = 0; i < size; i++)
{
if (s1[i] != s2[i])
{
return false;
}
}

return true;
}

static __always_inline void hex_string_to_bytes(char *str, u32 size, unsigned char *out)
{
for (int i = 0; i < (size / 2); i++)
{
char ch0 = str[2 * i];
char ch1 = str[2 * i + 1];
u8 nib0 = (ch0 & 0xF) + (ch0 >> 6) | ((ch0 >> 3) & 0x8);
u8 nib1 = (ch1 & 0xF) + (ch1 >> 6) | ((ch1 >> 3) & 0x8);
out[i] = (nib0 << 4) | nib1;
}
}

// Go processes store the current goroutine in thread local store. From there
// this reads the g (aka goroutine) struct, then the m (the actual operating
// system thread) of that goroutine, and finally curg (current goroutine). This
// chain is necessary because getg().m.curg points to the current user g
// assigned to the thread (curg == getg() when not on the system stack). curg
// may be nil if there is no user g, such as when running in the scheduler. If
// curg is nil, then g is either a system stack (called g0) or a signal handler
// g (gsignal). Neither one will ever have labels.
static __always_inline bool get_trace_id(unsigned char *res_trace_id) {
long res;
struct task_struct *task = (struct task_struct *)bpf_get_current_task();
if (task == NULL) {
return false;
}

// It appears from all Go binaries we looked at 0xfffffffffffffff8 is the offset of `runtime.g`.
u64 g_addr_offset = 0xfffffffffffffff8;

size_t g_addr;
res = bpf_probe_read_user(&g_addr, sizeof(void *), (void*)(read_tls_base(task)+g_addr_offset));
if (res < 0) {
return false;
}

// DW_TAG_member
// DW_AT_name ("m")
// DW_AT_data_member_location (48)
// DW_AT_type (0x0000000000088e39 "runtime.m *")
// DW_AT_GO_embedded_field (0x00)
size_t m_ptr_addr;
res = bpf_probe_read_user(&m_ptr_addr, sizeof(void *), (void*)(g_addr+48));
if (res < 0) {
return false;
}

// DW_TAG_member
// DW_AT_name ("curg")
// DW_AT_data_member_location (192)
// DW_AT_type (0x00000000000892b1 "runtime.g *")
// DW_AT_GO_embedded_field (0x00)
size_t curg_ptr_addr;
res = bpf_probe_read_user(&curg_ptr_addr, sizeof(void *), (void*)(m_ptr_addr+192));
if (res < 0) {
return false;
}

// DW_TAG_member
// DW_AT_name ("labels")
// DW_AT_data_member_location (360)
// DW_AT_type (0x000000000005c242 "void *")
// DW_AT_GO_embedded_field (0x00)
void *labels_map_ptr_ptr;
res = bpf_probe_read_user(&labels_map_ptr_ptr, sizeof(void *), (void*)(curg_ptr_addr+360));
if (res < 0) {
return false;
}

void *labels_map_ptr;
res = bpf_probe_read(&labels_map_ptr, sizeof(labels_map_ptr), labels_map_ptr_ptr);
if (res < 0) {
return false;
}

u64 labels_count = 0;
res = bpf_probe_read(&labels_count, sizeof(labels_count), labels_map_ptr);
if (res < 0) {
return false;
}
if (labels_count == 0) {
return false;
}

unsigned char log_2_bucket_count;
res = bpf_probe_read(&log_2_bucket_count, sizeof(log_2_bucket_count), labels_map_ptr + 9);
if (res < 0) {
return false;
}
u64 bucket_count = 1 << log_2_bucket_count;
void *label_buckets;
res = bpf_probe_read(&label_buckets, sizeof(label_buckets), labels_map_ptr + 16);
if (res < 0) {
return false;
}

u32 map_id = 0;
// This needs to be allocated in a per-cpu map, because it's too large and
// can't be allocated on the stack (which is limited to 512 bytes in bpf).
struct map_bucket *map_value = bpf_map_lookup_elem(&golang_mapbucket_storage_map, &map_id);
if (!map_value) {
return NULL;
}

for (u64 j = 0; j < MAX_BUCKETS; j++) {
if (j >= bucket_count) {
break;
}
res = bpf_probe_read(map_value, sizeof(struct map_bucket), label_buckets + (j * sizeof(struct map_bucket)));
if (res < 0) {
continue;
}
for (u64 i = 0; i < 8; i++) {
if (map_value->tophash[i] == 0) {
continue;
}
if (map_value->keys[i].len != TRACEID_MAP_KEY_LENGTH) {
continue;
}

char current_label_key[TRACEID_MAP_KEY_LENGTH];
bpf_probe_read(current_label_key, sizeof(current_label_key), map_value->keys[i].str);
if (!bpf_memcmp(current_label_key, "otel.traceid", TRACEID_MAP_KEY_LENGTH)) {
continue;
}

if (map_value->values[i].len != TRACEID_MAP_VAL_LENGTH) {
continue;
}

char trace_id[TRACEID_MAP_VAL_LENGTH];
bpf_probe_read(trace_id, TRACEID_MAP_VAL_LENGTH, map_value->values[i].str);

hex_string_to_bytes(trace_id, TRACEID_MAP_VAL_LENGTH, res_trace_id);
return true;
}
}

return false;
}
10 changes: 8 additions & 2 deletions bpf/unwinders/native.bpf.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <bpf/bpf_helpers.h>
#include <bpf/bpf_tracing.h>
#include "shared.h"
#include "go_traceid.h"

/*================================ CONSTANTS =================================*/
// Programs.
Expand Down Expand Up @@ -119,10 +120,10 @@ struct unwinder_config_t {
bool mixed_stack_enabled;
bool python_enabled;
bool ruby_enabled;
/* 3 byte of padding */
bool collect_trace_id;
/* 2 byte of padding */
bool _padding1;
bool _padding2;
bool _padding3;
u32 rate_limit_unwind_info;
u32 rate_limit_process_mappings;
u32 rate_limit_refresh_process_info;
Expand Down Expand Up @@ -666,6 +667,10 @@ static __always_inline void add_stack(struct bpf_perf_event_data *ctx, u64 pid_t
stack_key->pid = per_process_id;
stack_key->tgid = per_thread_id;

if (unwinder_config.collect_trace_id) {
get_trace_id(stack_key->trace_id);
}

// Hash and add user stack.
u64 user_stack_id = hash_stack(&unwind_state->stack, 0);
stack_key->user_stack_id = user_stack_id;
Expand Down Expand Up @@ -1110,6 +1115,7 @@ static __always_inline bool set_initial_state(struct bpf_perf_event_data *ctx) {
unwind_state->stack_key.user_stack_id = 0;
unwind_state->stack_key.kernel_stack_id = 0;
unwind_state->stack_key.interpreter_stack_id = 0;
__builtin_memset(unwind_state->stack_key.trace_id, 0, 16);

u64 ip = 0;
u64 sp = 0;
Expand Down
15 changes: 1 addition & 14 deletions bpf/unwinders/pyperf.bpf.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

#include "hash.h"
#include "shared.h"
#include "tls.h"

//
// ╔═════════════════════════════════════════════════════════════════════════╗
Expand Down Expand Up @@ -151,20 +152,6 @@ static inline __attribute__((__always_inline__)) int tls_read(void *tls_base, In
return 0;
}

static inline __attribute__((__always_inline__)) long unsigned int read_tls_base(struct task_struct *task) {
long unsigned int tls_base;
// This changes depending on arch and kernel version.
// task->thread.fs, task->thread.uw.tp_value, etc.
#if __TARGET_ARCH_x86
tls_base = BPF_CORE_READ(task, thread.fsbase);
#elif __TARGET_ARCH_arm64
tls_base = BPF_CORE_READ(task, thread.uw.tp_value);
#else
#error "Unsupported platform"
#endif
return tls_base;
}

//
// ╔═════════════════════════════════════════════════════════════════════════╗
// ║ BPF Programs ║
Expand Down
1 change: 1 addition & 0 deletions bpf/unwinders/shared.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ typedef struct {
u64 user_stack_id;
u64 kernel_stack_id;
u64 interpreter_stack_id;
unsigned char trace_id[16];
} stack_count_key_t;

typedef struct {
Expand Down
24 changes: 24 additions & 0 deletions bpf/unwinders/tls.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// +build ignore
// ^^ this is a golang build tag meant to exclude this C file from compilation
// by the CGO compiler
//
// SPDX-License-Identifier: GPL-2.0-only
// Copyright 2024 The Parca Authors

#include "vmlinux.h"

#include <bpf/bpf_core_read.h>

static inline __attribute__((__always_inline__)) long unsigned int read_tls_base(struct task_struct *task) {
long unsigned int tls_base;
// This changes depending on arch and kernel version.
// task->thread.fs, task->thread.uw.tp_value, etc.
#if __TARGET_ARCH_x86
tls_base = BPF_CORE_READ(task, thread.fsbase);
#elif __TARGET_ARCH_arm64
tls_base = BPF_CORE_READ(task, thread.uw.tp_value);
#else
#error "Unsupported platform"
#endif
return tls_base;
}
3 changes: 3 additions & 0 deletions cmd/parca-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ type flags struct {
PythonUnwindingDisable bool `default:"false" help:"Disable Python unwinder."`
RubyUnwindingDisable bool `default:"false" help:"Disable Ruby unwinder."`

CollectTraceID bool `default:"false" help:"Attempt to collect trace ID from the process."`

AnalyticsOptOut bool `default:"false" help:"Opt out of sending anonymous usage statistics."`

Telemetry FlagsTelemetry `embed:"" prefix:"telemetry-"`
Expand Down Expand Up @@ -962,6 +964,7 @@ func run(logger log.Logger, reg *prometheus.Registry, flags flags, numCPU int) e
RateLimitUnwindInfo: flags.Hidden.RateLimitUnwindInfo,
RateLimitProcessMappings: flags.Hidden.RateLimitProcessMappings,
RateLimitRefreshProcessInfo: flags.Hidden.RateLimitRefreshProcessInfo,
CollectTraceID: flags.CollectTraceID,
},
bpfProgramLoaded,
ofp,
Expand Down
13 changes: 13 additions & 0 deletions pkg/pprof/pprof.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package pprof

import (
"context"
"encoding/hex"
"errors"
"io/fs"
"strconv"
Expand Down Expand Up @@ -174,6 +175,15 @@ const (
threadNameLabel = "thread_name"
)

func isNonEmptyTraceID(traceID [16]byte) bool {
for _, b := range traceID {
if b != 0 {
return true
}
}
return false
}

// Convert converts a profile to a pprof profile. It is intended to only be
// used once.
func (c *Converter) Convert(ctx context.Context, rawData []profile.RawSample) (*pprofprofile.Profile, []*profilestorepb.ExecutableInfo, error) {
Expand Down Expand Up @@ -254,6 +264,9 @@ func (c *Converter) Convert(ctx context.Context, rawData []profile.RawSample) (*
if threadName != "" {
pprofSample.Label[threadNameLabel] = append(pprofSample.Label[threadNameLabel], threadName)
}
if isNonEmptyTraceID(sample.TraceID) {
pprofSample.Label["trace_id"] = append(pprofSample.Label["trace_id"], hex.EncodeToString(sample.TraceID[:]))
}

c.result.Sample = append(c.result.Sample, pprofSample)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/profile/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type RawSample struct {
// frame.
InterpreterStack []uint64
Value uint64
TraceID [16]byte
}

type RawData []ProcessRawData
Expand Down
Loading

0 comments on commit 6ce64ff

Please sign in to comment.