Skip to content

Commit

Permalink
[EBPF-560] attacher: Fix TestSingleFile flaky test (DataDog#31751)
Browse files Browse the repository at this point in the history
  • Loading branch information
gjulianm authored Dec 5, 2024
1 parent fef1eb7 commit 81c75a5
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 12 deletions.
48 changes: 37 additions & 11 deletions pkg/ebpf/uprobes/attacher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -824,7 +824,7 @@ func createTempTestFile(t *testing.T, name string) (string, utils.PathIdentifier

type SharedLibrarySuite struct {
suite.Suite
procMonitor ProcessMonitor
procMonitor *processMonitorProxy
}

func TestAttacherSharedLibrary(t *testing.T) {
Expand All @@ -839,16 +839,27 @@ func TestAttacherSharedLibrary(t *testing.T) {

tt.Run("netlink", func(ttt *testing.T) {
processMonitor := launchProcessMonitor(ttt, false)
suite.Run(ttt, &SharedLibrarySuite{procMonitor: processMonitor})

// Use a proxy so we can manually trigger events in case of misses
procmonObserver := newProcessMonitorProxy(processMonitor)
suite.Run(ttt, &SharedLibrarySuite{procMonitor: procmonObserver})
})

tt.Run("event stream", func(ttt *testing.T) {
processMonitor := launchProcessMonitor(ttt, true)
suite.Run(ttt, &SharedLibrarySuite{procMonitor: processMonitor})

// Use a proxy so we can manually trigger events in case of misses
procmonObserver := newProcessMonitorProxy(processMonitor)
suite.Run(ttt, &SharedLibrarySuite{procMonitor: procmonObserver})
})
})
}

func (s *SharedLibrarySuite) SetupTest() {
// Reset callbacks
s.procMonitor.Reset()
}

func (s *SharedLibrarySuite) TestSingleFile() {
t := s.T()
ebpfCfg := ddebpf.NewConfig()
Expand Down Expand Up @@ -900,16 +911,31 @@ func (s *SharedLibrarySuite) TestSingleFile() {
3, 10*time.Millisecond, 500*time.Millisecond, "did not catch process running, received calls %v", mockRegistry.Calls)

mockRegistry.AssertCalled(t, "Register", fooPath1, uint32(cmd.Process.Pid), mock.Anything, mock.Anything, mock.Anything)

mockRegistry.Calls = nil
require.NoError(t, cmd.Process.Kill())

require.Eventually(t, func() bool {
// Other processes might have finished and forced the Unregister call to the registry
return methodHasBeenCalledWithPredicate(mockRegistry, "Unregister", func(call mock.Call) bool {
return call.Arguments[0].(uint32) == uint32(cmd.Process.Pid)
})
}, time.Second*10, 200*time.Millisecond, "received calls %v", mockRegistry.Calls)
// The ideal path would be that the process monitor sends an exit event for
// the process as it's killed. However, sometimes these events are missed
// and the callbacks aren't called. Unlike the "Process launch" event, we
// cannot recreate the process exit, which would be the ideal solution to
// ensure we're testing the correct behavior (including any
// filters/callbacks on the process monitor). Instead, we manually trigger
// the exit event for the process using the processMonitorProxy, which
// should replicate the same codepath.
waitAndRetryIfFail(t,
func() {
require.NoError(t, cmd.Process.Kill())
},
func() bool {
return methodHasBeenCalledWithPredicate(mockRegistry, "Unregister", func(call mock.Call) bool {
return call.Arguments[0].(uint32) == uint32(cmd.Process.Pid)
})
},
func(testSuccess bool) {
if !testSuccess {
// If the test failed once, manually trigger the exit event
s.procMonitor.triggerExit(uint32(cmd.Process.Pid))
}
}, 2, 10*time.Millisecond, 500*time.Millisecond, "attacher did not correctly handle exit events received calls %v", mockRegistry.Calls)

mockRegistry.AssertCalled(t, "Unregister", uint32(cmd.Process.Pid))
}
Expand Down
58 changes: 57 additions & 1 deletion pkg/ebpf/uprobes/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024-present Datadog, Inc.

//go:build linux_bpf
//go:build linux_bpf && test

package uprobes

Expand All @@ -13,6 +13,7 @@ import (
"path/filepath"
"runtime"
"strconv"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -236,3 +237,58 @@ func waitAndRetryIfFail(t *testing.T, setupFunc func(), testFunc func() bool, re

require.Fail(t, "condition not met after %d retries", maxRetries, msgAndArgs)
}

// processMonitorProxy is a wrapper around a ProcessMonitor that stores the
// callbacks subscribed to it, and triggers them which allows manually
// triggering the callbacks for testing purposes.
type processMonitorProxy struct {
target ProcessMonitor
mutex sync.Mutex // performance is not a worry for this, so use a single mutex for simplicity
execCallbacks map[*func(uint32)]struct{}
exitCallbacks map[*func(uint32)]struct{}
}

// ensure it implements the ProcessMonitor interface
var _ ProcessMonitor = &processMonitorProxy{}

func newProcessMonitorProxy(target ProcessMonitor) *processMonitorProxy {
return &processMonitorProxy{
target: target,
execCallbacks: make(map[*func(uint32)]struct{}),
exitCallbacks: make(map[*func(uint32)]struct{}),
}
}

func (o *processMonitorProxy) SubscribeExec(cb func(uint32)) func() {
o.mutex.Lock()
defer o.mutex.Unlock()
o.execCallbacks[&cb] = struct{}{}

return o.target.SubscribeExec(cb)
}

func (o *processMonitorProxy) SubscribeExit(cb func(uint32)) func() {
o.mutex.Lock()
defer o.mutex.Unlock()
o.exitCallbacks[&cb] = struct{}{}

return o.target.SubscribeExit(cb)
}

func (o *processMonitorProxy) triggerExit(pid uint32) {
o.mutex.Lock()
defer o.mutex.Unlock()

for cb := range o.exitCallbacks {
(*cb)(pid)
}
}

// Reset resets the state of the processMonitorProxy, removing all callbacks.
func (o *processMonitorProxy) Reset() {
o.mutex.Lock()
defer o.mutex.Unlock()

o.execCallbacks = make(map[*func(uint32)]struct{})
o.exitCallbacks = make(map[*func(uint32)]struct{})
}

0 comments on commit 81c75a5

Please sign in to comment.