diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 24c15c4a3b6cd8..4daf9310c20042 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -456,10 +456,12 @@ /pkg/util/crio/ @DataDog/container-integrations /pkg/util/docker/ @DataDog/container-integrations /pkg/util/ecs/ @DataDog/container-integrations +/pkg/util/encoding/ @DataDog/ebpf-platform /pkg/util/funcs/ @DataDog/ebpf-platform /pkg/util/gpu/ @DataDog/container-platform /pkg/util/kernel/ @DataDog/ebpf-platform /pkg/util/safeelf/ @DataDog/ebpf-platform +/pkg/util/slices/ @DataDog/ebpf-platform /pkg/util/ktime @DataDog/agent-security /pkg/util/kubernetes/ @DataDog/container-integrations @DataDog/container-platform @DataDog/container-app /pkg/util/podman/ @DataDog/container-integrations diff --git a/cmd/system-probe/config/adjust_npm.go b/cmd/system-probe/config/adjust_npm.go index e1be10ae08d795..98a57348273573 100644 --- a/cmd/system-probe/config/adjust_npm.go +++ b/cmd/system-probe/config/adjust_npm.go @@ -26,6 +26,10 @@ const ( func adjustNetwork(cfg model.Config) { ebpflessEnabled := cfg.GetBool(netNS("enable_ebpfless")) + deprecateInt(cfg, spNS("closed_connection_flush_threshold"), netNS("closed_connection_flush_threshold")) + deprecateInt(cfg, spNS("closed_channel_size"), netNS("closed_channel_size")) + applyDefault(cfg, netNS("closed_channel_size"), 500) + limitMaxInt(cfg, spNS("max_conns_per_message"), maxConnsMessageBatchSize) if cfg.GetBool(spNS("disable_tcp")) { diff --git a/pkg/config/setup/system_probe.go b/pkg/config/setup/system_probe.go index 12a94b923c7b77..0b99070c96ccf3 100644 --- a/pkg/config/setup/system_probe.go +++ b/pkg/config/setup/system_probe.go @@ -195,8 +195,11 @@ func InitSystemProbeConfig(cfg pkgconfigmodel.Config) { cfg.BindEnvAndSetDefault(join(spNS, "max_tracked_connections"), 65536) cfg.BindEnv(join(spNS, "max_closed_connections_buffered")) cfg.BindEnv(join(netNS, "max_failed_connections_buffered")) - cfg.BindEnvAndSetDefault(join(spNS, "closed_connection_flush_threshold"), 0) - cfg.BindEnvAndSetDefault(join(spNS, "closed_channel_size"), 500) + cfg.BindEnv(join(spNS, "closed_connection_flush_threshold")) + cfg.BindEnv(join(netNS, "closed_connection_flush_threshold")) + cfg.BindEnv(join(spNS, "closed_channel_size")) + cfg.BindEnv(join(netNS, "closed_channel_size")) + cfg.BindEnvAndSetDefault(join(netNS, "closed_buffer_wakeup_count"), 4) cfg.BindEnvAndSetDefault(join(spNS, "max_connection_state_buffered"), 75000) cfg.BindEnvAndSetDefault(join(spNS, "disable_dns_inspection"), false, "DD_DISABLE_DNS_INSPECTION") @@ -212,6 +215,7 @@ func InitSystemProbeConfig(cfg pkgconfigmodel.Config) { cfg.BindEnvAndSetDefault(join(spNS, "enable_conntrack_all_namespaces"), true, "DD_SYSTEM_PROBE_ENABLE_CONNTRACK_ALL_NAMESPACES") cfg.BindEnvAndSetDefault(join(netNS, "enable_protocol_classification"), true, "DD_ENABLE_PROTOCOL_CLASSIFICATION") cfg.BindEnvAndSetDefault(join(netNS, "enable_ringbuffers"), true, "DD_SYSTEM_PROBE_NETWORK_ENABLE_RINGBUFFERS") + cfg.BindEnvAndSetDefault(join(netNS, "enable_custom_batching"), false, "DD_SYSTEM_PROBE_NETWORK_ENABLE_CUSTOM_BATCHING") cfg.BindEnvAndSetDefault(join(netNS, "enable_tcp_failed_connections"), true, "DD_SYSTEM_PROBE_NETWORK_ENABLE_FAILED_CONNS") cfg.BindEnvAndSetDefault(join(netNS, "ignore_conntrack_init_failure"), false, "DD_SYSTEM_PROBE_NETWORK_IGNORE_CONNTRACK_INIT_FAILURE") cfg.BindEnvAndSetDefault(join(netNS, "conntrack_init_timeout"), 10*time.Second) diff --git a/pkg/ebpf/c/bpf_helpers_custom.h b/pkg/ebpf/c/bpf_helpers_custom.h index 42c83c272ed111..d2b997032293cd 100644 --- a/pkg/ebpf/c/bpf_helpers_custom.h +++ b/pkg/ebpf/c/bpf_helpers_custom.h @@ -39,4 +39,17 @@ unsigned long long load_half(void *skb, unsigned long long load_word(void *skb, unsigned long long off) asm("llvm.bpf.load.word"); +// declare our own versions of these enums, because they don't exist on <5.8 +enum { + DD_BPF_RB_NO_WAKEUP = 1, + DD_BPF_RB_FORCE_WAKEUP = 2, +}; + +enum { + DD_BPF_RB_AVAIL_DATA = 0, + DD_BPF_RB_RING_SIZE = 1, + DD_BPF_RB_CONS_POS = 2, + DD_BPF_RB_PROD_POS = 3, +}; + #endif diff --git a/pkg/ebpf/manager.go b/pkg/ebpf/manager.go index 06e790609a9ecf..3c09088fa959db 100644 --- a/pkg/ebpf/manager.go +++ b/pkg/ebpf/manager.go @@ -100,3 +100,40 @@ func (m *Manager) InitWithOptions(bytecode io.ReaderAt, opts *manager.Options) e } return nil } + +type modifierPreStart interface { + PreStart() error +} + +// Start is a wrapper around ebpf-manager.Manager.Start +func (m *Manager) Start() error { + for _, mod := range m.EnabledModifiers { + if ps, ok := mod.(modifierPreStart); ok { + if err := ps.PreStart(); err != nil { + return fmt.Errorf("prestart %s manager modifier: %w", mod, err) + } + } + } + return m.Manager.Start() +} + +type modifierAfterStop interface { + AfterStop(manager.MapCleanupType) error +} + +// Stop is a wrapper around ebpf-manager.Manager.Stop +func (m *Manager) Stop(ct manager.MapCleanupType) error { + if err := m.Manager.Stop(ct); err != nil { + return err + } + + for _, mod := range m.EnabledModifiers { + if as, ok := mod.(modifierAfterStop); ok { + if err := as.AfterStop(ct); err != nil { + return fmt.Errorf("afterstop %s manager modifier: %w", mod, err) + } + } + } + + return nil +} diff --git a/pkg/ebpf/perf/event.go b/pkg/ebpf/perf/event.go new file mode 100644 index 00000000000000..ee1c287be698e3 --- /dev/null +++ b/pkg/ebpf/perf/event.go @@ -0,0 +1,464 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2024-present Datadog, Inc. + +//go:build linux_bpf + +// Package perf implements types related to eBPF and the perf subsystem, like perf buffers and ring buffers. +package perf + +import ( + "errors" + "fmt" + "slices" + "sync/atomic" + + manager "github.com/DataDog/ebpf-manager" + "github.com/cilium/ebpf" + "github.com/cilium/ebpf/asm" + "github.com/cilium/ebpf/features" + "github.com/cilium/ebpf/perf" + "github.com/cilium/ebpf/ringbuf" + + ddebpf "github.com/DataDog/datadog-agent/pkg/ebpf" + "github.com/DataDog/datadog-agent/pkg/ebpf/names" + ebpfTelemetry "github.com/DataDog/datadog-agent/pkg/ebpf/telemetry" + ddsync "github.com/DataDog/datadog-agent/pkg/util/sync" +) + +var perfPool = ddsync.NewDefaultTypedPool[perf.Record]() +var ringbufPool = ddsync.NewDefaultTypedPool[ringbuf.Record]() + +// Flusher is an interface for objects that support flushing +type Flusher interface { + Flush() +} + +// compile time check to ensure this satisfies the Modifier interface +var _ ddebpf.Modifier = (*EventHandler)(nil) + +// EventHandler abstracts consuming data from a perf buffer or ring buffer (depending on availability and options). +// It handles upgrading maps from a ring buffer if desired, and unmarshalling into the desired data type. +type EventHandler struct { + f Flusher + opts eventHandlerOptions + // mapName specifies the name of the map + mapName string + // handler is the callback for data received from the perf/ring buffer + handler func([]byte) + + readLoop func() + perfChan chan *perf.Record + ringChan chan *ringbuf.Record + + chLenTelemetry *atomic.Uint64 +} + +type mapMode uint8 + +const ( + perfBufferOnly mapMode = iota + upgradePerfBuffer + ringBufferOnly +) + +// EventHandlerMode controls the mode in which the event handler operates +type EventHandlerMode func(*EventHandler) + +// UsePerfBuffers will only use perf buffers and will not attempt any upgrades to ring buffers. +func UsePerfBuffers(bufferSize int, channelSize int, perfMode PerfBufferMode) EventHandlerMode { + return func(e *EventHandler) { + e.opts.mode = perfBufferOnly + e.opts.channelSize = channelSize + e.opts.perfBufferSize = bufferSize + perfMode(&e.opts.perfOptions) + } +} + +// UpgradePerfBuffers will upgrade to ring buffers if available, but will fall back to perf buffers if not. +func UpgradePerfBuffers(perfBufferSize int, channelSize int, perfMode PerfBufferMode, ringBufferSize int) EventHandlerMode { + return func(e *EventHandler) { + e.opts.mode = upgradePerfBuffer + e.opts.channelSize = channelSize + e.opts.perfBufferSize = perfBufferSize + e.opts.ringBufferSize = ringBufferSize + perfMode(&e.opts.perfOptions) + } +} + +// UseRingBuffers will only use ring buffers. +func UseRingBuffers(bufferSize int, channelSize int) EventHandlerMode { + return func(e *EventHandler) { + e.opts.mode = ringBufferOnly + e.opts.channelSize = channelSize + e.opts.ringBufferSize = bufferSize + } +} + +// EventHandlerOption is an option that applies to the event handler +type EventHandlerOption func(*EventHandler) + +// SendTelemetry specifies whether to collect usage telemetry from the perf/ring buffer +func SendTelemetry(enabled bool) EventHandlerOption { + return func(e *EventHandler) { + e.opts.telemetryEnabled = enabled + } +} + +// RingBufferEnabledConstantName provides a constant name that will be set whether ring buffers are in use +func RingBufferEnabledConstantName(name string) EventHandlerOption { + return func(e *EventHandler) { + e.opts.ringBufferEnabledConstantName = name + } +} + +// RingBufferWakeupSize sets a constant for eBPF to use, that determines when to wakeup userspace +func RingBufferWakeupSize(name string, size uint64) EventHandlerOption { + return func(e *EventHandler) { + e.opts.ringBufferWakeupConstantName = name + e.opts.ringBufferWakeupSize = size + } +} + +// eventHandlerOptions are the options controlling the EventHandler. +type eventHandlerOptions struct { + // telemetryEnabled specifies whether to collect usage telemetry from the perf/ring buffer. + telemetryEnabled bool + + mode mapMode + channelSize int + + perfBufferSize int + perfOptions perfBufferOptions + + ringBufferSize int + ringBufferEnabledConstantName string + + ringBufferWakeupConstantName string + ringBufferWakeupSize uint64 +} + +// PerfBufferMode is a mode for the perf buffer +// +//nolint:revive +type PerfBufferMode func(*perfBufferOptions) + +// Watermark - The reader will start processing samples once their sizes in the perf buffer +// exceed this value. Must be smaller than the perf buffer size. +func Watermark(byteCount int) PerfBufferMode { + return func(opts *perfBufferOptions) { + opts.watermark = byteCount + opts.wakeupEvents = 0 + } +} + +// WakeupEvents - The number of events required in any per CPU buffer before Read will process data. +func WakeupEvents(count int) PerfBufferMode { + return func(opts *perfBufferOptions) { + opts.wakeupEvents = count + opts.watermark = 0 + } +} + +// perfBufferOptions are options specifically for perf buffers +// +//nolint:revive +type perfBufferOptions struct { + watermark int + wakeupEvents int +} + +// NewEventHandler creates an event handler with the provided options +func NewEventHandler(mapName string, handler func([]byte), mode EventHandlerMode, opts ...EventHandlerOption) (*EventHandler, error) { + if mapName == "" { + return nil, errors.New("invalid options: MapName is required") + } + if handler == nil { + return nil, errors.New("invalid options: Handler is required") + } + e := &EventHandler{ + mapName: mapName, + handler: handler, + } + mode(e) + for _, opt := range opts { + opt(e) + } + if e.opts.telemetryEnabled { + e.chLenTelemetry = &atomic.Uint64{} + } + return e, nil +} + +// BeforeInit implements the Modifier interface +// This function will modify the shared buffers according to the user provided mode +func (e *EventHandler) BeforeInit(mgr *manager.Manager, moduleName names.ModuleName, mgrOpts *manager.Options) (err error) { + ms, _, _ := mgr.GetMapSpec(e.mapName) + if ms == nil { + return fmt.Errorf("unable to find map spec %q", e.mapName) + } + defer e.setupEnabledConstant(mgrOpts) + defer e.setupRingbufferWakeupConstant(mgrOpts) + + ringBufErr := features.HaveMapType(ebpf.RingBuf) + if e.opts.mode == ringBufferOnly { + if ringBufErr != nil { + return ringBufErr + } + if ms.Type != ebpf.RingBuf { + return fmt.Errorf("map %q is not a ring buffer, got %q instead", e.mapName, ms.Type.String()) + } + + // the size of the ring buffer is communicated to the kernel via the max entries field + // of the bpf map + if ms.MaxEntries != uint32(e.opts.ringBufferSize) { + ResizeRingBuffer(mgrOpts, e.mapName, e.opts.ringBufferSize) + } + e.initRingBuffer(mgr) + return nil + } + defer e.removeRingBufferHelperCalls(mgr, moduleName, mgrOpts) + + if e.opts.mode == perfBufferOnly { + if ms.Type != ebpf.PerfEventArray { + return fmt.Errorf("map %q is not a perf buffer, got %q instead", e.mapName, ms.Type.String()) + } + e.initPerfBuffer(mgr) + return nil + } + + if e.opts.mode == upgradePerfBuffer { + if ms.Type != ebpf.PerfEventArray { + return fmt.Errorf("map %q is not a perf buffer, got %q instead", e.mapName, ms.Type.String()) + } + + // the layout of the bpf map for perf buffers does not match that of ring buffers. + // When upgrading perf buffers to ring buffers, we must account for these differences. + // - Ring buffers do not use key/value sizes + // - Ring buffers specify their size via max entries + if ringBufErr == nil { + UpgradePerfBuffer(mgr, mgrOpts, e.mapName) + if ms.MaxEntries != uint32(e.opts.ringBufferSize) { + ResizeRingBuffer(mgrOpts, e.mapName, e.opts.ringBufferSize) + } + e.initRingBuffer(mgr) + return nil + } + + e.initPerfBuffer(mgr) + return nil + } + + return fmt.Errorf("unsupported EventHandlerMode %d", e.opts.mode) +} + +func (e *EventHandler) removeRingBufferHelperCalls(mgr *manager.Manager, moduleName names.ModuleName, mgrOpts *manager.Options) { + if features.HaveMapType(ebpf.RingBuf) == nil { + return + } + // add helper call remover because ring buffers are not available + _ = ddebpf.NewHelperCallRemover(asm.FnRingbufOutput, asm.FnRingbufQuery, asm.FnRingbufReserve, asm.FnRingbufSubmit, asm.FnRingbufDiscard).BeforeInit(mgr, moduleName, mgrOpts) +} + +func (e *EventHandler) setupEnabledConstant(mgrOpts *manager.Options) { + if e.opts.ringBufferEnabledConstantName == "" || e.f == nil { + return + } + + var val uint64 + switch e.f.(type) { + case *manager.RingBuffer: + val = uint64(1) + default: + val = uint64(0) + } + mgrOpts.ConstantEditors = append(mgrOpts.ConstantEditors, manager.ConstantEditor{ + Name: e.opts.ringBufferEnabledConstantName, + Value: val, + }) +} + +func (e *EventHandler) setupRingbufferWakeupConstant(mgrOpts *manager.Options) { + if e.opts.ringBufferWakeupSize == 0 || e.opts.ringBufferWakeupConstantName == "" || e.f == nil { + return + } + + switch e.f.(type) { + case *manager.RingBuffer: + mgrOpts.ConstantEditors = append(mgrOpts.ConstantEditors, manager.ConstantEditor{ + Name: e.opts.ringBufferWakeupConstantName, + Value: e.opts.ringBufferWakeupSize, + }) + default: + // do nothing + } +} + +// AfterInit implements the Modifier interface +func (e *EventHandler) AfterInit(_ *manager.Manager, _ names.ModuleName, _ *manager.Options) error { + return nil +} + +// PreStart implements the Modifier interface +func (e *EventHandler) PreStart() error { + go e.readLoop() + return nil +} + +// AfterStop implements the Modifier interface +func (e *EventHandler) AfterStop(_ manager.MapCleanupType) error { + if e.perfChan != nil { + close(e.perfChan) + } + if e.ringChan != nil { + close(e.ringChan) + } + return nil +} + +func (e *EventHandler) String() string { + return "EventHandler" +} + +// Flush flushes the pending data from the underlying perfbuf/ringbuf +func (e *EventHandler) Flush() { + e.f.Flush() +} + +// ResizeRingBuffer resizes the ring buffer by creating/updating a map spec editor +func ResizeRingBuffer(mgrOpts *manager.Options, mapName string, bufferSize int) { + if mgrOpts.MapSpecEditors == nil { + mgrOpts.MapSpecEditors = make(map[string]manager.MapSpecEditor) + } + specEditor := mgrOpts.MapSpecEditors[mapName] + specEditor.MaxEntries = uint32(bufferSize) + specEditor.EditorFlag |= manager.EditMaxEntries + mgrOpts.MapSpecEditors[mapName] = specEditor +} + +func (e *EventHandler) perfLoop() { + for record := range e.perfChan { + e.perfLoopHandler(record) + } +} + +func (e *EventHandler) initPerfBuffer(mgr *manager.Manager) { + e.perfChan = make(chan *perf.Record, e.opts.channelSize) + e.readLoop = e.perfLoop + + // remove any existing perf buffers from manager + mgr.PerfMaps = slices.DeleteFunc(mgr.PerfMaps, func(perfMap *manager.PerfMap) bool { + return perfMap.Name == e.mapName + }) + pm := &manager.PerfMap{ + Map: manager.Map{Name: e.mapName}, + PerfMapOptions: manager.PerfMapOptions{ + PerfRingBufferSize: e.opts.perfBufferSize, + Watermark: e.opts.perfOptions.watermark, + WakeupEvents: e.opts.perfOptions.wakeupEvents, + RecordHandler: e.perfRecordHandler, + LostHandler: nil, // TODO do we need support for Lost? + RecordGetter: perfPool.Get, + TelemetryEnabled: e.opts.telemetryEnabled, + }, + } + mgr.PerfMaps = append(mgr.PerfMaps, pm) + ebpfTelemetry.ReportPerfMapTelemetry(pm) + ebpfTelemetry.ReportPerfMapChannelLenTelemetry(pm, func() int { + return int(e.chLenTelemetry.Swap(0)) + }) + e.f = pm +} + +func (e *EventHandler) perfRecordHandler(record *perf.Record, _ *manager.PerfMap, _ *manager.Manager) { + e.perfChan <- record + if e.opts.telemetryEnabled { + updateMaxTelemetry(e.chLenTelemetry, uint64(len(e.perfChan))) + } +} + +func (e *EventHandler) perfLoopHandler(record *perf.Record) { + // record is only allowed to live for the duration of the callback. Put it back into the sync.Pool once done. + defer perfPool.Put(record) + e.handler(record.RawSample) +} + +func (e *EventHandler) initRingBuffer(mgr *manager.Manager) { + e.ringChan = make(chan *ringbuf.Record, e.opts.channelSize) + e.readLoop = e.ringLoop + + // remove any existing matching ring buffers from manager + mgr.RingBuffers = slices.DeleteFunc(mgr.RingBuffers, func(ringBuf *manager.RingBuffer) bool { + return ringBuf.Name == e.mapName + }) + rb := &manager.RingBuffer{ + Map: manager.Map{Name: e.mapName}, + RingBufferOptions: manager.RingBufferOptions{ + RecordHandler: e.ringRecordHandler, + RecordGetter: ringbufPool.Get, + TelemetryEnabled: e.opts.telemetryEnabled, + }, + } + mgr.RingBuffers = append(mgr.RingBuffers, rb) + ebpfTelemetry.ReportRingBufferTelemetry(rb) + ebpfTelemetry.ReportRingBufferChannelLenTelemetry(rb, func() int { + return int(e.chLenTelemetry.Swap(0)) + }) + e.f = rb +} + +func (e *EventHandler) ringLoop() { + for record := range e.ringChan { + e.ringLoopHandler(record) + } +} + +func (e *EventHandler) ringRecordHandler(record *ringbuf.Record, _ *manager.RingBuffer, _ *manager.Manager) { + e.ringChan <- record + if e.opts.telemetryEnabled { + updateMaxTelemetry(e.chLenTelemetry, uint64(len(e.ringChan))) + } +} + +func (e *EventHandler) ringLoopHandler(record *ringbuf.Record) { + // record is only allowed to live for the duration of the callback. Put it back into the sync.Pool once done. + defer ringbufPool.Put(record) + e.handler(record.RawSample) +} + +// UpgradePerfBuffer upgrades a perf buffer to a ring buffer by creating a map spec editor +func UpgradePerfBuffer(mgr *manager.Manager, mgrOpts *manager.Options, mapName string) { + if mgrOpts.MapSpecEditors == nil { + mgrOpts.MapSpecEditors = make(map[string]manager.MapSpecEditor) + } + specEditor := mgrOpts.MapSpecEditors[mapName] + specEditor.Type = ebpf.RingBuf + specEditor.KeySize = 0 + specEditor.ValueSize = 0 + specEditor.EditorFlag |= manager.EditType | manager.EditKeyValue + mgrOpts.MapSpecEditors[mapName] = specEditor + + // remove map from perf maps because it has been upgraded + mgr.PerfMaps = slices.DeleteFunc(mgr.PerfMaps, func(perfMap *manager.PerfMap) bool { + return perfMap.Name == mapName + }) +} + +// implement the CAS algorithm to atomically update a max value +func updateMaxTelemetry(a *atomic.Uint64, val uint64) { + for { + oldVal := a.Load() + if val <= oldVal { + return + } + // if the value at a is not `oldVal`, then `CompareAndSwap` returns + // false indicating that the value of the atomic has changed between + // the above check and this invocation. + // In this case we retry the above test, to see if the value still needs + // to be updated. + if a.CompareAndSwap(oldVal, val) { + return + } + } +} diff --git a/pkg/ebpf/telemetry/perf_metrics.go b/pkg/ebpf/telemetry/perf_metrics.go index 84324790e1d28f..a8f6ed2e6602ee 100644 --- a/pkg/ebpf/telemetry/perf_metrics.go +++ b/pkg/ebpf/telemetry/perf_metrics.go @@ -22,19 +22,25 @@ var ( ) type perfUsageCollector struct { - mtx sync.Mutex - usage *prometheus.GaugeVec - usagePct *prometheus.GaugeVec - size *prometheus.GaugeVec - lost *prometheus.CounterVec - - perfMaps []*manager.PerfMap - ringBuffers []*manager.RingBuffer + mtx sync.Mutex + usage *prometheus.GaugeVec + usagePct *prometheus.GaugeVec + size *prometheus.GaugeVec + lost *prometheus.CounterVec + channelLen *prometheus.GaugeVec + + perfMaps []*manager.PerfMap + perfChannelLenFuncs map[*manager.PerfMap]func() int + + ringBuffers []*manager.RingBuffer + ringChannelLenFuncs map[*manager.RingBuffer]func() int } // NewPerfUsageCollector creates a prometheus.Collector for perf buffer and ring buffer metrics func NewPerfUsageCollector() prometheus.Collector { perfCollector = &perfUsageCollector{ + perfChannelLenFuncs: make(map[*manager.PerfMap]func() int), + ringChannelLenFuncs: make(map[*manager.RingBuffer]func() int), usage: prometheus.NewGaugeVec( prometheus.GaugeOpts{ Subsystem: "ebpf__perf", @@ -67,6 +73,14 @@ func NewPerfUsageCollector() prometheus.Collector { }, []string{"map_name", "map_type", "cpu_num"}, ), + channelLen: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Subsystem: "ebpf__perf", + Name: "_channel_len", + Help: "gauge tracking number of elements in buffer channel", + }, + []string{"map_name", "map_type"}, + ), } return perfCollector } @@ -103,6 +117,11 @@ func (p *perfUsageCollector) Collect(metrics chan<- prometheus.Metric) { } } + for pm, chFunc := range p.perfChannelLenFuncs { + mapName, mapType := pm.Name, ebpf.PerfEventArray.String() + p.channelLen.WithLabelValues(mapName, mapType).Set(float64(chFunc())) + } + for _, rb := range p.ringBuffers { mapName, mapType := rb.Name, ebpf.RingBuf.String() size := float64(rb.BufferSize()) @@ -118,10 +137,16 @@ func (p *perfUsageCollector) Collect(metrics chan<- prometheus.Metric) { p.size.WithLabelValues(mapName, mapType, cpuString).Set(size) } + for rb, chFunc := range p.ringChannelLenFuncs { + mapName, mapType := rb.Name, ebpf.RingBuf.String() + p.channelLen.WithLabelValues(mapName, mapType).Set(float64(chFunc())) + } + p.usage.Collect(metrics) p.usagePct.Collect(metrics) p.size.Collect(metrics) p.lost.Collect(metrics) + p.channelLen.Collect(metrics) } // ReportPerfMapTelemetry starts reporting the telemetry for the provided PerfMap @@ -132,6 +157,14 @@ func ReportPerfMapTelemetry(pm *manager.PerfMap) { perfCollector.registerPerfMap(pm) } +// ReportPerfMapChannelLenTelemetry starts reporting the telemetry for the provided PerfMap's buffer channel +func ReportPerfMapChannelLenTelemetry(pm *manager.PerfMap, channelLenFunc func() int) { + if perfCollector == nil { + return + } + perfCollector.registerPerfMapChannel(pm, channelLenFunc) +} + // ReportRingBufferTelemetry starts reporting the telemetry for the provided RingBuffer func ReportRingBufferTelemetry(rb *manager.RingBuffer) { if perfCollector == nil { @@ -140,6 +173,14 @@ func ReportRingBufferTelemetry(rb *manager.RingBuffer) { perfCollector.registerRingBuffer(rb) } +// ReportRingBufferChannelLenTelemetry starts reporting the telemetry for the provided RingBuffer's buffer channel +func ReportRingBufferChannelLenTelemetry(rb *manager.RingBuffer, channelLenFunc func() int) { + if perfCollector == nil { + return + } + perfCollector.registerRingBufferChannel(rb, channelLenFunc) +} + func (p *perfUsageCollector) registerPerfMap(pm *manager.PerfMap) { if !pm.TelemetryEnabled { return @@ -149,6 +190,15 @@ func (p *perfUsageCollector) registerPerfMap(pm *manager.PerfMap) { p.perfMaps = append(p.perfMaps, pm) } +func (p *perfUsageCollector) registerPerfMapChannel(pm *manager.PerfMap, channelLenFunc func() int) { + if !pm.TelemetryEnabled { + return + } + p.mtx.Lock() + defer p.mtx.Unlock() + p.perfChannelLenFuncs[pm] = channelLenFunc +} + func (p *perfUsageCollector) registerRingBuffer(rb *manager.RingBuffer) { if !rb.TelemetryEnabled { return @@ -158,6 +208,15 @@ func (p *perfUsageCollector) registerRingBuffer(rb *manager.RingBuffer) { p.ringBuffers = append(p.ringBuffers, rb) } +func (p *perfUsageCollector) registerRingBufferChannel(rb *manager.RingBuffer, channelLenFunc func() int) { + if !rb.TelemetryEnabled { + return + } + p.mtx.Lock() + defer p.mtx.Unlock() + p.ringChannelLenFuncs[rb] = channelLenFunc +} + // UnregisterTelemetry unregisters the PerfMap and RingBuffers from telemetry func UnregisterTelemetry(m *manager.Manager) { if perfCollector == nil { @@ -172,7 +231,13 @@ func (p *perfUsageCollector) unregisterTelemetry(m *manager.Manager) { p.perfMaps = slices.DeleteFunc(p.perfMaps, func(perfMap *manager.PerfMap) bool { return slices.Contains(m.PerfMaps, perfMap) }) + for _, pm := range m.PerfMaps { + delete(p.perfChannelLenFuncs, pm) + } p.ringBuffers = slices.DeleteFunc(p.ringBuffers, func(ringBuf *manager.RingBuffer) bool { return slices.Contains(m.RingBuffers, ringBuf) }) + for _, rb := range m.RingBuffers { + delete(p.ringChannelLenFuncs, rb) + } } diff --git a/pkg/network/config/config.go b/pkg/network/config/config.go index bf0e4ad4987862..126745eeba0ce0 100644 --- a/pkg/network/config/config.go +++ b/pkg/network/config/config.go @@ -218,6 +218,9 @@ type Config struct { // ClosedChannelSize specifies the size for closed channel for the tracer ClosedChannelSize int + // ClosedBufferWakeupCount specifies the number of events that will buffer in a perf buffer before userspace is woken up. + ClosedBufferWakeupCount int + // ExcludedSourceConnections is a map of source connections to blacklist ExcludedSourceConnections map[string][]string @@ -288,6 +291,9 @@ type Config struct { // EnableUSMEventStream enables USM to use the event stream instead // of netlink for receiving process events. EnableUSMEventStream bool + + // CustomBatchingEnabled enables the use of custom batching for eBPF perf events with perf buffers + CustomBatchingEnabled bool } // New creates a config for the network tracer @@ -318,8 +324,9 @@ func New() *Config { MaxTrackedConnections: uint32(cfg.GetInt64(sysconfig.FullKeyPath(spNS, "max_tracked_connections"))), MaxClosedConnectionsBuffered: uint32(cfg.GetInt64(sysconfig.FullKeyPath(spNS, "max_closed_connections_buffered"))), MaxFailedConnectionsBuffered: uint32(cfg.GetInt64(sysconfig.FullKeyPath(netNS, "max_failed_connections_buffered"))), - ClosedConnectionFlushThreshold: cfg.GetInt(sysconfig.FullKeyPath(spNS, "closed_connection_flush_threshold")), - ClosedChannelSize: cfg.GetInt(sysconfig.FullKeyPath(spNS, "closed_channel_size")), + ClosedConnectionFlushThreshold: cfg.GetInt(sysconfig.FullKeyPath(netNS, "closed_connection_flush_threshold")), + ClosedChannelSize: cfg.GetInt(sysconfig.FullKeyPath(netNS, "closed_channel_size")), + ClosedBufferWakeupCount: cfg.GetInt(sysconfig.FullKeyPath(netNS, "closed_buffer_wakeup_count")), MaxConnectionsStateBuffered: cfg.GetInt(sysconfig.FullKeyPath(spNS, "max_connection_state_buffered")), ClientStateExpiry: 2 * time.Minute, @@ -334,6 +341,7 @@ func New() *Config { ProtocolClassificationEnabled: cfg.GetBool(sysconfig.FullKeyPath(netNS, "enable_protocol_classification")), NPMRingbuffersEnabled: cfg.GetBool(sysconfig.FullKeyPath(netNS, "enable_ringbuffers")), + CustomBatchingEnabled: cfg.GetBool(sysconfig.FullKeyPath(netNS, "enable_custom_batching")), EnableHTTPMonitoring: cfg.GetBool(sysconfig.FullKeyPath(smNS, "enable_http_monitoring")), EnableHTTP2Monitoring: cfg.GetBool(sysconfig.FullKeyPath(smNS, "enable_http2_monitoring")), diff --git a/pkg/network/ebpf/c/tracer.c b/pkg/network/ebpf/c/tracer.c index 66cefbee1d08df..fd7f2ba9690648 100644 --- a/pkg/network/ebpf/c/tracer.c +++ b/pkg/network/ebpf/c/tracer.c @@ -304,7 +304,9 @@ int BPF_BYPASSABLE_KRETPROBE(kretprobe__tcp_close_clean_protocols) { bpf_map_delete_elem(&tcp_close_args, &pid_tgid); } - bpf_tail_call_compat(ctx, &tcp_close_progs, 0); + if (is_batching_enabled()) { + bpf_tail_call_compat(ctx, &tcp_close_progs, 0); + } return 0; } diff --git a/pkg/network/ebpf/c/tracer/events.h b/pkg/network/ebpf/c/tracer/events.h index 1de94d755b1f3c..4fa6b66936fda5 100644 --- a/pkg/network/ebpf/c/tracer/events.h +++ b/pkg/network/ebpf/c/tracer/events.h @@ -34,11 +34,32 @@ static __always_inline void clean_protocol_classification(conn_tuple_t *tup) { bpf_map_delete_elem(&conn_tuple_to_socket_skb_conn_tuple, &conn_tuple); } +static __always_inline bool is_batching_enabled() { + __u64 batching_enabled = 0; + LOAD_CONSTANT("batching_enabled", batching_enabled); + return batching_enabled != 0; +} + +__maybe_unused static __always_inline __u64 get_ringbuf_flags(size_t data_size) { + if (is_batching_enabled()) { + return 0; + } + + __u64 ringbuffer_wakeup_size = 0; + LOAD_CONSTANT("ringbuffer_wakeup_size", ringbuffer_wakeup_size); + if (ringbuffer_wakeup_size == 0) { + return 0; + } + + __u64 sz = bpf_ringbuf_query(&conn_close_event, DD_BPF_RB_AVAIL_DATA); + return (sz + data_size) >= ringbuffer_wakeup_size ? DD_BPF_RB_FORCE_WAKEUP : DD_BPF_RB_NO_WAKEUP; +} + __maybe_unused static __always_inline void submit_closed_conn_event(void *ctx, int cpu, void *event_data, size_t data_size) { __u64 ringbuffers_enabled = 0; LOAD_CONSTANT("ringbuffers_enabled", ringbuffers_enabled); if (ringbuffers_enabled > 0) { - bpf_ringbuf_output(&conn_close_event, event_data, data_size, 0); + bpf_ringbuf_output(&conn_close_event, event_data, data_size, get_ringbuf_flags(data_size)); } else { bpf_perf_event_output(ctx, &conn_close_event, cpu, event_data, data_size); } @@ -94,32 +115,34 @@ static __always_inline int cleanup_conn(void *ctx, conn_tuple_t *tup, struct soc // if we added another field conn.conn_stats.duration = bpf_ktime_get_ns() - conn.conn_stats.duration; - // Batch TCP closed connections before generating a perf event - batch_t *batch_ptr = bpf_map_lookup_elem(&conn_close_batch, &cpu); - if (batch_ptr == NULL) { - return -1; - } + if (is_batching_enabled()) { + // Batch TCP closed connections before generating a perf event + batch_t *batch_ptr = bpf_map_lookup_elem(&conn_close_batch, &cpu); + if (batch_ptr == NULL) { + return -1; + } - // TODO: Can we turn this into a macro based on TCP_CLOSED_BATCH_SIZE? - switch (batch_ptr->len) { - case 0: - batch_ptr->c0 = conn; - batch_ptr->len++; - return 0; - case 1: - batch_ptr->c1 = conn; - batch_ptr->len++; - return 0; - case 2: - batch_ptr->c2 = conn; - batch_ptr->len++; - return 0; - case 3: - batch_ptr->c3 = conn; - batch_ptr->len++; - // In this case the batch is ready to be flushed, which we defer to kretprobe/tcp_close - // in order to cope with the eBPF stack limitation of 512 bytes. - return 0; + // TODO: Can we turn this into a macro based on TCP_CLOSED_BATCH_SIZE? + switch (batch_ptr->len) { + case 0: + batch_ptr->c0 = conn; + batch_ptr->len++; + return 0; + case 1: + batch_ptr->c1 = conn; + batch_ptr->len++; + return 0; + case 2: + batch_ptr->c2 = conn; + batch_ptr->len++; + return 0; + case 3: + batch_ptr->c3 = conn; + batch_ptr->len++; + // In this case the batch is ready to be flushed, which we defer to kretprobe/tcp_close + // in order to cope with the eBPF stack limitation of 512 bytes. + return 0; + } } // If we hit this section it means we had one or more interleaved tcp_close calls. @@ -127,11 +150,13 @@ static __always_inline int cleanup_conn(void *ctx, conn_tuple_t *tup, struct soc // frequent of a case to cause performance issues and avoid cases where // we drop whole connections, which impacts things USM connection matching. submit_closed_conn_event(ctx, cpu, &conn, sizeof(conn_t)); - if (is_tcp) { - increment_telemetry_count(unbatched_tcp_close); - } - if (is_udp) { - increment_telemetry_count(unbatched_udp_close); + if (is_batching_enabled()) { + if (is_tcp) { + increment_telemetry_count(unbatched_tcp_close); + } + if (is_udp) { + increment_telemetry_count(unbatched_udp_close); + } } return 0; } diff --git a/pkg/network/ebpf/c/tracer/maps.h b/pkg/network/ebpf/c/tracer/maps.h index e6123782f8ea56..c97b643d561bb6 100644 --- a/pkg/network/ebpf/c/tracer/maps.h +++ b/pkg/network/ebpf/c/tracer/maps.h @@ -36,7 +36,7 @@ BPF_PERF_EVENT_ARRAY_MAP(conn_close_event, __u32) * or BPF_MAP_TYPE_PERCPU_ARRAY, but they are not available in * some of the Kernels we support (4.4 ~ 4.6) */ -BPF_HASH_MAP(conn_close_batch, __u32, batch_t, 1024) +BPF_HASH_MAP(conn_close_batch, __u32, batch_t, 1) /* * Map to hold struct sock parameter for tcp_sendmsg calls diff --git a/pkg/network/event_common_linux.go b/pkg/network/event_common_linux.go index 99058b017903a6..b0644f44b16111 100644 --- a/pkg/network/event_common_linux.go +++ b/pkg/network/event_common_linux.go @@ -3,10 +3,20 @@ // This product includes software developed at Datadog (https://www.datadoghq.com/). // Copyright 2016-present Datadog, Inc. -//go:build linux +//go:build linux_bpf package network +import ( + "fmt" + "math" + "time" + "unsafe" + + netebpf "github.com/DataDog/datadog-agent/pkg/network/ebpf" + "github.com/DataDog/datadog-agent/pkg/network/protocols" +) + // Sub returns s-other. // // This implementation is different from the implementation on @@ -50,3 +60,94 @@ func (s StatCounters) Sub(other StatCounters) (sc StatCounters, underflow bool) return sc, false } + +// UnmarshalBinary converts a raw byte slice to a ConnectionStats object +func (c *ConnectionStats) UnmarshalBinary(data []byte) error { + if len(data) < netebpf.SizeofConn { + return fmt.Errorf("'Conn' binary data too small, received %d but expected %d bytes", len(data), netebpf.SizeofConn) + } + + ct := (*netebpf.Conn)(unsafe.Pointer(&data[0])) + c.FromConn(ct) + return nil +} + +// FromConn populates relevant fields on ConnectionStats from the connection data +func (c *ConnectionStats) FromConn(ct *netebpf.Conn) { + c.FromTupleAndStats(&ct.Tup, &ct.Conn_stats) + c.FromTCPStats(&ct.Tcp_stats) +} + +// FromTupleAndStats populates relevant fields on ConnectionStats from the arguments +func (c *ConnectionStats) FromTupleAndStats(t *netebpf.ConnTuple, s *netebpf.ConnStats) { + *c = ConnectionStats{ConnectionTuple: ConnectionTuple{ + Pid: t.Pid, + NetNS: t.Netns, + Source: t.SourceAddress(), + Dest: t.DestAddress(), + SPort: t.Sport, + DPort: t.Dport, + }, + Monotonic: StatCounters{ + SentBytes: s.Sent_bytes, + RecvBytes: s.Recv_bytes, + SentPackets: uint64(s.Sent_packets), + RecvPackets: uint64(s.Recv_packets), + }, + LastUpdateEpoch: s.Timestamp, + IsAssured: s.IsAssured(), + Cookie: StatCookie(s.Cookie), + } + + if s.Duration <= uint64(math.MaxInt64) { + c.Duration = time.Duration(s.Duration) * time.Nanosecond + } + + c.ProtocolStack = protocols.Stack{ + API: protocols.API(s.Protocol_stack.Api), + Application: protocols.Application(s.Protocol_stack.Application), + Encryption: protocols.Encryption(s.Protocol_stack.Encryption), + } + + if t.Type() == netebpf.TCP { + c.Type = TCP + } else { + c.Type = UDP + } + + switch t.Family() { + case netebpf.IPv4: + c.Family = AFINET + case netebpf.IPv6: + c.Family = AFINET6 + } + + c.SPortIsEphemeral = IsPortInEphemeralRange(c.Family, c.Type, t.Sport) + + switch s.ConnectionDirection() { + case netebpf.Incoming: + c.Direction = INCOMING + case netebpf.Outgoing: + c.Direction = OUTGOING + default: + c.Direction = OUTGOING + } +} + +// FromTCPStats populates relevant fields on ConnectionStats from the arguments +func (c *ConnectionStats) FromTCPStats(tcpStats *netebpf.TCPStats) { + if c.Type != TCP || tcpStats == nil { + return + } + + c.Monotonic.Retransmits = tcpStats.Retransmits + c.Monotonic.TCPEstablished = tcpStats.State_transitions >> netebpf.Established & 1 + c.Monotonic.TCPClosed = tcpStats.State_transitions >> netebpf.Close & 1 + c.RTT = tcpStats.Rtt + c.RTTVar = tcpStats.Rtt_var + if tcpStats.Failure_reason > 0 { + c.TCPFailures = map[uint16]uint32{ + tcpStats.Failure_reason: 1, + } + } +} diff --git a/pkg/network/event_common_notlinux.go b/pkg/network/event_common_notlinux.go index 938acfb127f7a3..b366328c545d4c 100644 --- a/pkg/network/event_common_notlinux.go +++ b/pkg/network/event_common_notlinux.go @@ -3,7 +3,7 @@ // This product includes software developed at Datadog (https://www.datadoghq.com/). // Copyright 2016-present Datadog, Inc. -//go:build !linux +//go:build !linux_bpf package network diff --git a/pkg/network/protocols/events/configuration.go b/pkg/network/protocols/events/configuration.go index 0888c8d2fde7d6..2d85791bf820b0 100644 --- a/pkg/network/protocols/events/configuration.go +++ b/pkg/network/protocols/events/configuration.go @@ -159,7 +159,7 @@ func removeRingBufferHelperCalls(m *manager.Manager) { // TODO: this is not the intended API usage of a `ebpf.Modifier`. // Once we have access to the `ddebpf.Manager`, add this modifier to its list of // `EnabledModifiers` and let it control the execution of the callbacks - patcher := ddebpf.NewHelperCallRemover(asm.FnRingbufOutput) + patcher := ddebpf.NewHelperCallRemover(asm.FnRingbufOutput, asm.FnRingbufQuery, asm.FnRingbufReserve, asm.FnRingbufSubmit, asm.FnRingbufDiscard) err := patcher.BeforeInit(m, names.NewModuleName("usm"), nil) if err != nil { diff --git a/pkg/network/state_linux_test.go b/pkg/network/state_linux_test.go index 2a1f3a9b4df801..a7ec0737efd14d 100644 --- a/pkg/network/state_linux_test.go +++ b/pkg/network/state_linux_test.go @@ -3,7 +3,7 @@ // This product includes software developed at Datadog (https://www.datadoghq.com/). // Copyright 2016-present Datadog, Inc. -//go:build linux +//go:build linux_bpf package network diff --git a/pkg/network/tracer/connection/ebpf_tracer.go b/pkg/network/tracer/connection/ebpf_tracer.go index 9fc47be3a76d33..bb44b8b04e2eaa 100644 --- a/pkg/network/tracer/connection/ebpf_tracer.go +++ b/pkg/network/tracer/connection/ebpf_tracer.go @@ -24,23 +24,23 @@ import ( telemetryComponent "github.com/DataDog/datadog-agent/comp/core/telemetry" ddebpf "github.com/DataDog/datadog-agent/pkg/ebpf" "github.com/DataDog/datadog-agent/pkg/ebpf/maps" + "github.com/DataDog/datadog-agent/pkg/ebpf/perf" ebpftelemetry "github.com/DataDog/datadog-agent/pkg/ebpf/telemetry" "github.com/DataDog/datadog-agent/pkg/network" "github.com/DataDog/datadog-agent/pkg/network/config" netebpf "github.com/DataDog/datadog-agent/pkg/network/ebpf" "github.com/DataDog/datadog-agent/pkg/network/ebpf/probes" - "github.com/DataDog/datadog-agent/pkg/network/protocols" "github.com/DataDog/datadog-agent/pkg/network/tracer/connection/fentry" "github.com/DataDog/datadog-agent/pkg/network/tracer/connection/kprobe" "github.com/DataDog/datadog-agent/pkg/network/tracer/connection/util" "github.com/DataDog/datadog-agent/pkg/telemetry" + "github.com/DataDog/datadog-agent/pkg/util/encoding" "github.com/DataDog/datadog-agent/pkg/util/log" + ddsync "github.com/DataDog/datadog-agent/pkg/util/sync" ) const ( - defaultClosedChannelSize = 500 - defaultFailedChannelSize = 500 - connTracerModuleName = "network_tracer__ebpf" + connTracerModuleName = "network_tracer__ebpf" ) var tcpOngoingConnectMapTTL = 30 * time.Minute.Nanoseconds() @@ -137,7 +137,7 @@ var EbpfTracerTelemetry = struct { //nolint:revive // TODO } type ebpfTracer struct { - m *manager.Manager + m *ddebpf.Manager conns *maps.GenericMap[netebpf.ConnTuple, netebpf.ConnStats] tcpStats *maps.GenericMap[netebpf.ConnTuple, netebpf.TCPStats] @@ -157,8 +157,6 @@ type ebpfTracer struct { ebpfTracerType TracerType - exitTelemetry chan struct{} - ch *cookieHasher } @@ -194,24 +192,36 @@ func newEbpfTracer(config *config.Config, _ telemetryComponent.Component) (Trace manager.ConstantEditor{Name: "ephemeral_range_begin", Value: uint64(begin)}, manager.ConstantEditor{Name: "ephemeral_range_end", Value: uint64(end)}) - closedChannelSize := defaultClosedChannelSize - if config.ClosedChannelSize > 0 { - closedChannelSize = config.ClosedChannelSize + connPool := ddsync.NewDefaultTypedPool[network.ConnectionStats]() + var extractor *batchExtractor + + util.AddBoolConst(&mgrOptions, "batching_enabled", config.CustomBatchingEnabled) + if config.CustomBatchingEnabled { + numCPUs, err := ebpf.PossibleCPU() + if err != nil { + return nil, fmt.Errorf("could not determine number of CPUs: %w", err) + } + extractor = newBatchExtractor(numCPUs) + mgrOptions.MapSpecEditors[probes.ConnCloseBatchMap] = manager.MapSpecEditor{ + MaxEntries: uint32(numCPUs), + EditorFlag: manager.EditMaxEntries, + } } - var connCloseEventHandler ddebpf.EventHandler - var failedConnsHandler ddebpf.EventHandler - if config.RingBufferSupportedNPM() { - connCloseEventHandler = ddebpf.NewRingBufferHandler(closedChannelSize) - failedConnsHandler = ddebpf.NewRingBufferHandler(defaultFailedChannelSize) - } else { - connCloseEventHandler = ddebpf.NewPerfHandler(closedChannelSize) - failedConnsHandler = ddebpf.NewPerfHandler(defaultFailedChannelSize) + + tr := &ebpfTracer{ + removeTuple: &netebpf.ConnTuple{}, + ch: newCookieHasher(), } - var m *manager.Manager - var tracerType TracerType = TracerTypeFentry //nolint:revive // TODO + connCloseEventHandler, err := initClosedConnEventHandler(config, tr.closedPerfCallback, connPool, extractor) + if err != nil { + return nil, err + } + + var m *ddebpf.Manager + var tracerType = TracerTypeFentry var closeTracerFn func() - m, closeTracerFn, err := fentry.LoadTracer(config, mgrOptions, connCloseEventHandler) + m, closeTracerFn, err = fentry.LoadTracer(config, mgrOptions, connCloseEventHandler) if err != nil && !errors.Is(err, fentry.ErrorNotSupported) { // failed to load fentry tracer return nil, err @@ -221,26 +231,23 @@ func newEbpfTracer(config *config.Config, _ telemetryComponent.Component) (Trace // load the kprobe tracer log.Info("loading kprobe-based tracer") var kprobeTracerType kprobe.TracerType - m, closeTracerFn, kprobeTracerType, err = kprobe.LoadTracer(config, mgrOptions, connCloseEventHandler, failedConnsHandler) + m, closeTracerFn, kprobeTracerType, err = kprobe.LoadTracer(config, mgrOptions, connCloseEventHandler) if err != nil { return nil, err } tracerType = TracerType(kprobeTracerType) } m.DumpHandler = dumpMapsHandler - ddebpf.AddNameMappings(m, "npm_tracer") + ddebpf.AddNameMappings(m.Manager, "npm_tracer") - numCPUs, err := ebpf.PossibleCPU() - if err != nil { - return nil, fmt.Errorf("could not determine number of CPUs: %w", err) - } - extractor := newBatchExtractor(numCPUs) - batchMgr, err := newConnBatchManager(m, extractor) - if err != nil { - return nil, fmt.Errorf("could not create connection batch manager: %w", err) + var flusher perf.Flusher = connCloseEventHandler + if config.CustomBatchingEnabled { + flusher, err = newConnBatchManager(m.Manager, extractor, connPool, tr.closedPerfCallback) + if err != nil { + return nil, err + } } - - closeConsumer := newTCPCloseConsumer(connCloseEventHandler, batchMgr) + tr.closeConsumer = newTCPCloseConsumer(flusher, connPool) // Failed connections are not supported on prebuilt if tracerType == TracerTypeKProbePrebuilt { @@ -250,32 +257,26 @@ func newEbpfTracer(config *config.Config, _ telemetryComponent.Component) (Trace config.TCPFailedConnectionsEnabled = false } - tr := &ebpfTracer{ - m: m, - config: config, - closeConsumer: closeConsumer, - removeTuple: &netebpf.ConnTuple{}, - closeTracer: closeTracerFn, - ebpfTracerType: tracerType, - exitTelemetry: make(chan struct{}), - ch: newCookieHasher(), - } + tr.m = m + tr.config = config + tr.closeTracer = closeTracerFn + tr.ebpfTracerType = tracerType - tr.setupMapCleaner(m) + tr.setupMapCleaner(m.Manager) - tr.conns, err = maps.GetMap[netebpf.ConnTuple, netebpf.ConnStats](m, probes.ConnMap) + tr.conns, err = maps.GetMap[netebpf.ConnTuple, netebpf.ConnStats](m.Manager, probes.ConnMap) if err != nil { tr.Stop() return nil, fmt.Errorf("error retrieving the bpf %s map: %s", probes.ConnMap, err) } - tr.tcpStats, err = maps.GetMap[netebpf.ConnTuple, netebpf.TCPStats](m, probes.TCPStatsMap) + tr.tcpStats, err = maps.GetMap[netebpf.ConnTuple, netebpf.TCPStats](m.Manager, probes.TCPStatsMap) if err != nil { tr.Stop() return nil, fmt.Errorf("error retrieving the bpf %s map: %s", probes.TCPStatsMap, err) } - if tr.tcpRetransmits, err = maps.GetMap[netebpf.ConnTuple, uint32](m, probes.TCPRetransmitsMap); err != nil { + if tr.tcpRetransmits, err = maps.GetMap[netebpf.ConnTuple, uint32](m.Manager, probes.TCPRetransmitsMap); err != nil { tr.Stop() return nil, fmt.Errorf("error retrieving the bpf %s map: %s", probes.TCPRetransmitsMap, err) } @@ -283,6 +284,63 @@ func newEbpfTracer(config *config.Config, _ telemetryComponent.Component) (Trace return tr, nil } +func initClosedConnEventHandler(config *config.Config, closedCallback func(*network.ConnectionStats), pool ddsync.Pool[network.ConnectionStats], extractor *batchExtractor) (*perf.EventHandler, error) { + connHasher := newCookieHasher() + singleConnHandler := encoding.BinaryUnmarshalCallback(pool.Get, func(b *network.ConnectionStats, err error) { + if err != nil { + if b != nil { + pool.Put(b) + } + log.Debug(err.Error()) + return + } + if b != nil { + connHasher.Hash(b) + } + closedCallback(b) + }) + + handler := singleConnHandler + perfMode := perf.WakeupEvents(config.ClosedBufferWakeupCount) + // multiply by number of connections with in-buffer batching to have same effective size as with custom batching + chanSize := config.ClosedChannelSize * config.ClosedBufferWakeupCount + if config.CustomBatchingEnabled { + perfMode = perf.Watermark(1) + chanSize = config.ClosedChannelSize + handler = func(buf []byte) { + l := len(buf) + switch { + case l >= netebpf.SizeofBatch: + b := netebpf.ToBatch(buf) + for rc := extractor.NextConnection(b); rc != nil; rc = extractor.NextConnection(b) { + c := pool.Get() + c.FromConn(rc) + connHasher.Hash(c) + + closedCallback(c) + } + case l >= netebpf.SizeofConn: + singleConnHandler(buf) + case l == 0: + singleConnHandler(nil) + default: + log.Debugf("unexpected %q binary data of size %d bytes", probes.ConnCloseEventMap, l) + } + } + } + + perfBufferSize := util.ComputeDefaultClosedConnPerfBufferSize() + mode := perf.UsePerfBuffers(perfBufferSize, chanSize, perfMode) + if config.RingBufferSupportedNPM() { + mode = perf.UpgradePerfBuffers(perfBufferSize, chanSize, perfMode, util.ComputeDefaultClosedConnRingBufferSize()) + } + + return perf.NewEventHandler(probes.ConnCloseEventMap, handler, mode, + perf.SendTelemetry(config.InternalTelemetryEnabled), + perf.RingBufferEnabledConstantName("ringbuffers_enabled"), + perf.RingBufferWakeupSize("ringbuffer_wakeup_size", uint64(config.ClosedBufferWakeupCount*(netebpf.SizeofConn+unix.BPF_RINGBUF_HDR_SZ)))) +} + func boolConst(name string, value bool) manager.ConstantEditor { c := manager.ConstantEditor{ Name: name, @@ -295,6 +353,10 @@ func boolConst(name string, value bool) manager.ConstantEditor { return c } +func (t *ebpfTracer) closedPerfCallback(c *network.ConnectionStats) { + t.closeConsumer.Callback(c) +} + func (t *ebpfTracer) Start(callback func(*network.ConnectionStats)) (err error) { defer func() { if err != nil { @@ -307,11 +369,13 @@ func (t *ebpfTracer) Start(callback func(*network.ConnectionStats)) (err error) return fmt.Errorf("error initializing port binding maps: %s", err) } + t.closeConsumer.Start(callback) + if err := t.m.Start(); err != nil { + t.closeConsumer.Stop() return fmt.Errorf("could not start ebpf manager: %s", err) } - t.closeConsumer.Start(callback) return nil } @@ -334,9 +398,8 @@ func (t *ebpfTracer) FlushPending() { func (t *ebpfTracer) Stop() { t.stopOnce.Do(func() { - close(t.exitTelemetry) - ddebpf.RemoveNameMappings(t.m) - ebpftelemetry.UnregisterTelemetry(t.m) + ddebpf.RemoveNameMappings(t.m.Manager) + ebpftelemetry.UnregisterTelemetry(t.m.Manager) _ = t.m.Stop(manager.CleanAll) t.closeConsumer.Stop() t.ongoingConnectCleaner.Stop() @@ -379,7 +442,8 @@ func (t *ebpfTracer) GetConnections(buffer *network.ConnectionBuffer, filter fun continue } - populateConnStats(conn, key, stats, t.ch) + conn.FromTupleAndStats(key, stats) + t.ch.Hash(conn) connsByTuple[*key] = stats.Cookie isTCP := conn.Type == network.TCP @@ -403,7 +467,7 @@ func (t *ebpfTracer) GetConnections(buffer *network.ConnectionBuffer, filter fun } if t.getTCPStats(tcp, key) { - updateTCPStats(conn, tcp) + conn.FromTCPStats(tcp) } if retrans, ok := t.getTCPRetransmits(key, seen); ok && conn.Type == network.TCP { conn.Monotonic.Retransmits = retrans @@ -480,7 +544,7 @@ func (t *ebpfTracer) Remove(conn *network.ConnectionStats) error { func (t *ebpfTracer) getEBPFTelemetry() *netebpf.Telemetry { var zero uint32 - mp, err := maps.GetMap[uint32, netebpf.Telemetry](t.m, probes.TelemetryMap) + mp, err := maps.GetMap[uint32, netebpf.Telemetry](t.m.Manager, probes.TelemetryMap) if err != nil { log.Warnf("error retrieving telemetry map: %s", err) return nil @@ -610,7 +674,7 @@ func (t *ebpfTracer) initializePortBindingMaps() error { return fmt.Errorf("failed to read initial TCP pid->port mapping: %s", err) } - tcpPortMap, err := maps.GetMap[netebpf.PortBinding, uint32](t.m, probes.PortBindingsMap) + tcpPortMap, err := maps.GetMap[netebpf.PortBinding, uint32](t.m.Manager, probes.PortBindingsMap) if err != nil { return fmt.Errorf("failed to get TCP port binding map: %w", err) } @@ -628,7 +692,7 @@ func (t *ebpfTracer) initializePortBindingMaps() error { return fmt.Errorf("failed to read initial UDP pid->port mapping: %s", err) } - udpPortMap, err := maps.GetMap[netebpf.PortBinding, uint32](t.m, probes.UDPPortBindingsMap) + udpPortMap, err := maps.GetMap[netebpf.PortBinding, uint32](t.m.Manager, probes.UDPPortBindingsMap) if err != nil { return fmt.Errorf("failed to get UDP port binding map: %w", err) } @@ -707,81 +771,3 @@ func (t *ebpfTracer) setupMapCleaner(m *manager.Manager) { t.ongoingConnectCleaner = tcpOngoingConnectPidCleaner } - -func populateConnStats(stats *network.ConnectionStats, t *netebpf.ConnTuple, s *netebpf.ConnStats, ch *cookieHasher) { - *stats = network.ConnectionStats{ConnectionTuple: network.ConnectionTuple{ - Pid: t.Pid, - NetNS: t.Netns, - Source: t.SourceAddress(), - Dest: t.DestAddress(), - SPort: t.Sport, - DPort: t.Dport, - }, - Monotonic: network.StatCounters{ - SentBytes: s.Sent_bytes, - RecvBytes: s.Recv_bytes, - SentPackets: uint64(s.Sent_packets), - RecvPackets: uint64(s.Recv_packets), - }, - LastUpdateEpoch: s.Timestamp, - IsAssured: s.IsAssured(), - Cookie: network.StatCookie(s.Cookie), - } - - if s.Duration <= uint64(math.MaxInt64) { - stats.Duration = time.Duration(s.Duration) * time.Nanosecond - } - - stats.ProtocolStack = protocols.Stack{ - API: protocols.API(s.Protocol_stack.Api), - Application: protocols.Application(s.Protocol_stack.Application), - Encryption: protocols.Encryption(s.Protocol_stack.Encryption), - } - - if t.Type() == netebpf.TCP { - stats.Type = network.TCP - } else { - stats.Type = network.UDP - } - - switch t.Family() { - case netebpf.IPv4: - stats.Family = network.AFINET - case netebpf.IPv6: - stats.Family = network.AFINET6 - } - - stats.SPortIsEphemeral = network.IsPortInEphemeralRange(stats.Family, stats.Type, t.Sport) - - switch s.ConnectionDirection() { - case netebpf.Incoming: - stats.Direction = network.INCOMING - case netebpf.Outgoing: - stats.Direction = network.OUTGOING - default: - stats.Direction = network.OUTGOING - } - - if ch != nil { - ch.Hash(stats) - } -} - -func updateTCPStats(conn *network.ConnectionStats, tcpStats *netebpf.TCPStats) { - if conn.Type != network.TCP { - return - } - - if tcpStats != nil { - conn.Monotonic.Retransmits = tcpStats.Retransmits - conn.Monotonic.TCPEstablished = tcpStats.State_transitions >> netebpf.Established & 1 - conn.Monotonic.TCPClosed = tcpStats.State_transitions >> netebpf.Close & 1 - conn.RTT = tcpStats.Rtt - conn.RTTVar = tcpStats.Rtt_var - if tcpStats.Failure_reason > 0 { - conn.TCPFailures = map[uint16]uint32{ - tcpStats.Failure_reason: 1, - } - } - } -} diff --git a/pkg/network/tracer/connection/fentry/manager.go b/pkg/network/tracer/connection/fentry/manager.go index b41820b7ffedf3..deddd6708f5fd7 100644 --- a/pkg/network/tracer/connection/fentry/manager.go +++ b/pkg/network/tracer/connection/fentry/manager.go @@ -11,12 +11,10 @@ import ( manager "github.com/DataDog/ebpf-manager" ddebpf "github.com/DataDog/datadog-agent/pkg/ebpf" - "github.com/DataDog/datadog-agent/pkg/network/config" "github.com/DataDog/datadog-agent/pkg/network/ebpf/probes" - "github.com/DataDog/datadog-agent/pkg/network/tracer/connection/util" ) -func initManager(mgr *ddebpf.Manager, connCloseEventHandler ddebpf.EventHandler, cfg *config.Config) { +func initManager(mgr *ddebpf.Manager) { mgr.Maps = []*manager.Map{ {Name: probes.ConnMap}, {Name: probes.TCPStatsMap}, @@ -29,7 +27,6 @@ func initManager(mgr *ddebpf.Manager, connCloseEventHandler ddebpf.EventHandler, {Name: "pending_bind"}, {Name: probes.TelemetryMap}, } - util.SetupClosedConnHandler(connCloseEventHandler, mgr, cfg) for funcName := range programs { p := &manager.Probe{ ProbeIdentificationPair: manager.ProbeIdentificationPair{ diff --git a/pkg/network/tracer/connection/fentry/probes.go b/pkg/network/tracer/connection/fentry/probes.go index 8d8ce7ae73df3d..02ad4d6937c7cf 100644 --- a/pkg/network/tracer/connection/fentry/probes.go +++ b/pkg/network/tracer/connection/fentry/probes.go @@ -138,7 +138,6 @@ func enabledPrograms(c *config.Config) (map[string]struct{}, error) { enableProgram(enabled, tcpSendPageReturn) enableProgram(enabled, selectVersionBasedProbe(kv, tcpRecvMsgReturn, tcpRecvMsgPre5190Return, kv5190)) enableProgram(enabled, tcpClose) - enableProgram(enabled, tcpCloseReturn) enableProgram(enabled, tcpConnect) enableProgram(enabled, tcpFinishConnect) enableProgram(enabled, inetCskAcceptReturn) @@ -153,30 +152,40 @@ func enabledPrograms(c *config.Config) (map[string]struct{}, error) { // if err == nil && len(missing) == 0 { // enableProgram(enabled, sockFDLookupRet) // } + + if c.CustomBatchingEnabled { + enableProgram(enabled, tcpCloseReturn) + } } if c.CollectUDPv4Conns { enableProgram(enabled, udpSendPageReturn) enableProgram(enabled, udpDestroySock) - enableProgram(enabled, udpDestroySockReturn) enableProgram(enabled, inetBind) enableProgram(enabled, inetBindRet) enableProgram(enabled, udpRecvMsg) enableProgram(enabled, selectVersionBasedProbe(kv, udpRecvMsgReturn, udpRecvMsgPre5190Return, kv5190)) enableProgram(enabled, udpSendMsgReturn) enableProgram(enabled, udpSendSkb) + + if c.CustomBatchingEnabled { + enableProgram(enabled, udpDestroySockReturn) + } } if c.CollectUDPv6Conns { enableProgram(enabled, udpSendPageReturn) enableProgram(enabled, udpv6DestroySock) - enableProgram(enabled, udpv6DestroySockReturn) enableProgram(enabled, inet6Bind) enableProgram(enabled, inet6BindRet) enableProgram(enabled, udpv6RecvMsg) enableProgram(enabled, selectVersionBasedProbe(kv, udpv6RecvMsgReturn, udpv6RecvMsgPre5190Return, kv5190)) enableProgram(enabled, udpv6SendMsgReturn) enableProgram(enabled, udpv6SendSkb) + + if c.CustomBatchingEnabled { + enableProgram(enabled, udpv6DestroySockReturn) + } } if c.CollectUDPv4Conns || c.CollectUDPv6Conns { diff --git a/pkg/network/tracer/connection/fentry/tracer.go b/pkg/network/tracer/connection/fentry/tracer.go index d44715e8c87e5e..0a88ad6710b749 100644 --- a/pkg/network/tracer/connection/fentry/tracer.go +++ b/pkg/network/tracer/connection/fentry/tracer.go @@ -17,10 +17,10 @@ import ( ddebpf "github.com/DataDog/datadog-agent/pkg/ebpf" "github.com/DataDog/datadog-agent/pkg/ebpf/bytecode" + "github.com/DataDog/datadog-agent/pkg/ebpf/perf" ebpftelemetry "github.com/DataDog/datadog-agent/pkg/ebpf/telemetry" "github.com/DataDog/datadog-agent/pkg/network/config" netebpf "github.com/DataDog/datadog-agent/pkg/network/ebpf" - "github.com/DataDog/datadog-agent/pkg/network/tracer/connection/util" "github.com/DataDog/datadog-agent/pkg/util/fargate" ) @@ -29,71 +29,65 @@ const probeUID = "net" var ErrorNotSupported = errors.New("fentry tracer is only supported on Fargate") //nolint:revive // TODO // LoadTracer loads a new tracer -func LoadTracer(config *config.Config, mgrOpts manager.Options, connCloseEventHandler ddebpf.EventHandler) (*manager.Manager, func(), error) { +func LoadTracer(config *config.Config, mgrOpts manager.Options, connCloseEventHandler *perf.EventHandler) (*ddebpf.Manager, func(), error) { if !fargate.IsFargateInstance() { return nil, nil, ErrorNotSupported } - m := ddebpf.NewManagerWithDefault(&manager.Manager{}, "network", &ebpftelemetry.ErrorsTelemetryModifier{}) + m := ddebpf.NewManagerWithDefault(&manager.Manager{}, "network", &ebpftelemetry.ErrorsTelemetryModifier{}, connCloseEventHandler) err := ddebpf.LoadCOREAsset(netebpf.ModuleFileName("tracer-fentry", config.BPFDebug), func(ar bytecode.AssetReader, o manager.Options) error { o.RemoveRlimit = mgrOpts.RemoveRlimit o.MapSpecEditors = mgrOpts.MapSpecEditors o.ConstantEditors = mgrOpts.ConstantEditors + return initFentryTracer(ar, o, config, m) + }) - // Use the config to determine what kernel probes should be enabled - enabledProbes, err := enabledPrograms(config) - if err != nil { - return fmt.Errorf("invalid probe configuration: %v", err) - } - - initManager(m, connCloseEventHandler, config) - - file, err := os.Stat("/proc/self/ns/pid") + if err != nil { + return nil, nil, err + } - if err != nil { - return fmt.Errorf("could not load sysprobe pid: %w", err) - } + return m, nil, nil +} - device := file.Sys().(*syscall.Stat_t).Dev - inode := file.Sys().(*syscall.Stat_t).Ino - ringbufferEnabled := config.RingBufferSupportedNPM() - - o.ConstantEditors = append(o.ConstantEditors, manager.ConstantEditor{ - Name: "systemprobe_device", - Value: device, - }) - o.ConstantEditors = append(o.ConstantEditors, manager.ConstantEditor{ - Name: "systemprobe_ino", - Value: inode, - }) - util.AddBoolConst(&o, "ringbuffers_enabled", ringbufferEnabled) - if ringbufferEnabled { - util.EnableRingbuffersViaMapEditor(&mgrOpts) - } +// Use a function so someone doesn't accidentally use mgrOpts from the outer scope in LoadTracer +func initFentryTracer(ar bytecode.AssetReader, o manager.Options, config *config.Config, m *ddebpf.Manager) error { + // Use the config to determine what kernel probes should be enabled + enabledProbes, err := enabledPrograms(config) + if err != nil { + return fmt.Errorf("invalid probe configuration: %v", err) + } - // exclude all non-enabled probes to ensure we don't run into problems with unsupported probe types - for _, p := range m.Probes { - if _, enabled := enabledProbes[p.EBPFFuncName]; !enabled { - o.ExcludedFunctions = append(o.ExcludedFunctions, p.EBPFFuncName) - } - } - for funcName := range enabledProbes { - o.ActivatedProbes = append( - o.ActivatedProbes, - &manager.ProbeSelector{ - ProbeIdentificationPair: manager.ProbeIdentificationPair{ - EBPFFuncName: funcName, - UID: probeUID, - }, - }) - } + initManager(m) - return m.InitWithOptions(ar, &o) + file, err := os.Stat("/proc/self/ns/pid") + if err != nil { + return fmt.Errorf("could not load sysprobe pid: %w", err) + } + pidStat := file.Sys().(*syscall.Stat_t) + o.ConstantEditors = append(o.ConstantEditors, manager.ConstantEditor{ + Name: "systemprobe_device", + Value: pidStat.Dev, + }, manager.ConstantEditor{ + Name: "systemprobe_ino", + Value: pidStat.Ino, }) - if err != nil { - return nil, nil, err + // exclude all non-enabled probes to ensure we don't run into problems with unsupported probe types + for _, p := range m.Probes { + if _, enabled := enabledProbes[p.EBPFFuncName]; !enabled { + o.ExcludedFunctions = append(o.ExcludedFunctions, p.EBPFFuncName) + } + } + for funcName := range enabledProbes { + o.ActivatedProbes = append( + o.ActivatedProbes, + &manager.ProbeSelector{ + ProbeIdentificationPair: manager.ProbeIdentificationPair{ + EBPFFuncName: funcName, + UID: probeUID, + }, + }) } - return m.Manager, nil, nil + return m.InitWithOptions(ar, &o) } diff --git a/pkg/network/tracer/connection/kprobe/config.go b/pkg/network/tracer/connection/kprobe/config.go index 880a2f0a5e8388..7567b28e5c4d44 100644 --- a/pkg/network/tracer/connection/kprobe/config.go +++ b/pkg/network/tracer/connection/kprobe/config.go @@ -8,10 +8,14 @@ package kprobe import ( + "errors" "fmt" + manager "github.com/DataDog/ebpf-manager" + "github.com/DataDog/datadog-agent/pkg/ebpf" "github.com/DataDog/datadog-agent/pkg/network/config" + netebpf "github.com/DataDog/datadog-agent/pkg/network/ebpf" "github.com/DataDog/datadog-agent/pkg/network/ebpf/probes" "github.com/DataDog/datadog-agent/pkg/util/kernel" "github.com/DataDog/datadog-agent/pkg/util/log" @@ -76,10 +80,15 @@ func enabledProbes(c *config.Config, runtimeTracer, coreTracer bool) (map[probes enableProbe(enabled, probes.TCPReadSock) enableProbe(enabled, probes.TCPReadSockReturn) enableProbe(enabled, probes.TCPClose) - enableProbe(enabled, probes.TCPCloseFlushReturn) + if c.CustomBatchingEnabled { + enableProbe(enabled, probes.TCPCloseFlushReturn) + } + enableProbe(enabled, probes.TCPConnect) enableProbe(enabled, probes.TCPDone) - enableProbe(enabled, probes.TCPDoneFlushReturn) + if c.CustomBatchingEnabled { + enableProbe(enabled, probes.TCPDoneFlushReturn) + } enableProbe(enabled, probes.TCPFinishConnect) enableProbe(enabled, probes.InetCskAcceptReturn) enableProbe(enabled, probes.InetCskListenStop) @@ -93,7 +102,9 @@ func enabledProbes(c *config.Config, runtimeTracer, coreTracer bool) (map[probes if c.CollectUDPv4Conns { enableProbe(enabled, probes.UDPDestroySock) - enableProbe(enabled, probes.UDPDestroySockReturn) + if c.CustomBatchingEnabled { + enableProbe(enabled, probes.UDPDestroySockReturn) + } enableProbe(enabled, selectVersionBasedProbe(runtimeTracer, kv, probes.IPMakeSkb, probes.IPMakeSkbPre4180, kv4180)) enableProbe(enabled, probes.IPMakeSkbReturn) enableProbe(enabled, probes.InetBind) @@ -116,11 +127,13 @@ func enabledProbes(c *config.Config, runtimeTracer, coreTracer bool) (map[probes if c.CollectUDPv6Conns { enableProbe(enabled, probes.UDPv6DestroySock) - enableProbe(enabled, probes.UDPv6DestroySockReturn) + if c.CustomBatchingEnabled { + enableProbe(enabled, probes.UDPv6DestroySockReturn) + } if kv >= kv5180 || runtimeTracer { // prebuilt shouldn't arrive here with 5.18+ and UDPv6 enabled if !coreTracer && !runtimeTracer { - return nil, fmt.Errorf("UDPv6 does not function on prebuilt tracer with kernel versions 5.18+") + return nil, errors.New("UDPv6 does not function on prebuilt tracer with kernel versions 5.18+") } enableProbe(enabled, probes.IP6MakeSkb) } else if kv >= kv470 { @@ -156,6 +169,46 @@ func enabledProbes(c *config.Config, runtimeTracer, coreTracer bool) (map[probes return enabled, nil } +func protocolClassificationTailCalls(cfg *config.Config) []manager.TailCallRoute { + tcs := []manager.TailCallRoute{ + { + ProgArrayName: probes.ClassificationProgsMap, + Key: netebpf.ClassificationQueues, + ProbeIdentificationPair: manager.ProbeIdentificationPair{ + EBPFFuncName: probes.ProtocolClassifierQueuesSocketFilter, + UID: probeUID, + }, + }, + { + ProgArrayName: probes.ClassificationProgsMap, + Key: netebpf.ClassificationDBs, + ProbeIdentificationPair: manager.ProbeIdentificationPair{ + EBPFFuncName: probes.ProtocolClassifierDBsSocketFilter, + UID: probeUID, + }, + }, + { + ProgArrayName: probes.ClassificationProgsMap, + Key: netebpf.ClassificationGRPC, + ProbeIdentificationPair: manager.ProbeIdentificationPair{ + EBPFFuncName: probes.ProtocolClassifierGRPCSocketFilter, + UID: probeUID, + }, + }, + } + if cfg.CustomBatchingEnabled { + tcs = append(tcs, manager.TailCallRoute{ + ProgArrayName: probes.TCPCloseProgsMap, + Key: 0, + ProbeIdentificationPair: manager.ProbeIdentificationPair{ + EBPFFuncName: probes.TCPCloseFlushReturn, + UID: probeUID, + }, + }) + } + return tcs +} + func enableAdvancedUDP(enabled map[probes.ProbeFuncName]struct{}) error { missing, err := ebpf.VerifyKernelFuncs("skb_consume_udp", "__skb_free_datagram_locked", "skb_free_datagram_locked") if err != nil { @@ -169,7 +222,7 @@ func enableAdvancedUDP(enabled map[probes.ProbeFuncName]struct{}) error { } else if _, miss := missing["skb_free_datagram_locked"]; !miss { enableProbe(enabled, probes.SKBFreeDatagramLocked) } else { - return fmt.Errorf("missing desired UDP receive kernel functions") + return errors.New("missing desired UDP receive kernel functions") } return nil } diff --git a/pkg/network/tracer/connection/kprobe/manager.go b/pkg/network/tracer/connection/kprobe/manager.go index fb2ee4b7bd0656..63dde86a0e1073 100644 --- a/pkg/network/tracer/connection/kprobe/manager.go +++ b/pkg/network/tracer/connection/kprobe/manager.go @@ -11,9 +11,8 @@ import ( manager "github.com/DataDog/ebpf-manager" ddebpf "github.com/DataDog/datadog-agent/pkg/ebpf" - "github.com/DataDog/datadog-agent/pkg/network/config" "github.com/DataDog/datadog-agent/pkg/network/ebpf/probes" - "github.com/DataDog/datadog-agent/pkg/network/tracer/connection/util" + "github.com/DataDog/datadog-agent/pkg/util/slices" ) var mainProbes = []probes.ProbeFuncName{ @@ -32,9 +31,7 @@ var mainProbes = []probes.ProbeFuncName{ probes.TCPReadSockReturn, probes.TCPClose, probes.TCPDone, - probes.TCPDoneFlushReturn, probes.TCPCloseCleanProtocolsReturn, - probes.TCPCloseFlushReturn, probes.TCPConnect, probes.TCPFinishConnect, probes.IPMakeSkb, @@ -50,9 +47,7 @@ var mainProbes = []probes.ProbeFuncName{ probes.InetCskAcceptReturn, probes.InetCskListenStop, probes.UDPDestroySock, - probes.UDPDestroySockReturn, probes.UDPv6DestroySock, - probes.UDPv6DestroySockReturn, probes.InetBind, probes.Inet6Bind, probes.InetBindRet, @@ -61,7 +56,14 @@ var mainProbes = []probes.ProbeFuncName{ probes.UDPSendPageReturn, } -func initManager(mgr *ddebpf.Manager, connCloseEventHandler ddebpf.EventHandler, runtimeTracer bool, cfg *config.Config) error { +var batchProbes = []probes.ProbeFuncName{ + probes.TCPDoneFlushReturn, + probes.TCPCloseFlushReturn, + probes.UDPDestroySockReturn, + probes.UDPv6DestroySockReturn, +} + +func initManager(mgr *ddebpf.Manager, runtimeTracer bool) error { mgr.Maps = []*manager.Map{ {Name: probes.ConnMap}, {Name: probes.TCPStatsMap}, @@ -82,45 +84,45 @@ func initManager(mgr *ddebpf.Manager, connCloseEventHandler ddebpf.EventHandler, {Name: probes.ClassificationProgsMap}, {Name: probes.TCPCloseProgsMap}, } - util.SetupClosedConnHandler(connCloseEventHandler, mgr, cfg) - for _, funcName := range mainProbes { - p := &manager.Probe{ + var funcNameToProbe = func(funcName probes.ProbeFuncName) *manager.Probe { + return &manager.Probe{ ProbeIdentificationPair: manager.ProbeIdentificationPair{ EBPFFuncName: funcName, UID: probeUID, }, } - mgr.Probes = append(mgr.Probes, p) } - mgr.Probes = append(mgr.Probes, - &manager.Probe{ProbeIdentificationPair: manager.ProbeIdentificationPair{EBPFFuncName: probes.SKBFreeDatagramLocked, UID: probeUID}}, - &manager.Probe{ProbeIdentificationPair: manager.ProbeIdentificationPair{EBPFFuncName: probes.UnderscoredSKBFreeDatagramLocked, UID: probeUID}}, - &manager.Probe{ProbeIdentificationPair: manager.ProbeIdentificationPair{EBPFFuncName: probes.SKBConsumeUDP, UID: probeUID}}, - ) + mgr.Probes = append(mgr.Probes, slices.Map(mainProbes, funcNameToProbe)...) + mgr.Probes = append(mgr.Probes, slices.Map(batchProbes, funcNameToProbe)...) + mgr.Probes = append(mgr.Probes, slices.Map([]probes.ProbeFuncName{ + probes.SKBFreeDatagramLocked, + probes.UnderscoredSKBFreeDatagramLocked, + probes.SKBConsumeUDP, + }, funcNameToProbe)...) if !runtimeTracer { // the runtime compiled tracer has no need for separate probes targeting specific kernel versions, since it can // do that with #ifdefs inline. Thus, the following probes should only be declared as existing in the prebuilt // tracer. - mgr.Probes = append(mgr.Probes, - &manager.Probe{ProbeIdentificationPair: manager.ProbeIdentificationPair{EBPFFuncName: probes.TCPRetransmitPre470, UID: probeUID}}, - &manager.Probe{ProbeIdentificationPair: manager.ProbeIdentificationPair{EBPFFuncName: probes.IPMakeSkbPre4180, UID: probeUID}}, - &manager.Probe{ProbeIdentificationPair: manager.ProbeIdentificationPair{EBPFFuncName: probes.IP6MakeSkbPre470, UID: probeUID}}, - &manager.Probe{ProbeIdentificationPair: manager.ProbeIdentificationPair{EBPFFuncName: probes.IP6MakeSkbPre5180, UID: probeUID}}, - &manager.Probe{ProbeIdentificationPair: manager.ProbeIdentificationPair{EBPFFuncName: probes.UDPRecvMsgPre5190, UID: probeUID}}, - &manager.Probe{ProbeIdentificationPair: manager.ProbeIdentificationPair{EBPFFuncName: probes.UDPv6RecvMsgPre5190, UID: probeUID}}, - &manager.Probe{ProbeIdentificationPair: manager.ProbeIdentificationPair{EBPFFuncName: probes.UDPRecvMsgPre470, UID: probeUID}}, - &manager.Probe{ProbeIdentificationPair: manager.ProbeIdentificationPair{EBPFFuncName: probes.UDPv6RecvMsgPre470, UID: probeUID}}, - &manager.Probe{ProbeIdentificationPair: manager.ProbeIdentificationPair{EBPFFuncName: probes.UDPRecvMsgPre410, UID: probeUID}}, - &manager.Probe{ProbeIdentificationPair: manager.ProbeIdentificationPair{EBPFFuncName: probes.UDPv6RecvMsgPre410, UID: probeUID}}, - &manager.Probe{ProbeIdentificationPair: manager.ProbeIdentificationPair{EBPFFuncName: probes.UDPRecvMsgReturnPre470, UID: probeUID}}, - &manager.Probe{ProbeIdentificationPair: manager.ProbeIdentificationPair{EBPFFuncName: probes.UDPv6RecvMsgReturnPre470, UID: probeUID}}, - &manager.Probe{ProbeIdentificationPair: manager.ProbeIdentificationPair{EBPFFuncName: probes.TCPSendMsgPre410, UID: probeUID}}, - &manager.Probe{ProbeIdentificationPair: manager.ProbeIdentificationPair{EBPFFuncName: probes.TCPRecvMsgPre410, UID: probeUID}}, - &manager.Probe{ProbeIdentificationPair: manager.ProbeIdentificationPair{EBPFFuncName: probes.TCPRecvMsgPre5190, UID: probeUID}}, - ) + mgr.Probes = append(mgr.Probes, slices.Map([]probes.ProbeFuncName{ + probes.TCPRetransmitPre470, + probes.IPMakeSkbPre4180, + probes.IP6MakeSkbPre470, + probes.IP6MakeSkbPre5180, + probes.UDPRecvMsgPre5190, + probes.UDPv6RecvMsgPre5190, + probes.UDPRecvMsgPre470, + probes.UDPv6RecvMsgPre470, + probes.UDPRecvMsgPre410, + probes.UDPv6RecvMsgPre410, + probes.UDPRecvMsgReturnPre470, + probes.UDPv6RecvMsgReturnPre470, + probes.TCPSendMsgPre410, + probes.TCPRecvMsgPre410, + probes.TCPRecvMsgPre5190, + }, funcNameToProbe)...) } return nil diff --git a/pkg/network/tracer/connection/kprobe/tracer.go b/pkg/network/tracer/connection/kprobe/tracer.go index 61ee6a88087718..60947618fae0dc 100644 --- a/pkg/network/tracer/connection/kprobe/tracer.go +++ b/pkg/network/tracer/connection/kprobe/tracer.go @@ -16,6 +16,7 @@ import ( ddebpf "github.com/DataDog/datadog-agent/pkg/ebpf" "github.com/DataDog/datadog-agent/pkg/ebpf/bytecode" + "github.com/DataDog/datadog-agent/pkg/ebpf/perf" "github.com/DataDog/datadog-agent/pkg/ebpf/prebuilt" ebpftelemetry "github.com/DataDog/datadog-agent/pkg/ebpf/telemetry" "github.com/DataDog/datadog-agent/pkg/network/config" @@ -44,41 +45,6 @@ var ( // - 2492d3b867043f6880708d095a7a5d65debcfc32 classificationMinimumKernel = kernel.VersionCode(4, 11, 0) - protocolClassificationTailCalls = []manager.TailCallRoute{ - { - ProgArrayName: probes.ClassificationProgsMap, - Key: netebpf.ClassificationQueues, - ProbeIdentificationPair: manager.ProbeIdentificationPair{ - EBPFFuncName: probes.ProtocolClassifierQueuesSocketFilter, - UID: probeUID, - }, - }, - { - ProgArrayName: probes.ClassificationProgsMap, - Key: netebpf.ClassificationDBs, - ProbeIdentificationPair: manager.ProbeIdentificationPair{ - EBPFFuncName: probes.ProtocolClassifierDBsSocketFilter, - UID: probeUID, - }, - }, - { - ProgArrayName: probes.ClassificationProgsMap, - Key: netebpf.ClassificationGRPC, - ProbeIdentificationPair: manager.ProbeIdentificationPair{ - EBPFFuncName: probes.ProtocolClassifierGRPCSocketFilter, - UID: probeUID, - }, - }, - { - ProgArrayName: probes.TCPCloseProgsMap, - Key: 0, - ProbeIdentificationPair: manager.ProbeIdentificationPair{ - EBPFFuncName: probes.TCPCloseFlushReturn, - UID: probeUID, - }, - }, - } - // these primarily exist for mocking out in tests coreTracerLoader = loadCORETracer rcTracerLoader = loadRuntimeCompiledTracer @@ -109,7 +75,7 @@ func ClassificationSupported(config *config.Config) bool { } // LoadTracer loads the co-re/prebuilt/runtime compiled network tracer, depending on config -func LoadTracer(cfg *config.Config, mgrOpts manager.Options, connCloseEventHandler ddebpf.EventHandler, failedConnsHandler ddebpf.EventHandler) (*manager.Manager, func(), TracerType, error) { //nolint:revive // TODO +func LoadTracer(cfg *config.Config, mgrOpts manager.Options, connCloseEventHandler *perf.EventHandler) (*ddebpf.Manager, func(), TracerType, error) { kprobeAttachMethod := manager.AttachKprobeWithPerfEventOpen if cfg.AttachKprobesWithKprobeEventsABI { kprobeAttachMethod = manager.AttachKprobeWithKprobeEvents @@ -123,7 +89,7 @@ func LoadTracer(cfg *config.Config, mgrOpts manager.Options, connCloseEventHandl return nil, nil, TracerTypeCORE, fmt.Errorf("error determining if CO-RE tracer is supported: %w", err) } - var m *manager.Manager + var m *ddebpf.Manager var closeFn func() if err == nil { m, closeFn, err = coreTracerLoader(cfg, mgrOpts, connCloseEventHandler) @@ -174,18 +140,11 @@ func LoadTracer(cfg *config.Config, mgrOpts manager.Options, connCloseEventHandl return m, closeFn, TracerTypePrebuilt, err } -func loadTracerFromAsset(buf bytecode.AssetReader, runtimeTracer, coreTracer bool, config *config.Config, mgrOpts manager.Options, connCloseEventHandler ddebpf.EventHandler) (*manager.Manager, func(), error) { - m := ddebpf.NewManagerWithDefault(&manager.Manager{}, "network", &ebpftelemetry.ErrorsTelemetryModifier{}) - if err := initManager(m, connCloseEventHandler, runtimeTracer, config); err != nil { +func loadTracerFromAsset(buf bytecode.AssetReader, runtimeTracer, coreTracer bool, config *config.Config, mgrOpts manager.Options, connCloseEventHandler *perf.EventHandler) (*ddebpf.Manager, func(), error) { + m := ddebpf.NewManagerWithDefault(&manager.Manager{}, "network", &ebpftelemetry.ErrorsTelemetryModifier{}, connCloseEventHandler) + if err := initManager(m, runtimeTracer); err != nil { return nil, nil, fmt.Errorf("could not initialize manager: %w", err) } - switch connCloseEventHandler.(type) { - case *ddebpf.RingBufferHandler: - util.EnableRingbuffersViaMapEditor(&mgrOpts) - util.AddBoolConst(&mgrOpts, "ringbuffers_enabled", true) - } - - var undefinedProbes []manager.ProbeIdentificationPair var closeProtocolClassifierSocketFilterFn func() classificationSupported := ClassificationSupported(config) @@ -193,8 +152,9 @@ func loadTracerFromAsset(buf bytecode.AssetReader, runtimeTracer, coreTracer boo var tailCallsIdentifiersSet map[manager.ProbeIdentificationPair]struct{} if classificationSupported { - tailCallsIdentifiersSet = make(map[manager.ProbeIdentificationPair]struct{}, len(protocolClassificationTailCalls)) - for _, tailCall := range protocolClassificationTailCalls { + pcTailCalls := protocolClassificationTailCalls(config) + tailCallsIdentifiersSet = make(map[manager.ProbeIdentificationPair]struct{}, len(pcTailCalls)) + for _, tailCall := range pcTailCalls { tailCallsIdentifiersSet[tailCall.ProbeIdentificationPair] = struct{}{} } socketFilterProbe, _ := m.GetProbe(manager.ProbeIdentificationPair{ @@ -202,7 +162,7 @@ func loadTracerFromAsset(buf bytecode.AssetReader, runtimeTracer, coreTracer boo UID: probeUID, }) if socketFilterProbe == nil { - return nil, nil, fmt.Errorf("error retrieving protocol classifier socket filter") + return nil, nil, errors.New("error retrieving protocol classifier socket filter") } var err error @@ -211,9 +171,7 @@ func loadTracerFromAsset(buf bytecode.AssetReader, runtimeTracer, coreTracer boo return nil, nil, fmt.Errorf("error enabling protocol classifier: %w", err) } - //nolint:ineffassign,staticcheck // TODO(NET) Fix ineffassign linter // TODO(NET) Fix staticcheck linter - undefinedProbes = append(undefinedProbes, protocolClassificationTailCalls[0].ProbeIdentificationPair) - mgrOpts.TailCallRouter = append(mgrOpts.TailCallRouter, protocolClassificationTailCalls...) + mgrOpts.TailCallRouter = append(mgrOpts.TailCallRouter, pcTailCalls...) } else { // Kernels < 4.7.0 do not know about the per-cpu array map used // in classification, preventing the program to load even though @@ -267,11 +225,11 @@ func loadTracerFromAsset(buf bytecode.AssetReader, runtimeTracer, coreTracer boo return nil, nil, fmt.Errorf("failed to init ebpf manager: %w", err) } - return m.Manager, closeProtocolClassifierSocketFilterFn, nil + return m, closeProtocolClassifierSocketFilterFn, nil } -func loadCORETracer(config *config.Config, mgrOpts manager.Options, connCloseEventHandler ddebpf.EventHandler) (*manager.Manager, func(), error) { - var m *manager.Manager +func loadCORETracer(config *config.Config, mgrOpts manager.Options, connCloseEventHandler *perf.EventHandler) (*ddebpf.Manager, func(), error) { + var m *ddebpf.Manager var closeFn func() var err error err = ddebpf.LoadCOREAsset(netebpf.ModuleFileName("tracer", config.BPFDebug), func(ar bytecode.AssetReader, o manager.Options) error { @@ -288,7 +246,7 @@ func loadCORETracer(config *config.Config, mgrOpts manager.Options, connCloseEve return m, closeFn, err } -func loadRuntimeCompiledTracer(config *config.Config, mgrOpts manager.Options, connCloseEventHandler ddebpf.EventHandler) (*manager.Manager, func(), error) { +func loadRuntimeCompiledTracer(config *config.Config, mgrOpts manager.Options, connCloseEventHandler *perf.EventHandler) (*ddebpf.Manager, func(), error) { buf, err := getRuntimeCompiledTracer(config) if err != nil { return nil, nil, err @@ -298,7 +256,7 @@ func loadRuntimeCompiledTracer(config *config.Config, mgrOpts manager.Options, c return tracerLoaderFromAsset(buf, true, false, config, mgrOpts, connCloseEventHandler) } -func loadPrebuiltTracer(config *config.Config, mgrOpts manager.Options, connCloseEventHandler ddebpf.EventHandler) (*manager.Manager, func(), error) { +func loadPrebuiltTracer(config *config.Config, mgrOpts manager.Options, connCloseEventHandler *perf.EventHandler) (*ddebpf.Manager, func(), error) { buf, err := netebpf.ReadBPFModule(config.BPFDir, config.BPFDebug) if err != nil { return nil, nil, fmt.Errorf("could not read bpf module: %w", err) diff --git a/pkg/network/tracer/connection/kprobe/tracer_test.go b/pkg/network/tracer/connection/kprobe/tracer_test.go index 4c4a155ce12b95..6b731dc95ccc88 100644 --- a/pkg/network/tracer/connection/kprobe/tracer_test.go +++ b/pkg/network/tracer/connection/kprobe/tracer_test.go @@ -17,6 +17,7 @@ import ( ddebpf "github.com/DataDog/datadog-agent/pkg/ebpf" "github.com/DataDog/datadog-agent/pkg/ebpf/bytecode" + "github.com/DataDog/datadog-agent/pkg/ebpf/perf" "github.com/DataDog/datadog-agent/pkg/network/config" "github.com/DataDog/datadog-agent/pkg/util/kernel" ) @@ -169,14 +170,14 @@ func testTracerFallbackCOREAndRCErr(t *testing.T) { runFallbackTests(t, "CORE and RC error", true, true, tests) } -func loaderFunc(closeFn func(), err error) func(_ *config.Config, _ manager.Options, _ ddebpf.EventHandler) (*manager.Manager, func(), error) { - return func(_ *config.Config, _ manager.Options, _ ddebpf.EventHandler) (*manager.Manager, func(), error) { +func loaderFunc(closeFn func(), err error) func(_ *config.Config, _ manager.Options, _ *perf.EventHandler) (*ddebpf.Manager, func(), error) { + return func(_ *config.Config, _ manager.Options, _ *perf.EventHandler) (*ddebpf.Manager, func(), error) { return nil, closeFn, err } } -func prebuiltLoaderFunc(closeFn func(), err error) func(_ *config.Config, _ manager.Options, _ ddebpf.EventHandler) (*manager.Manager, func(), error) { - return func(_ *config.Config, _ manager.Options, _ ddebpf.EventHandler) (*manager.Manager, func(), error) { +func prebuiltLoaderFunc(closeFn func(), err error) func(_ *config.Config, _ manager.Options, _ *perf.EventHandler) (*ddebpf.Manager, func(), error) { + return func(_ *config.Config, _ manager.Options, _ *perf.EventHandler) (*ddebpf.Manager, func(), error) { return nil, closeFn, err } } @@ -216,7 +217,7 @@ func runFallbackTests(t *testing.T, desc string, coreErr, rcErr bool, tests []st cfg.AllowPrebuiltFallback = te.allowPrebuiltFallback prevOffsetGuessingRun := offsetGuessingRun - _, closeFn, tracerType, err := LoadTracer(cfg, manager.Options{}, nil, nil) + _, closeFn, tracerType, err := LoadTracer(cfg, manager.Options{}, nil) if te.err == nil { assert.NoError(t, err, "%+v", te) } else { @@ -251,12 +252,12 @@ func TestCORETracerSupported(t *testing.T) { }) coreCalled := false - coreTracerLoader = func(*config.Config, manager.Options, ddebpf.EventHandler) (*manager.Manager, func(), error) { + coreTracerLoader = func(*config.Config, manager.Options, *perf.EventHandler) (*ddebpf.Manager, func(), error) { coreCalled = true return nil, nil, nil } prebuiltCalled := false - prebuiltTracerLoader = func(*config.Config, manager.Options, ddebpf.EventHandler) (*manager.Manager, func(), error) { + prebuiltTracerLoader = func(*config.Config, manager.Options, *perf.EventHandler) (*ddebpf.Manager, func(), error) { prebuiltCalled = true return nil, nil, nil } @@ -270,7 +271,7 @@ func TestCORETracerSupported(t *testing.T) { cfg := config.New() cfg.EnableCORE = true cfg.AllowRuntimeCompiledFallback = false - _, _, _, err = LoadTracer(cfg, manager.Options{}, nil, nil) + _, _, _, err = LoadTracer(cfg, manager.Options{}, nil) assert.False(t, prebuiltCalled) if kv < kernel.VersionCode(4, 4, 128) && platform != "centos" && platform != "redhat" { assert.False(t, coreCalled) @@ -283,7 +284,7 @@ func TestCORETracerSupported(t *testing.T) { coreCalled = false prebuiltCalled = false cfg.AllowRuntimeCompiledFallback = true - _, _, _, err = LoadTracer(cfg, manager.Options{}, nil, nil) + _, _, _, err = LoadTracer(cfg, manager.Options{}, nil) assert.NoError(t, err) if kv < kernel.VersionCode(4, 4, 128) && platform != "centos" && platform != "redhat" { assert.False(t, coreCalled) @@ -296,7 +297,7 @@ func TestCORETracerSupported(t *testing.T) { func TestDefaultKprobeMaxActiveSet(t *testing.T) { prevLoader := tracerLoaderFromAsset - tracerLoaderFromAsset = func(_ bytecode.AssetReader, _, _ bool, _ *config.Config, mgrOpts manager.Options, _ ddebpf.EventHandler) (*manager.Manager, func(), error) { + tracerLoaderFromAsset = func(_ bytecode.AssetReader, _, _ bool, _ *config.Config, mgrOpts manager.Options, _ *perf.EventHandler) (*ddebpf.Manager, func(), error) { assert.Equal(t, mgrOpts.DefaultKProbeMaxActive, 128) return nil, nil, nil } @@ -306,7 +307,7 @@ func TestDefaultKprobeMaxActiveSet(t *testing.T) { cfg := config.New() cfg.EnableCORE = true cfg.AllowRuntimeCompiledFallback = false - _, _, _, err := LoadTracer(cfg, manager.Options{DefaultKProbeMaxActive: 128}, nil, nil) + _, _, _, err := LoadTracer(cfg, manager.Options{DefaultKProbeMaxActive: 128}, nil) require.NoError(t, err) }) @@ -314,7 +315,7 @@ func TestDefaultKprobeMaxActiveSet(t *testing.T) { cfg := config.New() cfg.EnableCORE = false cfg.AllowRuntimeCompiledFallback = false - _, _, _, err := LoadTracer(cfg, manager.Options{DefaultKProbeMaxActive: 128}, nil, nil) + _, _, _, err := LoadTracer(cfg, manager.Options{DefaultKProbeMaxActive: 128}, nil) require.NoError(t, err) }) @@ -322,7 +323,7 @@ func TestDefaultKprobeMaxActiveSet(t *testing.T) { cfg := config.New() cfg.EnableCORE = false cfg.AllowRuntimeCompiledFallback = true - _, _, _, err := LoadTracer(cfg, manager.Options{DefaultKProbeMaxActive: 128}, nil, nil) + _, _, _, err := LoadTracer(cfg, manager.Options{DefaultKProbeMaxActive: 128}, nil) require.NoError(t, err) }) } diff --git a/pkg/network/tracer/connection/perf_batching.go b/pkg/network/tracer/connection/perf_batching.go index 9c20044cde3daf..93754574a10f2c 100644 --- a/pkg/network/tracer/connection/perf_batching.go +++ b/pkg/network/tracer/connection/perf_batching.go @@ -8,6 +8,7 @@ package connection import ( + "errors" "fmt" "time" @@ -17,6 +18,7 @@ import ( "github.com/DataDog/datadog-agent/pkg/network" netebpf "github.com/DataDog/datadog-agent/pkg/network/ebpf" "github.com/DataDog/datadog-agent/pkg/network/ebpf/probes" + ddsync "github.com/DataDog/datadog-agent/pkg/util/sync" ) // perfBatchManager is responsible for two things: @@ -27,16 +29,18 @@ import ( // The motivation is to impose an upper limit on how long a TCP close connection // event remains stored in the eBPF map before being processed by the NetworkAgent. type perfBatchManager struct { - batchMap *maps.GenericMap[uint32, netebpf.Batch] - extractor *batchExtractor - ch *cookieHasher + batchMap *maps.GenericMap[uint32, netebpf.Batch] + extractor *batchExtractor + ch *cookieHasher + connGetter ddsync.PoolGetter[network.ConnectionStats] + callback func(stats *network.ConnectionStats) } // newPerfBatchManager returns a new `PerfBatchManager` and initializes the // eBPF map that holds the tcp_close batch objects. -func newPerfBatchManager(batchMap *maps.GenericMap[uint32, netebpf.Batch], extractor *batchExtractor) (*perfBatchManager, error) { +func newPerfBatchManager(batchMap *maps.GenericMap[uint32, netebpf.Batch], extractor *batchExtractor, getter ddsync.PoolGetter[network.ConnectionStats], callback func(stats *network.ConnectionStats)) (*perfBatchManager, error) { if batchMap == nil { - return nil, fmt.Errorf("batchMap is nil") + return nil, errors.New("batchMap is nil") } for cpu := uint32(0); cpu < uint32(extractor.NumCPUs()); cpu++ { @@ -51,25 +55,18 @@ func newPerfBatchManager(batchMap *maps.GenericMap[uint32, netebpf.Batch], extra } return &perfBatchManager{ - batchMap: batchMap, - extractor: extractor, - ch: newCookieHasher(), + batchMap: batchMap, + extractor: extractor, + ch: newCookieHasher(), + connGetter: getter, + callback: callback, }, nil } -// ExtractBatchInto extracts from the given batch all connections that haven't been processed yet. -func (p *perfBatchManager) ExtractBatchInto(buffer *network.ConnectionBuffer, b *netebpf.Batch) { - for rc := p.extractor.NextConnection(b); rc != nil; rc = p.extractor.NextConnection(b) { - conn := buffer.Next() - populateConnStats(conn, &rc.Tup, &rc.Conn_stats, p.ch) - updateTCPStats(conn, &rc.Tcp_stats) - } -} - -// GetPendingConns return all connections that are in batches that are not yet full. +// Flush return all connections that are in batches that are not yet full. // It tracks which connections have been processed by this call, by batch id. // This prevents double-processing of connections between GetPendingConns and Extract. -func (p *perfBatchManager) GetPendingConns(buffer *network.ConnectionBuffer) { +func (p *perfBatchManager) Flush() { b := new(netebpf.Batch) for cpu := uint32(0); cpu < uint32(p.extractor.NumCPUs()); cpu++ { err := p.batchMap.Lookup(&cpu, b) @@ -78,20 +75,23 @@ func (p *perfBatchManager) GetPendingConns(buffer *network.ConnectionBuffer) { } for rc := p.extractor.NextConnection(b); rc != nil; rc = p.extractor.NextConnection(b) { - c := buffer.Next() - populateConnStats(c, &rc.Tup, &rc.Conn_stats, p.ch) - updateTCPStats(c, &rc.Tcp_stats) + c := p.connGetter.Get() + c.FromConn(rc) + p.ch.Hash(c) + p.callback(c) } } + // indicate we are done with all pending connection + p.callback(nil) p.extractor.CleanupExpiredState(time.Now()) } -func newConnBatchManager(mgr *manager.Manager, extractor *batchExtractor) (*perfBatchManager, error) { +func newConnBatchManager(mgr *manager.Manager, extractor *batchExtractor, connGetter ddsync.PoolGetter[network.ConnectionStats], closedCallback func(stats *network.ConnectionStats)) (*perfBatchManager, error) { connCloseMap, err := maps.GetMap[uint32, netebpf.Batch](mgr, probes.ConnCloseBatchMap) if err != nil { return nil, fmt.Errorf("unable to get map %s: %s", probes.ConnCloseBatchMap, err) } - batchMgr, err := newPerfBatchManager(connCloseMap, extractor) + batchMgr, err := newPerfBatchManager(connCloseMap, extractor, connGetter, closedCallback) if err != nil { return nil, err } diff --git a/pkg/network/tracer/connection/perf_batching_test.go b/pkg/network/tracer/connection/perf_batching_test.go index c7d22aaff83aa8..82095728412c4f 100644 --- a/pkg/network/tracer/connection/perf_batching_test.go +++ b/pkg/network/tracer/connection/perf_batching_test.go @@ -19,6 +19,7 @@ import ( ebpfmaps "github.com/DataDog/datadog-agent/pkg/ebpf/maps" "github.com/DataDog/datadog-agent/pkg/network" netebpf "github.com/DataDog/datadog-agent/pkg/network/ebpf" + ddsync "github.com/DataDog/datadog-agent/pkg/util/sync" ) const ( @@ -26,7 +27,15 @@ const ( ) func TestGetPendingConns(t *testing.T) { - manager := newTestBatchManager(t) + var pendingConns []*network.ConnectionStats + flushDone := make(chan struct{}) + manager := newTestBatchManager(t, func(conn *network.ConnectionStats) { + if conn == nil { + flushDone <- struct{}{} + return + } + pendingConns = append(pendingConns, conn) + }) batch := new(netebpf.Batch) batch.Id = 0 @@ -41,9 +50,8 @@ func TestGetPendingConns(t *testing.T) { } updateBatch() - buffer := network.NewConnectionBuffer(256, 256) - manager.GetPendingConns(buffer) - pendingConns := buffer.Connections() + go manager.Flush() + <-flushDone assert.GreaterOrEqual(t, len(pendingConns), 2) for _, pid := range []uint32{pidMax + 1, pidMax + 2} { found := false @@ -64,9 +72,9 @@ func TestGetPendingConns(t *testing.T) { updateBatch() // We should now get only the connection that hasn't been processed before - buffer.Reset() - manager.GetPendingConns(buffer) - pendingConns = buffer.Connections() + go manager.Flush() + pendingConns = pendingConns[:0] + <-flushDone assert.GreaterOrEqual(t, len(pendingConns), 1) var found bool for _, p := range pendingConns { @@ -80,7 +88,12 @@ func TestGetPendingConns(t *testing.T) { } func TestPerfBatchStateCleanup(t *testing.T) { - manager := newTestBatchManager(t) + flushDone := make(chan struct{}) + manager := newTestBatchManager(t, func(stats *network.ConnectionStats) { + if stats == nil { + flushDone <- struct{}{} + } + }) manager.extractor.expiredStateInterval = 100 * time.Millisecond batch := new(netebpf.Batch) @@ -93,14 +106,15 @@ func TestPerfBatchStateCleanup(t *testing.T) { err := manager.batchMap.Put(&cpu, batch) require.NoError(t, err) - buffer := network.NewConnectionBuffer(256, 256) - manager.GetPendingConns(buffer) + go manager.Flush() + <-flushDone _, ok := manager.extractor.stateByCPU[cpu].processed[batch.Id] require.True(t, ok) assert.Equal(t, uint16(2), manager.extractor.stateByCPU[cpu].processed[batch.Id].offset) manager.extractor.CleanupExpiredState(time.Now().Add(manager.extractor.expiredStateInterval)) - manager.GetPendingConns(buffer) + go manager.Flush() + <-flushDone // state should not have been cleaned up, since no more connections have happened _, ok = manager.extractor.stateByCPU[cpu].processed[batch.Id] @@ -108,7 +122,7 @@ func TestPerfBatchStateCleanup(t *testing.T) { assert.Equal(t, uint16(2), manager.extractor.stateByCPU[cpu].processed[batch.Id].offset) } -func newTestBatchManager(t *testing.T) *perfBatchManager { +func newTestBatchManager(t *testing.T, callback func(*network.ConnectionStats)) *perfBatchManager { require.NoError(t, rlimit.RemoveMemlock()) m, err := ebpf.NewMap(&ebpf.MapSpec{ Type: ebpf.Hash, @@ -122,7 +136,8 @@ func newTestBatchManager(t *testing.T) *perfBatchManager { gm, err := ebpfmaps.Map[uint32, netebpf.Batch](m) require.NoError(t, err) extractor := newBatchExtractor(numTestCPUs) - mgr, err := newPerfBatchManager(gm, extractor) + connPool := ddsync.NewDefaultTypedPool[network.ConnectionStats]() + mgr, err := newPerfBatchManager(gm, extractor, connPool, callback) require.NoError(t, err) return mgr } diff --git a/pkg/network/tracer/connection/tcp_close_consumer.go b/pkg/network/tracer/connection/tcp_close_consumer.go index 3c9ff45dba9f0e..5e1d45fae5afa6 100644 --- a/pkg/network/tracer/connection/tcp_close_consumer.go +++ b/pkg/network/tracer/connection/tcp_close_consumer.go @@ -9,46 +9,48 @@ package connection import ( "sync" - "time" - "unsafe" + "sync/atomic" - ddebpf "github.com/DataDog/datadog-agent/pkg/ebpf" + "github.com/DataDog/datadog-agent/pkg/ebpf/perf" "github.com/DataDog/datadog-agent/pkg/network" - netebpf "github.com/DataDog/datadog-agent/pkg/network/ebpf" "github.com/DataDog/datadog-agent/pkg/status/health" "github.com/DataDog/datadog-agent/pkg/telemetry" "github.com/DataDog/datadog-agent/pkg/util/log" + ddsync "github.com/DataDog/datadog-agent/pkg/util/sync" ) const closeConsumerModuleName = "network_tracer__ebpf" // Telemetry var closeConsumerTelemetry = struct { - perfReceived telemetry.Counter - perfLost telemetry.Counter + perfReceived telemetry.Counter + flushReceived telemetry.Counter }{ telemetry.NewCounter(closeConsumerModuleName, "closed_conn_polling_received", []string{}, "Counter measuring the number of closed connections received"), - telemetry.NewCounter(closeConsumerModuleName, "closed_conn_polling_lost", []string{}, "Counter measuring the number of closed connection batches lost (were transmitted from ebpf but never received)"), + telemetry.NewCounter(closeConsumerModuleName, "closed_conn_flush_received", []string{}, "Counter measuring the number of closed connections received during flush"), } type tcpCloseConsumer struct { - eventHandler ddebpf.EventHandler - batchManager *perfBatchManager - requests chan chan struct{} - buffer *network.ConnectionBuffer - once sync.Once - closed chan struct{} - ch *cookieHasher + requests chan chan struct{} + once sync.Once + closed chan struct{} + + flusher perf.Flusher + callback func(*network.ConnectionStats) + releaser ddsync.PoolReleaser[network.ConnectionStats] + flushChannel chan chan struct{} + flushing *atomic.Bool } -func newTCPCloseConsumer(eventHandler ddebpf.EventHandler, batchManager *perfBatchManager) *tcpCloseConsumer { +func newTCPCloseConsumer(flusher perf.Flusher, releaser ddsync.PoolReleaser[network.ConnectionStats]) *tcpCloseConsumer { return &tcpCloseConsumer{ - eventHandler: eventHandler, - batchManager: batchManager, requests: make(chan chan struct{}), - buffer: network.NewConnectionBuffer(netebpf.BatchSize, netebpf.BatchSize), closed: make(chan struct{}), - ch: newCookieHasher(), + flusher: flusher, + releaser: releaser, + callback: func(*network.ConnectionStats) {}, + flushChannel: make(chan chan struct{}, 1), + flushing: &atomic.Bool{}, } } @@ -75,101 +77,52 @@ func (c *tcpCloseConsumer) Stop() { if c == nil { return } - c.eventHandler.Stop() c.once.Do(func() { close(c.closed) }) } -func (c *tcpCloseConsumer) extractConn(data []byte) { - ct := (*netebpf.Conn)(unsafe.Pointer(&data[0])) - conn := c.buffer.Next() - populateConnStats(conn, &ct.Tup, &ct.Conn_stats, c.ch) - updateTCPStats(conn, &ct.Tcp_stats) +func (c *tcpCloseConsumer) Callback(conn *network.ConnectionStats) { + // sentinel record post-flush + if conn == nil { + request := <-c.flushChannel + close(request) + c.flushing.Store(false) + return + } + + closeConsumerTelemetry.perfReceived.Inc() + if c.flushing.Load() { + closeConsumerTelemetry.flushReceived.Inc() + } + c.callback(conn) + c.releaser.Put(conn) } func (c *tcpCloseConsumer) Start(callback func(*network.ConnectionStats)) { if c == nil { return } - health := health.RegisterLiveness("network-tracer") - - var ( - then = time.Now() - closedCount uint64 - lostSamplesCount uint64 - ) + c.callback = callback + liveHealth := health.RegisterLiveness("network-tracer") go func() { defer func() { - err := health.Deregister() + err := liveHealth.Deregister() if err != nil { log.Warnf("error de-registering health check: %s", err) } }() - dataChannel := c.eventHandler.DataChannel() - lostChannel := c.eventHandler.LostChannel() for { select { - case <-c.closed: return - case <-health.C: - case batchData, ok := <-dataChannel: - if !ok { - return - } - - l := len(batchData.Data) - switch { - case l >= netebpf.SizeofBatch: - batch := netebpf.ToBatch(batchData.Data) - c.batchManager.ExtractBatchInto(c.buffer, batch) - case l >= netebpf.SizeofConn: - c.extractConn(batchData.Data) - default: - log.Errorf("unknown type received from perf buffer, skipping. data size=%d, expecting %d or %d", len(batchData.Data), netebpf.SizeofConn, netebpf.SizeofBatch) - continue - } - - closeConsumerTelemetry.perfReceived.Add(float64(c.buffer.Len())) - closedCount += uint64(c.buffer.Len()) - conns := c.buffer.Connections() - for i := range conns { - callback(&conns[i]) - } - c.buffer.Reset() - batchData.Done() - // lost events only occur when using perf buffers - case lc, ok := <-lostChannel: - if !ok { - return - } - closeConsumerTelemetry.perfLost.Add(float64(lc)) - lostSamplesCount += lc + case <-liveHealth.C: case request := <-c.requests: - oneTimeBuffer := network.NewConnectionBuffer(32, 32) - c.batchManager.GetPendingConns(oneTimeBuffer) - conns := oneTimeBuffer.Connections() - for i := range conns { - callback(&conns[i]) - } - close(request) - - closedCount += uint64(oneTimeBuffer.Len()) - now := time.Now() - elapsed := now.Sub(then) - then = now - log.Debugf( - "tcp close summary: closed_count=%d elapsed=%s closed_rate=%.2f/s lost_samples_count=%d", - closedCount, - elapsed, - float64(closedCount)/elapsed.Seconds(), - lostSamplesCount, - ) - closedCount = 0 - lostSamplesCount = 0 + c.flushing.Store(true) + c.flushChannel <- request + c.flusher.Flush() } } }() diff --git a/pkg/network/tracer/connection/tcp_close_consumer_test.go b/pkg/network/tracer/connection/tcp_close_consumer_test.go index 96db4d9bfc04cc..6bdbf4024962eb 100644 --- a/pkg/network/tracer/connection/tcp_close_consumer_test.go +++ b/pkg/network/tracer/connection/tcp_close_consumer_test.go @@ -11,15 +11,10 @@ import ( "testing" "github.com/stretchr/testify/require" - - "github.com/DataDog/datadog-agent/pkg/ebpf" ) func TestTcpCloseConsumerStopRace(t *testing.T) { - pf := ebpf.NewPerfHandler(10) - require.NotNil(t, pf) - - c := newTCPCloseConsumer(pf, nil) + c := newTCPCloseConsumer(nil, nil) require.NotNil(t, c) c.Stop() diff --git a/pkg/network/tracer/connection/util/conn_tracer.go b/pkg/network/tracer/connection/util/conn_tracer.go index 1fc7e129340582..811c656e7a43ad 100644 --- a/pkg/network/tracer/connection/util/conn_tracer.go +++ b/pkg/network/tracer/connection/util/conn_tracer.go @@ -14,16 +14,10 @@ import ( manager "github.com/DataDog/ebpf-manager" cebpf "github.com/cilium/ebpf" - "github.com/cilium/ebpf/asm" - "github.com/DataDog/datadog-agent/pkg/ebpf" - ebpftelemetry "github.com/DataDog/datadog-agent/pkg/ebpf/telemetry" "github.com/DataDog/datadog-agent/pkg/network" - "github.com/DataDog/datadog-agent/pkg/network/config" netebpf "github.com/DataDog/datadog-agent/pkg/network/ebpf" - "github.com/DataDog/datadog-agent/pkg/network/ebpf/probes" "github.com/DataDog/datadog-agent/pkg/process/util" - "github.com/DataDog/datadog-agent/pkg/util/log" ) // toPowerOf2 converts a number to its nearest power of 2 @@ -32,9 +26,9 @@ func toPowerOf2(x int) int { return int(math.Pow(2, math.Round(log2))) } -// computeDefaultClosedConnRingBufferSize is the default buffer size of the ring buffer for closed connection events. +// ComputeDefaultClosedConnRingBufferSize is the default buffer size of the ring buffer for closed connection events. // Must be a power of 2 and a multiple of the page size -func computeDefaultClosedConnRingBufferSize() int { +func ComputeDefaultClosedConnRingBufferSize() int { numCPUs, err := cebpf.PossibleCPU() if err != nil { numCPUs = 1 @@ -42,68 +36,12 @@ func computeDefaultClosedConnRingBufferSize() int { return 8 * toPowerOf2(numCPUs) * os.Getpagesize() } -// computeDefaultClosedConnPerfBufferSize is the default buffer size of the perf buffer for closed connection events. +// ComputeDefaultClosedConnPerfBufferSize is the default buffer size of the perf buffer for closed connection events. // Must be a multiple of the page size -func computeDefaultClosedConnPerfBufferSize() int { +func ComputeDefaultClosedConnPerfBufferSize() int { return 8 * os.Getpagesize() } -// EnableRingbuffersViaMapEditor sets up the ring buffer for closed connection events via a map editor -func EnableRingbuffersViaMapEditor(mgrOpts *manager.Options) { - mgrOpts.MapSpecEditors[probes.ConnCloseEventMap] = manager.MapSpecEditor{ - Type: cebpf.RingBuf, - MaxEntries: uint32(computeDefaultClosedConnRingBufferSize()), - KeySize: 0, - ValueSize: 0, - EditorFlag: manager.EditType | manager.EditMaxEntries | manager.EditKeyValue, - } -} - -// SetupHandler sets up the closed connection event handler -func SetupHandler(eventHandler ebpf.EventHandler, mgr *ebpf.Manager, cfg *config.Config, perfSize int, mapName probes.BPFMapName) { - switch handler := eventHandler.(type) { - case *ebpf.RingBufferHandler: - log.Infof("Setting up connection handler for map %v with ring buffer", mapName) - rb := &manager.RingBuffer{ - Map: manager.Map{Name: mapName}, - RingBufferOptions: manager.RingBufferOptions{ - RecordGetter: handler.RecordGetter, - RecordHandler: handler.RecordHandler, - TelemetryEnabled: cfg.InternalTelemetryEnabled, - }, - } - mgr.RingBuffers = append(mgr.RingBuffers, rb) - ebpftelemetry.ReportRingBufferTelemetry(rb) - case *ebpf.PerfHandler: - log.Infof("Setting up connection handler for map %v with perf buffer", mapName) - pm := &manager.PerfMap{ - Map: manager.Map{Name: mapName}, - PerfMapOptions: manager.PerfMapOptions{ - PerfRingBufferSize: perfSize, - Watermark: 1, - RecordHandler: handler.RecordHandler, - LostHandler: handler.LostHandler, - RecordGetter: handler.RecordGetter, - TelemetryEnabled: cfg.InternalTelemetryEnabled, - }, - } - mgr.PerfMaps = append(mgr.PerfMaps, pm) - ebpftelemetry.ReportPerfMapTelemetry(pm) - helperCallRemover := ebpf.NewHelperCallRemover(asm.FnRingbufOutput) - err := helperCallRemover.BeforeInit(mgr.Manager, mgr.Name, nil) - if err != nil { - log.Error("Failed to remove helper calls from eBPF programs: ", err) - } - default: - log.Errorf("Failed to set up connection handler for map %v: unknown event handler type", mapName) - } -} - -// SetupClosedConnHandler sets up the closed connection event handler -func SetupClosedConnHandler(connCloseEventHandler ebpf.EventHandler, mgr *ebpf.Manager, cfg *config.Config) { - SetupHandler(connCloseEventHandler, mgr, cfg, computeDefaultClosedConnPerfBufferSize(), probes.ConnCloseEventMap) -} - // AddBoolConst modifies the options to include a constant editor for a boolean value func AddBoolConst(options *manager.Options, name string, flag bool) { val := uint64(1) diff --git a/pkg/util/encoding/binary.go b/pkg/util/encoding/binary.go new file mode 100644 index 00000000000000..1ad9d2a220788c --- /dev/null +++ b/pkg/util/encoding/binary.go @@ -0,0 +1,36 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2024-present Datadog, Inc. + +// Package encoding is for utilities relating to the encoding package from the stdlib +package encoding + +import ( + "encoding" +) + +// BinaryUnmarshalCallback returns a function that will decode the argument byte slice into T +// using `newFn` to create an instance of T and the encoding.BinaryUnmarshaler interface to do the actual conversion. +// `callback` will be called with the resulting T. +// If the argument byte slice is empty, callback will be called with `nil`. +// Unmarshalling errors will be provided to the callback as the second argument. The data argument to the callback +// may still be non-nil even if there was an error. This allows the callback to handle the allocated object, even +// in the face of errors. +func BinaryUnmarshalCallback[T encoding.BinaryUnmarshaler](newFn func() T, callback func(T, error)) func(buf []byte) { + return func(buf []byte) { + if len(buf) == 0 { + var nilvalue T + callback(nilvalue, nil) + return + } + + d := newFn() + if err := d.UnmarshalBinary(buf); err != nil { + // pass d here so callback can choose how to deal with the data + callback(d, err) + return + } + callback(d, nil) + } +} diff --git a/pkg/util/encoding/binary_test.go b/pkg/util/encoding/binary_test.go new file mode 100644 index 00000000000000..4412d5c3176510 --- /dev/null +++ b/pkg/util/encoding/binary_test.go @@ -0,0 +1,62 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2024-present Datadog, Inc. + +package encoding + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" +) + +type emptyTestType struct { +} + +func (tt *emptyTestType) UnmarshalBinary(_ []byte) error { + return nil +} + +type errorTestType struct{} + +func (tt *errorTestType) UnmarshalBinary(_ []byte) error { + return errors.New("error") +} + +type dataTestType struct { + buf []byte +} + +func (tt *dataTestType) UnmarshalBinary(data []byte) error { + tt.buf = data + return nil +} + +func TestBinaryUnmarshalCallback(t *testing.T) { + cb := BinaryUnmarshalCallback(func() *emptyTestType { + return new(emptyTestType) + }, func(x *emptyTestType, err error) { + assert.Nil(t, x) + assert.NoError(t, err) + }) + cb(nil) + cb([]byte{}) + + cb = BinaryUnmarshalCallback(func() *errorTestType { + return new(errorTestType) + }, func(x *errorTestType, err error) { + assert.NotNil(t, x) + assert.Error(t, err) + }) + cb([]byte{1, 2}) + + cb = BinaryUnmarshalCallback(func() *dataTestType { + return new(dataTestType) + }, func(x *dataTestType, err error) { + assert.Equal(t, []byte{1, 2}, x.buf) + assert.NoError(t, err) + }) + cb([]byte{1, 2}) +} diff --git a/pkg/util/slices/map.go b/pkg/util/slices/map.go new file mode 100644 index 00000000000000..049d4f4a25e50f --- /dev/null +++ b/pkg/util/slices/map.go @@ -0,0 +1,16 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2024-present Datadog, Inc. + +// Package slices are utilities to deal with slices +package slices + +// Map returns a new slice with the result of applying fn to each element. +func Map[S ~[]E, E any, RE any](s S, fn func(E) RE) []RE { + x := make([]RE, 0, len(s)) + for _, v := range s { + x = append(x, fn(v)) + } + return x +} diff --git a/pkg/util/slices/map_test.go b/pkg/util/slices/map_test.go new file mode 100644 index 00000000000000..0bf48b54a5a21b --- /dev/null +++ b/pkg/util/slices/map_test.go @@ -0,0 +1,19 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2024-present Datadog, Inc. + +package slices + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestMap(t *testing.T) { + x := Map([]int{1, 2, 4, 8}, func(v int) int { + return v * v + }) + assert.Equal(t, []int{1, 4, 16, 64}, x) +} diff --git a/pkg/util/sync/pool.go b/pkg/util/sync/pool.go index fdf1872c666ad5..231a6a53b9efce 100644 --- a/pkg/util/sync/pool.go +++ b/pkg/util/sync/pool.go @@ -8,6 +8,22 @@ package sync import "sync" +// PoolReleaser is interface that wraps a sync.Pool Put function +type PoolReleaser[K any] interface { + Put(*K) +} + +// PoolGetter is interface that wraps a sync.Pool Get function +type PoolGetter[K any] interface { + Get() *K +} + +// Pool is a combination interface of PoolGetter and PoolReleaser +type Pool[K any] interface { + PoolGetter[K] + PoolReleaser[K] +} + // TypedPool is a type-safe version of sync.Pool type TypedPool[K any] struct { p sync.Pool