diff --git a/pkg/network/tracer/connection/ebpfless/map_utils.go b/pkg/network/tracer/connection/ebpfless/map_utils.go new file mode 100644 index 00000000000000..3e41f4832707d9 --- /dev/null +++ b/pkg/network/tracer/connection/ebpfless/map_utils.go @@ -0,0 +1,19 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2024-present Datadog, Inc. + +//go:build linux + +package ebpfless + +// WriteMapWithSizeLimit updates a map via m[key] = val. +// However, if the map would overflow sizeLimit, it returns false instead. +func WriteMapWithSizeLimit[Key comparable, Val any](m map[Key]Val, key Key, val Val, sizeLimit int) bool { + _, exists := m[key] + if !exists && len(m) >= sizeLimit { + return false + } + m[key] = val + return true +} diff --git a/pkg/network/tracer/connection/ebpfless/map_utils_test.go b/pkg/network/tracer/connection/ebpfless/map_utils_test.go new file mode 100644 index 00000000000000..e387328d006106 --- /dev/null +++ b/pkg/network/tracer/connection/ebpfless/map_utils_test.go @@ -0,0 +1,38 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2024-present Datadog, Inc. + +//go:build linux + +package ebpfless + +import ( + "github.com/stretchr/testify/require" + "testing" +) + +func TestWriteMapWithSizeLimit(t *testing.T) { + m := map[string]int{} + + // not full: any write should work + ok := WriteMapWithSizeLimit(m, "foo", 123, 1) + require.True(t, ok) + + expectedFoo := map[string]int{ + "foo": 123, + } + require.Equal(t, expectedFoo, m) + + // full: shouldn't write a new key + ok = WriteMapWithSizeLimit(m, "bar", 456, 1) + require.False(t, ok) + require.Equal(t, expectedFoo, m) + + // full: replacing key should still work + ok = WriteMapWithSizeLimit(m, "foo", 789, 1) + require.True(t, ok) + require.Equal(t, map[string]int{ + "foo": 789, + }, m) +} diff --git a/pkg/network/tracer/connection/ebpfless/tcp_processor.go b/pkg/network/tracer/connection/ebpfless/tcp_processor.go index de8e7b0eafb6e1..c071971f78f1fa 100644 --- a/pkg/network/tracer/connection/ebpfless/tcp_processor.go +++ b/pkg/network/tracer/connection/ebpfless/tcp_processor.go @@ -18,6 +18,7 @@ import ( "github.com/google/gopacket/layers" "github.com/DataDog/datadog-agent/pkg/network" + "github.com/DataDog/datadog-agent/pkg/network/config" ) type connectionState struct { @@ -61,17 +62,22 @@ func (st *connectionState) hasMissedHandshake() bool { // TCPProcessor encapsulates TCP state tracking for the ebpfless tracer type TCPProcessor struct { + cfg *config.Config // pendingConns contains connections with tcpState == connStatAttempted pendingConns map[network.ConnectionTuple]*connectionState // establishedConns contains connections with tcpState == connStatEstablished establishedConns map[network.ConnectionTuple]*connectionState } +// TODO make this into a config value +const maxPendingConns = 1024 + // NewTCPProcessor constructs an empty TCPProcessor -func NewTCPProcessor() *TCPProcessor { +func NewTCPProcessor(cfg *config.Config) *TCPProcessor { return &TCPProcessor{ - pendingConns: map[network.ConnectionTuple]*connectionState{}, - establishedConns: map[network.ConnectionTuple]*connectionState{}, + cfg: cfg, + pendingConns: make(map[network.ConnectionTuple]*connectionState, maxPendingConns), + establishedConns: make(map[network.ConnectionTuple]*connectionState, cfg.MaxTrackedConnections), } } @@ -137,7 +143,7 @@ func (t *TCPProcessor) updateSynFlag(conn *network.ConnectionStats, st *connecti if st.tcpState == connStatAttempted && st.localSynState.isSynAcked() && st.remoteSynState.isSynAcked() { st.tcpState = connStatEstablished if st.hasMissedHandshake() { - statsTelemetry.missedTCPConnections.Inc() + statsTelemetry.missedTCPHandshakes.Inc() } else { conn.Monotonic.TCPEstablished++ } @@ -273,7 +279,11 @@ func (t *TCPProcessor) Process(conn *network.ConnectionStats, timestampNs uint64 stateChanged := st.tcpState != origState if stateChanged { - t.moveConn(conn.ConnectionTuple, st) + ok := t.moveConn(conn.ConnectionTuple, st) + // if the map is full then we are unable to move the connection, report that + if !ok { + return ProcessResultMapFull, nil + } } // if the connection is still established, we should update the connection map @@ -303,18 +313,29 @@ func (t *TCPProcessor) RemoveConn(tuple network.ConnectionTuple) { delete(t.pendingConns, tuple) delete(t.establishedConns, tuple) } -func (t *TCPProcessor) moveConn(tuple network.ConnectionTuple, st *connectionState) { + +// moveConn moves a connection to the correct map based on its tcpState. +// If it had to drop the connection because the target map was full, it returns false. +func (t *TCPProcessor) moveConn(tuple network.ConnectionTuple, st *connectionState) bool { t.RemoveConn(tuple) switch st.tcpState { // For this case, simply let closed connections disappear. Process() will return // ProcessResultCloseConn letting the ebpfless tracer know the connection has closed. case connStatClosed: - // TODO limit map sizes here with a config value case connStatAttempted: - t.pendingConns[tuple] = st - // TODO limit map sizes here with a config value + ok := WriteMapWithSizeLimit(t.pendingConns, tuple, st, maxPendingConns) + if !ok { + statsTelemetry.droppedPendingConns.Inc() + } + return ok case connStatEstablished: - t.establishedConns[tuple] = st + maxTrackedConns := int(t.cfg.MaxTrackedConnections) + ok := WriteMapWithSizeLimit(t.establishedConns, tuple, st, maxTrackedConns) + if !ok { + statsTelemetry.droppedEstablishedConns.Inc() + } + return ok } + return true } diff --git a/pkg/network/tracer/connection/ebpfless/tcp_processor_test.go b/pkg/network/tracer/connection/ebpfless/tcp_processor_test.go index d0e7482f1b9700..dea6090eb121d1 100644 --- a/pkg/network/tracer/connection/ebpfless/tcp_processor_test.go +++ b/pkg/network/tracer/connection/ebpfless/tcp_processor_test.go @@ -8,6 +8,7 @@ package ebpfless import ( + "github.com/DataDog/datadog-agent/pkg/network/config" "net" "syscall" "testing" @@ -166,9 +167,10 @@ func (pb packetBuilder) outgoing(payloadLen uint16, relSeq, relAck uint32, flags } func newTCPTestFixture(t *testing.T) *tcpTestFixture { + cfg := config.New() return &tcpTestFixture{ t: t, - tcp: NewTCPProcessor(), + tcp: NewTCPProcessor(cfg), conn: nil, } } @@ -256,6 +258,9 @@ func testBasicHandshake(t *testing.T, pb packetBuilder) { } require.Equal(t, expectedStats, f.conn.Monotonic) + + require.Empty(t, f.tcp.pendingConns) + require.Empty(t, f.tcp.establishedConns) } var lowerSeq uint32 = 2134452051 @@ -323,6 +328,9 @@ func testReversedBasicHandshake(t *testing.T, pb packetBuilder) { TCPClosed: 1, } require.Equal(t, expectedStats, f.conn.Monotonic) + + require.Empty(t, f.tcp.pendingConns) + require.Empty(t, f.tcp.establishedConns) } func TestReversedBasicHandshake(t *testing.T) { diff --git a/pkg/network/tracer/connection/ebpfless/tcp_utils.go b/pkg/network/tracer/connection/ebpfless/tcp_utils.go index 122acbf4a3062b..4abe627a995d54 100644 --- a/pkg/network/tracer/connection/ebpfless/tcp_utils.go +++ b/pkg/network/tracer/connection/ebpfless/tcp_utils.go @@ -34,20 +34,27 @@ const ( // ProcessResultCloseConn - this connection is done and its ConnectionStats should be passed // to the ebpfless tracer's closed connection handler. ProcessResultCloseConn + // ProcessResultMapFull - this connection can't be tracked because the TCPProcessor's connection + // map is full. This connection should be removed from the tracer as well. + ProcessResultMapFull ) var statsTelemetry = struct { - missedTCPConnections telemetry.Counter - missingTCPFlags telemetry.Counter - tcpSynAndFin telemetry.Counter - tcpRstAndSyn telemetry.Counter - tcpRstAndFin telemetry.Counter + droppedPendingConns telemetry.Counter + droppedEstablishedConns telemetry.Counter + missedTCPHandshakes telemetry.Counter + missingTCPFlags telemetry.Counter + tcpSynAndFin telemetry.Counter + tcpRstAndSyn telemetry.Counter + tcpRstAndFin telemetry.Counter }{ - telemetry.NewCounter(ebpflessModuleName, "missed_tcp_connections", []string{}, "Counter measuring the number of TCP connections where we missed the SYN handshake"), - telemetry.NewCounter(ebpflessModuleName, "missing_tcp_flags", []string{}, "Counter measuring packets encountered with none of SYN, FIN, ACK, RST set"), - telemetry.NewCounter(ebpflessModuleName, "tcp_syn_and_fin", []string{}, "Counter measuring packets encountered with SYN+FIN together"), - telemetry.NewCounter(ebpflessModuleName, "tcp_rst_and_syn", []string{}, "Counter measuring packets encountered with RST+SYN together"), - telemetry.NewCounter(ebpflessModuleName, "tcp_rst_and_fin", []string{}, "Counter measuring packets encountered with RST+FIN together"), + droppedPendingConns: telemetry.NewCounter(ebpflessModuleName, "dropped_pending_conns", nil, "Counter measuring the number of TCP connections which were dropped during the handshake (because the map was full)"), + droppedEstablishedConns: telemetry.NewCounter(ebpflessModuleName, "dropped_established_conns", nil, "Counter measuring the number of TCP connections which were dropped while established (because the map was full)"), + missedTCPHandshakes: telemetry.NewCounter(ebpflessModuleName, "missed_tcp_handshakes", nil, "Counter measuring the number of TCP connections where we missed the SYN handshake"), + missingTCPFlags: telemetry.NewCounter(ebpflessModuleName, "missing_tcp_flags", nil, "Counter measuring packets encountered with none of SYN, FIN, ACK, RST set"), + tcpSynAndFin: telemetry.NewCounter(ebpflessModuleName, "tcp_syn_and_fin", nil, "Counter measuring packets encountered with SYN+FIN together"), + tcpRstAndSyn: telemetry.NewCounter(ebpflessModuleName, "tcp_rst_and_syn", nil, "Counter measuring packets encountered with RST+SYN together"), + tcpRstAndFin: telemetry.NewCounter(ebpflessModuleName, "tcp_rst_and_fin", nil, "Counter measuring packets encountered with RST+FIN together"), } const tcpSeqMidpoint = 0x80000000 diff --git a/pkg/network/tracer/connection/ebpfless_tracer.go b/pkg/network/tracer/connection/ebpfless_tracer.go index c1b1c0945eef99..bd9c60ea43aa23 100644 --- a/pkg/network/tracer/connection/ebpfless_tracer.go +++ b/pkg/network/tracer/connection/ebpfless_tracer.go @@ -40,9 +40,11 @@ const ( var ( ebpfLessTracerTelemetry = struct { - skippedPackets telemetry.Counter + skippedPackets telemetry.Counter + droppedConnections telemetry.Counter }{ telemetry.NewCounter(ebpfLessTelemetryPrefix, "skipped_packets", []string{"reason"}, "Counter measuring skipped packets"), + telemetry.NewCounter(ebpfLessTelemetryPrefix, "dropped_connections", nil, "Counter measuring dropped connections"), } ) @@ -81,7 +83,7 @@ func newEbpfLessTracer(cfg *config.Config) (*ebpfLessTracer, error) { exit: make(chan struct{}), scratchConn: &network.ConnectionStats{}, udp: &udpProcessor{}, - tcp: ebpfless.NewTCPProcessor(), + tcp: ebpfless.NewTCPProcessor(cfg), conns: make(map[network.ConnectionTuple]*network.ConnectionStats, cfg.MaxTrackedConnections), boundPorts: ebpfless.NewBoundPorts(cfg), cookieHasher: newCookieHasher(), @@ -238,10 +240,19 @@ func (t *ebpfLessTracer) processConnection( switch result { case ebpfless.ProcessResultNone: case ebpfless.ProcessResultStoreConn: - t.conns[t.scratchConn.ConnectionTuple] = conn + maxTrackedConns := int(t.config.MaxTrackedConnections) + ok := ebpfless.WriteMapWithSizeLimit(t.conns, conn.ConnectionTuple, conn, maxTrackedConns) + // we don't have enough space to add this connection, remove its TCP state tracking + if !ok && conn.Type == network.TCP { + t.tcp.RemoveConn(conn.ConnectionTuple) + ebpfLessTracerTelemetry.droppedConnections.Inc() + } case ebpfless.ProcessResultCloseConn: delete(t.conns, conn.ConnectionTuple) closeCallback(conn) + case ebpfless.ProcessResultMapFull: + delete(t.conns, conn.ConnectionTuple) + ebpfLessTracerTelemetry.droppedConnections.Inc() } return nil