Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes for SQL detection and Go events handled by kprobes #901

Merged
merged 5 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/internal/discover/attacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,9 @@ func monitorPIDs(tracer *ebpf.ProcessTracer, ie *Instrumentable) {
}

// allowing the tracer to forward traces from the discovered PID and its children processes
tracer.AllowPID(uint32(ie.FileInfo.Pid), ie.FileInfo.Service)
tracer.AllowPID(uint32(ie.FileInfo.Pid), ie.FileInfo.Ns, ie.FileInfo.Service)
for _, pid := range ie.ChildPids {
tracer.AllowPID(pid, ie.FileInfo.Service)
tracer.AllowPID(pid, ie.FileInfo.Ns, ie.FileInfo.Service)
}
}

Expand All @@ -205,7 +205,7 @@ func (ta *TraceAttacher) notifyProcessDeletion(ie *Instrumentable) {
// notifying the tracer to block any trace from that PID
// to avoid that a new process reusing this PID could send traces
// unless explicitly allowed
tracer.BlockPID(uint32(ie.FileInfo.Pid))
tracer.BlockPID(uint32(ie.FileInfo.Pid), ie.FileInfo.Ns)

// if there are no more trace instances for a Go program, we need to notify that
// the tracer needs to be stopped and deleted.
Expand Down
8 changes: 4 additions & 4 deletions pkg/internal/ebpf/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ var MisclassifiedEvents = make(chan MisclassifiedEvent)

func ptlog() *slog.Logger { return slog.With("component", "ebpf.ProcessTracer") }

func ReadHTTPRequestTraceAsSpan(record *ringbuf.Record) (request.Span, bool, error) {
func ReadHTTPRequestTraceAsSpan(record *ringbuf.Record, filter ServiceFilter) (request.Span, bool, error) {
var eventType uint8

// we read the type first, depending on the type we decide what kind of record we have
Expand All @@ -114,11 +114,11 @@ func ReadHTTPRequestTraceAsSpan(record *ringbuf.Record) (request.Span, bool, err
case EventTypeSQL:
return ReadSQLRequestTraceAsSpan(record)
case EventTypeKHTTP:
return ReadHTTPInfoIntoSpan(record)
return ReadHTTPInfoIntoSpan(record, filter)
case EventTypeKHTTP2:
return ReadHTTP2InfoIntoSpan(record)
return ReadHTTP2InfoIntoSpan(record, filter)
case EventTypeTCP:
return ReadTCPRequestIntoSpan(record)
return ReadTCPRequestIntoSpan(record, filter)
case EventTypeGoKafka:
return ReadGoKafkaRequestIntoSpan(record)
case EventTypeGoRedis:
Expand Down
25 changes: 24 additions & 1 deletion pkg/internal/ebpf/common/http2grpc_transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,14 +232,18 @@ func (event *BPFHTTP2Info) hostInfo() (source, target string) {
}

// nolint:cyclop
func ReadHTTP2InfoIntoSpan(record *ringbuf.Record) (request.Span, bool, error) {
func ReadHTTP2InfoIntoSpan(record *ringbuf.Record, filter ServiceFilter) (request.Span, bool, error) {
var event BPFHTTP2Info

err := binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, &event)
if err != nil {
return request.Span{}, true, err
}

if !filter.ValidPID(event.Pid.UserPid, event.Pid.Ns, PIDTypeKProbes) {
return request.Span{}, true, nil
}

framer := byteFramer(event.Data[:])
retFramer := byteFramer(event.RetData[:])
// We don't set the framer.ReadMetaHeaders function to hpack.NewDecoder because
Expand Down Expand Up @@ -297,3 +301,22 @@ func ReadHTTP2InfoIntoSpan(record *ringbuf.Record) (request.Span, bool, error) {

return request.Span{}, true, nil // ignore if we couldn't parse it
}

func isHTTP2(data []uint8, event *TCPRequestInfo) bool {
framer := byteFramer(data)

for {
f, err := framer.ReadFrame()

if err != nil {
break
}

if ff, ok := f.(*http2.HeadersFrame); ok {
method, path, _ := readMetaFrame((*BPFConnInfo)(&event.ConnInfo), framer, ff)
return method != "" || path != ""
}
}

return false
}
12 changes: 9 additions & 3 deletions pkg/internal/ebpf/common/httpfltr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ func TestCstr(t *testing.T) {
}

func TestToRequestTrace(t *testing.T) {
fltr := TestPidsFilter{services: map[uint32]svc.ID{}}

var record BPFHTTPInfo
record.Type = 1
record.StartMonotimeNs = 123456
Expand All @@ -100,7 +102,7 @@ func TestToRequestTrace(t *testing.T) {
err := binary.Write(buf, binary.LittleEndian, &record)
assert.NoError(t, err)

result, _, err := ReadHTTPInfoIntoSpan(&ringbuf.Record{RawSample: buf.Bytes()})
result, _, err := ReadHTTPInfoIntoSpan(&ringbuf.Record{RawSample: buf.Bytes()}, &fltr)
assert.NoError(t, err)

expected := request.Span{
Expand All @@ -120,6 +122,8 @@ func TestToRequestTrace(t *testing.T) {
}

func TestToRequestTraceNoConnection(t *testing.T) {
fltr := TestPidsFilter{services: map[uint32]svc.ID{}}

var record BPFHTTPInfo
record.Type = 1
record.StartMonotimeNs = 123456
Expand All @@ -133,7 +137,7 @@ func TestToRequestTraceNoConnection(t *testing.T) {
err := binary.Write(buf, binary.LittleEndian, &record)
assert.NoError(t, err)

result, _, err := ReadHTTPInfoIntoSpan(&ringbuf.Record{RawSample: buf.Bytes()})
result, _, err := ReadHTTPInfoIntoSpan(&ringbuf.Record{RawSample: buf.Bytes()}, &fltr)
assert.NoError(t, err)

// change the expected port just before testing
Expand All @@ -154,6 +158,8 @@ func TestToRequestTraceNoConnection(t *testing.T) {
}

func TestToRequestTrace_BadHost(t *testing.T) {
fltr := TestPidsFilter{services: map[uint32]svc.ID{}}

var record BPFHTTPInfo
record.Type = 1
record.StartMonotimeNs = 123456
Expand All @@ -169,7 +175,7 @@ func TestToRequestTrace_BadHost(t *testing.T) {
err := binary.Write(buf, binary.LittleEndian, &record)
assert.NoError(t, err)

result, _, err := ReadHTTPInfoIntoSpan(&ringbuf.Record{RawSample: buf.Bytes()})
result, _, err := ReadHTTPInfoIntoSpan(&ringbuf.Record{RawSample: buf.Bytes()}, &fltr)
assert.NoError(t, err)

expected := request.Span{
Expand Down
7 changes: 6 additions & 1 deletion pkg/internal/ebpf/common/httpfltr_transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,18 @@ type HTTPInfo struct {
Service svc.ID
}

func ReadHTTPInfoIntoSpan(record *ringbuf.Record) (request.Span, bool, error) {
func ReadHTTPInfoIntoSpan(record *ringbuf.Record, filter ServiceFilter) (request.Span, bool, error) {
var event BPFHTTPInfo
err := binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, &event)
if err != nil {
return request.Span{}, true, err
}

// Generated by Go instrumentation
if !filter.ValidPID(event.Pid.UserPid, event.Pid.Ns, PIDTypeKProbes) {
return request.Span{}, true, nil
}

return HTTPInfoEventToSpan(event)
}

Expand Down
36 changes: 36 additions & 0 deletions pkg/internal/ebpf/common/kafka_detect_transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"encoding/binary"
"errors"

trace2 "go.opentelemetry.io/otel/trace"

"github.com/grafana/beyla/pkg/internal/request"
)

Expand Down Expand Up @@ -223,3 +225,37 @@ func getTopicOffsetFromFetchOperation(header *Header) int {

return offset
}

func TCPToKafkaToSpan(trace *TCPRequestInfo, data *KafkaInfo) request.Span {
peer := ""
hostname := ""
hostPort := 0

if trace.ConnInfo.S_port != 0 || trace.ConnInfo.D_port != 0 {
peer, hostname = trace.reqHostInfo()
hostPort = int(trace.ConnInfo.D_port)
}
return request.Span{
Type: request.EventTypeKafkaClient,
Method: data.Operation.String(),
OtherNamespace: data.ClientID,
Path: data.Topic,
Peer: peer,
Host: hostname,
HostPort: hostPort,
ContentLength: 0,
RequestStart: int64(trace.StartMonotimeNs),
Start: int64(trace.StartMonotimeNs),
End: int64(trace.EndMonotimeNs),
Status: 0,
TraceID: trace2.TraceID(trace.Tp.TraceId),
SpanID: trace2.SpanID(trace.Tp.SpanId),
ParentSpanID: trace2.SpanID(trace.Tp.ParentId),
Flags: trace.Tp.Flags,
Pid: request.PidInfo{
HostPID: trace.Pid.HostPid,
UserPID: trace.Pid.UserPid,
Namespace: trace.Pid.Ns,
},
}
}
61 changes: 30 additions & 31 deletions pkg/internal/ebpf/common/pids.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,17 @@ var activePids, _ = lru.New[uint32, svc.ID](1024)
// injectable functions (can be replaced in tests). It reads the
// current process namespace from the /proc filesystem. It is required to
// choose to filter traces using whether the User-space or Host-space PIDs
var readNamespace = FindNamespace

var readNamespacePIDs = FindNamespacedPids
var readNamespacePIDs = exec.FindNamespacedPids

type PIDInfo struct {
service svc.ID
pidType PIDType
}

type ServiceFilter interface {
AllowPID(uint32, svc.ID, PIDType)
BlockPID(uint32)
AllowPID(uint32, uint32, svc.ID, PIDType)
BlockPID(uint32, uint32)
ValidPID(uint32, uint32, PIDType) bool
Filter(inputSpans []request.Span) []request.Span
CurrentPIDs(PIDType) map[uint32]map[uint32]svc.ID
}
Expand Down Expand Up @@ -74,16 +73,30 @@ func CommonPIDsFilter(systemWide bool) ServiceFilter {
return commonPIDsFilter
}

func (pf *PIDsFilter) AllowPID(pid uint32, svc svc.ID, pidType PIDType) {
func (pf *PIDsFilter) AllowPID(pid, ns uint32, svc svc.ID, pidType PIDType) {
pf.mux.Lock()
defer pf.mux.Unlock()
pf.addPID(pid, svc, pidType)
pf.addPID(pid, ns, svc, pidType)
}

func (pf *PIDsFilter) BlockPID(pid uint32) {
func (pf *PIDsFilter) BlockPID(pid, ns uint32) {
pf.mux.Lock()
defer pf.mux.Unlock()
pf.removePID(pid)
pf.removePID(pid, ns)
}

func (pf *PIDsFilter) ValidPID(userPID, ns uint32, pidType PIDType) bool {
pf.mux.RLock()
defer pf.mux.RUnlock()

if ns, nsExists := pf.current[ns]; nsExists {
if info, pidExists := ns[userPID]; pidExists {
return info.pidType == pidType
}
}

return false

}

func (pf *PIDsFilter) CurrentPIDs(t PIDType) map[uint32]map[uint32]svc.ID {
Expand Down Expand Up @@ -138,14 +151,7 @@ func (pf *PIDsFilter) Filter(inputSpans []request.Span) []request.Span {
return outputSpans
}

func (pf *PIDsFilter) addPID(pid uint32, s svc.ID, t PIDType) {
nsid, err := readNamespace(int32(pid))

if err != nil {
pf.log.Error("Error looking up namespace for tracking PID", "pid", pid, "error", err)
return
}

func (pf *PIDsFilter) addPID(pid, nsid uint32, s svc.ID, t PIDType) {
ns, nsExists := pf.current[nsid]
if !nsExists {
ns = make(map[uint32]PIDInfo)
Expand All @@ -164,18 +170,7 @@ func (pf *PIDsFilter) addPID(pid uint32, s svc.ID, t PIDType) {
}
}

func (pf *PIDsFilter) removePID(pid uint32) {
nsid, err := readNamespace(int32(pid))

if err != nil {
// this will always happen on process removal, as /proc/<pid>/ns/pid won't be found
// the code is kept here as a placeholder for a future fix (e.g. using eBPF notifications
// to get both the PID and the nsid)
// TODO: fix
pf.log.Debug("Error looking up namespace for removing PID", "pid", pid, "error", err)
return
}

func (pf *PIDsFilter) removePID(pid, nsid uint32) {
ns, nsExists := pf.current[nsid]
if !nsExists {
return
Expand All @@ -191,9 +186,13 @@ func (pf *PIDsFilter) removePID(pid uint32) {
// for system-wide instrumenation
type IdentityPidsFilter struct{}

func (pf *IdentityPidsFilter) AllowPID(_ uint32, _ svc.ID, _ PIDType) {}
func (pf *IdentityPidsFilter) AllowPID(_ uint32, _ uint32, _ svc.ID, _ PIDType) {}

func (pf *IdentityPidsFilter) BlockPID(_ uint32) {}
func (pf *IdentityPidsFilter) BlockPID(_ uint32, _ uint32) {}

func (pf *IdentityPidsFilter) ValidPID(_ uint32, _ uint32, _ PIDType) bool {
return false
}

func (pf *IdentityPidsFilter) CurrentPIDs(_ PIDType) map[uint32]map[uint32]svc.ID {
return nil
Expand Down
Loading
Loading