From 053fb92580eebce18c7067e3484cd67c2e7011bf Mon Sep 17 00:00:00 2001 From: Nikola Grcevski Date: Mon, 3 Jun 2024 18:26:16 -0400 Subject: [PATCH 1/5] better sql detection --- .../ebpf/common/tcp_detect_transform.go | 29 ++++++++------ .../ebpf/common/tcp_detect_transform_test.go | 40 +++++++++++++------ 2 files changed, 44 insertions(+), 25 deletions(-) diff --git a/pkg/internal/ebpf/common/tcp_detect_transform.go b/pkg/internal/ebpf/common/tcp_detect_transform.go index c3e66e9e2..e418521f1 100644 --- a/pkg/internal/ebpf/common/tcp_detect_transform.go +++ b/pkg/internal/ebpf/common/tcp_detect_transform.go @@ -32,10 +32,10 @@ func ReadTCPRequestIntoSpan(record *ringbuf.Record) (request.Span, bool, error) buf := string(event.Buf[:l]) // Check if we have a SQL statement - sqlIndex := isSQL(buf) + op, table, sql := detectSQL(buf) switch { - case sqlIndex >= 0: - return TCPToSQLToSpan(&event, buf[sqlIndex:]), false, nil + case validSQL(op, table): + return TCPToSQLToSpan(&event, op, table, sql), false, nil case isHTTP2(b, &event): MisclassifiedEvents <- MisclassifiedEvent{EventType: EventTypeKHTTP2, TCPInfo: &event} case isRedis(event.Buf[:l]) && isRedis(event.Rbuf[:]): @@ -59,16 +59,23 @@ func ReadTCPRequestIntoSpan(record *ringbuf.Record) (request.Span, bool, error) return request.Span{}, true, nil // ignore if we couldn't parse it } -func isSQL(buf string) int { +func validSQL(op, table string) bool { + return op != "" && table != "" +} + +func detectSQL(buf string) (string, string, string) { b := asciiToUpper(buf) for _, q := range []string{"SELECT", "UPDATE", "DELETE", "INSERT", "ALTER", "CREATE", "DROP"} { i := strings.Index(b, q) if i >= 0 { - return i + sql := cstr([]uint8(b[i:])) + + op, table := sqlprune.SQLParseOperationAndTable(sql) + return op, table, sql } } - return -1 + return "", "", "" } // when the input string is invalid unicode (might happen with the ringbuffer @@ -95,11 +102,7 @@ func (trace *TCPRequestInfo) reqHostInfo() (source, target string) { return src.String(), dst.String() } -func TCPToSQLToSpan(trace *TCPRequestInfo, s string) request.Span { - sql := cstr([]uint8(s)) - - method, path := sqlprune.SQLParseOperationAndTable(sql) - +func TCPToSQLToSpan(trace *TCPRequestInfo, op, table, sql string) request.Span { peer := "" hostname := "" hostPort := 0 @@ -111,8 +114,8 @@ func TCPToSQLToSpan(trace *TCPRequestInfo, s string) request.Span { return request.Span{ Type: request.EventTypeSQLClient, - Method: method, - Path: path, + Method: op, + Path: table, Peer: peer, Host: hostname, HostPort: hostPort, diff --git a/pkg/internal/ebpf/common/tcp_detect_transform_test.go b/pkg/internal/ebpf/common/tcp_detect_transform_test.go index a47ebc0cd..5a45eb0a6 100644 --- a/pkg/internal/ebpf/common/tcp_detect_transform_test.go +++ b/pkg/internal/ebpf/common/tcp_detect_transform_test.go @@ -23,33 +23,49 @@ const ( func TestTCPReqSQLParsing(t *testing.T) { sql := randomStringWithSub("SELECT * FROM accounts ") r := makeTCPReq(sql, tcpSend, 343534, 8080, 2000) - sqlIndex := isSQL(sql) - assert.GreaterOrEqual(t, sqlIndex, 0) - s := TCPToSQLToSpan(&r, sql[sqlIndex:]) + op, table, sql := detectSQL(sql) + assert.Equal(t, op, "SELECT") + assert.Equal(t, table, "ACCOUNTS") + s := TCPToSQLToSpan(&r, op, table, sql) assert.NotNil(t, s) assert.NotEmpty(t, s.Host) assert.NotEmpty(t, s.Peer) assert.Equal(t, s.HostPort, 8080) assert.Greater(t, s.End, s.Start) - assert.True(t, strings.Contains(s.Statement, "SELECT * FROM accounts ")) + assert.True(t, strings.Contains(s.Statement, "SELECT * FROM ACCOUNTS ")) assert.Equal(t, "SELECT", s.Method) - assert.Equal(t, "accounts", s.Path) + assert.Equal(t, "ACCOUNTS", s.Path) assert.Equal(t, request.EventTypeSQLClient, s.Type) } func TestTCPReqParsing(t *testing.T) { sql := "Not a sql or any known protocol" r := makeTCPReq(sql, tcpSend, 343534, 8080, 2000) - sqlIndex := isSQL(sql) - assert.LessOrEqual(t, sqlIndex, 0) + op, table, _ := detectSQL(sql) + assert.Empty(t, op) + assert.Empty(t, table) assert.NotNil(t, r) } func TestSQLDetection(t *testing.T) { - for _, s := range []string{"SELECT", "UPDATE", "DELETE", "INSERT", "CREATE", "DROP", "ALTER"} { + for _, s := range []string{"SELECT * from accounts", "SELECT/*My comment*/ * from accounts", "--UPDATE accounts SET", "DELETE++ from accounts ", "INSERT into accounts ", "CREATE table accounts ", "DROP table accounts ", "ALTER table accounts"} { surrounded := randomStringWithSub(s) - assert.GreaterOrEqual(t, isSQL(surrounded), 0) - assert.GreaterOrEqual(t, isSQL(s), 0) + op, table, _ := detectSQL(s) + assert.NotEmpty(t, op) + assert.NotEmpty(t, table) + op, table, _ = detectSQL(surrounded) + assert.NotEmpty(t, op) + assert.NotEmpty(t, table) + } +} + +func TestSQLDetectionFails(t *testing.T) { + for _, s := range []string{"SELECT", "UPDATES{}", "DELETE {} ", "INSERT// into accounts "} { + op, table, _ := detectSQL(s) + assert.False(t, validSQL(op, table)) + surrounded := randomStringWithSub(s) + op, table, _ = detectSQL(surrounded) + assert.False(t, validSQL(op, table)) } } @@ -60,7 +76,7 @@ func TestReadTCPRequestIntoSpan_Overflow(t *testing.T) { // this byte array contains select * from foo // rest of the array is invalid UTF-8 and would cause that strings.ToUpper // returns a string longer than 256. That's why we are providing - // our own asciiToUpper implementation in isSQL function + // our own asciiToUpper implementation in detectSQL function Buf: [256]byte{ 74, 39, 133, 207, 240, 83, 124, 225, 227, 163, 3, 23, 253, 254, 18, 12, 77, 143, 198, 122, 123, 67, 221, 225, 10, 233, 220, 36, 65, 35, 25, 251, 88, 197, 107, 99, 25, 247, 195, 216, @@ -86,7 +102,7 @@ func TestReadTCPRequestIntoSpan_Overflow(t *testing.T) { assert.Equal(t, request.EventTypeSQLClient, span.Type) assert.Equal(t, "SELECT", span.Method) - assert.Equal(t, "foo", span.Path) + assert.Equal(t, "FOO", span.Path) } func TestRedisDetection(t *testing.T) { From 4d593c1a680c576a74c564db939d667256e21789 Mon Sep 17 00:00:00 2001 From: Nikola Grcevski Date: Mon, 3 Jun 2024 18:26:25 -0400 Subject: [PATCH 2/5] ensure we can cleanup pids --- pkg/internal/discover/attacher.go | 6 +-- pkg/internal/ebpf/common/pids.go | 42 +++++------------ pkg/internal/ebpf/common/pids_test.go | 45 +++++++------------ pkg/internal/ebpf/common/ringbuf_test.go | 8 ++-- pkg/internal/ebpf/gokafka/gokafka.go | 8 ++-- pkg/internal/ebpf/goredis/goredis.go | 8 ++-- pkg/internal/ebpf/goruntime/goruntime.go | 8 ++-- pkg/internal/ebpf/grpc/grpc.go | 8 ++-- pkg/internal/ebpf/httpfltr/httpfltr.go | 8 ++-- pkg/internal/ebpf/httpssl/httpssl.go | 8 ++-- pkg/internal/ebpf/nethttp/nethttp.go | 8 ++-- pkg/internal/ebpf/tracer.go | 12 ++--- pkg/internal/exec/file.go | 7 ++- .../{ebpf/common => exec}/pids_darwin.go | 2 +- .../{ebpf/common => exec}/pids_linux.go | 2 +- pkg/internal/helpers/container/container.go | 4 +- 16 files changed, 77 insertions(+), 107 deletions(-) rename pkg/internal/{ebpf/common => exec}/pids_darwin.go (91%) rename pkg/internal/{ebpf/common => exec}/pids_linux.go (98%) diff --git a/pkg/internal/discover/attacher.go b/pkg/internal/discover/attacher.go index d2235f0b0..8ed29959d 100644 --- a/pkg/internal/discover/attacher.go +++ b/pkg/internal/discover/attacher.go @@ -184,9 +184,9 @@ func monitorPIDs(tracer *ebpf.ProcessTracer, ie *Instrumentable) { } // allowing the tracer to forward traces from the discovered PID and its children processes - tracer.AllowPID(uint32(ie.FileInfo.Pid), ie.FileInfo.Service) + tracer.AllowPID(uint32(ie.FileInfo.Pid), ie.FileInfo.Ns, ie.FileInfo.Service) for _, pid := range ie.ChildPids { - tracer.AllowPID(pid, ie.FileInfo.Service) + tracer.AllowPID(pid, ie.FileInfo.Ns, ie.FileInfo.Service) } } @@ -205,7 +205,7 @@ func (ta *TraceAttacher) notifyProcessDeletion(ie *Instrumentable) { // notifying the tracer to block any trace from that PID // to avoid that a new process reusing this PID could send traces // unless explicitly allowed - tracer.BlockPID(uint32(ie.FileInfo.Pid)) + tracer.BlockPID(uint32(ie.FileInfo.Pid), ie.FileInfo.Ns) // if there are no more trace instances for a Go program, we need to notify that // the tracer needs to be stopped and deleted. diff --git a/pkg/internal/ebpf/common/pids.go b/pkg/internal/ebpf/common/pids.go index 2bc5cba8a..d82162c9d 100644 --- a/pkg/internal/ebpf/common/pids.go +++ b/pkg/internal/ebpf/common/pids.go @@ -23,9 +23,7 @@ var activePids, _ = lru.New[uint32, svc.ID](1024) // injectable functions (can be replaced in tests). It reads the // current process namespace from the /proc filesystem. It is required to // choose to filter traces using whether the User-space or Host-space PIDs -var readNamespace = FindNamespace - -var readNamespacePIDs = FindNamespacedPids +var readNamespacePIDs = exec.FindNamespacedPids type PIDInfo struct { service svc.ID @@ -33,8 +31,8 @@ type PIDInfo struct { } type ServiceFilter interface { - AllowPID(uint32, svc.ID, PIDType) - BlockPID(uint32) + AllowPID(uint32, uint32, svc.ID, PIDType) + BlockPID(uint32, uint32) Filter(inputSpans []request.Span) []request.Span CurrentPIDs(PIDType) map[uint32]map[uint32]svc.ID } @@ -74,16 +72,16 @@ func CommonPIDsFilter(systemWide bool) ServiceFilter { return commonPIDsFilter } -func (pf *PIDsFilter) AllowPID(pid uint32, svc svc.ID, pidType PIDType) { +func (pf *PIDsFilter) AllowPID(pid, ns uint32, svc svc.ID, pidType PIDType) { pf.mux.Lock() defer pf.mux.Unlock() - pf.addPID(pid, svc, pidType) + pf.addPID(pid, ns, svc, pidType) } -func (pf *PIDsFilter) BlockPID(pid uint32) { +func (pf *PIDsFilter) BlockPID(pid, ns uint32) { pf.mux.Lock() defer pf.mux.Unlock() - pf.removePID(pid) + pf.removePID(pid, ns) } func (pf *PIDsFilter) CurrentPIDs(t PIDType) map[uint32]map[uint32]svc.ID { @@ -138,14 +136,7 @@ func (pf *PIDsFilter) Filter(inputSpans []request.Span) []request.Span { return outputSpans } -func (pf *PIDsFilter) addPID(pid uint32, s svc.ID, t PIDType) { - nsid, err := readNamespace(int32(pid)) - - if err != nil { - pf.log.Error("Error looking up namespace for tracking PID", "pid", pid, "error", err) - return - } - +func (pf *PIDsFilter) addPID(pid, nsid uint32, s svc.ID, t PIDType) { ns, nsExists := pf.current[nsid] if !nsExists { ns = make(map[uint32]PIDInfo) @@ -164,18 +155,7 @@ func (pf *PIDsFilter) addPID(pid uint32, s svc.ID, t PIDType) { } } -func (pf *PIDsFilter) removePID(pid uint32) { - nsid, err := readNamespace(int32(pid)) - - if err != nil { - // this will always happen on process removal, as /proc//ns/pid won't be found - // the code is kept here as a placeholder for a future fix (e.g. using eBPF notifications - // to get both the PID and the nsid) - // TODO: fix - pf.log.Debug("Error looking up namespace for removing PID", "pid", pid, "error", err) - return - } - +func (pf *PIDsFilter) removePID(pid, nsid uint32) { ns, nsExists := pf.current[nsid] if !nsExists { return @@ -191,9 +171,9 @@ func (pf *PIDsFilter) removePID(pid uint32) { // for system-wide instrumenation type IdentityPidsFilter struct{} -func (pf *IdentityPidsFilter) AllowPID(_ uint32, _ svc.ID, _ PIDType) {} +func (pf *IdentityPidsFilter) AllowPID(_ uint32, _ uint32, _ svc.ID, _ PIDType) {} -func (pf *IdentityPidsFilter) BlockPID(_ uint32) {} +func (pf *IdentityPidsFilter) BlockPID(_ uint32, _ uint32) {} func (pf *IdentityPidsFilter) CurrentPIDs(_ PIDType) map[uint32]map[uint32]svc.ID { return nil diff --git a/pkg/internal/ebpf/common/pids_test.go b/pkg/internal/ebpf/common/pids_test.go index 79a9b1deb..350338f10 100644 --- a/pkg/internal/ebpf/common/pids_test.go +++ b/pkg/internal/ebpf/common/pids_test.go @@ -21,16 +21,13 @@ var spanSet = []request.Span{ } func TestFilter_SameNS(t *testing.T) { - readNamespace = func(_ int32) (uint32, error) { - return 33, nil - } readNamespacePIDs = func(pid int32) ([]uint32, error) { return []uint32{uint32(pid)}, nil } pf := NewPIDsFilter(slog.With("env", "testing")) - pf.AllowPID(123, svc.ID{}, PIDTypeGo) - pf.AllowPID(456, svc.ID{}, PIDTypeGo) - pf.AllowPID(789, svc.ID{}, PIDTypeGo) + pf.AllowPID(123, 33, svc.ID{}, PIDTypeGo) + pf.AllowPID(456, 33, svc.ID{}, PIDTypeGo) + pf.AllowPID(789, 33, svc.ID{}, PIDTypeGo) // with the same namespace, it filters by user PID, as it is the PID // that is seen by Beyla's process discovery @@ -42,16 +39,13 @@ func TestFilter_SameNS(t *testing.T) { } func TestFilter_DifferentNS(t *testing.T) { - readNamespace = func(_ int32) (uint32, error) { - return 22, nil - } readNamespacePIDs = func(pid int32) ([]uint32, error) { return []uint32{uint32(pid)}, nil } pf := NewPIDsFilter(slog.With("env", "testing")) - pf.AllowPID(123, svc.ID{}, PIDTypeGo) - pf.AllowPID(456, svc.ID{}, PIDTypeGo) - pf.AllowPID(666, svc.ID{}, PIDTypeGo) + pf.AllowPID(123, 22, svc.ID{}, PIDTypeGo) + pf.AllowPID(456, 22, svc.ID{}, PIDTypeGo) + pf.AllowPID(666, 22, svc.ID{}, PIDTypeGo) // with the same namespace, it filters by user PID, as it is the PID // that is seen by Beyla's process discovery @@ -59,16 +53,13 @@ func TestFilter_DifferentNS(t *testing.T) { } func TestFilter_Block(t *testing.T) { - readNamespace = func(_ int32) (uint32, error) { - return 33, nil - } readNamespacePIDs = func(pid int32) ([]uint32, error) { return []uint32{uint32(pid)}, nil } pf := NewPIDsFilter(slog.With("env", "testing")) - pf.AllowPID(123, svc.ID{}, PIDTypeGo) - pf.AllowPID(456, svc.ID{}, PIDTypeGo) - pf.BlockPID(123) + pf.AllowPID(123, 33, svc.ID{}, PIDTypeGo) + pf.AllowPID(456, 33, svc.ID{}, PIDTypeGo) + pf.BlockPID(123, 33) // with the same namespace, it filters by user PID, as it is the PID // that is seen by Beyla's process discovery @@ -80,19 +71,13 @@ func TestFilter_Block(t *testing.T) { } func TestFilter_NewNSLater(t *testing.T) { - readNamespace = func(pid int32) (uint32, error) { - if pid == 1000 { - return 44, nil - } - return 33, nil - } readNamespacePIDs = func(pid int32) ([]uint32, error) { return []uint32{uint32(pid)}, nil } pf := NewPIDsFilter(slog.With("env", "testing")) - pf.AllowPID(123, svc.ID{}, PIDTypeGo) - pf.AllowPID(456, svc.ID{}, PIDTypeGo) - pf.AllowPID(789, svc.ID{}, PIDTypeGo) + pf.AllowPID(123, 33, svc.ID{}, PIDTypeGo) + pf.AllowPID(456, 33, svc.ID{}, PIDTypeGo) + pf.AllowPID(789, 33, svc.ID{}, PIDTypeGo) // with the same namespace, it filters by user PID, as it is the PID // that is seen by Beyla's process discovery @@ -102,7 +87,7 @@ func TestFilter_NewNSLater(t *testing.T) { {Pid: request.PidInfo{UserPID: 789, HostPID: 234, Namespace: 33}}, }, pf.Filter(spanSet)) - pf.AllowPID(1000, svc.ID{}, PIDTypeGo) + pf.AllowPID(1000, 44, svc.ID{}, PIDTypeGo) assert.Equal(t, []request.Span{ {Pid: request.PidInfo{UserPID: 123, HostPID: 333, Namespace: 33}}, @@ -111,7 +96,7 @@ func TestFilter_NewNSLater(t *testing.T) { {Pid: request.PidInfo{UserPID: 1000, HostPID: 1234, Namespace: 44}}, }, pf.Filter(spanSet)) - pf.BlockPID(456) + pf.BlockPID(456, 33) assert.Equal(t, []request.Span{ {Pid: request.PidInfo{UserPID: 123, HostPID: 333, Namespace: 33}}, @@ -119,7 +104,7 @@ func TestFilter_NewNSLater(t *testing.T) { {Pid: request.PidInfo{UserPID: 1000, HostPID: 1234, Namespace: 44}}, }, pf.Filter(spanSet)) - pf.BlockPID(1000) + pf.BlockPID(1000, 44) assert.Equal(t, []request.Span{ {Pid: request.PidInfo{UserPID: 123, HostPID: 333, Namespace: 33}}, diff --git a/pkg/internal/ebpf/common/ringbuf_test.go b/pkg/internal/ebpf/common/ringbuf_test.go index 01a3f63a0..f9a1cad8e 100644 --- a/pkg/internal/ebpf/common/ringbuf_test.go +++ b/pkg/internal/ebpf/common/ringbuf_test.go @@ -31,7 +31,7 @@ func TestForwardRingbuf_CapacityFull(t *testing.T) { metrics := &metricsReporter{} forwardedMessages := make(chan []request.Span, 100) fltr := TestPidsFilter{services: map[uint32]svc.ID{}} - fltr.AllowPID(1, svc.ID{Name: "myService"}, PIDTypeGo) + fltr.AllowPID(1, 1, svc.ID{Name: "myService"}, PIDTypeGo) go ForwardRingbuf( &TracerConfig{BatchLength: 10}, nil, // the source ring buffer can be null @@ -83,7 +83,7 @@ func TestForwardRingbuf_Deadline(t *testing.T) { metrics := &metricsReporter{} forwardedMessages := make(chan []request.Span, 100) fltr := TestPidsFilter{services: map[uint32]svc.ID{}} - fltr.AllowPID(1, svc.ID{Name: "myService"}, PIDTypeGo) + fltr.AllowPID(1, 1, svc.ID{Name: "myService"}, PIDTypeGo) go ForwardRingbuf( &TracerConfig{BatchLength: 10, BatchTimeout: 20 * time.Millisecond}, nil, // the source ring buffer can be null @@ -219,11 +219,11 @@ type TestPidsFilter struct { services map[uint32]svc.ID } -func (pf *TestPidsFilter) AllowPID(p uint32, s svc.ID, _ PIDType) { +func (pf *TestPidsFilter) AllowPID(p uint32, _ uint32, s svc.ID, _ PIDType) { pf.services[p] = s } -func (pf *TestPidsFilter) BlockPID(p uint32) { +func (pf *TestPidsFilter) BlockPID(p uint32, _ uint32) { delete(pf.services, p) } diff --git a/pkg/internal/ebpf/gokafka/gokafka.go b/pkg/internal/ebpf/gokafka/gokafka.go index 05909ddec..52a1f3a62 100644 --- a/pkg/internal/ebpf/gokafka/gokafka.go +++ b/pkg/internal/ebpf/gokafka/gokafka.go @@ -51,12 +51,12 @@ func New(cfg *beyla.Config, metrics imetrics.Reporter) *Tracer { } } -func (p *Tracer) AllowPID(pid uint32, svc svc.ID) { - p.pidsFilter.AllowPID(pid, svc, ebpfcommon.PIDTypeGo) +func (p *Tracer) AllowPID(pid, ns uint32, svc svc.ID) { + p.pidsFilter.AllowPID(pid, ns, svc, ebpfcommon.PIDTypeGo) } -func (p *Tracer) BlockPID(pid uint32) { - p.pidsFilter.BlockPID(pid) +func (p *Tracer) BlockPID(pid, ns uint32) { + p.pidsFilter.BlockPID(pid, ns) } func (p *Tracer) Load() (*ebpf.CollectionSpec, error) { diff --git a/pkg/internal/ebpf/goredis/goredis.go b/pkg/internal/ebpf/goredis/goredis.go index ac3065485..29eb8d653 100644 --- a/pkg/internal/ebpf/goredis/goredis.go +++ b/pkg/internal/ebpf/goredis/goredis.go @@ -51,12 +51,12 @@ func New(cfg *beyla.Config, metrics imetrics.Reporter) *Tracer { } } -func (p *Tracer) AllowPID(pid uint32, svc svc.ID) { - p.pidsFilter.AllowPID(pid, svc, ebpfcommon.PIDTypeGo) +func (p *Tracer) AllowPID(pid, ns uint32, svc svc.ID) { + p.pidsFilter.AllowPID(pid, ns, svc, ebpfcommon.PIDTypeGo) } -func (p *Tracer) BlockPID(pid uint32) { - p.pidsFilter.BlockPID(pid) +func (p *Tracer) BlockPID(pid, ns uint32) { + p.pidsFilter.BlockPID(pid, ns) } func (p *Tracer) Load() (*ebpf.CollectionSpec, error) { diff --git a/pkg/internal/ebpf/goruntime/goruntime.go b/pkg/internal/ebpf/goruntime/goruntime.go index 77a29ab7a..549ff43fc 100644 --- a/pkg/internal/ebpf/goruntime/goruntime.go +++ b/pkg/internal/ebpf/goruntime/goruntime.go @@ -50,12 +50,12 @@ func New(cfg *beyla.Config, metrics imetrics.Reporter) *Tracer { } } -func (p *Tracer) AllowPID(pid uint32, svc svc.ID) { - p.pidsFilter.AllowPID(pid, svc, ebpfcommon.PIDTypeGo) +func (p *Tracer) AllowPID(pid, ns uint32, svc svc.ID) { + p.pidsFilter.AllowPID(pid, ns, svc, ebpfcommon.PIDTypeGo) } -func (p *Tracer) BlockPID(pid uint32) { - p.pidsFilter.BlockPID(pid) +func (p *Tracer) BlockPID(pid, ns uint32) { + p.pidsFilter.BlockPID(pid, ns) } func (p *Tracer) Load() (*ebpf.CollectionSpec, error) { diff --git a/pkg/internal/ebpf/grpc/grpc.go b/pkg/internal/ebpf/grpc/grpc.go index 13b57be43..39a26dc1f 100644 --- a/pkg/internal/ebpf/grpc/grpc.go +++ b/pkg/internal/ebpf/grpc/grpc.go @@ -56,12 +56,12 @@ func New(cfg *beyla.Config, metrics imetrics.Reporter) *Tracer { } } -func (p *Tracer) AllowPID(pid uint32, svc svc.ID) { - p.pidsFilter.AllowPID(pid, svc, ebpfcommon.PIDTypeGo) +func (p *Tracer) AllowPID(pid, ns uint32, svc svc.ID) { + p.pidsFilter.AllowPID(pid, ns, svc, ebpfcommon.PIDTypeGo) } -func (p *Tracer) BlockPID(pid uint32) { - p.pidsFilter.BlockPID(pid) +func (p *Tracer) BlockPID(pid, ns uint32) { + p.pidsFilter.BlockPID(pid, ns) } func (p *Tracer) supportsContextPropagation() bool { diff --git a/pkg/internal/ebpf/httpfltr/httpfltr.go b/pkg/internal/ebpf/httpfltr/httpfltr.go index 1ba7766fc..18b400752 100644 --- a/pkg/internal/ebpf/httpfltr/httpfltr.go +++ b/pkg/internal/ebpf/httpfltr/httpfltr.go @@ -95,13 +95,13 @@ func (p *Tracer) rebuildValidPids() { } } -func (p *Tracer) AllowPID(pid uint32, svc svc.ID) { - p.pidsFilter.AllowPID(pid, svc, ebpfcommon.PIDTypeKProbes) +func (p *Tracer) AllowPID(pid, ns uint32, svc svc.ID) { + p.pidsFilter.AllowPID(pid, ns, svc, ebpfcommon.PIDTypeKProbes) p.rebuildValidPids() } -func (p *Tracer) BlockPID(pid uint32) { - p.pidsFilter.BlockPID(pid) +func (p *Tracer) BlockPID(pid, ns uint32) { + p.pidsFilter.BlockPID(pid, ns) p.rebuildValidPids() } diff --git a/pkg/internal/ebpf/httpssl/httpssl.go b/pkg/internal/ebpf/httpssl/httpssl.go index 13fb5c777..aa872ff5b 100644 --- a/pkg/internal/ebpf/httpssl/httpssl.go +++ b/pkg/internal/ebpf/httpssl/httpssl.go @@ -46,12 +46,12 @@ func New(cfg *beyla.Config, metrics imetrics.Reporter) *Tracer { } } -func (p *Tracer) AllowPID(pid uint32, svc svc.ID) { - p.pidsFilter.AllowPID(pid, svc, ebpfcommon.PIDTypeKProbes) +func (p *Tracer) AllowPID(pid, ns uint32, svc svc.ID) { + p.pidsFilter.AllowPID(pid, ns, svc, ebpfcommon.PIDTypeKProbes) } -func (p *Tracer) BlockPID(pid uint32) { - p.pidsFilter.BlockPID(pid) +func (p *Tracer) BlockPID(pid, ns uint32) { + p.pidsFilter.BlockPID(pid, ns) } func (p *Tracer) Load() (*ebpf.CollectionSpec, error) { diff --git a/pkg/internal/ebpf/nethttp/nethttp.go b/pkg/internal/ebpf/nethttp/nethttp.go index 9bd8a82a8..8be84151e 100644 --- a/pkg/internal/ebpf/nethttp/nethttp.go +++ b/pkg/internal/ebpf/nethttp/nethttp.go @@ -55,12 +55,12 @@ func New(cfg *beyla.Config, metrics imetrics.Reporter) *Tracer { } } -func (p *Tracer) AllowPID(pid uint32, svc svc.ID) { - p.pidsFilter.AllowPID(pid, svc, ebpfcommon.PIDTypeGo) +func (p *Tracer) AllowPID(pid, ns uint32, svc svc.ID) { + p.pidsFilter.AllowPID(pid, ns, svc, ebpfcommon.PIDTypeGo) } -func (p *Tracer) BlockPID(pid uint32) { - p.pidsFilter.BlockPID(pid) +func (p *Tracer) BlockPID(pid, ns uint32) { + p.pidsFilter.BlockPID(pid, ns) } func (p *Tracer) supportsContextPropagation() bool { diff --git a/pkg/internal/ebpf/tracer.go b/pkg/internal/ebpf/tracer.go index 5aaf5212a..6696c08d9 100644 --- a/pkg/internal/ebpf/tracer.go +++ b/pkg/internal/ebpf/tracer.go @@ -19,11 +19,11 @@ type PIDsAccounter interface { // AllowPID notifies the tracer to accept traces from the process with the // provided PID. Unless system-wide instrumentation, the Tracer should discard // traces from processes whose PID has not been allowed before - AllowPID(uint32, svc.ID) + AllowPID(uint32, uint32, svc.ID) // BlockPID notifies the tracer to stop accepting traces from the process // with the provided PID. After receiving them via ringbuffer, it should // discard them. - BlockPID(uint32) + BlockPID(uint32, uint32) } type CommonTracer interface { @@ -99,14 +99,14 @@ type ProcessTracer struct { Type ProcessTracerType } -func (pt *ProcessTracer) AllowPID(pid uint32, svc svc.ID) { +func (pt *ProcessTracer) AllowPID(pid, ns uint32, svc svc.ID) { for i := range pt.Programs { - pt.Programs[i].AllowPID(pid, svc) + pt.Programs[i].AllowPID(pid, ns, svc) } } -func (pt *ProcessTracer) BlockPID(pid uint32) { +func (pt *ProcessTracer) BlockPID(pid, ns uint32) { for i := range pt.Programs { - pt.Programs[i].BlockPID(pid) + pt.Programs[i].BlockPID(pid, ns) } } diff --git a/pkg/internal/exec/file.go b/pkg/internal/exec/file.go index 5c4b573ea..91f42277f 100644 --- a/pkg/internal/exec/file.go +++ b/pkg/internal/exec/file.go @@ -21,6 +21,7 @@ type FileInfo struct { Pid int32 Ppid int32 Ino uint64 + Ns uint32 } func (fi *FileInfo) ExecutableName() string { @@ -31,6 +32,10 @@ func (fi *FileInfo) ExecutableName() string { func FindExecELF(p *services.ProcessInfo, svcID svc.ID) (*FileInfo, error) { // In container environments or K8s, we can't just open the executable exe path, because it might // be in the volume of another pod/container. We need to access it through the /proc//exe symbolic link + ns, err := FindNamespace(p.Pid) + if err != nil { + return nil, fmt.Errorf("can't find namespace for PID=%d: %w", p.Pid, err) + } file := FileInfo{ Service: svcID, CmdExePath: p.ExePath, @@ -38,8 +43,8 @@ func FindExecELF(p *services.ProcessInfo, svcID svc.ID) (*FileInfo, error) { ProExeLinkPath: fmt.Sprintf("/proc/%d/exe", p.Pid), Pid: p.Pid, Ppid: p.PPid, + Ns: ns, } - var err error if file.ELF, err = elf.Open(file.ProExeLinkPath); err != nil { return nil, fmt.Errorf("can't open ELF file in %s: %w", file.ProExeLinkPath, err) } diff --git a/pkg/internal/ebpf/common/pids_darwin.go b/pkg/internal/exec/pids_darwin.go similarity index 91% rename from pkg/internal/ebpf/common/pids_darwin.go rename to pkg/internal/exec/pids_darwin.go index 3393630b8..c681ed100 100644 --- a/pkg/internal/ebpf/common/pids_darwin.go +++ b/pkg/internal/exec/pids_darwin.go @@ -1,4 +1,4 @@ -package ebpfcommon +package exec func FindNamespace(_ int32) (uint32, error) { // convenience method to allow unit tests compiling in Darwin diff --git a/pkg/internal/ebpf/common/pids_linux.go b/pkg/internal/exec/pids_linux.go similarity index 98% rename from pkg/internal/ebpf/common/pids_linux.go rename to pkg/internal/exec/pids_linux.go index 480e971df..dd4027b60 100644 --- a/pkg/internal/ebpf/common/pids_linux.go +++ b/pkg/internal/exec/pids_linux.go @@ -1,4 +1,4 @@ -package ebpfcommon +package exec import ( "bufio" diff --git a/pkg/internal/helpers/container/container.go b/pkg/internal/helpers/container/container.go index a06a2116e..c4cce0348 100644 --- a/pkg/internal/helpers/container/container.go +++ b/pkg/internal/helpers/container/container.go @@ -8,12 +8,12 @@ import ( "regexp" "strconv" - ebpfcommon "github.com/grafana/beyla/pkg/internal/ebpf/common" + "github.com/grafana/beyla/pkg/internal/exec" ) // injectable values for testing var procRoot = "/proc/" -var namespaceFinder = ebpfcommon.FindNamespace +var namespaceFinder = exec.FindNamespace // Info that we need to keep from a container: its ContainerID in Kubernetes and // the PIDNamespace of its processes. From f4ee743d5ebb3060016ad94d07f9bbabeb160c0a Mon Sep 17 00:00:00 2001 From: Nikola Grcevski Date: Mon, 3 Jun 2024 19:21:17 -0400 Subject: [PATCH 3/5] fixup stuff to use the new pid filter approach --- pkg/internal/ebpf/common/common.go | 8 ++++---- .../ebpf/common/http2grpc_transform.go | 6 +++++- pkg/internal/ebpf/common/httpfltr_test.go | 12 +++++++++--- .../ebpf/common/httpfltr_transform.go | 7 ++++++- pkg/internal/ebpf/common/pids.go | 19 +++++++++++++++++++ pkg/internal/ebpf/common/ringbuf.go | 14 +++++++------- pkg/internal/ebpf/common/ringbuf_test.go | 4 ++++ .../ebpf/common/tcp_detect_transform.go | 6 +++++- .../ebpf/common/tcp_detect_transform_test.go | 5 ++++- pkg/internal/ebpf/watcher/watcher.go | 2 +- 10 files changed, 64 insertions(+), 19 deletions(-) diff --git a/pkg/internal/ebpf/common/common.go b/pkg/internal/ebpf/common/common.go index d161d02e2..0b75a85f0 100644 --- a/pkg/internal/ebpf/common/common.go +++ b/pkg/internal/ebpf/common/common.go @@ -101,7 +101,7 @@ var MisclassifiedEvents = make(chan MisclassifiedEvent) func ptlog() *slog.Logger { return slog.With("component", "ebpf.ProcessTracer") } -func ReadHTTPRequestTraceAsSpan(record *ringbuf.Record) (request.Span, bool, error) { +func ReadHTTPRequestTraceAsSpan(record *ringbuf.Record, filter ServiceFilter) (request.Span, bool, error) { var eventType uint8 // we read the type first, depending on the type we decide what kind of record we have @@ -114,11 +114,11 @@ func ReadHTTPRequestTraceAsSpan(record *ringbuf.Record) (request.Span, bool, err case EventTypeSQL: return ReadSQLRequestTraceAsSpan(record) case EventTypeKHTTP: - return ReadHTTPInfoIntoSpan(record) + return ReadHTTPInfoIntoSpan(record, filter) case EventTypeKHTTP2: - return ReadHTTP2InfoIntoSpan(record) + return ReadHTTP2InfoIntoSpan(record, filter) case EventTypeTCP: - return ReadTCPRequestIntoSpan(record) + return ReadTCPRequestIntoSpan(record, filter) case EventTypeGoKafka: return ReadGoKafkaRequestIntoSpan(record) case EventTypeGoRedis: diff --git a/pkg/internal/ebpf/common/http2grpc_transform.go b/pkg/internal/ebpf/common/http2grpc_transform.go index 3cceb1ebb..a567131b9 100644 --- a/pkg/internal/ebpf/common/http2grpc_transform.go +++ b/pkg/internal/ebpf/common/http2grpc_transform.go @@ -232,7 +232,7 @@ func (event *BPFHTTP2Info) hostInfo() (source, target string) { } // nolint:cyclop -func ReadHTTP2InfoIntoSpan(record *ringbuf.Record) (request.Span, bool, error) { +func ReadHTTP2InfoIntoSpan(record *ringbuf.Record, filter ServiceFilter) (request.Span, bool, error) { var event BPFHTTP2Info err := binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, &event) @@ -240,6 +240,10 @@ func ReadHTTP2InfoIntoSpan(record *ringbuf.Record) (request.Span, bool, error) { return request.Span{}, true, err } + if !filter.ValidPID(event.Pid.UserPid, event.Pid.Ns, PIDTypeKProbes) { + return request.Span{}, true, nil + } + framer := byteFramer(event.Data[:]) retFramer := byteFramer(event.RetData[:]) // We don't set the framer.ReadMetaHeaders function to hpack.NewDecoder because diff --git a/pkg/internal/ebpf/common/httpfltr_test.go b/pkg/internal/ebpf/common/httpfltr_test.go index 24601cfb0..1b1c3e324 100644 --- a/pkg/internal/ebpf/common/httpfltr_test.go +++ b/pkg/internal/ebpf/common/httpfltr_test.go @@ -86,6 +86,8 @@ func TestCstr(t *testing.T) { } func TestToRequestTrace(t *testing.T) { + fltr := TestPidsFilter{services: map[uint32]svc.ID{}} + var record BPFHTTPInfo record.Type = 1 record.StartMonotimeNs = 123456 @@ -100,7 +102,7 @@ func TestToRequestTrace(t *testing.T) { err := binary.Write(buf, binary.LittleEndian, &record) assert.NoError(t, err) - result, _, err := ReadHTTPInfoIntoSpan(&ringbuf.Record{RawSample: buf.Bytes()}) + result, _, err := ReadHTTPInfoIntoSpan(&ringbuf.Record{RawSample: buf.Bytes()}, &fltr) assert.NoError(t, err) expected := request.Span{ @@ -120,6 +122,8 @@ func TestToRequestTrace(t *testing.T) { } func TestToRequestTraceNoConnection(t *testing.T) { + fltr := TestPidsFilter{services: map[uint32]svc.ID{}} + var record BPFHTTPInfo record.Type = 1 record.StartMonotimeNs = 123456 @@ -133,7 +137,7 @@ func TestToRequestTraceNoConnection(t *testing.T) { err := binary.Write(buf, binary.LittleEndian, &record) assert.NoError(t, err) - result, _, err := ReadHTTPInfoIntoSpan(&ringbuf.Record{RawSample: buf.Bytes()}) + result, _, err := ReadHTTPInfoIntoSpan(&ringbuf.Record{RawSample: buf.Bytes()}, &fltr) assert.NoError(t, err) // change the expected port just before testing @@ -154,6 +158,8 @@ func TestToRequestTraceNoConnection(t *testing.T) { } func TestToRequestTrace_BadHost(t *testing.T) { + fltr := TestPidsFilter{services: map[uint32]svc.ID{}} + var record BPFHTTPInfo record.Type = 1 record.StartMonotimeNs = 123456 @@ -169,7 +175,7 @@ func TestToRequestTrace_BadHost(t *testing.T) { err := binary.Write(buf, binary.LittleEndian, &record) assert.NoError(t, err) - result, _, err := ReadHTTPInfoIntoSpan(&ringbuf.Record{RawSample: buf.Bytes()}) + result, _, err := ReadHTTPInfoIntoSpan(&ringbuf.Record{RawSample: buf.Bytes()}, &fltr) assert.NoError(t, err) expected := request.Span{ diff --git a/pkg/internal/ebpf/common/httpfltr_transform.go b/pkg/internal/ebpf/common/httpfltr_transform.go index 78cd0288d..5e0453ce4 100644 --- a/pkg/internal/ebpf/common/httpfltr_transform.go +++ b/pkg/internal/ebpf/common/httpfltr_transform.go @@ -60,13 +60,18 @@ type HTTPInfo struct { Service svc.ID } -func ReadHTTPInfoIntoSpan(record *ringbuf.Record) (request.Span, bool, error) { +func ReadHTTPInfoIntoSpan(record *ringbuf.Record, filter ServiceFilter) (request.Span, bool, error) { var event BPFHTTPInfo err := binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, &event) if err != nil { return request.Span{}, true, err } + // Generated by Go instrumentation + if !filter.ValidPID(event.Pid.UserPid, event.Pid.Ns, PIDTypeKProbes) { + return request.Span{}, true, nil + } + return HTTPInfoEventToSpan(event) } diff --git a/pkg/internal/ebpf/common/pids.go b/pkg/internal/ebpf/common/pids.go index d82162c9d..913c8c6fd 100644 --- a/pkg/internal/ebpf/common/pids.go +++ b/pkg/internal/ebpf/common/pids.go @@ -33,6 +33,7 @@ type PIDInfo struct { type ServiceFilter interface { AllowPID(uint32, uint32, svc.ID, PIDType) BlockPID(uint32, uint32) + ValidPID(uint32, uint32, PIDType) bool Filter(inputSpans []request.Span) []request.Span CurrentPIDs(PIDType) map[uint32]map[uint32]svc.ID } @@ -84,6 +85,20 @@ func (pf *PIDsFilter) BlockPID(pid, ns uint32) { pf.removePID(pid, ns) } +func (pf *PIDsFilter) ValidPID(userPID, ns uint32, pidType PIDType) bool { + pf.mux.RLock() + defer pf.mux.RUnlock() + + if ns, nsExists := pf.current[ns]; nsExists { + if info, pidExists := ns[userPID]; pidExists { + return info.pidType == pidType + } + } + + return false + +} + func (pf *PIDsFilter) CurrentPIDs(t PIDType) map[uint32]map[uint32]svc.ID { pf.mux.RLock() defer pf.mux.RUnlock() @@ -175,6 +190,10 @@ func (pf *IdentityPidsFilter) AllowPID(_ uint32, _ uint32, _ svc.ID, _ PIDType) func (pf *IdentityPidsFilter) BlockPID(_ uint32, _ uint32) {} +func (pf *IdentityPidsFilter) ValidPID(_ uint32, _ uint32, _ PIDType) bool { + return false +} + func (pf *IdentityPidsFilter) CurrentPIDs(_ PIDType) map[uint32]map[uint32]svc.ID { return nil } diff --git a/pkg/internal/ebpf/common/ringbuf.go b/pkg/internal/ebpf/common/ringbuf.go index 8e1d0ddc3..64c1d5863 100644 --- a/pkg/internal/ebpf/common/ringbuf.go +++ b/pkg/internal/ebpf/common/ringbuf.go @@ -37,10 +37,10 @@ type ringBufForwarder struct { spansLen int access sync.Mutex ticker *time.Ticker - reader func(*ringbuf.Record) (request.Span, bool, error) + reader func(*ringbuf.Record, ServiceFilter) (request.Span, bool, error) // filter the input spans, eliminating these from processes whose PID // belong to a process that does not match the discovery policies - filter func([]request.Span) []request.Span + filter ServiceFilter metrics imetrics.Reporter } @@ -67,7 +67,7 @@ func SharedRingbuf( rbf := ringBufForwarder{ cfg: cfg, logger: log, ringbuffer: ringbuffer, closers: nil, reader: ReadHTTPRequestTraceAsSpan, - filter: filter.Filter, metrics: metrics, + filter: filter, metrics: metrics, } singleRbf = &rbf return singleRbf.sharedReadAndForward @@ -77,7 +77,7 @@ func ForwardRingbuf( cfg *TracerConfig, ringbuffer *ebpf.Map, filter ServiceFilter, - reader func(*ringbuf.Record) (request.Span, bool, error), + reader func(*ringbuf.Record, ServiceFilter) (request.Span, bool, error), logger *slog.Logger, metrics imetrics.Reporter, closers ...io.Closer, @@ -85,7 +85,7 @@ func ForwardRingbuf( rbf := ringBufForwarder{ cfg: cfg, logger: logger, ringbuffer: ringbuffer, closers: closers, reader: reader, - filter: filter.Filter, metrics: metrics, + filter: filter, metrics: metrics, } return rbf.readAndForward } @@ -170,7 +170,7 @@ func (rbf *ringBufForwarder) alreadyForwarded(ctx context.Context, _ []io.Closer func (rbf *ringBufForwarder) processAndForward(record ringbuf.Record, spansChan chan<- []request.Span) { rbf.access.Lock() defer rbf.access.Unlock() - s, ignore, err := rbf.reader(&record) + s, ignore, err := rbf.reader(&record, rbf.filter) if err != nil { rbf.logger.Error("error parsing perf event", err) return @@ -197,7 +197,7 @@ func (rbf *ringBufForwarder) processAndForward(record ringbuf.Record, spansChan func (rbf *ringBufForwarder) flushEvents(spansChan chan<- []request.Span) { rbf.metrics.TracerFlush(rbf.spansLen) - spansChan <- rbf.filter(rbf.spans[:rbf.spansLen]) + spansChan <- rbf.filter.Filter(rbf.spans[:rbf.spansLen]) rbf.spans = make([]request.Span, rbf.cfg.BatchLength) rbf.spansLen = 0 } diff --git a/pkg/internal/ebpf/common/ringbuf_test.go b/pkg/internal/ebpf/common/ringbuf_test.go index f9a1cad8e..c65a80442 100644 --- a/pkg/internal/ebpf/common/ringbuf_test.go +++ b/pkg/internal/ebpf/common/ringbuf_test.go @@ -227,6 +227,10 @@ func (pf *TestPidsFilter) BlockPID(p uint32, _ uint32) { delete(pf.services, p) } +func (pf *TestPidsFilter) ValidPID(_ uint32, _ uint32, _ PIDType) bool { + return true +} + func (pf *TestPidsFilter) CurrentPIDs(_ PIDType) map[uint32]map[uint32]svc.ID { return nil } diff --git a/pkg/internal/ebpf/common/tcp_detect_transform.go b/pkg/internal/ebpf/common/tcp_detect_transform.go index e418521f1..d330fa4b8 100644 --- a/pkg/internal/ebpf/common/tcp_detect_transform.go +++ b/pkg/internal/ebpf/common/tcp_detect_transform.go @@ -14,7 +14,7 @@ import ( "github.com/grafana/beyla/pkg/internal/sqlprune" ) -func ReadTCPRequestIntoSpan(record *ringbuf.Record) (request.Span, bool, error) { +func ReadTCPRequestIntoSpan(record *ringbuf.Record, filter ServiceFilter) (request.Span, bool, error) { var event TCPRequestInfo err := binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, &event) @@ -22,6 +22,10 @@ func ReadTCPRequestIntoSpan(record *ringbuf.Record) (request.Span, bool, error) return request.Span{}, true, err } + if !filter.ValidPID(event.Pid.UserPid, event.Pid.Ns, PIDTypeKProbes) { + return request.Span{}, true, nil + } + b := event.Buf[:] l := int(event.Len) diff --git a/pkg/internal/ebpf/common/tcp_detect_transform_test.go b/pkg/internal/ebpf/common/tcp_detect_transform_test.go index 5a45eb0a6..4f3408ce6 100644 --- a/pkg/internal/ebpf/common/tcp_detect_transform_test.go +++ b/pkg/internal/ebpf/common/tcp_detect_transform_test.go @@ -13,6 +13,7 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/beyla/pkg/internal/request" + "github.com/grafana/beyla/pkg/internal/svc" ) const ( @@ -71,6 +72,8 @@ func TestSQLDetectionFails(t *testing.T) { // Test making sure that issue https://github.com/grafana/beyla/issues/854 is fixed func TestReadTCPRequestIntoSpan_Overflow(t *testing.T) { + fltr := TestPidsFilter{services: map[uint32]svc.ID{}} + tri := TCPRequestInfo{ Len: 340, // this byte array contains select * from foo @@ -96,7 +99,7 @@ func TestReadTCPRequestIntoSpan_Overflow(t *testing.T) { } binaryRecord := bytes.Buffer{} require.NoError(t, binary.Write(&binaryRecord, binary.LittleEndian, tri)) - span, ignore, err := ReadTCPRequestIntoSpan(&ringbuf.Record{RawSample: binaryRecord.Bytes()}) + span, ignore, err := ReadTCPRequestIntoSpan(&ringbuf.Record{RawSample: binaryRecord.Bytes()}, &fltr) require.NoError(t, err) require.False(t, ignore) diff --git a/pkg/internal/ebpf/watcher/watcher.go b/pkg/internal/ebpf/watcher/watcher.go index ee7d1fdfd..111bd7313 100644 --- a/pkg/internal/ebpf/watcher/watcher.go +++ b/pkg/internal/ebpf/watcher/watcher.go @@ -94,7 +94,7 @@ func (p *Watcher) Run(ctx context.Context) { )(ctx, nil) } -func (p *Watcher) processWatchEvent(record *ringbuf.Record) (request.Span, bool, error) { +func (p *Watcher) processWatchEvent(record *ringbuf.Record, _ ebpfcommon.ServiceFilter) (request.Span, bool, error) { var flags uint64 var event BPFWatchInfo From 1f1f2c2d226c7c8e1abc56200cbd6683186fe8d7 Mon Sep 17 00:00:00 2001 From: Nikola Grcevski Date: Mon, 3 Jun 2024 19:30:13 -0400 Subject: [PATCH 4/5] refactor --- .../ebpf/common/http2grpc_transform.go | 19 +++ .../ebpf/common/kafka_detect_transform.go | 36 +++++ .../ebpf/common/sql_detect_transform.go | 79 +++++++++++ .../ebpf/common/tcp_detect_transform.go | 127 +----------------- 4 files changed, 135 insertions(+), 126 deletions(-) create mode 100644 pkg/internal/ebpf/common/sql_detect_transform.go diff --git a/pkg/internal/ebpf/common/http2grpc_transform.go b/pkg/internal/ebpf/common/http2grpc_transform.go index a567131b9..237614ee1 100644 --- a/pkg/internal/ebpf/common/http2grpc_transform.go +++ b/pkg/internal/ebpf/common/http2grpc_transform.go @@ -301,3 +301,22 @@ func ReadHTTP2InfoIntoSpan(record *ringbuf.Record, filter ServiceFilter) (reques return request.Span{}, true, nil // ignore if we couldn't parse it } + +func isHTTP2(data []uint8, event *TCPRequestInfo) bool { + framer := byteFramer(data) + + for { + f, err := framer.ReadFrame() + + if err != nil { + break + } + + if ff, ok := f.(*http2.HeadersFrame); ok { + method, path, _ := readMetaFrame((*BPFConnInfo)(&event.ConnInfo), framer, ff) + return method != "" || path != "" + } + } + + return false +} diff --git a/pkg/internal/ebpf/common/kafka_detect_transform.go b/pkg/internal/ebpf/common/kafka_detect_transform.go index 956398acb..154bc880a 100644 --- a/pkg/internal/ebpf/common/kafka_detect_transform.go +++ b/pkg/internal/ebpf/common/kafka_detect_transform.go @@ -4,6 +4,8 @@ import ( "encoding/binary" "errors" + trace2 "go.opentelemetry.io/otel/trace" + "github.com/grafana/beyla/pkg/internal/request" ) @@ -223,3 +225,37 @@ func getTopicOffsetFromFetchOperation(header *Header) int { return offset } + +func TCPToKafkaToSpan(trace *TCPRequestInfo, data *KafkaInfo) request.Span { + peer := "" + hostname := "" + hostPort := 0 + + if trace.ConnInfo.S_port != 0 || trace.ConnInfo.D_port != 0 { + peer, hostname = trace.reqHostInfo() + hostPort = int(trace.ConnInfo.D_port) + } + return request.Span{ + Type: request.EventTypeKafkaClient, + Method: data.Operation.String(), + OtherNamespace: data.ClientID, + Path: data.Topic, + Peer: peer, + Host: hostname, + HostPort: hostPort, + ContentLength: 0, + RequestStart: int64(trace.StartMonotimeNs), + Start: int64(trace.StartMonotimeNs), + End: int64(trace.EndMonotimeNs), + Status: 0, + TraceID: trace2.TraceID(trace.Tp.TraceId), + SpanID: trace2.SpanID(trace.Tp.SpanId), + ParentSpanID: trace2.SpanID(trace.Tp.ParentId), + Flags: trace.Tp.Flags, + Pid: request.PidInfo{ + HostPID: trace.Pid.HostPid, + UserPID: trace.Pid.UserPid, + Namespace: trace.Pid.Ns, + }, + } +} diff --git a/pkg/internal/ebpf/common/sql_detect_transform.go b/pkg/internal/ebpf/common/sql_detect_transform.go new file mode 100644 index 000000000..b6b418df6 --- /dev/null +++ b/pkg/internal/ebpf/common/sql_detect_transform.go @@ -0,0 +1,79 @@ +package ebpfcommon + +import ( + "strings" + + trace2 "go.opentelemetry.io/otel/trace" + + "github.com/grafana/beyla/pkg/internal/request" + "github.com/grafana/beyla/pkg/internal/sqlprune" +) + +func validSQL(op, table string) bool { + return op != "" && table != "" +} + +// when the input string is invalid unicode (might happen with the ringbuffer +// data), strings.ToUpper might return a string larger than the input string, +// and might cause some later out of bound errors. +func asciiToUpper(input string) string { + out := make([]byte, len(input)) + for i := range input { + if input[i] >= 'a' && input[i] <= 'z' { + out[i] = input[i] - byte('a') + byte('A') + } else { + out[i] = input[i] + } + } + return string(out) +} + +func detectSQL(buf string) (string, string, string) { + b := asciiToUpper(buf) + for _, q := range []string{"SELECT", "UPDATE", "DELETE", "INSERT", "ALTER", "CREATE", "DROP"} { + i := strings.Index(b, q) + if i >= 0 { + sql := cstr([]uint8(b[i:])) + + op, table := sqlprune.SQLParseOperationAndTable(sql) + return op, table, sql + } + } + + return "", "", "" +} + +func TCPToSQLToSpan(trace *TCPRequestInfo, op, table, sql string) request.Span { + peer := "" + hostname := "" + hostPort := 0 + + if trace.ConnInfo.S_port != 0 || trace.ConnInfo.D_port != 0 { + peer, hostname = trace.reqHostInfo() + hostPort = int(trace.ConnInfo.D_port) + } + + return request.Span{ + Type: request.EventTypeSQLClient, + Method: op, + Path: table, + Peer: peer, + Host: hostname, + HostPort: hostPort, + ContentLength: 0, + RequestStart: int64(trace.StartMonotimeNs), + Start: int64(trace.StartMonotimeNs), + End: int64(trace.EndMonotimeNs), + Status: 0, + TraceID: trace2.TraceID(trace.Tp.TraceId), + SpanID: trace2.SpanID(trace.Tp.SpanId), + ParentSpanID: trace2.SpanID(trace.Tp.ParentId), + Flags: trace.Tp.Flags, + Pid: request.PidInfo{ + HostPID: trace.Pid.HostPid, + UserPID: trace.Pid.UserPid, + Namespace: trace.Pid.Ns, + }, + Statement: sql, + } +} diff --git a/pkg/internal/ebpf/common/tcp_detect_transform.go b/pkg/internal/ebpf/common/tcp_detect_transform.go index d330fa4b8..e4fdfd73a 100644 --- a/pkg/internal/ebpf/common/tcp_detect_transform.go +++ b/pkg/internal/ebpf/common/tcp_detect_transform.go @@ -4,16 +4,13 @@ import ( "bytes" "encoding/binary" "net" - "strings" "github.com/cilium/ebpf/ringbuf" - trace2 "go.opentelemetry.io/otel/trace" - "golang.org/x/net/http2" "github.com/grafana/beyla/pkg/internal/request" - "github.com/grafana/beyla/pkg/internal/sqlprune" ) +// nolint:cyclop func ReadTCPRequestIntoSpan(record *ringbuf.Record, filter ServiceFilter) (request.Span, bool, error) { var event TCPRequestInfo @@ -63,40 +60,6 @@ func ReadTCPRequestIntoSpan(record *ringbuf.Record, filter ServiceFilter) (reque return request.Span{}, true, nil // ignore if we couldn't parse it } -func validSQL(op, table string) bool { - return op != "" && table != "" -} - -func detectSQL(buf string) (string, string, string) { - b := asciiToUpper(buf) - for _, q := range []string{"SELECT", "UPDATE", "DELETE", "INSERT", "ALTER", "CREATE", "DROP"} { - i := strings.Index(b, q) - if i >= 0 { - sql := cstr([]uint8(b[i:])) - - op, table := sqlprune.SQLParseOperationAndTable(sql) - return op, table, sql - } - } - - return "", "", "" -} - -// when the input string is invalid unicode (might happen with the ringbuffer -// data), strings.ToUpper might return a string larger than the input string, -// and might cause some later out of bound errors. -func asciiToUpper(input string) string { - out := make([]byte, len(input)) - for i := range input { - if input[i] >= 'a' && input[i] <= 'z' { - out[i] = input[i] - byte('a') + byte('A') - } else { - out[i] = input[i] - } - } - return string(out) -} - func (trace *TCPRequestInfo) reqHostInfo() (source, target string) { src := make(net.IP, net.IPv6len) dst := make(net.IP, net.IPv6len) @@ -105,91 +68,3 @@ func (trace *TCPRequestInfo) reqHostInfo() (source, target string) { return src.String(), dst.String() } - -func TCPToSQLToSpan(trace *TCPRequestInfo, op, table, sql string) request.Span { - peer := "" - hostname := "" - hostPort := 0 - - if trace.ConnInfo.S_port != 0 || trace.ConnInfo.D_port != 0 { - peer, hostname = trace.reqHostInfo() - hostPort = int(trace.ConnInfo.D_port) - } - - return request.Span{ - Type: request.EventTypeSQLClient, - Method: op, - Path: table, - Peer: peer, - Host: hostname, - HostPort: hostPort, - ContentLength: 0, - RequestStart: int64(trace.StartMonotimeNs), - Start: int64(trace.StartMonotimeNs), - End: int64(trace.EndMonotimeNs), - Status: 0, - TraceID: trace2.TraceID(trace.Tp.TraceId), - SpanID: trace2.SpanID(trace.Tp.SpanId), - ParentSpanID: trace2.SpanID(trace.Tp.ParentId), - Flags: trace.Tp.Flags, - Pid: request.PidInfo{ - HostPID: trace.Pid.HostPid, - UserPID: trace.Pid.UserPid, - Namespace: trace.Pid.Ns, - }, - Statement: sql, - } -} - -func isHTTP2(data []uint8, event *TCPRequestInfo) bool { - framer := byteFramer(data) - - for { - f, err := framer.ReadFrame() - - if err != nil { - break - } - - if ff, ok := f.(*http2.HeadersFrame); ok { - method, path, _ := readMetaFrame((*BPFConnInfo)(&event.ConnInfo), framer, ff) - return method != "" || path != "" - } - } - - return false -} - -func TCPToKafkaToSpan(trace *TCPRequestInfo, data *KafkaInfo) request.Span { - peer := "" - hostname := "" - hostPort := 0 - - if trace.ConnInfo.S_port != 0 || trace.ConnInfo.D_port != 0 { - peer, hostname = trace.reqHostInfo() - hostPort = int(trace.ConnInfo.D_port) - } - return request.Span{ - Type: request.EventTypeKafkaClient, - Method: data.Operation.String(), - OtherNamespace: data.ClientID, - Path: data.Topic, - Peer: peer, - Host: hostname, - HostPort: hostPort, - ContentLength: 0, - RequestStart: int64(trace.StartMonotimeNs), - Start: int64(trace.StartMonotimeNs), - End: int64(trace.EndMonotimeNs), - Status: 0, - TraceID: trace2.TraceID(trace.Tp.TraceId), - SpanID: trace2.SpanID(trace.Tp.SpanId), - ParentSpanID: trace2.SpanID(trace.Tp.ParentId), - Flags: trace.Tp.Flags, - Pid: request.PidInfo{ - HostPID: trace.Pid.HostPid, - UserPID: trace.Pid.UserPid, - Namespace: trace.Pid.Ns, - }, - } -} From 0099d44daad4dcb1d192807a7add42335aa6cb49 Mon Sep 17 00:00:00 2001 From: Nikola Grcevski Date: Tue, 4 Jun 2024 15:15:47 -0400 Subject: [PATCH 5/5] Fix issues found during testing --- pkg/internal/ebpf/common/pids.go | 2 +- pkg/internal/ebpf/common/sql_detect_transform.go | 2 +- pkg/internal/ebpf/common/tcp_detect_transform_test.go | 8 ++++---- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/internal/ebpf/common/pids.go b/pkg/internal/ebpf/common/pids.go index 913c8c6fd..03aebe52a 100644 --- a/pkg/internal/ebpf/common/pids.go +++ b/pkg/internal/ebpf/common/pids.go @@ -191,7 +191,7 @@ func (pf *IdentityPidsFilter) AllowPID(_ uint32, _ uint32, _ svc.ID, _ PIDType) func (pf *IdentityPidsFilter) BlockPID(_ uint32, _ uint32) {} func (pf *IdentityPidsFilter) ValidPID(_ uint32, _ uint32, _ PIDType) bool { - return false + return true } func (pf *IdentityPidsFilter) CurrentPIDs(_ PIDType) map[uint32]map[uint32]svc.ID { diff --git a/pkg/internal/ebpf/common/sql_detect_transform.go b/pkg/internal/ebpf/common/sql_detect_transform.go index b6b418df6..0b466dca9 100644 --- a/pkg/internal/ebpf/common/sql_detect_transform.go +++ b/pkg/internal/ebpf/common/sql_detect_transform.go @@ -33,7 +33,7 @@ func detectSQL(buf string) (string, string, string) { for _, q := range []string{"SELECT", "UPDATE", "DELETE", "INSERT", "ALTER", "CREATE", "DROP"} { i := strings.Index(b, q) if i >= 0 { - sql := cstr([]uint8(b[i:])) + sql := cstr([]uint8(buf[i:])) op, table := sqlprune.SQLParseOperationAndTable(sql) return op, table, sql diff --git a/pkg/internal/ebpf/common/tcp_detect_transform_test.go b/pkg/internal/ebpf/common/tcp_detect_transform_test.go index 4f3408ce6..fb3513dce 100644 --- a/pkg/internal/ebpf/common/tcp_detect_transform_test.go +++ b/pkg/internal/ebpf/common/tcp_detect_transform_test.go @@ -26,16 +26,16 @@ func TestTCPReqSQLParsing(t *testing.T) { r := makeTCPReq(sql, tcpSend, 343534, 8080, 2000) op, table, sql := detectSQL(sql) assert.Equal(t, op, "SELECT") - assert.Equal(t, table, "ACCOUNTS") + assert.Equal(t, table, "accounts") s := TCPToSQLToSpan(&r, op, table, sql) assert.NotNil(t, s) assert.NotEmpty(t, s.Host) assert.NotEmpty(t, s.Peer) assert.Equal(t, s.HostPort, 8080) assert.Greater(t, s.End, s.Start) - assert.True(t, strings.Contains(s.Statement, "SELECT * FROM ACCOUNTS ")) + assert.True(t, strings.Contains(s.Statement, "SELECT * FROM accounts ")) assert.Equal(t, "SELECT", s.Method) - assert.Equal(t, "ACCOUNTS", s.Path) + assert.Equal(t, "accounts", s.Path) assert.Equal(t, request.EventTypeSQLClient, s.Type) } @@ -105,7 +105,7 @@ func TestReadTCPRequestIntoSpan_Overflow(t *testing.T) { assert.Equal(t, request.EventTypeSQLClient, span.Type) assert.Equal(t, "SELECT", span.Method) - assert.Equal(t, "FOO", span.Path) + assert.Equal(t, "foo", span.Path) } func TestRedisDetection(t *testing.T) {