Skip to content

Commit

Permalink
add map size limits
Browse files Browse the repository at this point in the history
  • Loading branch information
pimlu committed Dec 14, 2024
1 parent 06d8940 commit 9248422
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 24 deletions.
19 changes: 19 additions & 0 deletions pkg/network/tracer/connection/ebpfless/map_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024-present Datadog, Inc.

//go:build linux

package ebpfless

// WriteMapWithSizeLimit updates a map via m[key] = val.
// However, if the map would overflow sizeLimit, it returns false instead.
func WriteMapWithSizeLimit[Key comparable, Val any](m map[Key]Val, key Key, val Val, sizeLimit int) bool {
_, exists := m[key]
if !exists && len(m) >= sizeLimit {
return false
}
m[key] = val
return true
}
38 changes: 38 additions & 0 deletions pkg/network/tracer/connection/ebpfless/map_utils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024-present Datadog, Inc.

//go:build linux

package ebpfless

import (
"github.com/stretchr/testify/require"
"testing"
)

func TestWriteMapWithSizeLimit(t *testing.T) {
m := map[string]int{}

// not full: any write should work
ok := WriteMapWithSizeLimit(m, "foo", 123, 1)
require.True(t, ok)

expectedFoo := map[string]int{
"foo": 123,
}
require.Equal(t, expectedFoo, m)

// full: shouldn't write a new key
ok = WriteMapWithSizeLimit(m, "bar", 456, 1)
require.False(t, ok)
require.Equal(t, expectedFoo, m)

// full: replacing key should still work
ok = WriteMapWithSizeLimit(m, "foo", 789, 1)
require.True(t, ok)
require.Equal(t, map[string]int{
"foo": 789,
}, m)
}
41 changes: 31 additions & 10 deletions pkg/network/tracer/connection/ebpfless/tcp_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/google/gopacket/layers"

"github.com/DataDog/datadog-agent/pkg/network"
"github.com/DataDog/datadog-agent/pkg/network/config"
)

