Skip to content

Commit

Permalink
[ebpfless] Separate pending connection map, handle closed connections
Browse files Browse the repository at this point in the history
  • Loading branch information
pimlu committed Dec 13, 2024
1 parent 9177202 commit 06d8940
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 71 deletions.
86 changes: 60 additions & 26 deletions pkg/network/tracer/connection/ebpfless/tcp_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,17 @@ func (st *connectionState) hasMissedHandshake() bool {

// TCPProcessor encapsulates TCP state tracking for the ebpfless tracer
type TCPProcessor struct {
conns map[network.ConnectionTuple]connectionState
// pendingConns contains connections with tcpState == connStatAttempted
pendingConns map[network.ConnectionTuple]*connectionState
// establishedConns contains connections with tcpState == connStatEstablished
establishedConns map[network.ConnectionTuple]*connectionState
}

// NewTCPProcessor constructs an empty TCPProcessor
func NewTCPProcessor() *TCPProcessor {
return &TCPProcessor{
conns: map[network.ConnectionTuple]connectionState{},
pendingConns: map[network.ConnectionTuple]*connectionState{},
establishedConns: map[network.ConnectionTuple]*connectionState{},
}
}

Expand Down Expand Up @@ -241,13 +245,13 @@ func (t *TCPProcessor) updateRstFlag(conn *network.ConnectionStats, st *connecti

// Process handles a TCP packet, calculating stats and keeping track of its state according to the
// TCP state machine.
func (t *TCPProcessor) Process(conn *network.ConnectionStats, timestampNs uint64, pktType uint8, ip4 *layers.IPv4, ip6 *layers.IPv6, tcp *layers.TCP) error {
func (t *TCPProcessor) Process(conn *network.ConnectionStats, timestampNs uint64, pktType uint8, ip4 *layers.IPv4, ip6 *layers.IPv6, tcp *layers.TCP) (ProcessResult, error) {
if pktType != unix.PACKET_OUTGOING && pktType != unix.PACKET_HOST {
return fmt.Errorf("TCPProcessor saw invalid pktType: %d", pktType)
return ProcessResultNone, fmt.Errorf("TCPProcessor saw invalid pktType: %d", pktType)
}
payloadLen, err := TCPPayloadLen(conn.Family, ip4, ip6, tcp)
if err != nil {
return err
return ProcessResultNone, err
}

log.TraceFunc(func() string {
Expand All @@ -256,31 +260,61 @@ func (t *TCPProcessor) Process(conn *network.ConnectionStats, timestampNs uint64

// skip invalid packets we don't recognize:
if checkInvalidTCP(tcp) {
return nil
return ProcessResultNone, nil
}

st := t.conns[conn.ConnectionTuple]
st := t.getConn(conn.ConnectionTuple)
origState := st.tcpState

t.updateSynFlag(conn, &st, pktType, tcp, payloadLen)
t.updateTCPStats(conn, &st, pktType, tcp, payloadLen, timestampNs)
t.updateFinFlag(conn, &st, pktType, tcp, payloadLen)
t.updateRstFlag(conn, &st, pktType, tcp, payloadLen)
t.updateSynFlag(conn, st, pktType, tcp, payloadLen)
t.updateTCPStats(conn, st, pktType, tcp, payloadLen, timestampNs)
t.updateFinFlag(conn, st, pktType, tcp, payloadLen)
t.updateRstFlag(conn, st, pktType, tcp, payloadLen)

t.conns[conn.ConnectionTuple] = st
return nil
stateChanged := st.tcpState != origState
if stateChanged {
t.moveConn(conn.ConnectionTuple, st)
}

// if the connection is still established, we should update the connection map
if st.tcpState == connStatEstablished {
return ProcessResultStoreConn, nil
}
// if the connection just closed, store it in the tracer's closeCallback
if st.tcpState == connStatClosed && stateChanged {
return ProcessResultCloseConn, nil
}
return ProcessResultNone, nil
}

// HasConnEverEstablished is used to avoid a connection appearing before the three-way handshake is complete.
// This is done to mirror the behavior of ebpf tracing accept() and connect(), which both return
// after the handshake is completed.
func (t *TCPProcessor) HasConnEverEstablished(conn *network.ConnectionStats) bool {
st := t.conns[conn.ConnectionTuple]

// conn.Monotonic.TCPEstablished can be 0 even though isEstablished is true,
// because pre-existing connections don't increment TCPEstablished.
// That's why we use tcpState instead of conn
isEstablished := st.tcpState == connStatEstablished
// if the connection has closed in any way, report that too
hasEverClosed := conn.Monotonic.TCPClosed > 0 || len(conn.TCPFailures) > 0
return isEstablished || hasEverClosed
func (t *TCPProcessor) getConn(tuple network.ConnectionTuple) *connectionState {
if st, ok := t.establishedConns[tuple]; ok {
return st
}
if st, ok := t.pendingConns[tuple]; ok {
return st
}
// otherwise, create a fresh state object that will be stored by moveConn later
return &connectionState{}
}

// RemoveConn clears a ConnectionTuple from its internal state.
func (t *TCPProcessor) RemoveConn(tuple network.ConnectionTuple) {
delete(t.pendingConns, tuple)
delete(t.establishedConns, tuple)
}
func (t *TCPProcessor) moveConn(tuple network.ConnectionTuple, st *connectionState) {
t.RemoveConn(tuple)

switch st.tcpState {
// For this case, simply let closed connections disappear. Process() will return
// ProcessResultCloseConn letting the ebpfless tracer know the connection has closed.
case connStatClosed:
// TODO limit map sizes here with a config value
case connStatAttempted:
t.pendingConns[tuple] = st
// TODO limit map sizes here with a config value
case connStatEstablished:
t.establishedConns[tuple] = st
}
}
50 changes: 15 additions & 35 deletions pkg/network/tracer/connection/ebpfless/tcp_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,12 +173,13 @@ func newTCPTestFixture(t *testing.T) *tcpTestFixture {
}
}

func (fixture *tcpTestFixture) runPkt(pkt testCapture) {
func (fixture *tcpTestFixture) runPkt(pkt testCapture) ProcessResult {
if fixture.conn == nil {
fixture.conn = makeTCPStates(pkt)
}
err := fixture.tcp.Process(fixture.conn, pkt.timestampNs, pkt.pktType, pkt.ipv4, pkt.ipv6, pkt.tcp)
result, err := fixture.tcp.Process(fixture.conn, pkt.timestampNs, pkt.pktType, pkt.ipv4, pkt.ipv6, pkt.tcp)
require.NoError(fixture.t, err)
return result
}

func (fixture *tcpTestFixture) runPkts(packets []testCapture) {
Expand All @@ -197,7 +198,7 @@ func (fixture *tcpTestFixture) runAgainstState(packets []testCapture, expected [

fixture.runPkt(pkt)
connTuple := fixture.conn.ConnectionTuple
actual := fixture.tcp.conns[connTuple].tcpState
actual := fixture.tcp.getConn(connTuple).tcpState
actualStrs = append(actualStrs, labelForState(actual))
}
require.Equal(fixture.t, expectedStrs, actualStrs)
Expand Down Expand Up @@ -614,9 +615,8 @@ func TestConnReset(t *testing.T) {
require.Equal(t, expectedStats, f.conn.Monotonic)
}

func TestConnectTwice(t *testing.T) {
// same as TestImmediateFin but everything happens twice

func TestProcessResult(t *testing.T) {
// same as TestImmediateFin but checks ProcessResult
pb := newPacketBuilder(lowerSeq, higherSeq)
basicHandshake := []testCapture{
pb.incoming(0, 0, 0, SYN),
Expand All @@ -628,40 +628,20 @@ func TestConnectTwice(t *testing.T) {
pb.outgoing(0, 2, 2, ACK),
}

expectedClientStates := []connStatus{
connStatAttempted,
connStatAttempted,
connStatEstablished,
// active close begins here
connStatEstablished,
connStatEstablished,
connStatClosed,
processResults := []ProcessResult{
ProcessResultNone,
ProcessResultNone,
ProcessResultStoreConn,
ProcessResultStoreConn,
ProcessResultStoreConn,
ProcessResultCloseConn,
}

f := newTCPTestFixture(t)
f.runAgainstState(basicHandshake, expectedClientStates)

state := f.tcp.conns[f.conn.ConnectionTuple]
// make sure the TCP state was erased after the connection was closed
require.Equal(t, connectionState{
tcpState: connStatClosed,
}, state)

// second connection here
f.runAgainstState(basicHandshake, expectedClientStates)

require.Empty(t, f.conn.TCPFailures)

expectedStats := network.StatCounters{
SentBytes: 0,
RecvBytes: 0,
SentPackets: 3 * 2,
RecvPackets: 3 * 2,
Retransmits: 0,
TCPEstablished: 1 * 2,
TCPClosed: 1 * 2,
for i, pkt := range basicHandshake {
require.Equal(t, processResults[i], f.runPkt(pkt), "packet #%d has the wrong ProcessResult", i)
}
require.Equal(t, expectedStats, f.conn.Monotonic)
}

func TestSimultaneousClose(t *testing.T) {
Expand Down
15 changes: 15 additions & 0 deletions pkg/network/tracer/connection/ebpfless/tcp_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,21 @@ import (

const ebpflessModuleName = "ebpfless_network_tracer"

// ProcessResult represents what the ebpfless tracer should do with ConnectionStats after processing a packet
type ProcessResult uint8

const (
// ProcessResultNone - the updated ConnectionStats should NOT be stored in the connection map.
// Usually, this is because the connection is not established yet.
ProcessResultNone ProcessResult = iota
// ProcessResultStoreConn - the updated ConnectionStats should be stored in the connection map.
// This happens when the connection is established.
ProcessResultStoreConn
// ProcessResultCloseConn - this connection is done and its ConnectionStats should be passed
// to the ebpfless tracer's closed connection handler.
ProcessResultCloseConn
)

var statsTelemetry = struct {
missedTCPConnections telemetry.Counter
missingTCPFlags telemetry.Counter
Expand Down
31 changes: 21 additions & 10 deletions pkg/network/tracer/connection/ebpfless_tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func newEbpfLessTracer(cfg *config.Config) (*ebpfLessTracer, error) {
}

// Start begins collecting network connection data.
func (t *ebpfLessTracer) Start(_ func(*network.ConnectionStats)) error {
func (t *ebpfLessTracer) Start(closeCallback func(*network.ConnectionStats)) error {
if err := t.boundPorts.Start(); err != nil {
return fmt.Errorf("could not update bound ports: %w", err)
}
Expand All @@ -123,7 +123,7 @@ func (t *ebpfLessTracer) Start(_ func(*network.ConnectionStats)) error {
return nil
}

if err := t.processConnection(pktType, &ip4, &ip6, &udp, &tcp, decoded); err != nil {
if err := t.processConnection(pktType, &ip4, &ip6, &udp, &tcp, decoded, closeCallback); err != nil {
log.Warnf("could not process packet: %s", err)
}

Expand All @@ -147,8 +147,8 @@ func (t *ebpfLessTracer) processConnection(
udp *layers.UDP,
tcp *layers.TCP,
decoded []gopacket.LayerType,
closeCallback func(*network.ConnectionStats),
) error {

t.scratchConn.Source, t.scratchConn.Dest = util.Address{}, util.Address{}
t.scratchConn.SPort, t.scratchConn.DPort = 0, 0
t.scratchConn.TCPFailures = make(map[uint16]uint32)
Expand Down Expand Up @@ -204,21 +204,25 @@ func (t *ebpfLessTracer) processConnection(
if ts, err = ddebpf.NowNanoseconds(); err != nil {
return fmt.Errorf("error getting last updated timestamp for connection: %w", err)
}
conn.LastUpdateEpoch = uint64(ts)

if !ip4Present && !ip6Present {
return nil
}

var result ebpfless.ProcessResult
switch conn.Type {
case network.UDP:
if (ip4Present && !t.config.CollectUDPv4Conns) || (ip6Present && !t.config.CollectUDPv6Conns) {
return nil
}
result = ebpfless.ProcessResultStoreConn
err = t.udp.process(conn, pktType, udp)
case network.TCP:
if (ip4Present && !t.config.CollectTCPv4Conns) || (ip6Present && !t.config.CollectTCPv6Conns) {
return nil
}
err = t.tcp.Process(conn, uint64(ts), pktType, ip4, ip6, tcp)
result, err = t.tcp.Process(conn, uint64(ts), pktType, ip4, ip6, tcp)
default:
err = fmt.Errorf("unsupported connection type %d", conn.Type)
}
Expand All @@ -227,15 +231,19 @@ func (t *ebpfLessTracer) processConnection(
return fmt.Errorf("error processing connection: %w", err)
}

// TODO probably remove HasConnEverEstablished once we handle closed connections properly
if conn.Type == network.UDP || (conn.Type == network.TCP && t.tcp.HasConnEverEstablished(conn)) {
conn.LastUpdateEpoch = uint64(ts)
t.conns[t.scratchConn.ConnectionTuple] = conn
}

log.TraceFunc(func() string {
return fmt.Sprintf("connection: %s", conn)
})

switch result {
case ebpfless.ProcessResultNone:
case ebpfless.ProcessResultStoreConn:
t.conns[t.scratchConn.ConnectionTuple] = conn
case ebpfless.ProcessResultCloseConn:
delete(t.conns, conn.ConnectionTuple)
closeCallback(conn)
}

return nil
}

Expand Down Expand Up @@ -310,6 +318,9 @@ func (t *ebpfLessTracer) Remove(conn *network.ConnectionStats) error {
defer t.m.Unlock()

delete(t.conns, conn.ConnectionTuple)
if conn.Type == network.TCP {
t.tcp.RemoveConn(conn.ConnectionTuple)
}
return nil
}

Expand Down

0 comments on commit 06d8940

Please sign in to comment.