From e88a4341f739c65647cf9f03d1ec1cd4718efab7 Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Thu, 4 Jun 2020 21:50:34 +0200 Subject: [PATCH 1/7] Avoid flows terminating twice --- x-pack/auditbeat/module/system/socket/state.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/x-pack/auditbeat/module/system/socket/state.go b/x-pack/auditbeat/module/system/socket/state.go index f9e4d6f9f18..0a82c96a940 100644 --- a/x-pack/auditbeat/module/system/socket/state.go +++ b/x-pack/auditbeat/module/system/socket/state.go @@ -178,7 +178,7 @@ type flow struct { process *process local, remote endpoint complete bool - + done bool // these are automatically calculated by state from kernelTimes above createdTime, lastSeenTime time.Time } @@ -794,7 +794,11 @@ func (f *flow) updateWith(ref flow, s *state) { } func (s *state) onFlowTerminated(f *flow) { + if f.done { + return + } s.flowLRU.remove(f) + f.done = true // Unbind this flow from its parent if parent, found := s.socks[f.sock]; found { delete(parent.flows, f.remote.addr.String()) From 01e5b775381be0ae651debd60eff0a42d766a7b1 Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Thu, 4 Jun 2020 21:51:36 +0200 Subject: [PATCH 2/7] Fix infinite loop in socket LRU handling --- x-pack/auditbeat/module/system/socket/state.go | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/x-pack/auditbeat/module/system/socket/state.go b/x-pack/auditbeat/module/system/socket/state.go index 0a82c96a940..82175b57c60 100644 --- a/x-pack/auditbeat/module/system/socket/state.go +++ b/x-pack/auditbeat/module/system/socket/state.go @@ -601,6 +601,8 @@ func (s *state) onSockTerminated(sock *socket) { delete(s.socks, sock.sock) if sock.closing { s.closing.remove(sock) + } else { + s.moveToClosing(sock) } } @@ -699,7 +701,6 @@ func (s *state) onSockDestroyed(ptr uintptr, pid uint32) error { if !found { return nil } - // Enrich with pid if sock.pid == 0 && pid != 0 { sock.pid = pid @@ -710,14 +711,18 @@ func (s *state) onSockDestroyed(ptr uintptr, pid uint32) error { // Keep the sock around in case it's a connected TCP socket, as still some // packets can be received shortly after/during inet_release. if !sock.closing { - sock.closeTime = time.Now() - sock.closing = true - s.socketLRU.remove(sock) - s.closing.add(sock) + s.moveToClosing(sock) } return nil } +func (s *state) moveToClosing(sock *socket) { + sock.closeTime = time.Now() + sock.closing = true + s.socketLRU.remove(sock) + s.closing.add(sock) +} + // UpdateFlow receives a partial flow and creates or updates an existing flow. func (s *state) UpdateFlow(ref flow) error { return s.UpdateFlowWithCondition(ref, nil) From 9ba9802f8fe27b91acce616eef026a6bf35ca3fd Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Thu, 4 Jun 2020 21:52:22 +0200 Subject: [PATCH 3/7] Decouple stats reporting and expirations --- x-pack/auditbeat/module/system/socket/state.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/x-pack/auditbeat/module/system/socket/state.go b/x-pack/auditbeat/module/system/socket/state.go index 82175b57c60..d40fd84b867 100644 --- a/x-pack/auditbeat/module/system/socket/state.go +++ b/x-pack/auditbeat/module/system/socket/state.go @@ -397,6 +397,7 @@ var kernelProcess = process{ func NewState(r mb.PushReporterV2, log helper.Logger, inactiveTimeout, socketTimeout, closeTimeout, clockMaxDrift time.Duration) *state { s := makeState(r, log, inactiveTimeout, socketTimeout, closeTimeout, clockMaxDrift) go s.reapLoop() + go s.logStateLoop() return s } @@ -461,8 +462,6 @@ func (s *state) logState() { func (s *state) reapLoop() { reportTicker := time.NewTicker(reapInterval) defer reportTicker.Stop() - logTicker := time.NewTicker(logInterval) - defer logTicker.Stop() for { select { case <-s.reporter.Done(): @@ -489,6 +488,17 @@ func (s *state) reapLoop() { return } } + } + } +} + +func (s *state) logStateLoop() { + logTicker := time.NewTicker(logInterval) + defer logTicker.Stop() + for { + select { + case <-s.reporter.Done(): + return case <-logTicker.C: s.logState() } From 82d902618ced9b758c8cafdee0f7d63076e9d45a Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Mon, 8 Jun 2020 16:37:24 +0200 Subject: [PATCH 4/7] changelog entry --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index b3e908b5715..dd6a31198fc 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -127,6 +127,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - system/package: Fix parsing of Installed-Size field of DEB packages. {issue}16661[16661] {pull}17188[17188] - system module: Fix panic during initialisation when /proc/stat can't be read. {pull}17569[17569] - system/package: Fix an error that can occur while trying to persist package metadata. {issue}18536[18536] {pull}18887[18887] +- system/socket: Fix dataset using 100% CPU and becoming unresponsive in some scenarios. {pull}19033[19033] *Filebeat* From c86a5e5b9dc6e037597ca1ac84c93201dcf5f7ca Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Tue, 9 Jun 2020 13:05:21 +0200 Subject: [PATCH 5/7] Add test for infinite loop fix --- .../module/system/socket/state_test.go | 50 +++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/x-pack/auditbeat/module/system/socket/state_test.go b/x-pack/auditbeat/module/system/socket/state_test.go index 1fcaeb78abf..46b7b693cec 100644 --- a/x-pack/auditbeat/module/system/socket/state_test.go +++ b/x-pack/auditbeat/module/system/socket/state_test.go @@ -706,5 +706,55 @@ func TestUDPSendMsgAltLogic(t *testing.T) { ev.AltRAddrA, ev.AltRAddrB = ipv6("fddd::cafe") assert.Equal(t, expectedIPv6, ev.String()) }) +} +func TestSocketReuse(t *testing.T) { + const ( + localIP = "192.168.33.10" + remoteIP = "172.19.12.13" + localPort = 38842 + remotePort = 53 + sock uintptr = 0xff1234 + ) + st := makeState(nil, (*logWrapper)(t), time.Hour, time.Hour, 0, time.Hour) + lPort, rPort := be16(localPort), be16(remotePort) + lAddr, rAddr := ipv4(localIP), ipv4(remoteIP) + evs := []event{ + &clockSyncCall{ + Meta: meta(uint32(os.Getpid()), 1235, 5), + Ts: uint64(time.Now().UnixNano()), + }, + &inetCreate{Meta: meta(1234, 1235, 5), Proto: 0}, + &sockInitData{Meta: meta(1234, 1235, 5), Sock: sock}, + &udpSendMsgCall{ + Meta: meta(1234, 1235, 6), + Sock: sock, + Size: 123, + LAddr: lAddr, + AltRAddr: rAddr, + LPort: lPort, + AltRPort: rPort, + }, + // Asume inetRelease lost. + &inetCreate{Meta: meta(1234, 1235, 5), Proto: 0}, + &sockInitData{Meta: meta(1234, 1235, 5), Sock: sock}, + &udpSendMsgCall{ + Meta: meta(1234, 1235, 6), + Sock: sock, + Size: 123, + LAddr: lAddr, + AltRAddr: rAddr, + LPort: lPort, + AltRPort: rPort, + }, + } + if err := feedEvents(evs, st, t); err != nil { + t.Fatal(err) + } + st.ExpireOlder() + flows, err := getFlows(st.DoneFlows(), all) + if err != nil { + t.Fatal(err) + } + assert.Len(t, flows, 1) } From c82c7b2320dbb472a40b7d311ed18a826c2f49f8 Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Tue, 9 Jun 2020 13:05:55 +0200 Subject: [PATCH 6/7] Fix socket expiration The feature was using socket.closeTime as a reference for expiration, but this timestamp was only set once the socket was closed or expired, so it caused all sockets to expirate every closeTimeout. --- .../auditbeat/module/system/socket/state.go | 27 +++---- .../module/system/socket/state_test.go | 76 ++++++++++++++++--- 2 files changed, 78 insertions(+), 25 deletions(-) diff --git a/x-pack/auditbeat/module/system/socket/state.go b/x-pack/auditbeat/module/system/socket/state.go index d40fd84b867..afd72e853fc 100644 --- a/x-pack/auditbeat/module/system/socket/state.go +++ b/x-pack/auditbeat/module/system/socket/state.go @@ -253,7 +253,6 @@ type socket struct { process *process // This signals that the socket is in the closeTimeout list. closing bool - closeTime time.Time prev, next linkedElement createdTime, lastSeenTime time.Time @@ -281,7 +280,7 @@ func (s *socket) SetNext(e linkedElement) { // Timestamp returns the time reference used to expire sockets. func (s *socket) Timestamp() time.Time { - return s.closeTime + return s.lastSeenTime } type dnsTracker struct { @@ -372,13 +371,16 @@ type state struct { closing linkedList dns dnsTracker + + // Decouple time.Now() + clock func() time.Time } func (s *state) getSocket(sock uintptr) *socket { if socket, found := s.socks[sock]; found { return socket } - now := time.Now() + now := s.clock() socket := &socket{ sock: sock, createdTime: now, @@ -413,6 +415,7 @@ func makeState(r mb.PushReporterV2, log helper.Logger, inactiveTimeout, socketTi closeTimeout: closeTimeout, clockMaxDrift: clockMaxDrift, dns: newDNSTracker(inactiveTimeout * 2), + clock: time.Now, } } @@ -439,7 +442,7 @@ func (s *state) logState() { events := atomic.LoadUint64(&eventCount) s.Unlock() - now := time.Now() + now := s.clock() took := now.Sub(lastTime) newEvs := events - lastEvents lastEvents = events @@ -508,7 +511,7 @@ func (s *state) logStateLoop() { func (s *state) ExpireOlder() { s.Lock() defer s.Unlock() - deadline := time.Now().Add(-s.inactiveTimeout) + deadline := s.clock().Add(-s.inactiveTimeout) for item := s.flowLRU.peek(); item != nil && item.Timestamp().Before(deadline); { if flow, ok := item.(*flow); ok { s.onFlowTerminated(flow) @@ -517,8 +520,7 @@ func (s *state) ExpireOlder() { } item = s.flowLRU.peek() } - - deadline = time.Now().Add(-s.socketTimeout) + deadline = s.clock().Add(-s.socketTimeout) for item := s.socketLRU.peek(); item != nil && item.Timestamp().Before(deadline); { if sock, ok := item.(*socket); ok { s.onSockDestroyed(sock.sock, 0) @@ -527,8 +529,7 @@ func (s *state) ExpireOlder() { } item = s.socketLRU.peek() } - - deadline = time.Now().Add(-s.closeTimeout) + deadline = s.clock().Add(-s.closeTimeout) for item := s.closing.peek(); item != nil && item.Timestamp().Before(deadline); { if sock, ok := item.(*socket); ok { s.onSockTerminated(sock) @@ -671,7 +672,7 @@ func (s *state) mutualEnrich(sock *socket, f *flow) { f.process = sock.process } if !sock.closing { - sock.lastSeenTime = time.Now() + sock.lastSeenTime = s.clock() s.socketLRU.remove(sock) s.socketLRU.add(sock) } @@ -727,7 +728,7 @@ func (s *state) onSockDestroyed(ptr uintptr, pid uint32) error { } func (s *state) moveToClosing(sock *socket) { - sock.closeTime = time.Now() + sock.lastSeenTime = s.clock() sock.closing = true s.socketLRU.remove(sock) s.closing.add(sock) @@ -1030,8 +1031,8 @@ func (s *state) kernTimestampToTime(ts kernelTime) time.Time { } if s.kernelEpoch == (time.Time{}) { // This is the first event and time sync hasn't happened yet. - // Take a temporary epoch relative to time.Now() - now := time.Now() + // Take a temporary epoch relative to current time. + now := s.clock() s.kernelEpoch = now.Add(-time.Duration(ts)) return now } diff --git a/x-pack/auditbeat/module/system/socket/state_test.go b/x-pack/auditbeat/module/system/socket/state_test.go index 46b7b693cec..9b36c8b5dd4 100644 --- a/x-pack/auditbeat/module/system/socket/state_test.go +++ b/x-pack/auditbeat/module/system/socket/state_test.go @@ -10,6 +10,7 @@ import ( "encoding/binary" "fmt" "net" + "os" "testing" "time" @@ -134,16 +135,24 @@ func TestTCPConnWithProcess(t *testing.T) { func TestTCPConnWithProcessSocketTimeouts(t *testing.T) { const ( - localIP = "192.168.33.10" - remoteIP = "172.19.12.13" - localPort = 38842 - remotePort = 443 - sock uintptr = 0xff1234 + localIP = "192.168.33.10" + remoteIP = "172.19.12.13" + localPort = 38842 + remotePort = 443 + sock uintptr = 0xff1234 + flowTimeout = time.Hour + socketTimeout = time.Minute * 3 + closeTimeout = time.Minute ) - st := makeState(nil, (*logWrapper)(t), time.Second, 0, 0, time.Second) + st := makeState(nil, (*logWrapper)(t), flowTimeout, socketTimeout, closeTimeout, time.Second) + now := time.Now() + st.clock = func() time.Time { + return now + } lPort, rPort := be16(localPort), be16(remotePort) lAddr, rAddr := ipv4(localIP), ipv4(remoteIP) evs := []event{ + callExecve(meta(1234, 1234, 1), []string{"/usr/bin/curl", "https://example.net/", "-o", "/tmp/site.html"}), &commitCreds{Meta: meta(1234, 1234, 2), UID: 501, GID: 20, EUID: 501, EGID: 20}, &execveRet{Meta: meta(1234, 1234, 2), Retval: 1234}, @@ -174,7 +183,18 @@ func TestTCPConnWithProcessSocketTimeouts(t *testing.T) { t.Fatal(err) } st.ExpireOlder() + // Nothing expired just yet. + flows, err := getFlows(st.DoneFlows(), all) + if err != nil { + t.Fatal(err) + } + assert.Empty(t, flows) + evs = []event{ + &clockSyncCall{ + Meta: meta(uint32(os.Getpid()), 1235, 0), + Ts: uint64(now.UnixNano()), + }, &inetReleaseCall{Meta: meta(0, 0, 15), Sock: sock}, &tcpV4DoRcv{ Meta: meta(0, 0, 17), @@ -185,17 +205,31 @@ func TestTCPConnWithProcessSocketTimeouts(t *testing.T) { RAddr: rAddr, RPort: rPort, }, - &doExit{Meta: meta(1234, 1234, 18)}, + + &inetCreate{Meta: meta(1234, 1235, 18), Proto: 0}, + &sockInitData{Meta: meta(1234, 1235, 19), Sock: sock + 1}, + &tcpIPv4ConnectCall{Meta: meta(1234, 1235, 20), Sock: sock + 1, RAddr: rAddr, RPort: rPort}, + &tcpV4DoRcv{ + Meta: meta(0, 0, 21), + Sock: sock + 1, + Size: 12, + LAddr: lAddr, + LPort: lPort, + RAddr: rAddr, + RPort: rPort, + }, } if err := feedEvents(evs, st, t); err != nil { t.Fatal(err) } + // Expire the first socket + now = now.Add(closeTimeout + 1) st.ExpireOlder() - flows, err := getFlows(st.DoneFlows(), all) + flows, err = getFlows(st.DoneFlows(), all) if err != nil { t.Fatal(err) } - assert.Len(t, flows, 2) + assert.Len(t, flows, 1) flow := flows[0] t.Log("read flow 0", flow) for field, expected := range map[string]interface{}{ @@ -207,8 +241,8 @@ func TestTCPConnWithProcessSocketTimeouts(t *testing.T) { "client.port": localPort, "destination.ip": remoteIP, "destination.port": remotePort, - "destination.packets": uint64(1), - "destination.bytes": uint64(12), + "destination.packets": uint64(2), + "destination.bytes": uint64(19), "server.ip": remoteIP, "server.port": remotePort, "network.direction": "outbound", @@ -224,10 +258,28 @@ func TestTCPConnWithProcessSocketTimeouts(t *testing.T) { t.Fatal("expected value not found") } } + // Wait until sock+1 expires due to inactivity. It won't be available + // just yet. + now = now.Add(socketTimeout + 1) + st.ExpireOlder() + flows, err = getFlows(st.DoneFlows(), all) + if err != nil { + t.Fatal(err) + } + assert.Empty(t, flows) + + // Wait until the sock is closed completely. + now = now.Add(closeTimeout + 1) + st.ExpireOlder() + flows, err = getFlows(st.DoneFlows(), all) + if err != nil { + t.Fatal(err) + } + assert.Len(t, flows, 1) + flow = flows[0] // we have a truncated flow with no directionality, // so just report what we can - flow = flows[1] t.Log("read flow 1", flow) for field, expected := range map[string]interface{}{ "source.ip": localIP, From b99c326d60ba24e7817cd36b54da0d471fbeee65 Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Tue, 9 Jun 2020 18:02:19 +0200 Subject: [PATCH 7/7] Add second issue to changelog --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index dd6a31198fc..16dfe0e6c79 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -128,6 +128,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - system module: Fix panic during initialisation when /proc/stat can't be read. {pull}17569[17569] - system/package: Fix an error that can occur while trying to persist package metadata. {issue}18536[18536] {pull}18887[18887] - system/socket: Fix dataset using 100% CPU and becoming unresponsive in some scenarios. {pull}19033[19033] +- system/socket: Fixed tracking of long-running connections. {pull}19033[19033] *Filebeat*