Skip to content

Commit

Permalink
Fix concurrent access crash in PIDsFilter (#479)
Browse files Browse the repository at this point in the history
* Fix concurrent access crash in PIDsFilter

* Fix currentPID usage
  • Loading branch information
mariomac authored Nov 30, 2023
1 parent be81a91 commit 26e3210
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 17 deletions.
57 changes: 48 additions & 9 deletions pkg/internal/ebpf/common/pids.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ebpfcommon

import (
"log/slog"
"sync"

"github.com/grafana/beyla/pkg/internal/request"
)
Expand All @@ -19,13 +20,25 @@ var readNamespacePIDs = func(pid int32) ([]uint32, error) {
return FindNamespacedPids(pid)
}

// NamespacedPID is a pair of coordinates to identify a Process ID.
type NamespacedPID struct {
PIDNamespace uint32
PID uint32
}

// PIDsFilter keeps a thread-safe copy of the PIDs whose traces are allowed to
// be forwarded. Its Filter method filters the request.Span instances whose
// PIDs are not in the allowed list.
type PIDsFilter struct {
log *slog.Logger
log *slog.Logger
// current namespaces and their PIDs
current map[uint32]map[uint32]struct{}
queue chan pidEvent
// currentLock provides concurrent R/W access to current
currentLock *sync.RWMutex
// currentSnapshot keeps an updated copy of the PID coordinates of current map, used to
// concurrently share the information outside this PIDsFilter with thread safety.
currentSnapshot []NamespacedPID
queue chan pidEvent
}

type PIDEventOp uint8
Expand All @@ -42,9 +55,10 @@ type pidEvent struct {

func NewPIDsFilter(log *slog.Logger) *PIDsFilter {
return &PIDsFilter{
log: log,
current: map[uint32]map[uint32]struct{}{},
queue: make(chan pidEvent, updatesBufLen),
log: log,
currentLock: &sync.RWMutex{},
current: map[uint32]map[uint32]struct{}{},
queue: make(chan pidEvent, updatesBufLen),
}
}

Expand All @@ -56,16 +70,33 @@ func (pf *PIDsFilter) BlockPID(pid uint32) {
pf.queue <- pidEvent{pid: pid, op: DEL}
}

func (pf *PIDsFilter) CurrentPIDs() map[uint32]map[uint32]struct{} {
func (pf *PIDsFilter) CurrentPIDs() []NamespacedPID {
pf.updatePIDs()
return pf.current
// first look if there is an updated snapshot of the
// current PIDs, and build it if it does not exist
// or it has been invalidated
snapshot := pf.currentSnapshot
if len(snapshot) == 0 {
pf.currentLock.RLock()
defer pf.currentLock.RUnlock()
snapshot = make([]NamespacedPID, 0, len(pf.current))
for ns, pids := range pf.current {
for pid := range pids {
snapshot = append(snapshot, NamespacedPID{PIDNamespace: ns, PID: pid})
}
}
pf.currentSnapshot = snapshot
}
return snapshot
}

func (pf *PIDsFilter) Filter(inputSpans []request.Span) []request.Span {
// todo: adaptive presizing as a function of the historical percentage
// of filtered spans
outputSpans := make([]request.Span, 0, len(inputSpans))
pf.updatePIDs()
pf.currentLock.RLock()
defer pf.currentLock.RUnlock()
for i := range inputSpans {
span := &inputSpans[i]

Expand Down Expand Up @@ -95,16 +126,22 @@ func (pf *PIDsFilter) Filter(inputSpans []request.Span) []request.Span {

// update added/deleted PIDs, if any
func (pf *PIDsFilter) updatePIDs() {
pf.currentLock.Lock()
defer pf.currentLock.Unlock()
for {
select {
case e := <-pf.queue:
switch e.op {
case ADD:
// invalidate current PIDs snapshot
pf.currentSnapshot = nil
pf.addPID(e.pid)
case DEL:
// invalidate current PIDs snapshot
pf.currentSnapshot = nil
pf.removePID(e.pid)
default:
pf.log.Error("Unsupported PID operation")
pf.log.Error("Unsupported PID operation", "op", e)
}
default:
// no more updates
Expand All @@ -113,6 +150,7 @@ func (pf *PIDsFilter) updatePIDs() {
}
}

// addPID is not thread-safe. Its invoker must ensure the synchronization.
func (pf *PIDsFilter) addPID(pid uint32) {
nsid, err := readNamespace(int32(pid))

Expand All @@ -139,6 +177,7 @@ func (pf *PIDsFilter) addPID(pid uint32) {
}
}

// removePID is not thread-safe. Its invoker must ensure the synchronization.
func (pf *PIDsFilter) removePID(pid uint32) {
nsid, err := readNamespace(int32(pid))

Expand Down Expand Up @@ -166,7 +205,7 @@ func (pf *IdentityPidsFilter) AllowPID(_ uint32) {}

func (pf *IdentityPidsFilter) BlockPID(_ uint32) {}

func (pf *IdentityPidsFilter) CurrentPIDs() map[uint32]map[uint32]struct{} {
func (pf *IdentityPidsFilter) CurrentPIDs() []NamespacedPID {
return nil
}

Expand Down
14 changes: 6 additions & 8 deletions pkg/internal/ebpf/httpfltr/httpfltr.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type HTTPInfo struct {
type pidsFilter interface {
ebpf2.PIDsAccounter
Filter(inputSpans []request.Span) []request.Span
CurrentPIDs() map[uint32]map[uint32]struct{}
CurrentPIDs() []ebpfcommon.NamespacedPID
}

type Tracer struct {
Expand Down Expand Up @@ -290,14 +290,12 @@ func (p *Tracer) Run(ctx context.Context, eventsChan chan<- []request.Span, serv
// pids that are allowed into the bpf map
if p.bpfObjects.ValidPids != nil {
p.log.Debug("Reallowing pids")
for nsid, pids := range p.pidsFilter.CurrentPIDs() {
for pid := range pids {
p.log.Debug("Reallowing pid", "pid", pid, "namespace", nsid)
err := p.bpfObjects.ValidPids.Put(pid, nsid)
for _, np := range p.pidsFilter.CurrentPIDs() {
p.log.Debug("Reallowing pid", "pid", np.PID, "namespace", np.PIDNamespace)
err := p.bpfObjects.ValidPids.Put(np.PID, np.PIDNamespace)
if err != nil {
if err != nil {
if err != nil {
p.log.Error("Error setting up pid in BPF space", "pid", pid, "namespace", nsid, "error", err)
}
p.log.Error("Error setting up pid in BPF space", "pid", np.PID, "namespace", np.PIDNamespace, "error", err)
}
}
}
Expand Down

0 comments on commit 26e3210

Please sign in to comment.