type connectionState struct {
Expand Down Expand Up @@ -61,17 +62,22 @@ func (st *connectionState) hasMissedHandshake() bool {

// TCPProcessor encapsulates TCP state tracking for the ebpfless tracer
type TCPProcessor struct {
cfg *config.Config
// pendingConns contains connections with tcpState == connStatAttempted
pendingConns map[network.ConnectionTuple]*connectionState
// establishedConns contains connections with tcpState == connStatEstablished
establishedConns map[network.ConnectionTuple]*connectionState
}

// TODO make this into a config value
const maxPendingConns = 1024

// NewTCPProcessor constructs an empty TCPProcessor
func NewTCPProcessor() *TCPProcessor {
func NewTCPProcessor(cfg *config.Config) *TCPProcessor {
return &TCPProcessor{
pendingConns: map[network.ConnectionTuple]*connectionState{},
establishedConns: map[network.ConnectionTuple]*connectionState{},
cfg: cfg,
pendingConns: make(map[network.ConnectionTuple]*connectionState, maxPendingConns),
establishedConns: make(map[network.ConnectionTuple]*connectionState, cfg.MaxTrackedConnections),
}
}

Expand Down Expand Up @@ -137,7 +143,7 @@ func (t *TCPProcessor) updateSynFlag(conn *network.ConnectionStats, st *connecti
if st.tcpState == connStatAttempted && st.localSynState.isSynAcked() && st.remoteSynState.isSynAcked() {
st.tcpState = connStatEstablished
if st.hasMissedHandshake() {
statsTelemetry.missedTCPConnections.Inc()
statsTelemetry.missedTCPHandshakes.Inc()
} else {
conn.Monotonic.TCPEstablished++
}
Expand Down Expand Up @@ -273,7 +279,11 @@ func (t *TCPProcessor) Process(conn *network.ConnectionStats, timestampNs uint64

stateChanged := st.tcpState != origState
if stateChanged {
t.moveConn(conn.ConnectionTuple, st)
ok := t.moveConn(conn.ConnectionTuple, st)
// if the map is full then we are unable to move the connection, report that
if !ok {
return ProcessResultMapFull, nil
}
}

// if the connection is still established, we should update the connection map
Expand Down Expand Up @@ -303,18 +313,29 @@ func (t *TCPProcessor) RemoveConn(tuple network.ConnectionTuple) {
delete(t.pendingConns, tuple)
delete(t.establishedConns, tuple)
}
func (t *TCPProcessor) moveConn(tuple network.ConnectionTuple, st *connectionState) {

// moveConn moves a connection to the correct map based on its tcpState.
// If it had to drop the connection because the target map was full, it returns false.
func (t *TCPProcessor) moveConn(tuple network.ConnectionTuple, st *connectionState) bool {
t.RemoveConn(tuple)

switch st.tcpState {
// For this case, simply let closed connections disappear. Process() will return
// ProcessResultCloseConn letting the ebpfless tracer know the connection has closed.
case connStatClosed:
// TODO limit map sizes here with a config value
case connStatAttempted:
t.pendingConns[tuple] = st
// TODO limit map sizes here with a config value
ok := WriteMapWithSizeLimit(t.pendingConns, tuple, st, maxPendingConns)
if !ok {
statsTelemetry.droppedPendingConns.Inc()
}
return ok
case connStatEstablished:
t.establishedConns[tuple] = st
maxTrackedConns := int(t.cfg.MaxTrackedConnections)
ok := WriteMapWithSizeLimit(t.establishedConns, tuple, st, maxTrackedConns)
if !ok {
statsTelemetry.droppedEstablishedConns.Inc()
}
return ok
}
return true
}
10 changes: 9 additions & 1 deletion pkg/network/tracer/connection/ebpfless/tcp_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package ebpfless

import (
"github.com/DataDog/datadog-agent/pkg/network/config"
"net"
"syscall"
"testing"
Expand Down Expand Up @@ -166,9 +167,10 @@ func (pb packetBuilder) outgoing(payloadLen uint16, relSeq, relAck uint32, flags
}

func newTCPTestFixture(t *testing.T) *tcpTestFixture {
cfg := config.New()
return &tcpTestFixture{
t: t,
tcp: NewTCPProcessor(),
tcp: NewTCPProcessor(cfg),
conn: nil,
}
}
Expand Down Expand Up @@ -256,6 +258,9 @@ func testBasicHandshake(t *testing.T, pb packetBuilder) {
}

require.Equal(t, expectedStats, f.conn.Monotonic)

require.Empty(t, f.tcp.pendingConns)
require.Empty(t, f.tcp.establishedConns)
}

var lowerSeq uint32 = 2134452051
Expand Down Expand Up @@ -323,6 +328,9 @@ func testReversedBasicHandshake(t *testing.T, pb packetBuilder) {
TCPClosed: 1,
}
require.Equal(t, expectedStats, f.conn.Monotonic)

require.Empty(t, f.tcp.pendingConns)
require.Empty(t, f.tcp.establishedConns)
}

func TestReversedBasicHandshake(t *testing.T) {
Expand Down
27 changes: 17 additions & 10 deletions pkg/network/tracer/connection/ebpfless/tcp_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,27 @@ const (
// ProcessResultCloseConn - this connection is done and its ConnectionStats should be passed
// to the ebpfless tracer's closed connection handler.
ProcessResultCloseConn
// ProcessResultMapFull - this connection can't be tracked because the TCPProcessor's connection
// map is full. This connection should be removed from the tracer as well.
ProcessResultMapFull
)

var statsTelemetry = struct {
missedTCPConnections telemetry.Counter
missingTCPFlags telemetry.Counter
tcpSynAndFin telemetry.Counter
tcpRstAndSyn telemetry.Counter
tcpRstAndFin telemetry.Counter
droppedPendingConns telemetry.Counter
droppedEstablishedConns telemetry.Counter
missedTCPHandshakes telemetry.Counter
missingTCPFlags telemetry.Counter
tcpSynAndFin telemetry.Counter
tcpRstAndSyn telemetry.Counter
tcpRstAndFin telemetry.Counter
}{
telemetry.NewCounter(ebpflessModuleName, "missed_tcp_connections", []string{}, "Counter measuring the number of TCP connections where we missed the SYN handshake"),
telemetry.NewCounter(ebpflessModuleName, "missing_tcp_flags", []string{}, "Counter measuring packets encountered with none of SYN, FIN, ACK, RST set"),
telemetry.NewCounter(ebpflessModuleName, "tcp_syn_and_fin", []string{}, "Counter measuring packets encountered with SYN+FIN together"),
telemetry.NewCounter(ebpflessModuleName, "tcp_rst_and_syn", []string{}, "Counter measuring packets encountered with RST+SYN together"),
telemetry.NewCounter(ebpflessModuleName, "tcp_rst_and_fin", []string{}, "Counter measuring packets encountered with RST+FIN together"),
droppedPendingConns: telemetry.NewCounter(ebpflessModuleName, "dropped_pending_conns", nil, "Counter measuring the number of TCP connections which were dropped during the handshake (because the map was full)"),
droppedEstablishedConns: telemetry.NewCounter(ebpflessModuleName, "dropped_established_conns", nil, "Counter measuring the number of TCP connections which were dropped while established (because the map was full)"),
missedTCPHandshakes: telemetry.NewCounter(ebpflessModuleName, "missed_tcp_handshakes", nil, "Counter measuring the number of TCP connections where we missed the SYN handshake"),
missingTCPFlags: telemetry.NewCounter(ebpflessModuleName, "missing_tcp_flags", nil, "Counter measuring packets encountered with none of SYN, FIN, ACK, RST set"),
tcpSynAndFin: telemetry.NewCounter(ebpflessModuleName, "tcp_syn_and_fin", nil, "Counter measuring packets encountered with SYN+FIN together"),
tcpRstAndSyn: telemetry.NewCounter(ebpflessModuleName, "tcp_rst_and_syn", nil, "Counter measuring packets encountered with RST+SYN together"),
tcpRstAndFin: telemetry.NewCounter(ebpflessModuleName, "tcp_rst_and_fin", nil, "Counter measuring packets encountered with RST+FIN together"),
}

const tcpSeqMidpoint = 0x80000000
Expand Down
17 changes: 14 additions & 3 deletions pkg/network/tracer/connection/ebpfless_tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@ const (

var (
ebpfLessTracerTelemetry = struct {
skippedPackets telemetry.Counter
skippedPackets telemetry.Counter
droppedConnections telemetry.Counter
}{
telemetry.NewCounter(ebpfLessTelemetryPrefix, "skipped_packets", []string{"reason"}, "Counter measuring skipped packets"),
telemetry.NewCounter(ebpfLessTelemetryPrefix, "dropped_connections", nil, "Counter measuring dropped connections"),
}
)

Expand Down Expand Up @@ -81,7 +83,7 @@ func newEbpfLessTracer(cfg *config.Config) (*ebpfLessTracer, error) {
exit: make(chan struct{}),
scratchConn: &network.ConnectionStats{},
udp: &udpProcessor{},
tcp: ebpfless.NewTCPProcessor(),
tcp: ebpfless.NewTCPProcessor(cfg),
conns: make(map[network.ConnectionTuple]*network.ConnectionStats, cfg.MaxTrackedConnections),
boundPorts: ebpfless.NewBoundPorts(cfg),
cookieHasher: newCookieHasher(),
Expand Down Expand Up @@ -238,10 +240,19 @@ func (t *ebpfLessTracer) processConnection(
switch result {
case ebpfless.ProcessResultNone:
case ebpfless.ProcessResultStoreConn:
t.conns[t.scratchConn.ConnectionTuple] = conn
maxTrackedConns := int(t.config.MaxTrackedConnections)
ok := ebpfless.WriteMapWithSizeLimit(t.conns, conn.ConnectionTuple, conn, maxTrackedConns)
// we don't have enough space to add this connection, remove its TCP state tracking
if !ok && conn.Type == network.TCP {
t.tcp.RemoveConn(conn.ConnectionTuple)
ebpfLessTracerTelemetry.droppedConnections.Inc()
}
case ebpfless.ProcessResultCloseConn:
delete(t.conns, conn.ConnectionTuple)
closeCallback(conn)
case ebpfless.ProcessResultMapFull:
delete(t.conns, conn.ConnectionTuple)
ebpfLessTracerTelemetry.droppedConnections.Inc()
}

return nil
Expand Down

0 comments on commit 9248422

Please sign in to comment.