Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

in buffer batching with perf buffers for NPM #31402

Open
wants to merge 24 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
0f4104e
in buffer batching with perf buffers for NPM
brycekahle Apr 18, 2024
79a6bcc
add tests
brycekahle Nov 25, 2024
0b2e89b
fix lint
brycekahle Nov 25, 2024
f1cac6a
address review comments
brycekahle Nov 26, 2024
5bfb4d2
add minor comments
brycekahle Nov 26, 2024
5bc0068
fix typo
brycekahle Dec 2, 2024
4e882c5
rename to Flusher
brycekahle Dec 2, 2024
d01218c
rename to custom batching
brycekahle Dec 2, 2024
644d3fc
deprecate configs in sp namespace
brycekahle Dec 2, 2024
3d14fdc
use Modifier interface for event handler
brycekahle Dec 2, 2024
8693f02
add comments about handlers
brycekahle Dec 2, 2024
41908f9
refactor API to be more understandable
brycekahle Dec 2, 2024
278c30c
fix constant value
brycekahle Dec 2, 2024
757fb63
fix helper call removal
brycekahle Dec 2, 2024
965035e
add metric for conns received during flush
brycekahle Dec 4, 2024
2bebabf
remove panic from BinaryUnmarshalCallback
brycekahle Dec 6, 2024
da8ebd6
change default wakeup_events to match batch size
brycekahle Dec 9, 2024
dbb1272
extend in-buffer batching to ringbufs
brycekahle Dec 10, 2024
ad60da3
add channel to read from perf/ringbuf faster
brycekahle Dec 10, 2024
e6eb153
add telemetry for perf channel len
brycekahle Dec 13, 2024
0f1ed30
calculate channel size based on batching used
brycekahle Dec 16, 2024
fed0a57
add comments explaining the `updateMaxTelemetry` function
usamasaqib Dec 23, 2024
496324c
add explanatory comments about the BeforeInit implementation of Event…
usamasaqib Dec 23, 2024
575cbf0
Merge branch 'main' into bryce.kahle/perf-buffer-npm-only
usamasaqib Dec 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -454,10 +454,12 @@
/pkg/util/crio/ @DataDog/container-integrations
/pkg/util/docker/ @DataDog/container-integrations
/pkg/util/ecs/ @DataDog/container-integrations
/pkg/util/encoding/ @DataDog/ebpf-platform
/pkg/util/funcs/ @DataDog/ebpf-platform
/pkg/util/gpu/ @DataDog/container-platform
/pkg/util/kernel/ @DataDog/ebpf-platform
/pkg/util/safeelf/ @DataDog/ebpf-platform
/pkg/util/slices/ @DataDog/ebpf-platform
/pkg/util/ktime @DataDog/agent-security
/pkg/util/kubernetes/ @DataDog/container-integrations @DataDog/container-platform @DataDog/container-app
/pkg/util/podman/ @DataDog/container-integrations
Expand Down
4 changes: 4 additions & 0 deletions cmd/system-probe/config/adjust_npm.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ const (
func adjustNetwork(cfg model.Config) {
ebpflessEnabled := cfg.GetBool(netNS("enable_ebpfless"))

deprecateInt(cfg, spNS("closed_connection_flush_threshold"), netNS("closed_connection_flush_threshold"))
deprecateInt(cfg, spNS("closed_channel_size"), netNS("closed_channel_size"))
applyDefault(cfg, netNS("closed_channel_size"), 500)

limitMaxInt(cfg, spNS("max_conns_per_message"), maxConnsMessageBatchSize)

if cfg.GetBool(spNS("disable_tcp")) {
Expand Down
8 changes: 6 additions & 2 deletions pkg/config/setup/system_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,11 @@ func InitSystemProbeConfig(cfg pkgconfigmodel.Config) {
cfg.BindEnvAndSetDefault(join(spNS, "max_tracked_connections"), 65536)
cfg.BindEnv(join(spNS, "max_closed_connections_buffered"))
cfg.BindEnv(join(netNS, "max_failed_connections_buffered"))
cfg.BindEnvAndSetDefault(join(spNS, "closed_connection_flush_threshold"), 0)
cfg.BindEnvAndSetDefault(join(spNS, "closed_channel_size"), 500)
cfg.BindEnv(join(spNS, "closed_connection_flush_threshold"))
cfg.BindEnv(join(netNS, "closed_connection_flush_threshold"))
cfg.BindEnv(join(spNS, "closed_channel_size"))
cfg.BindEnv(join(netNS, "closed_channel_size"))
cfg.BindEnvAndSetDefault(join(netNS, "closed_buffer_wakeup_count"), 4)
cfg.BindEnvAndSetDefault(join(spNS, "max_connection_state_buffered"), 75000)

cfg.BindEnvAndSetDefault(join(spNS, "disable_dns_inspection"), false, "DD_DISABLE_DNS_INSPECTION")
Expand All @@ -212,6 +215,7 @@ func InitSystemProbeConfig(cfg pkgconfigmodel.Config) {
cfg.BindEnvAndSetDefault(join(spNS, "enable_conntrack_all_namespaces"), true, "DD_SYSTEM_PROBE_ENABLE_CONNTRACK_ALL_NAMESPACES")
cfg.BindEnvAndSetDefault(join(netNS, "enable_protocol_classification"), true, "DD_ENABLE_PROTOCOL_CLASSIFICATION")
cfg.BindEnvAndSetDefault(join(netNS, "enable_ringbuffers"), true, "DD_SYSTEM_PROBE_NETWORK_ENABLE_RINGBUFFERS")
cfg.BindEnvAndSetDefault(join(netNS, "enable_custom_batching"), false, "DD_SYSTEM_PROBE_NETWORK_ENABLE_CUSTOM_BATCHING")
cfg.BindEnvAndSetDefault(join(netNS, "enable_tcp_failed_connections"), true, "DD_SYSTEM_PROBE_NETWORK_ENABLE_FAILED_CONNS")
cfg.BindEnvAndSetDefault(join(netNS, "ignore_conntrack_init_failure"), false, "DD_SYSTEM_PROBE_NETWORK_IGNORE_CONNTRACK_INIT_FAILURE")
cfg.BindEnvAndSetDefault(join(netNS, "conntrack_init_timeout"), 10*time.Second)
Expand Down
13 changes: 13 additions & 0 deletions pkg/ebpf/c/bpf_helpers_custom.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,17 @@ unsigned long long load_half(void *skb,
unsigned long long load_word(void *skb,
unsigned long long off) asm("llvm.bpf.load.word");

// declare our own versions of these enums, because they don't exist on <5.8
enum {
DD_BPF_RB_NO_WAKEUP = 1,
DD_BPF_RB_FORCE_WAKEUP = 2,
};

enum {
DD_BPF_RB_AVAIL_DATA = 0,
DD_BPF_RB_RING_SIZE = 1,
DD_BPF_RB_CONS_POS = 2,
DD_BPF_RB_PROD_POS = 3,
};

#endif
37 changes: 37 additions & 0 deletions pkg/ebpf/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,40 @@ func (m *Manager) InitWithOptions(bytecode io.ReaderAt, opts *manager.Options) e
}
return nil
}

type modifierPreStart interface {
PreStart() error
}

// Start is a wrapper around ebpf-manager.Manager.Start
func (m *Manager) Start() error {
for _, mod := range m.EnabledModifiers {
if ps, ok := mod.(modifierPreStart); ok {
if err := ps.PreStart(); err != nil {
return fmt.Errorf("prestart %s manager modifier: %w", mod, err)
}
}
}
return m.Manager.Start()
}

type modifierAfterStop interface {
AfterStop(manager.MapCleanupType) error
}

// Stop is a wrapper around ebpf-manager.Manager.Stop
func (m *Manager) Stop(ct manager.MapCleanupType) error {
if err := m.Manager.Stop(ct); err != nil {
return err
}

for _, mod := range m.EnabledModifiers {
if as, ok := mod.(modifierAfterStop); ok {
if err := as.AfterStop(ct); err != nil {
return fmt.Errorf("afterstop %s manager modifier: %w", mod, err)
}
}
}

return nil
}
Loading
Loading