diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index f8732d1a087..e04b594c263 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -139,7 +139,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Use max in k8s overview dashboard aggregations. {pull}17015[17015] - Fix Disk Used and Disk Usage visualizations in the Metricbeat System dashboards. {issue}12435[12435] {pull}17272[17272] - Fix missing Accept header for Prometheus and OpenMetrics module. {issue}16870[16870] {pull}17291[17291] -- Further revise check for bad data in docker/memory. {pull}17400[17400] +- Further revise check for bad data in docker/memory. {pull}17400[17400] - Fix issue in Jolokia module when mbean contains multiple quoted properties. {issue}17375[17375] {pull}17374[17374] - Combine cloudwatch aggregated metrics into single event. {pull}17345[17345] - Fix how we filter services by name in system/service {pull}17400[17400] @@ -183,6 +183,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d *Auditbeat* - Log to stderr when running using reference kubernetes manifests. {pull}17443[174443] +- Fix syscall kprobe arguments for 32-bit systems in socket module. {pull}17500[17500] +- Fix memory leak on when we miss socket close kprobe events. {pull}17500[17500] *Filebeat* diff --git a/x-pack/auditbeat/module/system/socket/_meta/docs.asciidoc b/x-pack/auditbeat/module/system/socket/_meta/docs.asciidoc index c2517a1a47a..2d14a4d3d7f 100644 --- a/x-pack/auditbeat/module/system/socket/_meta/docs.asciidoc +++ b/x-pack/auditbeat/module/system/socket/_meta/docs.asciidoc @@ -127,6 +127,12 @@ Determines how long to wait after a socket has been closed for out of order packets. With TCP, some packets can be received shortly after a socket is closed. If set too low, additional flows will be generated for those packets. +- `socket.socket_inactive_timeout` (default: 1m) + +How long a socket can be inactive to be evicted from the internal cache. +A lower value reduces memory usage at the expense of some flows being +reported as multiple partial flows. + - `socket.perf_queue_size` (default: 4096) The number of tracing samples that can be queued for processing. A larger value diff --git a/x-pack/auditbeat/module/system/socket/arch_386.go b/x-pack/auditbeat/module/system/socket/arch_386.go index 43ce2c702f6..db1872d9248 100644 --- a/x-pack/auditbeat/module/system/socket/arch_386.go +++ b/x-pack/auditbeat/module/system/socket/arch_386.go @@ -21,10 +21,10 @@ var archVariables = common.MapStr{ "RET": "%ax", // System call parameters - "SYS_P1": "$stack1", - "SYS_P2": "$stack2", - "SYS_P3": "$stack3", - "SYS_P4": "$stack4", - "SYS_P5": "$stack5", - "SYS_P6": "$stack6", + "_SYS_P1": "$stack1", + "_SYS_P2": "$stack2", + "_SYS_P3": "$stack3", + "_SYS_P4": "$stack4", + "_SYS_P5": "$stack5", + "_SYS_P6": "$stack6", } diff --git a/x-pack/auditbeat/module/system/socket/config.go b/x-pack/auditbeat/module/system/socket/config.go index 88f871a602a..55ac5b4f907 100644 --- a/x-pack/auditbeat/module/system/socket/config.go +++ b/x-pack/auditbeat/module/system/socket/config.go @@ -32,6 +32,10 @@ type Config struct { // considered closed. FlowInactiveTimeout time.Duration `config:"socket.flow_inactive_timeout"` + // SocketInactiveTimeout determines how long a socket has to be inactive to be + // considered terminated or closed. + SocketInactiveTimeout time.Duration `config:"socket.socket_inactive_timeout"` + // FlowTerminationTimeout determines how long to wait after a flow has been // closed for out of order packets. With TCP, some packets can be received // shortly after a socket is closed. If set too low, additional flows will @@ -71,6 +75,7 @@ var defaultConfig = Config{ ErrQueueSize: 1, RingSizeExp: 7, FlowInactiveTimeout: 30 * time.Second, + SocketInactiveTimeout: 60 * time.Second, FlowTerminationTimeout: 5 * time.Second, ClockMaxDrift: 100 * time.Millisecond, ClockSyncPeriod: 10 * time.Second, diff --git a/x-pack/auditbeat/module/system/socket/socket_linux.go b/x-pack/auditbeat/module/system/socket/socket_linux.go index d92f2e7ba19..4c2cfd7e782 100644 --- a/x-pack/auditbeat/module/system/socket/socket_linux.go +++ b/x-pack/auditbeat/module/system/socket/socket_linux.go @@ -123,6 +123,7 @@ func (m *MetricSet) Run(r mb.PushReporterV2) { st := NewState(r, m.log, m.config.FlowInactiveTimeout, + m.config.SocketInactiveTimeout, m.config.FlowTerminationTimeout, m.config.ClockMaxDrift) diff --git a/x-pack/auditbeat/module/system/socket/state.go b/x-pack/auditbeat/module/system/socket/state.go index 9f1bd8964f7..19be9d0a0ee 100644 --- a/x-pack/auditbeat/module/system/socket/state.go +++ b/x-pack/auditbeat/module/system/socket/state.go @@ -255,6 +255,8 @@ type socket struct { closing bool closeTime time.Time prev, next linkedElement + + createdTime, lastSeenTime time.Time } // Prev returns the previous socket in the linked list. @@ -353,11 +355,14 @@ type state struct { numFlows uint64 // configuration - inactiveTimeout, closeTimeout time.Duration - clockMaxDrift time.Duration + inactiveTimeout, closeTimeout, socketTimeout time.Duration + clockMaxDrift time.Duration // lru used for flow expiration. - lru linkedList + flowLRU linkedList + + // lru used for socket expiration. + socketLRU linkedList // holds closed and expired flows. done linkedList @@ -373,10 +378,14 @@ func (s *state) getSocket(sock uintptr) *socket { if socket, found := s.socks[sock]; found { return socket } + now := time.Now() socket := &socket{ - sock: sock, + sock: sock, + createdTime: now, + lastSeenTime: now, } s.socks[sock] = socket + s.socketLRU.add(socket) return socket } @@ -385,13 +394,13 @@ var kernelProcess = process{ name: "[kernel_task]", } -func NewState(r mb.PushReporterV2, log helper.Logger, inactiveTimeout, closeTimeout, clockMaxDrift time.Duration) *state { - s := makeState(r, log, inactiveTimeout, closeTimeout, clockMaxDrift) +func NewState(r mb.PushReporterV2, log helper.Logger, inactiveTimeout, socketTimeout, closeTimeout, clockMaxDrift time.Duration) *state { + s := makeState(r, log, inactiveTimeout, socketTimeout, closeTimeout, clockMaxDrift) go s.reapLoop() return s } -func makeState(r mb.PushReporterV2, log helper.Logger, inactiveTimeout, closeTimeout, clockMaxDrift time.Duration) *state { +func makeState(r mb.PushReporterV2, log helper.Logger, inactiveTimeout, socketTimeout, closeTimeout, clockMaxDrift time.Duration) *state { return &state{ reporter: r, log: log, @@ -399,6 +408,7 @@ func makeState(r mb.PushReporterV2, log helper.Logger, inactiveTimeout, closeTim socks: make(map[uintptr]*socket), threads: make(map[uint32]event), inactiveTimeout: inactiveTimeout, + socketTimeout: socketTimeout, closeTimeout: closeTimeout, clockMaxDrift: clockMaxDrift, dns: newDNSTracker(inactiveTimeout * 2), @@ -422,7 +432,7 @@ func (s *state) logState() { numSocks := len(s.socks) numProcs := len(s.processes) numThreads := len(s.threads) - lruSize := s.lru.size + flowLRUSize := s.flowLRU.size doneSize := s.done.size closingSize := s.closing.size events := atomic.LoadUint64(&eventCount) @@ -434,11 +444,11 @@ func (s *state) logState() { lastEvents = events lastTime = now var errs []string - if uint64(lruSize) != numFlows { + if uint64(flowLRUSize) != numFlows { errs = append(errs, "flow count mismatch") } msg := fmt.Sprintf("state flows=%d sockets=%d procs=%d threads=%d lru=%d done=%d closing=%d events=%d eps=%.1f", - numFlows, numSocks, numProcs, numThreads, lruSize, doneSize, closingSize, events, + numFlows, numSocks, numProcs, numThreads, flowLRUSize, doneSize, closingSize, events, float64(newEvs)*float64(time.Second)/float64(took)) if errs == nil { s.log.Debugf("%s", msg) @@ -489,13 +499,23 @@ func (s *state) ExpireOlder() { s.Lock() defer s.Unlock() deadline := time.Now().Add(-s.inactiveTimeout) - for item := s.lru.peek(); item != nil && item.Timestamp().Before(deadline); { + for item := s.flowLRU.peek(); item != nil && item.Timestamp().Before(deadline); { if flow, ok := item.(*flow); ok { s.onFlowTerminated(flow) } else { - s.lru.get() + s.flowLRU.get() } - item = s.lru.peek() + item = s.flowLRU.peek() + } + + deadline = time.Now().Add(-s.socketTimeout) + for item := s.socketLRU.peek(); item != nil && item.Timestamp().Before(deadline); { + if sock, ok := item.(*socket); ok { + s.onSockDestroyed(sock.sock, 0) + } else { + s.socketLRU.get() + } + item = s.socketLRU.peek() } deadline = time.Now().Add(-s.closeTimeout) @@ -638,19 +658,16 @@ func (s *state) mutualEnrich(sock *socket, f *flow) { sock.process = s.getProcess(sock.pid) f.process = sock.process } + if !sock.closing { + sock.lastSeenTime = time.Now() + s.socketLRU.remove(sock) + s.socketLRU.add(sock) + } } func (s *state) createFlow(ref flow) error { // Get or create a socket for this flow - sock, found := s.socks[ref.sock] - if !found { - sock = &socket{ - sock: ref.sock, - } - s.socks[ref.sock] = sock - - } - + sock := s.getSocket(ref.sock) ref.createdTime = ref.lastSeenTime s.mutualEnrich(sock, &ref) @@ -664,7 +681,7 @@ func (s *state) createFlow(ref flow) error { sock.flows = make(map[string]*flow, 1) } sock.flows[ref.remote.addr.String()] = ptr - s.lru.add(ptr) + s.flowLRU.add(ptr) s.numFlows++ return nil } @@ -673,10 +690,16 @@ func (s *state) createFlow(ref flow) error { func (s *state) OnSockDestroyed(ptr uintptr, pid uint32) error { s.Lock() defer s.Unlock() + + return s.onSockDestroyed(ptr, pid) +} + +func (s *state) onSockDestroyed(ptr uintptr, pid uint32) error { sock, found := s.socks[ptr] if !found { return nil } + // Enrich with pid if sock.pid == 0 && pid != 0 { sock.pid = pid @@ -689,6 +712,7 @@ func (s *state) OnSockDestroyed(ptr uintptr, pid uint32) error { if !sock.closing { sock.closeTime = time.Now() sock.closing = true + s.socketLRU.remove(sock) s.closing.add(sock) } return nil @@ -721,8 +745,8 @@ func (s *state) UpdateFlowWithCondition(ref flow, cond func(*flow) bool) error { s.mutualEnrich(sock, &ref) prev.updateWith(ref, s) s.enrichDNS(prev) - s.lru.remove(prev) - s.lru.add(prev) + s.flowLRU.remove(prev) + s.flowLRU.add(prev) return nil } @@ -770,7 +794,7 @@ func (f *flow) updateWith(ref flow, s *state) { } func (s *state) onFlowTerminated(f *flow) { - s.lru.remove(f) + s.flowLRU.remove(f) // Unbind this flow from its parent if parent, found := s.socks[f.sock]; found { delete(parent.flows, f.remote.addr.String()) diff --git a/x-pack/auditbeat/module/system/socket/state_test.go b/x-pack/auditbeat/module/system/socket/state_test.go index 1a76c26e228..bde87dfed02 100644 --- a/x-pack/auditbeat/module/system/socket/state_test.go +++ b/x-pack/auditbeat/module/system/socket/state_test.go @@ -49,7 +49,7 @@ func TestTCPConnWithProcess(t *testing.T) { remotePort = 443 sock uintptr = 0xff1234 ) - st := makeState(nil, (*logWrapper)(t), time.Second, 0, time.Second) + st := makeState(nil, (*logWrapper)(t), time.Second, time.Second, 0, time.Second) lPort, rPort := be16(localPort), be16(remotePort) lAddr, rAddr := ipv4(localIP), ipv4(remoteIP) evs := []event{ @@ -127,6 +127,120 @@ func TestTCPConnWithProcess(t *testing.T) { } } +func TestTCPConnWithProcessSocketTimeouts(t *testing.T) { + const ( + localIP = "192.168.33.10" + remoteIP = "172.19.12.13" + localPort = 38842 + remotePort = 443 + sock uintptr = 0xff1234 + ) + st := makeState(nil, (*logWrapper)(t), time.Second, 0, 0, time.Second) + lPort, rPort := be16(localPort), be16(remotePort) + lAddr, rAddr := ipv4(localIP), ipv4(remoteIP) + evs := []event{ + callExecve(meta(1234, 1234, 1), []string{"/usr/bin/curl", "https://example.net/", "-o", "/tmp/site.html"}), + &commitCreds{Meta: meta(1234, 1234, 2), UID: 501, GID: 20, EUID: 501, EGID: 20}, + &execveRet{Meta: meta(1234, 1234, 2), Retval: 1234}, + &inetCreate{Meta: meta(1234, 1235, 5), Proto: 0}, + &sockInitData{Meta: meta(1234, 1235, 5), Sock: sock}, + &tcpIPv4ConnectCall{Meta: meta(1234, 1235, 8), Sock: sock, RAddr: rAddr, RPort: rPort}, + &ipLocalOutCall{ + Meta: meta(1234, 1235, 8), + Sock: sock, + Size: 20, + LAddr: lAddr, + LPort: lPort, + RAddr: rAddr, + RPort: rPort, + }, + &tcpConnectResult{Meta: meta(1234, 1235, 9), Retval: 0}, + &tcpV4DoRcv{ + Meta: meta(0, 0, 12), + Sock: sock, + Size: 12, + LAddr: lAddr, + LPort: lPort, + RAddr: rAddr, + RPort: rPort, + }, + } + if err := feedEvents(evs, st, t); err != nil { + t.Fatal(err) + } + st.ExpireOlder() + evs = []event{ + &inetReleaseCall{Meta: meta(0, 0, 15), Sock: sock}, + &tcpV4DoRcv{ + Meta: meta(0, 0, 17), + Sock: sock, + Size: 7, + LAddr: lAddr, + LPort: lPort, + RAddr: rAddr, + RPort: rPort, + }, + &doExit{Meta: meta(1234, 1234, 18)}, + } + if err := feedEvents(evs, st, t); err != nil { + t.Fatal(err) + } + st.ExpireOlder() + flows, err := getFlows(st.DoneFlows(), all) + if err != nil { + t.Fatal(err) + } + assert.Len(t, flows, 2) + flow := flows[0] + t.Log("read flow 0", flow) + for field, expected := range map[string]interface{}{ + "source.ip": localIP, + "source.port": localPort, + "source.packets": uint64(1), + "source.bytes": uint64(20), + "client.ip": localIP, + "client.port": localPort, + "destination.ip": remoteIP, + "destination.port": remotePort, + "destination.packets": uint64(1), + "destination.bytes": uint64(12), + "server.ip": remoteIP, + "server.port": remotePort, + "network.direction": "outbound", + "network.transport": "tcp", + "network.type": "ipv4", + "process.pid": 1234, + "process.name": "curl", + "user.id": "501", + } { + if !assertValue(t, flow, expected, field) { + t.Fatal("expected value not found") + } + } + + // we have a truncated flow with no directionality, + // so just report what we can + flow = flows[1] + t.Log("read flow 1", flow) + for field, expected := range map[string]interface{}{ + "source.ip": localIP, + "source.port": localPort, + "client.ip": localIP, + "client.port": localPort, + "destination.ip": remoteIP, + "destination.port": remotePort, + "server.ip": remoteIP, + "server.port": remotePort, + "network.direction": "unknown", + "network.transport": "tcp", + "network.type": "ipv4", + } { + if !assertValue(t, flow, expected, field) { + t.Fatal("expected value not found") + } + } +} + func TestUDPOutgoingSinglePacketWithProcess(t *testing.T) { const ( localIP = "192.168.33.10" @@ -135,7 +249,7 @@ func TestUDPOutgoingSinglePacketWithProcess(t *testing.T) { remotePort = 53 sock uintptr = 0xff1234 ) - st := makeState(nil, (*logWrapper)(t), time.Second, 0, time.Second) + st := makeState(nil, (*logWrapper)(t), time.Second, time.Second, 0, time.Second) lPort, rPort := be16(localPort), be16(remotePort) lAddr, rAddr := ipv4(localIP), ipv4(remoteIP) evs := []event{ @@ -199,7 +313,7 @@ func TestUDPIncomingSinglePacketWithProcess(t *testing.T) { remotePort = 53 sock uintptr = 0xff1234 ) - st := makeState(nil, (*logWrapper)(t), time.Second, 0, time.Second) + st := makeState(nil, (*logWrapper)(t), time.Second, time.Second, 0, time.Second) lPort, rPort := be16(localPort), be16(remotePort) lAddr, rAddr := ipv4(localIP), ipv4(remoteIP) var packet [256]byte