Skip to content

Commit

Permalink
[NPM-3586] Add RTT support to ebpf-less tracer (#31491)
Browse files Browse the repository at this point in the history
  • Loading branch information
pimlu authored Dec 4, 2024
1 parent 03b83e1 commit 5ccf0bc
Show file tree
Hide file tree
Showing 6 changed files with 303 additions and 30 deletions.
18 changes: 15 additions & 3 deletions pkg/network/tracer/connection/ebpfless/tcp_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type connectionState struct {
localFinSeq uint32
// remoteFinSeq is the tcp.Seq number for the incoming FIN (including any payload length)
remoteFinSeq uint32

rttTracker rttTracker
}

type TCPProcessor struct { //nolint:revive // TODO
Expand Down Expand Up @@ -113,7 +115,7 @@ func (t *TCPProcessor) updateSynFlag(conn *network.ConnectionStats, st *connecti

// updateTcpStats is designed to mirror the stat tracking in the windows driver's handleFlowProtocolTcp
// https://github.com/DataDog/datadog-windows-filter/blob/d7560d83eb627117521d631a4c05cd654a01987e/ddfilter/flow/flow_tcp.c#L91
func (t *TCPProcessor) updateTcpStats(conn *network.ConnectionStats, st *connectionState, pktType uint8, tcp *layers.TCP, payloadLen uint16) { //nolint:revive // TODO
func (t *TCPProcessor) updateTcpStats(conn *network.ConnectionStats, st *connectionState, pktType uint8, tcp *layers.TCP, payloadLen uint16, timestampNs uint64) { //nolint:revive // TODO
nextSeq := calcNextSeq(tcp, payloadLen)

if pktType == unix.PACKET_OUTGOING {
Expand All @@ -124,8 +126,12 @@ func (t *TCPProcessor) updateTcpStats(conn *network.ConnectionStats, st *connect
st.hasSentPacket = true
conn.Monotonic.SentBytes += uint64(payloadLen)
st.maxSeqSent = nextSeq

st.rttTracker.processOutgoing(timestampNs, nextSeq)
} else if packetCanRetransmit {
conn.Monotonic.Retransmits++

st.rttTracker.clearTrip()
}

ackOutdated := !st.hasLocalAck || isSeqBefore(st.lastLocalAck, tcp.Ack)
Expand All @@ -151,6 +157,12 @@ func (t *TCPProcessor) updateTcpStats(conn *network.ConnectionStats, st *connect
if tcp.ACK && ackOutdated {
st.hasRemoteAck = true
st.lastRemoteAck = tcp.Ack

hasNewRoundTrip := st.rttTracker.processIncoming(timestampNs, tcp.Ack)
if hasNewRoundTrip {
conn.RTT = nanosToMicros(st.rttTracker.rttSmoothNs)
conn.RTTVar = nanosToMicros(st.rttTracker.rttVarNs)
}
}
}
}
Expand Down Expand Up @@ -200,7 +212,7 @@ func (t *TCPProcessor) updateRstFlag(conn *network.ConnectionStats, st *connecti

// Process handles a TCP packet, calculating stats and keeping track of its state according to the
// TCP state machine.
func (t *TCPProcessor) Process(conn *network.ConnectionStats, pktType uint8, ip4 *layers.IPv4, ip6 *layers.IPv6, tcp *layers.TCP) error {
func (t *TCPProcessor) Process(conn *network.ConnectionStats, timestampNs uint64, pktType uint8, ip4 *layers.IPv4, ip6 *layers.IPv6, tcp *layers.TCP) error {
if pktType != unix.PACKET_OUTGOING && pktType != unix.PACKET_HOST {
return fmt.Errorf("TCPProcessor saw invalid pktType: %d", pktType)
}
Expand All @@ -221,7 +233,7 @@ func (t *TCPProcessor) Process(conn *network.ConnectionStats, pktType uint8, ip4
st := t.conns[conn.ConnectionTuple]

t.updateSynFlag(conn, &st, pktType, tcp, payloadLen)
t.updateTcpStats(conn, &st, pktType, tcp, payloadLen)
t.updateTcpStats(conn, &st, pktType, tcp, payloadLen, timestampNs)
t.updateFinFlag(conn, &st, pktType, tcp, payloadLen)
t.updateRstFlag(conn, &st, pktType, tcp, payloadLen)

Expand Down
102 changes: 102 additions & 0 deletions pkg/network/tracer/connection/ebpfless/tcp_processor_rtt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024-present Datadog, Inc.

//go:build linux_bpf

package ebpfless

import (
"time"

"github.com/DataDog/datadog-agent/pkg/util/log"
)

func absDiff(a uint64, b uint64) uint64 {
if a < b {
return b - a
}
return a - b
}

// nanosToMicros converts nanoseconds to microseconds, rounding and converting to
// the uint32 type that ConnectionStats uses
func nanosToMicros(nanos uint64) uint32 {
micros := time.Duration(nanos).Round(time.Microsecond).Microseconds()
return uint32(micros)
}

// rttTracker implements the RTT algorithm specified here:
// https://datatracker.ietf.org/doc/html/rfc6298#section-2
type rttTracker struct {
// sampleSentTimeNs is the timestamp our current round trip began.
// If it is 0, there is nothing in flight or a retransmit cleared this
sampleSentTimeNs uint64
// expectedAck is the ack needed to complete the round trip
expectedAck uint32
// rttSmoothNs is the smoothed RTT in nanoseconds
rttSmoothNs uint64
// rttVarNs is the variance of the RTT in nanoseconds
rttVarNs uint64
}

func (rt *rttTracker) isActive() bool {
return rt.sampleSentTimeNs > 0
}

// processOutgoing is called to (potentially) start a round trip.
// Records the time of the packet for later
func (rt *rttTracker) processOutgoing(timestampNs uint64, nextSeq uint32) {
if !rt.isActive() {
rt.sampleSentTimeNs = timestampNs
rt.expectedAck = nextSeq
}
}

// clearTrip is called by a retransmit or when a round-trip completes
// Retransmits pollute RTT accuracy and cause a trip to be thrown out
func (rt *rttTracker) clearTrip() {
if rt.isActive() {
rt.sampleSentTimeNs = 0
rt.expectedAck = 0
}
}

// processIncoming is called to (potentially) close out a round trip.
// Based off this https://github.com/DataDog/datadog-windows-filter/blob/d7560d83eb627117521d631a4c05cd654a01987e/ddfilter/flow/flow_tcp.c#L269
// Returns whether the RTT stats were updated.
func (rt *rttTracker) processIncoming(timestampNs uint64, ack uint32) bool {
hasCompletedTrip := rt.isActive() && isSeqBeforeEq(rt.expectedAck, ack)
if !hasCompletedTrip {
return false
}

elapsedNs := timestampNs - rt.sampleSentTimeNs
if timestampNs < rt.sampleSentTimeNs {
log.Warn("rttTracker encountered non-monotonic clock")
elapsedNs = 0
}
rt.clearTrip()

if rt.rttSmoothNs == 0 {
rt.rttSmoothNs = elapsedNs
rt.rttVarNs = elapsedNs / 2
return true
}

// update variables based on fixed point math.
// RFC 6298 says alpha=1/8 and beta=1/4
const fixedBasis uint64 = 1000
// SRTT < -(1 - alpha) * SRTT + alpha * R'
oneMinusAlpha := fixedBasis - (fixedBasis / 8)
alphaRPrime := elapsedNs / 8
s := ((oneMinusAlpha * rt.rttSmoothNs) / fixedBasis) + alphaRPrime
rt.rttSmoothNs = s

// RTTVAR <- (1 - beta) * RTTVAR + beta * |SRTT - R'|
oneMinusBeta := fixedBasis - fixedBasis/4
rt.rttVarNs = (oneMinusBeta*rt.rttVarNs)/fixedBasis + absDiff(rt.rttSmoothNs, elapsedNs)/4

return true
}
142 changes: 142 additions & 0 deletions pkg/network/tracer/connection/ebpfless/tcp_processor_rtt_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024-present Datadog, Inc.

//go:build linux_bpf

package ebpfless

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestNanosToMicros(t *testing.T) {
require.Equal(t, uint32(0), nanosToMicros(0))
require.Equal(t, uint32(0), nanosToMicros(200))
require.Equal(t, uint32(1), nanosToMicros(500))
require.Equal(t, uint32(1), nanosToMicros(1000))
require.Equal(t, uint32(1), nanosToMicros(1200))
require.Equal(t, uint32(123), nanosToMicros(123*1000))
}

func TestSingleSampleRTT(t *testing.T) {
var rt rttTracker

require.False(t, rt.isActive())

rt.processOutgoing(1000, 123)
require.True(t, rt.isActive())

hasUpdated := rt.processIncoming(2000, 42)
// ack is too low, not a round trip
require.False(t, hasUpdated)

// ack is high enough to complete a round trip
hasUpdated = rt.processIncoming(3000, 123)
require.True(t, hasUpdated)

require.Equal(t, uint64(2000), rt.rttSmoothNs)
require.Equal(t, uint64(1000), rt.rttVarNs)
}

func TestLowVarianceRtt(t *testing.T) {
var rt rttTracker

for i := range 10 {
ts := uint64(i + 1)
seq := uint32(123 + i)

startNs := (2 * ts) * 1000
endNs := startNs + 1000
// round trip time always the 1000, so variance goes to 0
rt.processOutgoing(startNs, seq)
hasUpdated := rt.processIncoming(endNs, seq)
require.True(t, hasUpdated)
require.Equal(t, rt.rttSmoothNs, uint64(1000))
}

// after 10 iterations, the variance should have mostly converged to zero
require.Less(t, rt.rttVarNs, uint64(100))
}

func TestConstantVarianceRtt(t *testing.T) {
var rt rttTracker

for i := range 10 {
ts := uint64(i + 1)
seq := uint32(123 + i)

startNs := (2 * ts) * 1000
endNs := startNs + 500
if i%2 == 0 {
endNs = startNs + 1000
}

// round trip time alternates between 500 and 100
rt.processOutgoing(startNs, seq)
hasUpdated := rt.processIncoming(endNs, seq)
require.True(t, hasUpdated)

require.LessOrEqual(t, uint64(500), rt.rttSmoothNs)
require.LessOrEqual(t, rt.rttSmoothNs, uint64(1000))
}

// This is not exact since it uses an exponential rolling sum
// In this test, the time delta alternates between 500 and 1000,
// so rttSmoothNs is 750, for an average difference of ~250.
const epsilon = 20
require.Less(t, uint64(250-epsilon), rt.rttVarNs)
require.Less(t, rt.rttVarNs, uint64(250+epsilon))
}

func TestTcpProcessorRtt(t *testing.T) {
pb := newPacketBuilder(lowerSeq, higherSeq)
syn := pb.outgoing(0, 0, 0, SYN)
// t=200 us
syn.timestampNs = 200 * 1000
synack := pb.incoming(0, 0, 1, SYN|ACK)
// t=300 us, for a round trip of 100us
synack.timestampNs = 300 * 1000

f := newTcpTestFixture(t)

f.runPkt(syn)
// round trip has not completed yet
require.Zero(t, f.conn.RTT)
require.Zero(t, f.conn.RTTVar)

f.runPkt(synack)
// round trip has completed in 100us
require.Equal(t, uint32(100), f.conn.RTT)
require.Equal(t, uint32(50), f.conn.RTTVar)
}

func TestTcpProcessorRttRetransmit(t *testing.T) {
pb := newPacketBuilder(lowerSeq, higherSeq)
syn := pb.outgoing(0, 0, 0, SYN)
// t=200 us
syn.timestampNs = 200 * 1000
synack := pb.incoming(0, 0, 1, SYN|ACK)
// t=300 us, for a round trip of 100us
synack.timestampNs = 300 * 1000

f := newTcpTestFixture(t)

f.runPkt(syn)
// round trip has not completed yet
require.Zero(t, f.conn.RTT)
require.Zero(t, f.conn.RTTVar)

f.runPkt(syn)
// this is a retransmit, should reset the round trip
require.Zero(t, f.conn.RTT)
require.Zero(t, f.conn.RTTVar)

f.runPkt(synack)
// should STILL not have a round trip because the retransmit contaminated the results
require.Zero(t, f.conn.RTT)
require.Zero(t, f.conn.RTTVar)
}
29 changes: 16 additions & 13 deletions pkg/network/tracer/connection/ebpfless/tcp_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,11 @@ func tcpPacket(srcPort, dstPort uint16, seq, ack uint32, flags uint8) layers.TCP
}

type testCapture struct {
pktType uint8
ipv4 *layers.IPv4
ipv6 *layers.IPv6
tcp *layers.TCP
timestampNs uint64
pktType uint8
ipv4 *layers.IPv4
ipv6 *layers.IPv6
tcp *layers.TCP
}

// TODO can this be merged with the logic creating scratchConns in ebpfless tracer?
Expand Down Expand Up @@ -142,10 +143,11 @@ func (pb packetBuilder) incoming(payloadLen uint16, relSeq, relAck uint32, flags
ack := relAck + pb.remoteSeqBase
tcp := tcpPacket(defaultRemotePort, defaultLocalPort, seq, ack, flags)
return testCapture{
pktType: unix.PACKET_HOST,
ipv4: &ipv4,
ipv6: nil,
tcp: &tcp,
timestampNs: 0, // timestampNs not populated except in tcp_processor_rtt_test
pktType: unix.PACKET_HOST,
ipv4: &ipv4,
ipv6: nil,
tcp: &tcp,
}
}

Expand All @@ -155,10 +157,11 @@ func (pb packetBuilder) outgoing(payloadLen uint16, relSeq, relAck uint32, flags
ack := relAck + pb.localSeqBase
tcp := tcpPacket(defaultLocalPort, defaultRemotePort, seq, ack, flags)
return testCapture{
pktType: unix.PACKET_OUTGOING,
ipv4: &ipv4,
ipv6: nil,
tcp: &tcp,
timestampNs: 0, // timestampNs not populated except in tcp_processor_rtt_test
pktType: unix.PACKET_OUTGOING,
ipv4: &ipv4,
ipv6: nil,
tcp: &tcp,
}
}

Expand All @@ -174,7 +177,7 @@ func (fixture *tcpTestFixture) runPkt(pkt testCapture) {
if fixture.conn == nil {
fixture.conn = makeTcpStates(pkt)
}
err := fixture.tcp.Process(fixture.conn, pkt.pktType, pkt.ipv4, pkt.ipv6, pkt.tcp)
err := fixture.tcp.Process(fixture.conn, pkt.timestampNs, pkt.pktType, pkt.ipv4, pkt.ipv6, pkt.tcp)
require.NoError(fixture.t, err)
}

Expand Down
Loading

0 comments on commit 5ccf0bc

Please sign in to comment.