diff --git a/pkg/network/tracer/connection/ebpfless/tcp_processor.go b/pkg/network/tracer/connection/ebpfless/tcp_processor.go index b312f460269ee..8d530fc3cd70a 100644 --- a/pkg/network/tracer/connection/ebpfless/tcp_processor.go +++ b/pkg/network/tracer/connection/ebpfless/tcp_processor.go @@ -9,6 +9,7 @@ package ebpfless import ( "fmt" + "github.com/DataDog/datadog-agent/pkg/util/log" "syscall" "time" @@ -17,7 +18,6 @@ import ( "github.com/google/gopacket/layers" "github.com/DataDog/datadog-agent/pkg/network" - "github.com/DataDog/datadog-agent/pkg/util/log" ) type connectionState struct { @@ -51,9 +51,14 @@ type connectionState struct { // remoteFinSeq is the tcp.Seq number for the incoming FIN (including any payload length) remoteFinSeq uint32 + // rttTracker is used to track round trip times rttTracker rttTracker } +func (st *connectionState) hasMissedHandshake() bool { + return st.localSynState == synStateMissed || st.remoteSynState == synStateMissed +} + // TCPProcessor encapsulates TCP state tracking for the ebpfless tracer type TCPProcessor struct { conns map[network.ConnectionTuple]connectionState @@ -125,9 +130,13 @@ func (t *TCPProcessor) updateSynFlag(conn *network.ConnectionStats, st *connecti updateConnStatsForOpen(conn) } // if both synStates are ack'd, move to established - if st.tcpState == connStatAttempted && st.localSynState == synStateAcked && st.remoteSynState == synStateAcked { + if st.tcpState == connStatAttempted && st.localSynState.isSynAcked() && st.remoteSynState.isSynAcked() { st.tcpState = connStatEstablished - conn.Monotonic.TCPEstablished++ + if st.hasMissedHandshake() { + statsTelemetry.missedTCPConnections.Inc() + } else { + conn.Monotonic.TCPEstablished++ + } } } @@ -155,7 +164,7 @@ func (t *TCPProcessor) updateTCPStats(conn *network.ConnectionStats, st *connect ackOutdated := !st.hasLocalAck || isSeqBefore(st.lastLocalAck, tcp.Ack) if tcp.ACK && ackOutdated { // wait until data comes in via synStateAcked - if st.hasLocalAck && st.remoteSynState == synStateAcked { + if st.hasLocalAck && st.remoteSynState.isSynAcked() { ackDiff := tcp.Ack - st.lastLocalAck isFinAck := st.hasRemoteFin && tcp.Ack == st.remoteFinSeq if isFinAck { @@ -260,3 +269,18 @@ func (t *TCPProcessor) Process(conn *network.ConnectionStats, timestampNs uint64 t.conns[conn.ConnectionTuple] = st return nil } + +// HasConnEverEstablished is used to avoid a connection appearing before the three-way handshake is complete. +// This is done to mirror the behavior of ebpf tracing accept() and connect(), which both return +// after the handshake is completed. +func (t *TCPProcessor) HasConnEverEstablished(conn *network.ConnectionStats) bool { + st := t.conns[conn.ConnectionTuple] + + // conn.Monotonic.TCPEstablished can be 0 even though isEstablished is true, + // because pre-existing connections don't increment TCPEstablished. + // That's why we use tcpState instead of conn + isEstablished := st.tcpState == connStatEstablished + // if the connection has closed in any way, report that too + hasEverClosed := conn.Monotonic.TCPClosed > 0 || len(conn.TCPFailures) > 0 + return isEstablished || hasEverClosed +} diff --git a/pkg/network/tracer/connection/ebpfless/tcp_processor_test.go b/pkg/network/tracer/connection/ebpfless/tcp_processor_test.go index 8656a73463d36..f19729579a6bc 100644 --- a/pkg/network/tracer/connection/ebpfless/tcp_processor_test.go +++ b/pkg/network/tracer/connection/ebpfless/tcp_processor_test.go @@ -181,7 +181,7 @@ func (fixture *tcpTestFixture) runPkt(pkt testCapture) { require.NoError(fixture.t, err) } -func (fixture *tcpTestFixture) runPkts(packets []testCapture) { //nolint:unused // TODO +func (fixture *tcpTestFixture) runPkts(packets []testCapture) { for _, pkt := range packets { fixture.runPkt(pkt) } @@ -775,3 +775,32 @@ func TestOpenCloseConn(t *testing.T) { f.runPkt(pb.incoming(0, 0, 0, SYN)) require.False(t, f.conn.IsClosed) } +func TestPreexistingConn(t *testing.T) { + pb := newPacketBuilder(lowerSeq, higherSeq) + + f := newTCPTestFixture(t) + + capture := []testCapture{ + // just sending data, no SYN + pb.outgoing(1, 10, 10, ACK), + pb.incoming(1, 10, 11, ACK), + // active close after sending no data + pb.outgoing(0, 11, 11, FIN|ACK), + pb.incoming(0, 11, 12, FIN|ACK), + pb.outgoing(0, 12, 12, ACK), + } + f.runPkts(capture) + + require.Empty(t, f.conn.TCPFailures) + + expectedStats := network.StatCounters{ + SentBytes: 1, + RecvBytes: 1, + SentPackets: 3, + RecvPackets: 2, + Retransmits: 0, + TCPEstablished: 0, // we missed when it established + TCPClosed: 1, + } + require.Equal(t, expectedStats, f.conn.Monotonic) +} diff --git a/pkg/network/tracer/connection/ebpfless/tcp_utils.go b/pkg/network/tracer/connection/ebpfless/tcp_utils.go index 1969fd85a5bc1..9ebe4d778969d 100644 --- a/pkg/network/tracer/connection/ebpfless/tcp_utils.go +++ b/pkg/network/tracer/connection/ebpfless/tcp_utils.go @@ -54,9 +54,16 @@ var connStatusLabels = []string{ type synState uint8 const ( + // synStateNone - Nothing seen yet (initial state) synStateNone synState = iota + // synStateSent - We have seen the SYN but not its ACK synStateSent + // synStateAcked - SYN is ACK'd for this side of the connection. + // If both sides are synStateAcked, the connection is established. synStateAcked + // synStateMissed is effectively the same as synStateAcked but represents + // capturing a preexisting connection where we didn't get to see the SYN. + synStateMissed ) func (ss *synState) update(synFlag, ackFlag bool) { @@ -68,13 +75,20 @@ func (ss *synState) update(synFlag, ackFlag bool) { if *ss == synStateSent && ackFlag { *ss = synStateAcked } + + // this allows synStateMissed to recover via SYN in order to pass TestUnusualAckSyn + if *ss == synStateMissed && synFlag { + *ss = synStateAcked + } // if we see ACK'd traffic but missed the SYN, assume the connection started before // the datadog-agent starts. if *ss == synStateNone && ackFlag { - statsTelemetry.missedTCPConnections.Inc() - *ss = synStateAcked + *ss = synStateMissed } } +func (ss *synState) isSynAcked() bool { + return *ss == synStateAcked || *ss == synStateMissed +} func labelForState(tcpState connStatus) string { idx := int(tcpState) diff --git a/pkg/network/tracer/connection/ebpfless_tracer.go b/pkg/network/tracer/connection/ebpfless_tracer.go index 8bb0a54f170ec..e4e661c2782a3 100644 --- a/pkg/network/tracer/connection/ebpfless_tracer.go +++ b/pkg/network/tracer/connection/ebpfless_tracer.go @@ -148,20 +148,23 @@ func (t *ebpfLessTracer) processConnection( tcp *layers.TCP, decoded []gopacket.LayerType, ) error { + t.scratchConn.Source, t.scratchConn.Dest = util.Address{}, util.Address{} t.scratchConn.SPort, t.scratchConn.DPort = 0, 0 t.scratchConn.TCPFailures = make(map[uint16]uint32) - var udpPresent, tcpPresent bool + var ip4Present, ip6Present, udpPresent, tcpPresent bool for _, layerType := range decoded { switch layerType { case layers.LayerTypeIPv4: t.scratchConn.Source = util.AddressFromNetIP(ip4.SrcIP) t.scratchConn.Dest = util.AddressFromNetIP(ip4.DstIP) t.scratchConn.Family = network.AFINET + ip4Present = true case layers.LayerTypeIPv6: t.scratchConn.Source = util.AddressFromNetIP(ip6.SrcIP) t.scratchConn.Dest = util.AddressFromNetIP(ip6.DstIP) t.scratchConn.Family = network.AFINET6 + ip6Present = true case layers.LayerTypeTCP: t.scratchConn.SPort = uint16(tcp.SrcPort) t.scratchConn.DPort = uint16(tcp.DstPort) @@ -175,15 +178,15 @@ func (t *ebpfLessTracer) processConnection( } } - // check if have all the basic pieces + // check if we have all the basic pieces if !udpPresent && !tcpPresent { log.Debugf("ignoring packet since its not udp or tcp") ebpfLessTracerTelemetry.skippedPackets.Inc("not_tcp_udp") return nil } - flipSourceDest(t.scratchConn, pktType) t.determineConnectionDirection(t.scratchConn, pktType) + flipSourceDest(t.scratchConn, pktType) t.m.Lock() defer t.m.Unlock() @@ -202,17 +205,17 @@ func (t *ebpfLessTracer) processConnection( return fmt.Errorf("error getting last updated timestamp for connection: %w", err) } - if ip4 == nil && ip6 == nil { + if !ip4Present && !ip6Present { return nil } switch conn.Type { case network.UDP: - if (ip4 != nil && !t.config.CollectUDPv4Conns) || (ip6 != nil && !t.config.CollectUDPv6Conns) { + if (ip4Present && !t.config.CollectUDPv4Conns) || (ip6Present && !t.config.CollectUDPv6Conns) { return nil } err = t.udp.process(conn, pktType, udp) case network.TCP: - if (ip4 != nil && !t.config.CollectTCPv4Conns) || (ip6 != nil && !t.config.CollectTCPv6Conns) { + if (ip4Present && !t.config.CollectTCPv4Conns) || (ip6Present && !t.config.CollectTCPv6Conns) { return nil } err = t.tcp.Process(conn, uint64(ts), pktType, ip4, ip6, tcp) @@ -224,7 +227,8 @@ func (t *ebpfLessTracer) processConnection( return fmt.Errorf("error processing connection: %w", err) } - if conn.Type == network.UDP || conn.Monotonic.TCPEstablished > 0 { + // TODO probably remove HasConnEverEstablished once we handle closed connections properly + if conn.Type == network.UDP || (conn.Type == network.TCP && t.tcp.HasConnEverEstablished(conn)) { conn.LastUpdateEpoch = uint64(ts) t.conns[t.scratchConn.ConnectionTuple] = conn } diff --git a/pkg/network/tracer/tracer_linux_test.go b/pkg/network/tracer/tracer_linux_test.go index d90928d560da8..29a8b6e34965a 100644 --- a/pkg/network/tracer/tracer_linux_test.go +++ b/pkg/network/tracer/tracer_linux_test.go @@ -1979,16 +1979,16 @@ func (s *TracerSuite) TestPreexistingConnectionDirection() { t := s.T() // Start the client and server before we enable the system probe to test that the tracer picks // up the pre-existing connection - server := tracertestutil.NewTCPServer(func(c net.Conn) { r := bufio.NewReader(c) for { if _, err := r.ReadBytes(byte('\n')); err != nil { assert.ErrorIs(t, err, io.EOF, "exited server loop with error that is not EOF") - return + break } _, _ = c.Write(genPayload(serverMessageSize)) } + c.Close() }) t.Cleanup(server.Shutdown) require.NoError(t, server.Run()) @@ -2023,37 +2023,55 @@ func (s *TracerSuite) TestPreexistingConnectionDirection() { } require.NotNil(collect, outgoing) require.NotNil(collect, incoming) - }, 3*time.Second, 100*time.Millisecond, "could not find connection incoming and outgoing connections") + if !assert.True(collect, incoming != nil && outgoing != nil) { + return + } - m := outgoing.Monotonic - assert.Equal(t, clientMessageSize, int(m.SentBytes)) - assert.Equal(t, serverMessageSize, int(m.RecvBytes)) - if !tr.config.EnableEbpfless { - assert.Equal(t, os.Getpid(), int(outgoing.Pid)) - } - assert.Equal(t, addrPort(server.Address()), int(outgoing.DPort)) - assert.Equal(t, c.LocalAddr().(*net.TCPAddr).Port, int(outgoing.SPort)) - assert.Equal(t, network.OUTGOING, outgoing.Direction) + m := outgoing.Monotonic + assert.Equal(collect, clientMessageSize, int(m.SentBytes)) + // ebpfless RecvBytes is based off acknowledgements, so it can miss the first + // packet in a pre-existing connection + if !tr.config.EnableEbpfless { + assert.Equal(collect, serverMessageSize, int(m.RecvBytes)) + } + if !tr.config.EnableEbpfless { + assert.Equal(collect, os.Getpid(), int(outgoing.Pid)) + } + assert.Equal(collect, addrPort(server.Address()), int(outgoing.DPort)) + assert.Equal(collect, c.LocalAddr().(*net.TCPAddr).Port, int(outgoing.SPort)) + assert.Equal(collect, network.OUTGOING, outgoing.Direction) + + m = incoming.Monotonic + // ebpfless RecvBytes is based off acknowledgements, so it can miss the first + // packet in a pre-existing connection + if !tr.config.EnableEbpfless { + assert.Equal(collect, clientMessageSize, int(m.RecvBytes)) + } + assert.Equal(collect, serverMessageSize, int(m.SentBytes)) + if !tr.config.EnableEbpfless { + assert.Equal(collect, os.Getpid(), int(incoming.Pid)) + } + assert.Equal(collect, addrPort(server.Address()), int(incoming.SPort)) + assert.Equal(collect, c.LocalAddr().(*net.TCPAddr).Port, int(incoming.DPort)) + assert.Equal(collect, network.INCOMING, incoming.Direction) + }, 3*time.Second, 100*time.Millisecond, "could not find connection incoming and outgoing connections") - m = incoming.Monotonic - assert.Equal(t, clientMessageSize, int(m.RecvBytes)) - assert.Equal(t, serverMessageSize, int(m.SentBytes)) - if !tr.config.EnableEbpfless { - assert.Equal(t, os.Getpid(), int(incoming.Pid)) - } - assert.Equal(t, addrPort(server.Address()), int(incoming.SPort)) - assert.Equal(t, c.LocalAddr().(*net.TCPAddr).Port, int(incoming.DPort)) - assert.Equal(t, network.INCOMING, incoming.Direction) } func (s *TracerSuite) TestPreexistingEmptyIncomingConnectionDirection() { t := s.T() + + // The ebpf tracer uses this to ensure it drops pre-existing connections + // that close empty (with no data), because they are difficult to track. + // However, in ebpfless they are easy to track, so disable this test. + // For more context, see PR #31100 + skipOnEbpflessNotSupported(t, testConfig()) + t.Run("ringbuf_enabled", func(t *testing.T) { if features.HaveMapType(ebpf.RingBuf) != nil { t.Skip("skipping test as ringbuffers are not supported on this kernel") } c := testConfig() - skipOnEbpflessNotSupported(t, c) c.NPMRingbuffersEnabled = true testPreexistingEmptyIncomingConnectionDirection(t, c) }) diff --git a/pkg/network/tracer/tracer_test.go b/pkg/network/tracer/tracer_test.go index 710e5eb142253..d50f4f299690c 100644 --- a/pkg/network/tracer/tracer_test.go +++ b/pkg/network/tracer/tracer_test.go @@ -1130,6 +1130,7 @@ func (s *TracerSuite) TestTCPEstablishedPreExistingConn() { c, err := net.DialTimeout("tcp", server.Address(), 50*time.Millisecond) require.NoError(t, err) laddr, raddr := c.LocalAddr(), c.RemoteAddr() + t.Logf("laddr=%s raddr=%s", laddr, raddr) // Ensure closed connections are flushed as soon as possible cfg := testConfig()