diff --git a/pkg/network/tracer/connection/ebpfless/tcp_processor.go b/pkg/network/tracer/connection/ebpfless/tcp_processor.go index c83a5430b96f1e..73ea6a1152aa34 100644 --- a/pkg/network/tracer/connection/ebpfless/tcp_processor.go +++ b/pkg/network/tracer/connection/ebpfless/tcp_processor.go @@ -54,6 +54,13 @@ type connectionState struct { // rttTracker is used to track round trip times rttTracker rttTracker + + // lastUpdateEpoch contains the last timestamp this connection sent/received a packet + // TODO find a way to combine this with ConnectionStats.lastUpdateEpoch + // This exists because connections in pendingConns don't have a ConnectionStats object yet. + // Can we make all connections in TCPProcessor have a ConnectionStats no matter what, and + // filter them out in GetConnections? + lastUpdateEpoch uint64 } func (st *connectionState) hasMissedHandshake() bool { @@ -71,6 +78,7 @@ type TCPProcessor struct { // TODO make this into a config value const maxPendingConns = 4096 +const pendingConnTimeoutNs = uint64(5 * time.Second) // NewTCPProcessor constructs an empty TCPProcessor func NewTCPProcessor(cfg *config.Config) *TCPProcessor { @@ -155,6 +163,7 @@ func (t *TCPProcessor) updateSynFlag(conn *network.ConnectionStats, st *connecti func (t *TCPProcessor) updateTCPStats(conn *network.ConnectionStats, st *connectionState, pktType uint8, tcp *layers.TCP, payloadLen uint16, timestampNs uint64) { nextSeq := calcNextSeq(tcp, payloadLen) + st.lastUpdateEpoch = timestampNs if pktType == unix.PACKET_OUTGOING { conn.Monotonic.SentPackets++ // packetCanRetransmit filters out packets that look like retransmits but aren't, like TCP keepalives @@ -339,3 +348,20 @@ func (t *TCPProcessor) moveConn(tuple network.ConnectionTuple, st *connectionSta } return true } + +// CleanupExpiredPendingConns iterates through pendingConns and removes those that +// have existed too long - in normal TCP, they should become established right away. +// +// This is only required for pendingConns because the tracer already has logic to remove +// established connections (connections that have ConnectionStats) +func (t *TCPProcessor) CleanupExpiredPendingConns(timestampNs uint64) { + for tuple, st := range t.pendingConns { + timeoutTime := st.lastUpdateEpoch + pendingConnTimeoutNs + + if timeoutTime <= timestampNs { + delete(t.pendingConns, tuple) + + statsTelemetry.expiredPendingConns.Inc() + } + } +} diff --git a/pkg/network/tracer/connection/ebpfless/tcp_processor_test.go b/pkg/network/tracer/connection/ebpfless/tcp_processor_test.go index dea6090eb121d1..b7d525bce15319 100644 --- a/pkg/network/tracer/connection/ebpfless/tcp_processor_test.go +++ b/pkg/network/tracer/connection/ebpfless/tcp_processor_test.go @@ -12,6 +12,7 @@ import ( "net" "syscall" "testing" + "time" "golang.org/x/sys/unix" @@ -792,3 +793,25 @@ func TestPreexistingConn(t *testing.T) { } require.Equal(t, expectedStats, f.conn.Monotonic) } + +func TestPendingConnExpiry(t *testing.T) { + now := uint64(time.Now().UnixNano()) + + pb := newPacketBuilder(lowerSeq, higherSeq) + pkt := pb.outgoing(0, 0, 0, SYN) + pkt.timestampNs = now + + f := newTCPTestFixture(t) + + f.runPkt(pkt) + require.Len(t, f.tcp.pendingConns, 1) + + // if no time has passed, should not remove the connection + f.tcp.CleanupExpiredPendingConns(now) + require.Len(t, f.tcp.pendingConns, 1) + + // if too much time has passed, should remove the connection + tenSecNs := uint64((10 * time.Second).Nanoseconds()) + f.tcp.CleanupExpiredPendingConns(now + tenSecNs) + require.Empty(t, f.tcp.pendingConns) +} diff --git a/pkg/network/tracer/connection/ebpfless/tcp_utils.go b/pkg/network/tracer/connection/ebpfless/tcp_utils.go index 4abe627a995d54..7a30737e734aee 100644 --- a/pkg/network/tracer/connection/ebpfless/tcp_utils.go +++ b/pkg/network/tracer/connection/ebpfless/tcp_utils.go @@ -40,6 +40,7 @@ const ( ) var statsTelemetry = struct { + expiredPendingConns telemetry.Counter droppedPendingConns telemetry.Counter droppedEstablishedConns telemetry.Counter missedTCPHandshakes telemetry.Counter @@ -48,6 +49,7 @@ var statsTelemetry = struct { tcpRstAndSyn telemetry.Counter tcpRstAndFin telemetry.Counter }{ + expiredPendingConns: telemetry.NewCounter(ebpflessModuleName, "expired_pending_conns", nil, "Counter measuring the number of TCP connections which expired because it took too long to complete the handshake"), droppedPendingConns: telemetry.NewCounter(ebpflessModuleName, "dropped_pending_conns", nil, "Counter measuring the number of TCP connections which were dropped during the handshake (because the map was full)"), droppedEstablishedConns: telemetry.NewCounter(ebpflessModuleName, "dropped_established_conns", nil, "Counter measuring the number of TCP connections which were dropped while established (because the map was full)"), missedTCPHandshakes: telemetry.NewCounter(ebpflessModuleName, "missed_tcp_handshakes", nil, "Counter measuring the number of TCP connections where we missed the SYN handshake"), diff --git a/pkg/network/tracer/connection/ebpfless_tracer.go b/pkg/network/tracer/connection/ebpfless_tracer.go index 88c744239fd39e..3eb5a03344b2a1 100644 --- a/pkg/network/tracer/connection/ebpfless_tracer.go +++ b/pkg/network/tracer/connection/ebpfless_tracer.go @@ -303,6 +303,12 @@ func (t *ebpfLessTracer) GetConnections(buffer *network.ConnectionBuffer, filter t.m.Lock() defer t.m.Unlock() + // use GetConnections to periodically cleanup pending connections + err := t.cleanupPendingConns() + if err != nil { + return err + } + if len(t.conns) == 0 { return nil } @@ -321,20 +327,35 @@ func (t *ebpfLessTracer) GetConnections(buffer *network.ConnectionBuffer, filter return nil } +// cleanupPendingConns removes pending connections from the TCP tracer. +// For more information, refer to CleanupExpiredPendingConns +func (t *ebpfLessTracer) cleanupPendingConns() error { + ts, err := ddebpf.NowNanoseconds() + if err != nil { + return fmt.Errorf("error getting last updated timestamp for connection: %w", err) + } + t.tcp.CleanupExpiredPendingConns(uint64(ts)) + return nil +} + // FlushPending forces any closed connections waiting for batching to be processed immediately. func (t *ebpfLessTracer) FlushPending() {} +func (t *ebpfLessTracer) remove(conn *network.ConnectionStats) error { + delete(t.conns, conn.ConnectionTuple) + if conn.Type == network.TCP { + t.tcp.RemoveConn(conn.ConnectionTuple) + } + return nil +} + // Remove deletes the connection from tracking state. // It does not prevent the connection from re-appearing later, if additional traffic occurs. func (t *ebpfLessTracer) Remove(conn *network.ConnectionStats) error { t.m.Lock() defer t.m.Unlock() - delete(t.conns, conn.ConnectionTuple) - if conn.Type == network.TCP { - t.tcp.RemoveConn(conn.ConnectionTuple) - } - return nil + return t.remove(conn) } // GetMap returns the underlying named map. This is useful if any maps are shared with other eBPF components.