From 5ccf0bc1dd20cc79350b98dd221d779406295a6b Mon Sep 17 00:00:00 2001 From: Stuart Geipel Date: Wed, 4 Dec 2024 11:41:40 -0500 Subject: [PATCH] [NPM-3586] Add RTT support to ebpf-less tracer (#31491) --- .../connection/ebpfless/tcp_processor.go | 18 ++- .../connection/ebpfless/tcp_processor_rtt.go | 102 +++++++++++++ .../ebpfless/tcp_processor_rtt_test.go | 142 ++++++++++++++++++ .../connection/ebpfless/tcp_processor_test.go | 29 ++-- .../tracer/connection/ebpfless_tracer.go | 13 +- pkg/network/tracer/tracer_linux_test.go | 29 +++- 6 files changed, 303 insertions(+), 30 deletions(-) create mode 100644 pkg/network/tracer/connection/ebpfless/tcp_processor_rtt.go create mode 100644 pkg/network/tracer/connection/ebpfless/tcp_processor_rtt_test.go diff --git a/pkg/network/tracer/connection/ebpfless/tcp_processor.go b/pkg/network/tracer/connection/ebpfless/tcp_processor.go index cc2b95fb8bea5..b23b8bbac4bbe 100644 --- a/pkg/network/tracer/connection/ebpfless/tcp_processor.go +++ b/pkg/network/tracer/connection/ebpfless/tcp_processor.go @@ -49,6 +49,8 @@ type connectionState struct { localFinSeq uint32 // remoteFinSeq is the tcp.Seq number for the incoming FIN (including any payload length) remoteFinSeq uint32 + + rttTracker rttTracker } type TCPProcessor struct { //nolint:revive // TODO @@ -113,7 +115,7 @@ func (t *TCPProcessor) updateSynFlag(conn *network.ConnectionStats, st *connecti // updateTcpStats is designed to mirror the stat tracking in the windows driver's handleFlowProtocolTcp // https://github.com/DataDog/datadog-windows-filter/blob/d7560d83eb627117521d631a4c05cd654a01987e/ddfilter/flow/flow_tcp.c#L91 -func (t *TCPProcessor) updateTcpStats(conn *network.ConnectionStats, st *connectionState, pktType uint8, tcp *layers.TCP, payloadLen uint16) { //nolint:revive // TODO +func (t *TCPProcessor) updateTcpStats(conn *network.ConnectionStats, st *connectionState, pktType uint8, tcp *layers.TCP, payloadLen uint16, timestampNs uint64) { //nolint:revive // TODO nextSeq := calcNextSeq(tcp, payloadLen) if pktType == unix.PACKET_OUTGOING { @@ -124,8 +126,12 @@ func (t *TCPProcessor) updateTcpStats(conn *network.ConnectionStats, st *connect st.hasSentPacket = true conn.Monotonic.SentBytes += uint64(payloadLen) st.maxSeqSent = nextSeq + + st.rttTracker.processOutgoing(timestampNs, nextSeq) } else if packetCanRetransmit { conn.Monotonic.Retransmits++ + + st.rttTracker.clearTrip() } ackOutdated := !st.hasLocalAck || isSeqBefore(st.lastLocalAck, tcp.Ack) @@ -151,6 +157,12 @@ func (t *TCPProcessor) updateTcpStats(conn *network.ConnectionStats, st *connect if tcp.ACK && ackOutdated { st.hasRemoteAck = true st.lastRemoteAck = tcp.Ack + + hasNewRoundTrip := st.rttTracker.processIncoming(timestampNs, tcp.Ack) + if hasNewRoundTrip { + conn.RTT = nanosToMicros(st.rttTracker.rttSmoothNs) + conn.RTTVar = nanosToMicros(st.rttTracker.rttVarNs) + } } } } @@ -200,7 +212,7 @@ func (t *TCPProcessor) updateRstFlag(conn *network.ConnectionStats, st *connecti // Process handles a TCP packet, calculating stats and keeping track of its state according to the // TCP state machine. -func (t *TCPProcessor) Process(conn *network.ConnectionStats, pktType uint8, ip4 *layers.IPv4, ip6 *layers.IPv6, tcp *layers.TCP) error { +func (t *TCPProcessor) Process(conn *network.ConnectionStats, timestampNs uint64, pktType uint8, ip4 *layers.IPv4, ip6 *layers.IPv6, tcp *layers.TCP) error { if pktType != unix.PACKET_OUTGOING && pktType != unix.PACKET_HOST { return fmt.Errorf("TCPProcessor saw invalid pktType: %d", pktType) } @@ -221,7 +233,7 @@ func (t *TCPProcessor) Process(conn *network.ConnectionStats, pktType uint8, ip4 st := t.conns[conn.ConnectionTuple] t.updateSynFlag(conn, &st, pktType, tcp, payloadLen) - t.updateTcpStats(conn, &st, pktType, tcp, payloadLen) + t.updateTcpStats(conn, &st, pktType, tcp, payloadLen, timestampNs) t.updateFinFlag(conn, &st, pktType, tcp, payloadLen) t.updateRstFlag(conn, &st, pktType, tcp, payloadLen) diff --git a/pkg/network/tracer/connection/ebpfless/tcp_processor_rtt.go b/pkg/network/tracer/connection/ebpfless/tcp_processor_rtt.go new file mode 100644 index 0000000000000..c199ebf8f753e --- /dev/null +++ b/pkg/network/tracer/connection/ebpfless/tcp_processor_rtt.go @@ -0,0 +1,102 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2024-present Datadog, Inc. + +//go:build linux_bpf + +package ebpfless + +import ( + "time" + + "github.com/DataDog/datadog-agent/pkg/util/log" +) + +func absDiff(a uint64, b uint64) uint64 { + if a < b { + return b - a + } + return a - b +} + +// nanosToMicros converts nanoseconds to microseconds, rounding and converting to +// the uint32 type that ConnectionStats uses +func nanosToMicros(nanos uint64) uint32 { + micros := time.Duration(nanos).Round(time.Microsecond).Microseconds() + return uint32(micros) +} + +// rttTracker implements the RTT algorithm specified here: +// https://datatracker.ietf.org/doc/html/rfc6298#section-2 +type rttTracker struct { + // sampleSentTimeNs is the timestamp our current round trip began. + // If it is 0, there is nothing in flight or a retransmit cleared this + sampleSentTimeNs uint64 + // expectedAck is the ack needed to complete the round trip + expectedAck uint32 + // rttSmoothNs is the smoothed RTT in nanoseconds + rttSmoothNs uint64 + // rttVarNs is the variance of the RTT in nanoseconds + rttVarNs uint64 +} + +func (rt *rttTracker) isActive() bool { + return rt.sampleSentTimeNs > 0 +} + +// processOutgoing is called to (potentially) start a round trip. +// Records the time of the packet for later +func (rt *rttTracker) processOutgoing(timestampNs uint64, nextSeq uint32) { + if !rt.isActive() { + rt.sampleSentTimeNs = timestampNs + rt.expectedAck = nextSeq + } +} + +// clearTrip is called by a retransmit or when a round-trip completes +// Retransmits pollute RTT accuracy and cause a trip to be thrown out +func (rt *rttTracker) clearTrip() { + if rt.isActive() { + rt.sampleSentTimeNs = 0 + rt.expectedAck = 0 + } +} + +// processIncoming is called to (potentially) close out a round trip. +// Based off this https://github.com/DataDog/datadog-windows-filter/blob/d7560d83eb627117521d631a4c05cd654a01987e/ddfilter/flow/flow_tcp.c#L269 +// Returns whether the RTT stats were updated. +func (rt *rttTracker) processIncoming(timestampNs uint64, ack uint32) bool { + hasCompletedTrip := rt.isActive() && isSeqBeforeEq(rt.expectedAck, ack) + if !hasCompletedTrip { + return false + } + + elapsedNs := timestampNs - rt.sampleSentTimeNs + if timestampNs < rt.sampleSentTimeNs { + log.Warn("rttTracker encountered non-monotonic clock") + elapsedNs = 0 + } + rt.clearTrip() + + if rt.rttSmoothNs == 0 { + rt.rttSmoothNs = elapsedNs + rt.rttVarNs = elapsedNs / 2 + return true + } + + // update variables based on fixed point math. + // RFC 6298 says alpha=1/8 and beta=1/4 + const fixedBasis uint64 = 1000 + // SRTT < -(1 - alpha) * SRTT + alpha * R' + oneMinusAlpha := fixedBasis - (fixedBasis / 8) + alphaRPrime := elapsedNs / 8 + s := ((oneMinusAlpha * rt.rttSmoothNs) / fixedBasis) + alphaRPrime + rt.rttSmoothNs = s + + // RTTVAR <- (1 - beta) * RTTVAR + beta * |SRTT - R'| + oneMinusBeta := fixedBasis - fixedBasis/4 + rt.rttVarNs = (oneMinusBeta*rt.rttVarNs)/fixedBasis + absDiff(rt.rttSmoothNs, elapsedNs)/4 + + return true +} diff --git a/pkg/network/tracer/connection/ebpfless/tcp_processor_rtt_test.go b/pkg/network/tracer/connection/ebpfless/tcp_processor_rtt_test.go new file mode 100644 index 0000000000000..7ff2eca99a352 --- /dev/null +++ b/pkg/network/tracer/connection/ebpfless/tcp_processor_rtt_test.go @@ -0,0 +1,142 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2024-present Datadog, Inc. + +//go:build linux_bpf + +package ebpfless + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestNanosToMicros(t *testing.T) { + require.Equal(t, uint32(0), nanosToMicros(0)) + require.Equal(t, uint32(0), nanosToMicros(200)) + require.Equal(t, uint32(1), nanosToMicros(500)) + require.Equal(t, uint32(1), nanosToMicros(1000)) + require.Equal(t, uint32(1), nanosToMicros(1200)) + require.Equal(t, uint32(123), nanosToMicros(123*1000)) +} + +func TestSingleSampleRTT(t *testing.T) { + var rt rttTracker + + require.False(t, rt.isActive()) + + rt.processOutgoing(1000, 123) + require.True(t, rt.isActive()) + + hasUpdated := rt.processIncoming(2000, 42) + // ack is too low, not a round trip + require.False(t, hasUpdated) + + // ack is high enough to complete a round trip + hasUpdated = rt.processIncoming(3000, 123) + require.True(t, hasUpdated) + + require.Equal(t, uint64(2000), rt.rttSmoothNs) + require.Equal(t, uint64(1000), rt.rttVarNs) +} + +func TestLowVarianceRtt(t *testing.T) { + var rt rttTracker + + for i := range 10 { + ts := uint64(i + 1) + seq := uint32(123 + i) + + startNs := (2 * ts) * 1000 + endNs := startNs + 1000 + // round trip time always the 1000, so variance goes to 0 + rt.processOutgoing(startNs, seq) + hasUpdated := rt.processIncoming(endNs, seq) + require.True(t, hasUpdated) + require.Equal(t, rt.rttSmoothNs, uint64(1000)) + } + + // after 10 iterations, the variance should have mostly converged to zero + require.Less(t, rt.rttVarNs, uint64(100)) +} + +func TestConstantVarianceRtt(t *testing.T) { + var rt rttTracker + + for i := range 10 { + ts := uint64(i + 1) + seq := uint32(123 + i) + + startNs := (2 * ts) * 1000 + endNs := startNs + 500 + if i%2 == 0 { + endNs = startNs + 1000 + } + + // round trip time alternates between 500 and 100 + rt.processOutgoing(startNs, seq) + hasUpdated := rt.processIncoming(endNs, seq) + require.True(t, hasUpdated) + + require.LessOrEqual(t, uint64(500), rt.rttSmoothNs) + require.LessOrEqual(t, rt.rttSmoothNs, uint64(1000)) + } + + // This is not exact since it uses an exponential rolling sum + // In this test, the time delta alternates between 500 and 1000, + // so rttSmoothNs is 750, for an average difference of ~250. + const epsilon = 20 + require.Less(t, uint64(250-epsilon), rt.rttVarNs) + require.Less(t, rt.rttVarNs, uint64(250+epsilon)) +} + +func TestTcpProcessorRtt(t *testing.T) { + pb := newPacketBuilder(lowerSeq, higherSeq) + syn := pb.outgoing(0, 0, 0, SYN) + // t=200 us + syn.timestampNs = 200 * 1000 + synack := pb.incoming(0, 0, 1, SYN|ACK) + // t=300 us, for a round trip of 100us + synack.timestampNs = 300 * 1000 + + f := newTcpTestFixture(t) + + f.runPkt(syn) + // round trip has not completed yet + require.Zero(t, f.conn.RTT) + require.Zero(t, f.conn.RTTVar) + + f.runPkt(synack) + // round trip has completed in 100us + require.Equal(t, uint32(100), f.conn.RTT) + require.Equal(t, uint32(50), f.conn.RTTVar) +} + +func TestTcpProcessorRttRetransmit(t *testing.T) { + pb := newPacketBuilder(lowerSeq, higherSeq) + syn := pb.outgoing(0, 0, 0, SYN) + // t=200 us + syn.timestampNs = 200 * 1000 + synack := pb.incoming(0, 0, 1, SYN|ACK) + // t=300 us, for a round trip of 100us + synack.timestampNs = 300 * 1000 + + f := newTcpTestFixture(t) + + f.runPkt(syn) + // round trip has not completed yet + require.Zero(t, f.conn.RTT) + require.Zero(t, f.conn.RTTVar) + + f.runPkt(syn) + // this is a retransmit, should reset the round trip + require.Zero(t, f.conn.RTT) + require.Zero(t, f.conn.RTTVar) + + f.runPkt(synack) + // should STILL not have a round trip because the retransmit contaminated the results + require.Zero(t, f.conn.RTT) + require.Zero(t, f.conn.RTTVar) +} diff --git a/pkg/network/tracer/connection/ebpfless/tcp_processor_test.go b/pkg/network/tracer/connection/ebpfless/tcp_processor_test.go index 10716414d7848..2d7e1ba4bff33 100644 --- a/pkg/network/tracer/connection/ebpfless/tcp_processor_test.go +++ b/pkg/network/tracer/connection/ebpfless/tcp_processor_test.go @@ -71,10 +71,11 @@ func tcpPacket(srcPort, dstPort uint16, seq, ack uint32, flags uint8) layers.TCP } type testCapture struct { - pktType uint8 - ipv4 *layers.IPv4 - ipv6 *layers.IPv6 - tcp *layers.TCP + timestampNs uint64 + pktType uint8 + ipv4 *layers.IPv4 + ipv6 *layers.IPv6 + tcp *layers.TCP } // TODO can this be merged with the logic creating scratchConns in ebpfless tracer? @@ -142,10 +143,11 @@ func (pb packetBuilder) incoming(payloadLen uint16, relSeq, relAck uint32, flags ack := relAck + pb.remoteSeqBase tcp := tcpPacket(defaultRemotePort, defaultLocalPort, seq, ack, flags) return testCapture{ - pktType: unix.PACKET_HOST, - ipv4: &ipv4, - ipv6: nil, - tcp: &tcp, + timestampNs: 0, // timestampNs not populated except in tcp_processor_rtt_test + pktType: unix.PACKET_HOST, + ipv4: &ipv4, + ipv6: nil, + tcp: &tcp, } } @@ -155,10 +157,11 @@ func (pb packetBuilder) outgoing(payloadLen uint16, relSeq, relAck uint32, flags ack := relAck + pb.localSeqBase tcp := tcpPacket(defaultLocalPort, defaultRemotePort, seq, ack, flags) return testCapture{ - pktType: unix.PACKET_OUTGOING, - ipv4: &ipv4, - ipv6: nil, - tcp: &tcp, + timestampNs: 0, // timestampNs not populated except in tcp_processor_rtt_test + pktType: unix.PACKET_OUTGOING, + ipv4: &ipv4, + ipv6: nil, + tcp: &tcp, } } @@ -174,7 +177,7 @@ func (fixture *tcpTestFixture) runPkt(pkt testCapture) { if fixture.conn == nil { fixture.conn = makeTcpStates(pkt) } - err := fixture.tcp.Process(fixture.conn, pkt.pktType, pkt.ipv4, pkt.ipv6, pkt.tcp) + err := fixture.tcp.Process(fixture.conn, pkt.timestampNs, pkt.pktType, pkt.ipv4, pkt.ipv6, pkt.tcp) require.NoError(fixture.t, err) } diff --git a/pkg/network/tracer/connection/ebpfless_tracer.go b/pkg/network/tracer/connection/ebpfless_tracer.go index c4a5e9db93fdf..8bb0a54f170ec 100644 --- a/pkg/network/tracer/connection/ebpfless_tracer.go +++ b/pkg/network/tracer/connection/ebpfless_tracer.go @@ -196,10 +196,15 @@ func (t *ebpfLessTracer) processConnection( conn.Duration = time.Duration(time.Now().UnixNano()) } + var ts int64 + var err error + if ts, err = ddebpf.NowNanoseconds(); err != nil { + return fmt.Errorf("error getting last updated timestamp for connection: %w", err) + } + if ip4 == nil && ip6 == nil { return nil } - var err error switch conn.Type { case network.UDP: if (ip4 != nil && !t.config.CollectUDPv4Conns) || (ip6 != nil && !t.config.CollectUDPv6Conns) { @@ -210,7 +215,7 @@ func (t *ebpfLessTracer) processConnection( if (ip4 != nil && !t.config.CollectTCPv4Conns) || (ip6 != nil && !t.config.CollectTCPv6Conns) { return nil } - err = t.tcp.Process(conn, pktType, ip4, ip6, tcp) + err = t.tcp.Process(conn, uint64(ts), pktType, ip4, ip6, tcp) default: err = fmt.Errorf("unsupported connection type %d", conn.Type) } @@ -220,10 +225,6 @@ func (t *ebpfLessTracer) processConnection( } if conn.Type == network.UDP || conn.Monotonic.TCPEstablished > 0 { - var ts int64 - if ts, err = ddebpf.NowNanoseconds(); err != nil { - return fmt.Errorf("error getting last updated timestamp for connection: %w", err) - } conn.LastUpdateEpoch = uint64(ts) t.conns[t.scratchConn.ConnectionTuple] = conn } diff --git a/pkg/network/tracer/tracer_linux_test.go b/pkg/network/tracer/tracer_linux_test.go index 675eacdd2fc6e..519c0ef979e3b 100644 --- a/pkg/network/tracer/tracer_linux_test.go +++ b/pkg/network/tracer/tracer_linux_test.go @@ -274,7 +274,6 @@ func (s *TracerSuite) TestTCPRTT() { flake.Mark(t) } cfg := testConfig() - skipEbpflessTodo(t, cfg) // Enable BPF-based system probe tr := setupTracer(t, cfg) // Create TCP Server that simply "drains" connection until receiving an EOF @@ -301,14 +300,28 @@ func (s *TracerSuite) TestTCPRTT() { tcpInfo, err := offsetguess.TcpGetInfo(c) require.NoError(t, err) - // Fetch connection matching source and target address - allConnections := getConnections(t, tr) - conn, ok := findConnection(c.LocalAddr(), c.RemoteAddr(), allConnections) - require.True(t, ok) + require.EventuallyWithT(t, func(ct *assert.CollectT) { + // Fetch connection matching source and target address + allConnections := getConnections(t, tr) + conn, ok := findConnection(c.LocalAddr(), c.RemoteAddr(), allConnections) + if !assert.True(ct, ok) { + return + } - // Assert that values returned from syscall match ones generated by eBPF program - assert.EqualValues(t, int(tcpInfo.Rtt), int(conn.RTT)) - assert.EqualValues(t, int(tcpInfo.Rttvar), int(conn.RTTVar)) + if cfg.EnableEbpfless { + timeoutUs := uint32((10 * time.Second).Microseconds()) + // On ebpfless, we don't have the same timestamps as the kernel so all + // we can do is sanity check that RTT is nonzero and not huge + assert.Greater(ct, int(conn.RTT), 0) + assert.Less(ct, conn.RTT, timeoutUs) + assert.Greater(ct, int(conn.RTTVar), 0) + assert.Less(ct, conn.RTTVar, timeoutUs) + } else { + // Assert that values returned from syscall match ones generated by eBPF program + assert.EqualValues(ct, int(tcpInfo.Rtt), int(conn.RTT)) + assert.EqualValues(ct, int(tcpInfo.Rttvar), int(conn.RTTVar)) + } + }, 3*time.Second, 100*time.Millisecond) } func (s *TracerSuite) TestTCPMiscount() {