Skip to content

Commit

Permalink
[ebpfless] Fix the pre-existing connection tests (#31858)
Browse files Browse the repository at this point in the history
  • Loading branch information
pimlu authored Dec 16, 2024
1 parent 898efd4 commit 29d90d3
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 36 deletions.
32 changes: 28 additions & 4 deletions pkg/network/tracer/connection/ebpfless/tcp_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package ebpfless

import (
"fmt"
"github.com/DataDog/datadog-agent/pkg/util/log"
"syscall"
"time"

Expand All @@ -17,7 +18,6 @@ import (
"github.com/google/gopacket/layers"

"github.com/DataDog/datadog-agent/pkg/network"
"github.com/DataDog/datadog-agent/pkg/util/log"
)

type connectionState struct {
Expand Down Expand Up @@ -51,9 +51,14 @@ type connectionState struct {
// remoteFinSeq is the tcp.Seq number for the incoming FIN (including any payload length)
remoteFinSeq uint32

// rttTracker is used to track round trip times
rttTracker rttTracker
}

func (st *connectionState) hasMissedHandshake() bool {
return st.localSynState == synStateMissed || st.remoteSynState == synStateMissed
}

// TCPProcessor encapsulates TCP state tracking for the ebpfless tracer
type TCPProcessor struct {
conns map[network.ConnectionTuple]connectionState
Expand Down Expand Up @@ -125,9 +130,13 @@ func (t *TCPProcessor) updateSynFlag(conn *network.ConnectionStats, st *connecti
updateConnStatsForOpen(conn)
}
// if both synStates are ack'd, move to established
if st.tcpState == connStatAttempted && st.localSynState == synStateAcked && st.remoteSynState == synStateAcked {
if st.tcpState == connStatAttempted && st.localSynState.isSynAcked() && st.remoteSynState.isSynAcked() {
st.tcpState = connStatEstablished
conn.Monotonic.TCPEstablished++
if st.hasMissedHandshake() {
statsTelemetry.missedTCPConnections.Inc()
} else {
conn.Monotonic.TCPEstablished++
}
}
}

Expand Down Expand Up @@ -155,7 +164,7 @@ func (t *TCPProcessor) updateTCPStats(conn *network.ConnectionStats, st *connect
ackOutdated := !st.hasLocalAck || isSeqBefore(st.lastLocalAck, tcp.Ack)
if tcp.ACK && ackOutdated {
// wait until data comes in via synStateAcked
if st.hasLocalAck && st.remoteSynState == synStateAcked {
if st.hasLocalAck && st.remoteSynState.isSynAcked() {
ackDiff := tcp.Ack - st.lastLocalAck
isFinAck := st.hasRemoteFin && tcp.Ack == st.remoteFinSeq
if isFinAck {
Expand Down Expand Up @@ -260,3 +269,18 @@ func (t *TCPProcessor) Process(conn *network.ConnectionStats, timestampNs uint64
t.conns[conn.ConnectionTuple] = st
return nil
}

// HasConnEverEstablished is used to avoid a connection appearing before the three-way handshake is complete.
// This is done to mirror the behavior of ebpf tracing accept() and connect(), which both return
// after the handshake is completed.
func (t *TCPProcessor) HasConnEverEstablished(conn *network.ConnectionStats) bool {
st := t.conns[conn.ConnectionTuple]

// conn.Monotonic.TCPEstablished can be 0 even though isEstablished is true,
// because pre-existing connections don't increment TCPEstablished.
// That's why we use tcpState instead of conn
isEstablished := st.tcpState == connStatEstablished
// if the connection has closed in any way, report that too
hasEverClosed := conn.Monotonic.TCPClosed > 0 || len(conn.TCPFailures) > 0
return isEstablished || hasEverClosed
}
31 changes: 30 additions & 1 deletion pkg/network/tracer/connection/ebpfless/tcp_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (fixture *tcpTestFixture) runPkt(pkt testCapture) {
require.NoError(fixture.t, err)
}

func (fixture *tcpTestFixture) runPkts(packets []testCapture) { //nolint:unused // TODO
func (fixture *tcpTestFixture) runPkts(packets []testCapture) {
for _, pkt := range packets {
fixture.runPkt(pkt)
}
Expand Down Expand Up @@ -775,3 +775,32 @@ func TestOpenCloseConn(t *testing.T) {
f.runPkt(pb.incoming(0, 0, 0, SYN))
require.False(t, f.conn.IsClosed)
}
func TestPreexistingConn(t *testing.T) {
pb := newPacketBuilder(lowerSeq, higherSeq)

f := newTCPTestFixture(t)

capture := []testCapture{
// just sending data, no SYN
pb.outgoing(1, 10, 10, ACK),
pb.incoming(1, 10, 11, ACK),
// active close after sending no data
pb.outgoing(0, 11, 11, FIN|ACK),
pb.incoming(0, 11, 12, FIN|ACK),
pb.outgoing(0, 12, 12, ACK),
}
f.runPkts(capture)

require.Empty(t, f.conn.TCPFailures)

expectedStats := network.StatCounters{
SentBytes: 1,
RecvBytes: 1,
SentPackets: 3,
RecvPackets: 2,
Retransmits: 0,
TCPEstablished: 0, // we missed when it established
TCPClosed: 1,
}
require.Equal(t, expectedStats, f.conn.Monotonic)
}
18 changes: 16 additions & 2 deletions pkg/network/tracer/connection/ebpfless/tcp_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,16 @@ var connStatusLabels = []string{
type synState uint8

const (
// synStateNone - Nothing seen yet (initial state)
synStateNone synState = iota
// synStateSent - We have seen the SYN but not its ACK
synStateSent
// synStateAcked - SYN is ACK'd for this side of the connection.
// If both sides are synStateAcked, the connection is established.
synStateAcked
// synStateMissed is effectively the same as synStateAcked but represents
// capturing a preexisting connection where we didn't get to see the SYN.
synStateMissed
)

func (ss *synState) update(synFlag, ackFlag bool) {
Expand All @@ -68,13 +75,20 @@ func (ss *synState) update(synFlag, ackFlag bool) {
if *ss == synStateSent && ackFlag {
*ss = synStateAcked
}

// this allows synStateMissed to recover via SYN in order to pass TestUnusualAckSyn
if *ss == synStateMissed && synFlag {
*ss = synStateAcked
}
// if we see ACK'd traffic but missed the SYN, assume the connection started before
// the datadog-agent starts.
if *ss == synStateNone && ackFlag {
statsTelemetry.missedTCPConnections.Inc()
*ss = synStateAcked
*ss = synStateMissed
}
}
func (ss *synState) isSynAcked() bool {
return *ss == synStateAcked || *ss == synStateMissed
}

func labelForState(tcpState connStatus) string {
idx := int(tcpState)
Expand Down
18 changes: 11 additions & 7 deletions pkg/network/tracer/connection/ebpfless_tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,20 +148,23 @@ func (t *ebpfLessTracer) processConnection(
tcp *layers.TCP,
decoded []gopacket.LayerType,
) error {

t.scratchConn.Source, t.scratchConn.Dest = util.Address{}, util.Address{}
t.scratchConn.SPort, t.scratchConn.DPort = 0, 0
t.scratchConn.TCPFailures = make(map[uint16]uint32)
var udpPresent, tcpPresent bool
var ip4Present, ip6Present, udpPresent, tcpPresent bool
for _, layerType := range decoded {
switch layerType {
case layers.LayerTypeIPv4:
t.scratchConn.Source = util.AddressFromNetIP(ip4.SrcIP)
t.scratchConn.Dest = util.AddressFromNetIP(ip4.DstIP)
t.scratchConn.Family = network.AFINET
ip4Present = true
case layers.LayerTypeIPv6:
t.scratchConn.Source = util.AddressFromNetIP(ip6.SrcIP)
t.scratchConn.Dest = util.AddressFromNetIP(ip6.DstIP)
t.scratchConn.Family = network.AFINET6
ip6Present = true
case layers.LayerTypeTCP:
t.scratchConn.SPort = uint16(tcp.SrcPort)
t.scratchConn.DPort = uint16(tcp.DstPort)
Expand All @@ -175,15 +178,15 @@ func (t *ebpfLessTracer) processConnection(
}
}

// check if have all the basic pieces
// check if we have all the basic pieces
if !udpPresent && !tcpPresent {
log.Debugf("ignoring packet since its not udp or tcp")
ebpfLessTracerTelemetry.skippedPackets.Inc("not_tcp_udp")
return nil
}

flipSourceDest(t.scratchConn, pktType)
t.determineConnectionDirection(t.scratchConn, pktType)
flipSourceDest(t.scratchConn, pktType)

t.m.Lock()
defer t.m.Unlock()
Expand All @@ -202,17 +205,17 @@ func (t *ebpfLessTracer) processConnection(
return fmt.Errorf("error getting last updated timestamp for connection: %w", err)
}

if ip4 == nil && ip6 == nil {
if !ip4Present && !ip6Present {
return nil
}
switch conn.Type {
case network.UDP:
if (ip4 != nil && !t.config.CollectUDPv4Conns) || (ip6 != nil && !t.config.CollectUDPv6Conns) {
if (ip4Present && !t.config.CollectUDPv4Conns) || (ip6Present && !t.config.CollectUDPv6Conns) {
return nil
}
err = t.udp.process(conn, pktType, udp)
case network.TCP:
if (ip4 != nil && !t.config.CollectTCPv4Conns) || (ip6 != nil && !t.config.CollectTCPv6Conns) {
if (ip4Present && !t.config.CollectTCPv4Conns) || (ip6Present && !t.config.CollectTCPv6Conns) {
return nil
}
err = t.tcp.Process(conn, uint64(ts), pktType, ip4, ip6, tcp)
Expand All @@ -224,7 +227,8 @@ func (t *ebpfLessTracer) processConnection(
return fmt.Errorf("error processing connection: %w", err)
}

if conn.Type == network.UDP || conn.Monotonic.TCPEstablished > 0 {
// TODO probably remove HasConnEverEstablished once we handle closed connections properly
if conn.Type == network.UDP || (conn.Type == network.TCP && t.tcp.HasConnEverEstablished(conn)) {
conn.LastUpdateEpoch = uint64(ts)
t.conns[t.scratchConn.ConnectionTuple] = conn
}
Expand Down
62 changes: 40 additions & 22 deletions pkg/network/tracer/tracer_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1979,16 +1979,16 @@ func (s *TracerSuite) TestPreexistingConnectionDirection() {
t := s.T()
// Start the client and server before we enable the system probe to test that the tracer picks
// up the pre-existing connection

server := tracertestutil.NewTCPServer(func(c net.Conn) {
r := bufio.NewReader(c)
for {
if _, err := r.ReadBytes(byte('\n')); err != nil {
assert.ErrorIs(t, err, io.EOF, "exited server loop with error that is not EOF")
return
break
}
_, _ = c.Write(genPayload(serverMessageSize))
}
c.Close()
})
t.Cleanup(server.Shutdown)
require.NoError(t, server.Run())
Expand Down Expand Up @@ -2023,37 +2023,55 @@ func (s *TracerSuite) TestPreexistingConnectionDirection() {
}
require.NotNil(collect, outgoing)
require.NotNil(collect, incoming)
}, 3*time.Second, 100*time.Millisecond, "could not find connection incoming and outgoing connections")
if !assert.True(collect, incoming != nil && outgoing != nil) {
return
}

m := outgoing.Monotonic
assert.Equal(t, clientMessageSize, int(m.SentBytes))
assert.Equal(t, serverMessageSize, int(m.RecvBytes))
if !tr.config.EnableEbpfless {
assert.Equal(t, os.Getpid(), int(outgoing.Pid))
}
assert.Equal(t, addrPort(server.Address()), int(outgoing.DPort))
assert.Equal(t, c.LocalAddr().(*net.TCPAddr).Port, int(outgoing.SPort))
assert.Equal(t, network.OUTGOING, outgoing.Direction)
m := outgoing.Monotonic
assert.Equal(collect, clientMessageSize, int(m.SentBytes))
// ebpfless RecvBytes is based off acknowledgements, so it can miss the first
// packet in a pre-existing connection
if !tr.config.EnableEbpfless {
assert.Equal(collect, serverMessageSize, int(m.RecvBytes))
}
if !tr.config.EnableEbpfless {
assert.Equal(collect, os.Getpid(), int(outgoing.Pid))
}
assert.Equal(collect, addrPort(server.Address()), int(outgoing.DPort))
assert.Equal(collect, c.LocalAddr().(*net.TCPAddr).Port, int(outgoing.SPort))
assert.Equal(collect, network.OUTGOING, outgoing.Direction)

m = incoming.Monotonic
// ebpfless RecvBytes is based off acknowledgements, so it can miss the first
// packet in a pre-existing connection
if !tr.config.EnableEbpfless {
assert.Equal(collect, clientMessageSize, int(m.RecvBytes))
}
assert.Equal(collect, serverMessageSize, int(m.SentBytes))
if !tr.config.EnableEbpfless {
assert.Equal(collect, os.Getpid(), int(incoming.Pid))
}
assert.Equal(collect, addrPort(server.Address()), int(incoming.SPort))
assert.Equal(collect, c.LocalAddr().(*net.TCPAddr).Port, int(incoming.DPort))
assert.Equal(collect, network.INCOMING, incoming.Direction)
}, 3*time.Second, 100*time.Millisecond, "could not find connection incoming and outgoing connections")

m = incoming.Monotonic
assert.Equal(t, clientMessageSize, int(m.RecvBytes))
assert.Equal(t, serverMessageSize, int(m.SentBytes))
if !tr.config.EnableEbpfless {
assert.Equal(t, os.Getpid(), int(incoming.Pid))
}
assert.Equal(t, addrPort(server.Address()), int(incoming.SPort))
assert.Equal(t, c.LocalAddr().(*net.TCPAddr).Port, int(incoming.DPort))
assert.Equal(t, network.INCOMING, incoming.Direction)
}

func (s *TracerSuite) TestPreexistingEmptyIncomingConnectionDirection() {
t := s.T()

// The ebpf tracer uses this to ensure it drops pre-existing connections
// that close empty (with no data), because they are difficult to track.
// However, in ebpfless they are easy to track, so disable this test.
// For more context, see PR #31100
skipOnEbpflessNotSupported(t, testConfig())

t.Run("ringbuf_enabled", func(t *testing.T) {
if features.HaveMapType(ebpf.RingBuf) != nil {
t.Skip("skipping test as ringbuffers are not supported on this kernel")
}
c := testConfig()
skipOnEbpflessNotSupported(t, c)
c.NPMRingbuffersEnabled = true
testPreexistingEmptyIncomingConnectionDirection(t, c)
})
Expand Down
1 change: 1 addition & 0 deletions pkg/network/tracer/tracer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1130,6 +1130,7 @@ func (s *TracerSuite) TestTCPEstablishedPreExistingConn() {
c, err := net.DialTimeout("tcp", server.Address(), 50*time.Millisecond)
require.NoError(t, err)
laddr, raddr := c.LocalAddr(), c.RemoteAddr()
t.Logf("laddr=%s raddr=%s", laddr, raddr)

// Ensure closed connections are flushed as soon as possible
cfg := testConfig()
Expand Down

0 comments on commit 29d90d3

Please sign in to comment.