Skip to content

Commit

Permalink
usm: process monitor: reuse consumers.NewProcessConsumer (DataDog#31864)
Browse files Browse the repository at this point in the history
  • Loading branch information
guyarb authored Dec 10, 2024
1 parent e70c007 commit 5ad43ca
Show file tree
Hide file tree
Showing 9 changed files with 55 additions and 142 deletions.
7 changes: 1 addition & 6 deletions cmd/system-probe/modules/eventmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,9 @@ func createEventMonitorModule(_ *sysconfigtypes.Config, deps module.FactoryDepen

netconfig := netconfig.New()
if netconfig.EnableUSMEventStream {
procmonconsumer, err := createProcessMonitorConsumer(evm, netconfig)
if err != nil {
if err := createProcessMonitorConsumer(evm, netconfig); err != nil {
return nil, err
}
if procmonconsumer != nil {
evm.RegisterEventConsumer(procmonconsumer)
log.Info("USM process monitoring consumer initialized")
}
}

gpucfg := gpuconfig.New()
Expand Down
28 changes: 24 additions & 4 deletions cmd/system-probe/modules/eventmonitor_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ import (
"github.com/DataDog/datadog-agent/cmd/system-probe/api/module"
"github.com/DataDog/datadog-agent/cmd/system-probe/config"
"github.com/DataDog/datadog-agent/pkg/eventmonitor"
"github.com/DataDog/datadog-agent/pkg/eventmonitor/consumers"
netconfig "github.com/DataDog/datadog-agent/pkg/network/config"
usmconfig "github.com/DataDog/datadog-agent/pkg/network/usm/config"
usmstate "github.com/DataDog/datadog-agent/pkg/network/usm/state"
procmon "github.com/DataDog/datadog-agent/pkg/process/monitor"
"github.com/DataDog/datadog-agent/pkg/process/monitor"
secconfig "github.com/DataDog/datadog-agent/pkg/security/config"
"github.com/DataDog/datadog-agent/pkg/util/log"
)

// EventMonitor - Event monitor Factory
Expand All @@ -28,10 +30,28 @@ var EventMonitor = module.Factory{
},
}

func createProcessMonitorConsumer(evm *eventmonitor.EventMonitor, config *netconfig.Config) (eventmonitor.EventConsumer, error) {
const (
eventMonitorID = "PROCESS_MONITOR"
eventMonitorChannelSize = 500
)

var (
eventTypes = []consumers.ProcessConsumerEventTypes{
consumers.ExecEventType,
consumers.ExitEventType,
}
)

func createProcessMonitorConsumer(evm *eventmonitor.EventMonitor, config *netconfig.Config) error {
if !usmconfig.IsUSMSupportedAndEnabled(config) || !usmconfig.NeedProcessMonitor(config) || usmstate.Get() != usmstate.Running {
return nil, nil
return nil
}

return procmon.NewProcessMonitorEventConsumer(evm)
consumer, err := consumers.NewProcessConsumer(eventMonitorID, eventMonitorChannelSize, eventTypes, evm)
if err != nil {
return err
}
monitor.InitializeEventConsumer(consumer)
log.Info("USM process monitoring consumer initialized")
return nil
}
4 changes: 2 additions & 2 deletions cmd/system-probe/modules/eventmonitor_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ var EventMonitor = module.Factory{
Fn: createEventMonitorModule,
}

func createProcessMonitorConsumer(_ *eventmonitor.EventMonitor, _ *netconfig.Config) (eventmonitor.EventConsumer, error) {
return nil, nil
func createProcessMonitorConsumer(_ *eventmonitor.EventMonitor, _ *netconfig.Config) error {
return nil
}

func createGPUProcessEventConsumer(_ *eventmonitor.EventMonitor) error {
Expand Down
7 changes: 2 additions & 5 deletions pkg/ebpf/uprobes/attacher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,12 @@ import (
"github.com/DataDog/datadog-agent/pkg/ebpf/bytecode"
"github.com/DataDog/datadog-agent/pkg/ebpf/ebpftest"
"github.com/DataDog/datadog-agent/pkg/ebpf/prebuilt"
eventmonitortestutil "github.com/DataDog/datadog-agent/pkg/eventmonitor/testutil"
"github.com/DataDog/datadog-agent/pkg/eventmonitor/consumers/testutil"
"github.com/DataDog/datadog-agent/pkg/network/go/bininspect"
"github.com/DataDog/datadog-agent/pkg/network/usm/sharedlibraries"
fileopener "github.com/DataDog/datadog-agent/pkg/network/usm/sharedlibraries/testutil"
"github.com/DataDog/datadog-agent/pkg/network/usm/utils"
"github.com/DataDog/datadog-agent/pkg/process/monitor"
procmontestutil "github.com/DataDog/datadog-agent/pkg/process/monitor/testutil"
secutils "github.com/DataDog/datadog-agent/pkg/security/utils"
"github.com/DataDog/datadog-agent/pkg/util/kernel"
)

Expand Down Expand Up @@ -800,8 +798,7 @@ func launchProcessMonitor(t *testing.T, useEventStream bool) *monitor.ProcessMon
t.Cleanup(pm.Stop)
require.NoError(t, pm.Initialize(useEventStream))
if useEventStream {
secutils.SetCachedHostname("test-hostname")
eventmonitortestutil.StartEventMonitor(t, procmontestutil.RegisterProcessMonitorEventConsumer)
monitor.InitializeEventConsumer(testutil.NewTestProcessConsumer(t))
}

return pm
Expand Down
8 changes: 3 additions & 5 deletions pkg/network/usm/monitor_tls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (

"github.com/DataDog/datadog-agent/pkg/ebpf/ebpftest"
"github.com/DataDog/datadog-agent/pkg/ebpf/prebuilt"
eventmonitortestutil "github.com/DataDog/datadog-agent/pkg/eventmonitor/testutil"
consumerstestutil "github.com/DataDog/datadog-agent/pkg/eventmonitor/consumers/testutil"
"github.com/DataDog/datadog-agent/pkg/network"
"github.com/DataDog/datadog-agent/pkg/network/config"
"github.com/DataDog/datadog-agent/pkg/network/protocols"
Expand All @@ -44,8 +44,7 @@ import (
"github.com/DataDog/datadog-agent/pkg/network/usm/consts"
usmtestutil "github.com/DataDog/datadog-agent/pkg/network/usm/testutil"
"github.com/DataDog/datadog-agent/pkg/network/usm/utils"
procmontestutil "github.com/DataDog/datadog-agent/pkg/process/monitor/testutil"
secutils "github.com/DataDog/datadog-agent/pkg/security/utils"
"github.com/DataDog/datadog-agent/pkg/process/monitor"
globalutils "github.com/DataDog/datadog-agent/pkg/util/testutil"
dockerutils "github.com/DataDog/datadog-agent/pkg/util/testutil/docker"
)
Expand Down Expand Up @@ -870,8 +869,7 @@ func setupUSMTLSMonitor(t *testing.T, cfg *config.Config) *Monitor {
require.NoError(t, err)
require.NoError(t, usmMonitor.Start())
if cfg.EnableUSMEventStream && usmconfig.NeedProcessMonitor(cfg) {
secutils.SetCachedHostname("test-hostname")
eventmonitortestutil.StartEventMonitor(t, procmontestutil.RegisterProcessMonitorEventConsumer)
monitor.InitializeEventConsumer(consumerstestutil.NewTestProcessConsumer(t))
}
t.Cleanup(usmMonitor.Stop)
t.Cleanup(utils.ResetDebugger)
Expand Down
7 changes: 2 additions & 5 deletions pkg/network/usm/sharedlibraries/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,11 @@ import (

"github.com/DataDog/datadog-agent/pkg/ebpf/ebpftest"
"github.com/DataDog/datadog-agent/pkg/ebpf/prebuilt"
eventmonitortestutil "github.com/DataDog/datadog-agent/pkg/eventmonitor/testutil"
"github.com/DataDog/datadog-agent/pkg/eventmonitor/consumers/testutil"
usmconfig "github.com/DataDog/datadog-agent/pkg/network/usm/config"
fileopener "github.com/DataDog/datadog-agent/pkg/network/usm/sharedlibraries/testutil"
"github.com/DataDog/datadog-agent/pkg/network/usm/utils"
"github.com/DataDog/datadog-agent/pkg/process/monitor"
procmontestutil "github.com/DataDog/datadog-agent/pkg/process/monitor/testutil"
secutils "github.com/DataDog/datadog-agent/pkg/security/utils"
"github.com/DataDog/datadog-agent/pkg/util/kernel"
"github.com/DataDog/datadog-agent/pkg/util/log"
)
Expand All @@ -42,8 +40,7 @@ func launchProcessMonitor(t *testing.T, useEventStream bool) {
t.Cleanup(pm.Stop)
require.NoError(t, pm.Initialize(useEventStream))
if useEventStream {
secutils.SetCachedHostname("test-hostname")
eventmonitortestutil.StartEventMonitor(t, procmontestutil.RegisterProcessMonitorEventConsumer)
monitor.InitializeEventConsumer(testutil.NewTestProcessConsumer(t))
}
}

Expand Down
79 changes: 11 additions & 68 deletions pkg/process/monitor/process_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ import (
"github.com/vishvananda/netlink"
"go.uber.org/atomic"

"github.com/DataDog/datadog-agent/pkg/eventmonitor"
"github.com/DataDog/datadog-agent/pkg/eventmonitor/consumers"
"github.com/DataDog/datadog-agent/pkg/network/protocols/telemetry"
"github.com/DataDog/datadog-agent/pkg/runtime"
"github.com/DataDog/datadog-agent/pkg/security/secl/model"
"github.com/DataDog/datadog-agent/pkg/util/kernel"
"github.com/DataDog/datadog-agent/pkg/util/log"
)
Expand Down Expand Up @@ -490,76 +489,20 @@ func (pm *ProcessMonitor) Stop() {
pm.processExitCallbacksMutex.Unlock()
}

// Event defines the event used by the process monitor
type Event struct {
Type model.EventType
Pid uint32
}

// EventConsumer defines an event consumer to handle event monitor events in the
// process monitor
type EventConsumer struct{}

// NewProcessMonitorEventConsumer returns a new process monitor event consumer
func NewProcessMonitorEventConsumer(em *eventmonitor.EventMonitor) (*EventConsumer, error) {
consumer := &EventConsumer{}
err := em.AddEventConsumerHandler(consumer)
return consumer, err
}

// ChanSize returns the channel size used by this consumer
func (ec *EventConsumer) ChanSize() int {
return 500
}

// ID returns the ID of this consumer
func (ec *EventConsumer) ID() string {
return "PROCESS_MONITOR"
}

// Start the consumer
func (ec *EventConsumer) Start() error {
return nil
}

// Stop the consumer
func (ec *EventConsumer) Stop() {
}

// EventTypes returns the event types handled by this consumer
func (ec *EventConsumer) EventTypes() []model.EventType {
return []model.EventType{
model.ExecEventType,
model.ExitEventType,
}
}

// HandleEvent handles events received from the event monitor
func (ec *EventConsumer) HandleEvent(event any) {
sevent, ok := event.(*Event)
if !ok {
return
}

processMonitor.tel.events.Add(1)
switch sevent.Type {
case model.ExecEventType:
// InitializeEventConsumer initializes the event consumer with the event handling.
func InitializeEventConsumer(consumer *consumers.ProcessConsumer) {
consumer.SubscribeExec(func(pid uint32) {
processMonitor.tel.events.Add(1)
processMonitor.tel.exec.Add(1)
if processMonitor.hasExecCallbacks.Load() {
processMonitor.handleProcessExec(sevent.Pid)
processMonitor.handleProcessExec(pid)
}
case model.ExitEventType:
})
consumer.SubscribeExit(func(pid uint32) {
processMonitor.tel.events.Add(1)
processMonitor.tel.exit.Add(1)
if processMonitor.hasExitCallbacks.Load() {
processMonitor.handleProcessExit(sevent.Pid)
processMonitor.handleProcessExit(pid)
}
}
}

// Copy should copy the given event or return nil to discard it
func (ec *EventConsumer) Copy(event *model.Event) any {
return &Event{
Type: event.GetEventType(),
Pid: event.GetProcessPid(),
}
})
}
30 changes: 10 additions & 20 deletions pkg/process/monitor/process_monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,10 @@ import (
"github.com/vishvananda/netns"
"go.uber.org/atomic"

"github.com/DataDog/datadog-agent/pkg/eventmonitor"
eventmonitortestutil "github.com/DataDog/datadog-agent/pkg/eventmonitor/testutil"
"github.com/DataDog/datadog-agent/pkg/eventmonitor/consumers/testutil"
"github.com/DataDog/datadog-agent/pkg/network/protocols/telemetry"
"github.com/DataDog/datadog-agent/pkg/security/utils"
"github.com/DataDog/datadog-agent/pkg/util"
"github.com/DataDog/datadog-agent/pkg/util/kernel"
"github.com/DataDog/datadog-agent/pkg/util/log"
)

func getProcessMonitor(t *testing.T) *ProcessMonitor {
Expand All @@ -40,12 +37,12 @@ func getProcessMonitor(t *testing.T) *ProcessMonitor {
func waitForProcessMonitor(t *testing.T, pm *ProcessMonitor) {
execCounter := atomic.NewInt32(0)
execCallback := func(_ uint32) { execCounter.Inc() }
registerCallback(t, pm, true, (*ProcessCallback)(&execCallback))
registerCallback(t, pm, true, &execCallback)

exitCounter := atomic.NewInt32(0)
// Sanity subscribing a callback.
exitCallback := func(_ uint32) { exitCounter.Inc() }
registerCallback(t, pm, false, (*ProcessCallback)(&exitCallback))
registerCallback(t, pm, false, &exitCallback)

require.Eventually(t, func() bool {
_ = exec.Command("/bin/echo").Run()
Expand All @@ -56,14 +53,7 @@ func waitForProcessMonitor(t *testing.T, pm *ProcessMonitor) {
func initializePM(t *testing.T, pm *ProcessMonitor, useEventStream bool) {
require.NoError(t, pm.Initialize(useEventStream))
if useEventStream {
utils.SetCachedHostname("test-hostname")
eventmonitortestutil.StartEventMonitor(t, func(t *testing.T, evm *eventmonitor.EventMonitor) {
// Can't use the implementation in procmontestutil due to import cycles
procmonconsumer, err := NewProcessMonitorEventConsumer(evm)
require.NoError(t, err)
evm.RegisterEventConsumer(procmonconsumer)
log.Info("process monitoring test consumer initialized")
})
InitializeEventConsumer(testutil.NewTestProcessConsumer(t))
}
waitForProcessMonitor(t, pm)
}
Expand Down Expand Up @@ -113,7 +103,7 @@ func (s *processMonitorSuite) TestProcessMonitorSanity() {
defer execsMutex.Unlock()
execs[pid] = struct{}{}
}
registerCallback(t, pm, true, (*ProcessCallback)(&callback))
registerCallback(t, pm, true, &callback)

exitMutex := sync.RWMutex{}
exits := make(map[uint32]struct{})
Expand All @@ -122,7 +112,7 @@ func (s *processMonitorSuite) TestProcessMonitorSanity() {
defer exitMutex.Unlock()
exits[pid] = struct{}{}
}
registerCallback(t, pm, false, (*ProcessCallback)(&exitCallback))
registerCallback(t, pm, false, &exitCallback)

initializePM(t, pm, s.useEventStream)
cmd := exec.Command(testBinaryPath, "test")
Expand Down Expand Up @@ -182,7 +172,7 @@ func (s *processMonitorSuite) TestProcessRegisterMultipleCallbacks() {
defer execCountersMutexes[i].Unlock()
c[pid] = struct{}{}
}
registerCallback(t, pm, true, (*ProcessCallback)(&callback))
registerCallback(t, pm, true, &callback)

exitCountersMutexes[i] = sync.RWMutex{}
exitCounters[i] = make(map[uint32]struct{})
Expand All @@ -193,7 +183,7 @@ func (s *processMonitorSuite) TestProcessRegisterMultipleCallbacks() {
defer exitCountersMutexes[i].Unlock()
exitc[pid] = struct{}{}
}
registerCallback(t, pm, false, (*ProcessCallback)(&exitCallback))
registerCallback(t, pm, false, &exitCallback)
}

initializePM(t, pm, s.useEventStream)
Expand Down Expand Up @@ -252,10 +242,10 @@ func (s *processMonitorSuite) TestProcessMonitorInNamespace() {
pm := getProcessMonitor(t)

callback := func(pid uint32) { execSet.Store(pid, struct{}{}) }
registerCallback(t, pm, true, (*ProcessCallback)(&callback))
registerCallback(t, pm, true, &callback)

exitCallback := func(pid uint32) { exitSet.Store(pid, struct{}{}) }
registerCallback(t, pm, false, (*ProcessCallback)(&exitCallback))
registerCallback(t, pm, false, &exitCallback)

monNs, err := netns.New()
require.NoError(t, err, "could not create network namespace for process monitor")
Expand Down
27 changes: 0 additions & 27 deletions pkg/process/monitor/testutil/testutil.go

This file was deleted.

0 comments on commit 5ad43ca

Please sign in to comment.