Skip to content

Commit

Permalink
[ebpfless] Implement connection duration and fix IsOpen (#31763)
Browse files Browse the repository at this point in the history
  • Loading branch information
pimlu authored Dec 5, 2024
1 parent 48cbbbf commit 435a6e3
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 5 deletions.
20 changes: 19 additions & 1 deletion pkg/network/tracer/connection/ebpfless/tcp_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package ebpfless
import (
"fmt"
"syscall"
"time"

"golang.org/x/sys/unix"

Expand Down Expand Up @@ -63,6 +64,19 @@ func NewTCPProcessor() *TCPProcessor { //nolint:revive // TODO
}
}

// updateConnStatsForOpen sets the duration to a "timestamp" representing the open time
func updateConnStatsForOpen(conn *network.ConnectionStats) {
conn.IsClosed = false
conn.Duration = time.Duration(time.Now().UnixNano())
}

// updateConnStatsForClose writes the actual duration once the connection closed
func updateConnStatsForClose(conn *network.ConnectionStats) {
conn.IsClosed = true
nowNs := time.Now().UnixNano()
conn.Duration = time.Duration(nowNs - int64(conn.Duration))
}

// calcNextSeq returns the seq "after" this segment, aka, what the ACK will be once this segment is received
func calcNextSeq(tcp *layers.TCP, payloadLen uint16) uint32 {
nextSeq := tcp.Seq + uint32(payloadLen)
Expand Down Expand Up @@ -105,6 +119,8 @@ func (t *TCPProcessor) updateSynFlag(conn *network.ConnectionStats, st *connecti
// if any SynState has progressed, move to attempted
if st.tcpState == ConnStatClosed && (st.localSynState != SynStateNone || st.remoteSynState != SynStateNone) {
st.tcpState = ConnStatAttempted

updateConnStatsForOpen(conn)
}
// if both synStates are ack'd, move to established
if st.tcpState == ConnStatAttempted && st.localSynState == SynStateAcked && st.remoteSynState == SynStateAcked {
Expand Down Expand Up @@ -188,6 +204,7 @@ func (t *TCPProcessor) updateFinFlag(conn *network.ConnectionStats, st *connecti
tcpState: ConnStatClosed,
}
conn.Monotonic.TCPClosed++
updateConnStatsForClose(conn)
}
}

Expand All @@ -200,14 +217,15 @@ func (t *TCPProcessor) updateRstFlag(conn *network.ConnectionStats, st *connecti
if st.tcpState == ConnStatAttempted {
reason = syscall.ECONNREFUSED
}
conn.TCPFailures[uint16(reason)]++

if st.tcpState == ConnStatEstablished {
conn.Monotonic.TCPClosed++
}
*st = connectionState{
tcpState: ConnStatClosed,
}
conn.TCPFailures[uint16(reason)]++
updateConnStatsForClose(conn)
}

// Process handles a TCP packet, calculating stats and keeping track of its state according to the
Expand Down
28 changes: 28 additions & 0 deletions pkg/network/tracer/connection/ebpfless/tcp_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -747,3 +747,31 @@ func TestUnusualAckSyn(t *testing.T) {
}
require.Equal(t, expectedStats, f.conn.Monotonic)
}

// TestOpenCloseConn checks whether IsClosed is set correctly in ConnectionStats
func TestOpenCloseConn(t *testing.T) {
pb := newPacketBuilder(lowerSeq, higherSeq)

f := newTcpTestFixture(t)

// send a SYN packet to kick things off
f.runPkt(pb.incoming(0, 0, 0, SYN))
require.False(t, f.conn.IsClosed)

// finish up the connection handshake and close it
remainingPkts := []testCapture{
pb.outgoing(0, 0, 1, SYN|ACK),
pb.incoming(0, 1, 1, ACK),
// active close after sending no data
pb.outgoing(0, 1, 1, FIN|ACK),
pb.incoming(0, 1, 2, FIN|ACK),
pb.outgoing(0, 2, 2, ACK),
}
f.runPkts(remainingPkts)
// should be closed now
require.True(t, f.conn.IsClosed)

// open it up again, it should not be marked closed afterward
f.runPkt(pb.incoming(0, 0, 0, SYN))
require.False(t, f.conn.IsClosed)
}
12 changes: 8 additions & 4 deletions pkg/network/tracer/tracer_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2357,23 +2357,24 @@ func BenchmarkAddProcessInfo(b *testing.B) {
func (s *TracerSuite) TestConnectionDuration() {
t := s.T()
cfg := testConfig()
skipEbpflessTodo(t, cfg)
tr := setupTracer(t, cfg)

srv := tracertestutil.NewTCPServer(func(c net.Conn) {
var b [4]byte
for {
_, err := c.Read(b[:])
if err != nil && (errors.Is(err, net.ErrClosed) || err == io.EOF) {
return
break
}
require.NoError(t, err)
_, err = c.Write([]byte("pong"))
if err != nil && (errors.Is(err, net.ErrClosed) || err == io.EOF) {
return
break
}
require.NoError(t, err)
}
err := c.Close()
require.NoError(t, err)
})

require.NoError(t, srv.Run(), "error running server")
Expand Down Expand Up @@ -2421,7 +2422,10 @@ LOOP:
require.EventuallyWithT(t, func(collect *assert.CollectT) {
var found bool
conn, found = findConnection(c.LocalAddr(), srv.Addr(), getConnections(t, tr))
assert.True(collect, found, "could not find closed connection")
if !assert.True(collect, found, "could not find connection") {
return
}
assert.True(collect, conn.IsClosed, "connection should be closed")
}, 3*time.Second, 100*time.Millisecond, "could not find closed connection")

// after closing the client connection, the duration should be
Expand Down

0 comments on commit 435a6e3

Please sign in to comment.