Skip to content

Commit

Permalink
[ebpfless] Add 5 second expiry to pending connections (#32187)
Browse files Browse the repository at this point in the history
  • Loading branch information
pimlu authored Dec 17, 2024
1 parent f215538 commit 51ec10c
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 5 deletions.
26 changes: 26 additions & 0 deletions pkg/network/tracer/connection/ebpfless/tcp_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ type connectionState struct {

// rttTracker is used to track round trip times
rttTracker rttTracker

// lastUpdateEpoch contains the last timestamp this connection sent/received a packet
// TODO find a way to combine this with ConnectionStats.lastUpdateEpoch
// This exists because connections in pendingConns don't have a ConnectionStats object yet.
// Can we make all connections in TCPProcessor have a ConnectionStats no matter what, and
// filter them out in GetConnections?
lastUpdateEpoch uint64
}

func (st *connectionState) hasMissedHandshake() bool {
Expand All @@ -71,6 +78,7 @@ type TCPProcessor struct {

// TODO make this into a config value
const maxPendingConns = 4096
const pendingConnTimeoutNs = uint64(5 * time.Second)

// NewTCPProcessor constructs an empty TCPProcessor
func NewTCPProcessor(cfg *config.Config) *TCPProcessor {
Expand Down Expand Up @@ -155,6 +163,7 @@ func (t *TCPProcessor) updateSynFlag(conn *network.ConnectionStats, st *connecti
func (t *TCPProcessor) updateTCPStats(conn *network.ConnectionStats, st *connectionState, pktType uint8, tcp *layers.TCP, payloadLen uint16, timestampNs uint64) {
nextSeq := calcNextSeq(tcp, payloadLen)

st.lastUpdateEpoch = timestampNs
if pktType == unix.PACKET_OUTGOING {
conn.Monotonic.SentPackets++
// packetCanRetransmit filters out packets that look like retransmits but aren't, like TCP keepalives
Expand Down Expand Up @@ -339,3 +348,20 @@ func (t *TCPProcessor) moveConn(tuple network.ConnectionTuple, st *connectionSta
}
return true
}

// CleanupExpiredPendingConns iterates through pendingConns and removes those that
// have existed too long - in normal TCP, they should become established right away.
//
// This is only required for pendingConns because the tracer already has logic to remove
// established connections (connections that have ConnectionStats)
func (t *TCPProcessor) CleanupExpiredPendingConns(timestampNs uint64) {
for tuple, st := range t.pendingConns {
timeoutTime := st.lastUpdateEpoch + pendingConnTimeoutNs

if timeoutTime <= timestampNs {
delete(t.pendingConns, tuple)

statsTelemetry.expiredPendingConns.Inc()
}
}
}
23 changes: 23 additions & 0 deletions pkg/network/tracer/connection/ebpfless/tcp_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"net"
"syscall"
"testing"
"time"

"golang.org/x/sys/unix"

Expand Down Expand Up @@ -792,3 +793,25 @@ func TestPreexistingConn(t *testing.T) {
}
require.Equal(t, expectedStats, f.conn.Monotonic)
}

func TestPendingConnExpiry(t *testing.T) {
now := uint64(time.Now().UnixNano())

pb := newPacketBuilder(lowerSeq, higherSeq)
pkt := pb.outgoing(0, 0, 0, SYN)
pkt.timestampNs = now

f := newTCPTestFixture(t)

f.runPkt(pkt)
require.Len(t, f.tcp.pendingConns, 1)

// if no time has passed, should not remove the connection
f.tcp.CleanupExpiredPendingConns(now)
require.Len(t, f.tcp.pendingConns, 1)

// if too much time has passed, should remove the connection
tenSecNs := uint64((10 * time.Second).Nanoseconds())
f.tcp.CleanupExpiredPendingConns(now + tenSecNs)
require.Empty(t, f.tcp.pendingConns)
}
2 changes: 2 additions & 0 deletions pkg/network/tracer/connection/ebpfless/tcp_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const (
)

var statsTelemetry = struct {
expiredPendingConns telemetry.Counter
droppedPendingConns telemetry.Counter
droppedEstablishedConns telemetry.Counter
missedTCPHandshakes telemetry.Counter
Expand All @@ -48,6 +49,7 @@ var statsTelemetry = struct {
tcpRstAndSyn telemetry.Counter
tcpRstAndFin telemetry.Counter
}{
expiredPendingConns: telemetry.NewCounter(ebpflessModuleName, "expired_pending_conns", nil, "Counter measuring the number of TCP connections which expired because it took too long to complete the handshake"),
droppedPendingConns: telemetry.NewCounter(ebpflessModuleName, "dropped_pending_conns", nil, "Counter measuring the number of TCP connections which were dropped during the handshake (because the map was full)"),
droppedEstablishedConns: telemetry.NewCounter(ebpflessModuleName, "dropped_established_conns", nil, "Counter measuring the number of TCP connections which were dropped while established (because the map was full)"),
missedTCPHandshakes: telemetry.NewCounter(ebpflessModuleName, "missed_tcp_handshakes", nil, "Counter measuring the number of TCP connections where we missed the SYN handshake"),
Expand Down
31 changes: 26 additions & 5 deletions pkg/network/tracer/connection/ebpfless_tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,12 @@ func (t *ebpfLessTracer) GetConnections(buffer *network.ConnectionBuffer, filter
t.m.Lock()
defer t.m.Unlock()

// use GetConnections to periodically cleanup pending connections
err := t.cleanupPendingConns()
if err != nil {
return err
}

if len(t.conns) == 0 {
return nil
}
Expand All @@ -321,20 +327,35 @@ func (t *ebpfLessTracer) GetConnections(buffer *network.ConnectionBuffer, filter
return nil
}

// cleanupPendingConns removes pending connections from the TCP tracer.
// For more information, refer to CleanupExpiredPendingConns
func (t *ebpfLessTracer) cleanupPendingConns() error {
ts, err := ddebpf.NowNanoseconds()
if err != nil {
return fmt.Errorf("error getting last updated timestamp for connection: %w", err)
}
t.tcp.CleanupExpiredPendingConns(uint64(ts))
return nil
}

// FlushPending forces any closed connections waiting for batching to be processed immediately.
func (t *ebpfLessTracer) FlushPending() {}

func (t *ebpfLessTracer) remove(conn *network.ConnectionStats) error {
delete(t.conns, conn.ConnectionTuple)
if conn.Type == network.TCP {
t.tcp.RemoveConn(conn.ConnectionTuple)
}
return nil
}

// Remove deletes the connection from tracking state.
// It does not prevent the connection from re-appearing later, if additional traffic occurs.
func (t *ebpfLessTracer) Remove(conn *network.ConnectionStats) error {
t.m.Lock()
defer t.m.Unlock()

delete(t.conns, conn.ConnectionTuple)
if conn.Type == network.TCP {
t.tcp.RemoveConn(conn.ConnectionTuple)
}
return nil
return t.remove(conn)
}

// GetMap returns the underlying named map. This is useful if any maps are shared with other eBPF components.
Expand Down

0 comments on commit 51ec10c

Please sign in to comment.