Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[x-pack/filebeat/netflow] implement netflow with multiple workers #40122

Merged
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
78c69b9
feat: implement netflow with multiple workers
pkoutsovasilis Jul 4, 2024
bf3ec66
feat: lru improvements
pkoutsovasilis Jul 6, 2024
2a33181
feat: add reversed order pcap for testing lru
pkoutsovasilis Jul 6, 2024
a19dfcb
doc: update CHANGELOG.next.asciidoc
pkoutsovasilis Jul 8, 2024
c0dbb72
fix: reduce capacity of LRU slice during Pop
pkoutsovasilis Jul 8, 2024
cffc71d
fix: ensure uniqueness for flows from different exporters in lru
pkoutsovasilis Jul 9, 2024
b4ab4a6
fix: correct godocs
pkoutsovasilis Jul 9, 2024
91d3db6
fix: rename workers_number to number_of_workers
pkoutsovasilis Jul 9, 2024
db0bf5b
doc: add number_of_workers in netflow plugin asciidoc
pkoutsovasilis Jul 9, 2024
2aa4862
fix: unique id for number_of_workers field in netflow asciidoc
pkoutsovasilis Jul 9, 2024
df1ef35
fix: rename number_of_workers to workers
pkoutsovasilis Jul 9, 2024
513c86a
fix: replace github.com/elastic/beats/v7/libbeat/common/atomic with s…
pkoutsovasilis Jul 22, 2024
4be3b9e
fix: improve lru cleanup code flow
pkoutsovasilis Jul 22, 2024
5b64eae
feat: add lru unit-tests
pkoutsovasilis Jul 22, 2024
1b13d48
feat: remove nil checks from lru, replace done chan with context
pkoutsovasilis Jul 22, 2024
3bdee4c
Merge branch 'main' into pkoutsovasilis/scale_netflow
aleksmaus Jul 22, 2024
5da5471
Merge remote-tracking branch 'refs/remotes/beats/main' into pkoutsova…
pkoutsovasilis Jul 24, 2024
75a74a3
Merge branch 'main' into pkoutsovasilis/scale_netflow
aleksmaus Jul 24, 2024
b79f348
fix: replace entryTime.Sub with the more efficient entryTime.Before i…
pkoutsovasilis Jul 24, 2024
159c77e
fix: remove race-condition prone isEmpty atomic
pkoutsovasilis Jul 24, 2024
5c0a7b0
fix: rename stop to wait for lru to capture its functionality
pkoutsovasilis Jul 24, 2024
1aa4b24
Merge branch 'main' into pkoutsovasilis/scale_netflow
pkoutsovasilis Jul 24, 2024
a0d878c
doc: update workers and output preset documentation
pkoutsovasilis Jul 26, 2024
0a3be7d
Merge branch 'main' into pkoutsovasilis/scale_netflow
pkoutsovasilis Jul 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Implement Elastic Agent status and health reporting for Netflow Filebeat input. {pull}40080[40080]
- Enhance input state reporting for CEL evaluations that return a single error object in events. {pull}40083[40083]
- Allow absent credentials when using GCS with Application Default Credentials. {issue}39977[39977] {pull}40072[40072]
- Add scaling up support for Netflow input. {issue}37761[37761] {pull}40122[40122]
- Update CEL mito extensions to v1.15.0. {pull}40294[40294]

*Auditbeat*
Expand Down
9 changes: 9 additions & 0 deletions x-pack/filebeat/docs/inputs/input-netflow.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,15 @@ The maximum number of packets that can be queued for processing.
Use this setting to avoid packet-loss when dealing with occasional bursts
of traffic.


[float]
[[workers]]
==== `workers`

The number of workers to read and decode concurrently netflow packets.
Default is `1`. Note that in order to maximize the performance gains of multiple
workers you also have to increase the number of output workers.
andrewkroh marked this conversation as resolved.
Show resolved Hide resolved

[float]
[[custom_definitions]]
==== `custom_definitions`
Expand Down
2 changes: 2 additions & 0 deletions x-pack/filebeat/input/netflow/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type config struct {
CustomDefinitions []string `config:"custom_definitions"`
DetectSequenceReset bool `config:"detect_sequence_reset"`
ShareTemplates bool `config:"share_templates"`
NumberOfWorkers uint32 `config:"workers"`
}

var defaultConfig = config{
Expand All @@ -40,4 +41,5 @@ var defaultConfig = config{
PacketQueueSize: 8192,
DetectSequenceReset: true,
ShareTemplates: false,
NumberOfWorkers: 1,
}
13 changes: 13 additions & 0 deletions x-pack/filebeat/input/netflow/decoder/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Config struct {
detectReset bool
fields fields.FieldDict
sharedTemplates bool
withCache bool
activeSessionsMetric ActiveSessionsMetric
}

Expand All @@ -33,6 +34,7 @@ var defaultCfg = Config{
expiration: time.Hour,
detectReset: true,
sharedTemplates: false,
withCache: false,
}

// Defaults returns a configuration object with defaults settings:
Expand Down Expand Up @@ -63,6 +65,17 @@ func (c *Config) WithExpiration(timeout time.Duration) *Config {
return c
}

// WithCache toggles the packet cache.
func (c *Config) WithCache(enabled bool) *Config {
c.withCache = enabled
return c
}

// Cache returns if the packet cache is enabled.
func (c *Config) Cache() bool {
return c.withCache
}

// WithSequenceResetEnabled allows to toggle the detection of reset sequences,
// which mean that an Exporter has restarted. This will cause the session to be
// reset (all templates expired). A value of true enables this behavior.
Expand Down
164 changes: 164 additions & 0 deletions x-pack/filebeat/input/netflow/decoder/v9/lru.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package v9

import (
"bytes"
"container/heap"
"sync"
"sync/atomic"
"time"
)

type eventWithMissingTemplate struct {
key SessionKey
entryTime time.Time
}

type pendingEventsHeap []eventWithMissingTemplate

func (h pendingEventsHeap) Len() int {
return len(h)
}

func (h pendingEventsHeap) Less(i, j int) bool {
return h[i].entryTime.Sub(h[j].entryTime) < 0
}

func (h pendingEventsHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
}

func (h *pendingEventsHeap) Push(x any) {
v, ok := x.(eventWithMissingTemplate)
if ok {
*h = append(*h, v)
}
}

func (h *pendingEventsHeap) Pop() any {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1 : n-1]
return x
}

type pendingTemplatesCache struct {
mtx sync.RWMutex
wg sync.WaitGroup
isEmpty atomic.Bool
hp pendingEventsHeap
started bool
events map[SessionKey][]*bytes.Buffer
}

func newPendingTemplatesCache() *pendingTemplatesCache {
cache := &pendingTemplatesCache{
events: make(map[SessionKey][]*bytes.Buffer),
hp: pendingEventsHeap{},
}
return cache
}

// GetAndRemove returns all events for a given session key and removes them from the cache
func (h *pendingTemplatesCache) GetAndRemove(key SessionKey) []*bytes.Buffer {
if h.isEmpty.Load() {
return nil
}

h.mtx.Lock()
defer h.mtx.Unlock()
events, ok := h.events[key]
if !ok {
return nil
}
delete(h.events, key)
h.isEmpty.Store(len(h.events) == 0)
return events
}

// Add adds an event to the pending templates cache
func (h *pendingTemplatesCache) Add(key SessionKey, events *bytes.Buffer) {
h.mtx.Lock()
defer h.mtx.Unlock()

h.events[key] = append(h.events[key], events)
h.hp.Push(eventWithMissingTemplate{key: key, entryTime: time.Now()})
h.isEmpty.Store(false)
pkoutsovasilis marked this conversation as resolved.
Show resolved Hide resolved
}

// start starts the pending templates cache cleaner
func (h *pendingTemplatesCache) start(done <-chan struct{}, cleanInterval time.Duration, removalThreshold time.Duration) {
h.mtx.Lock()
if h.started {
h.mtx.Unlock()
return
}
h.started = true
h.mtx.Unlock()

h.wg.Add(1)
go func(n *pendingTemplatesCache) {
defer n.wg.Done()
timer := time.NewTimer(cleanInterval)
defer timer.Stop()

for {
select {
case <-timer.C:
h.cleanup(removalThreshold)
timer.Reset(cleanInterval)
case <-done:
return
}
}
}(h)
}

func (h *pendingTemplatesCache) cleanup(removalThreshold time.Duration) {
h.mtx.Lock()
defer h.mtx.Unlock()

if len(h.hp) == 0 {
// lru is empty do not proceed further
return
} else if len(h.events) == 0 {
// all pending events have been cleaned by GetAndRemove
// thus reset lru since it is not empty (look above) and continue
h.hp = pendingEventsHeap{}
return
}

hp := &h.hp
now := time.Now()
for {
v := heap.Pop(hp)
c, ok := v.(eventWithMissingTemplate)
if !ok {
// weirdly enough we should never get here
continue
}
if now.Sub(c.entryTime) < removalThreshold {
pkoutsovasilis marked this conversation as resolved.
Show resolved Hide resolved
// we have events that are not old enough
// to be removed thus stop looping
heap.Push(hp, c)
break
}
// we can remove the pending events
delete(h.events, c.key)

if len(h.hp) == 0 {
break
}
}

h.isEmpty.Store(len(h.events) == 0)
}

// stop stops the pending templates cache cleaner
func (h *pendingTemplatesCache) stop() {
h.wg.Wait()
pkoutsovasilis marked this conversation as resolved.
Show resolved Hide resolved
}
112 changes: 112 additions & 0 deletions x-pack/filebeat/input/netflow/decoder/v9/lru_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package v9

import (
"bytes"
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestPendingTemplatesCache(t *testing.T) {

type testEvent struct {
key SessionKey
buf *bytes.Buffer
}

tests := []struct {
name string
eventsToAdd []testEvent
eventsToGet []SessionKey
eventsExpected []*bytes.Buffer
getDelay time.Duration
cleanInterval time.Duration
removalThreshold time.Duration
}{
{
name: "Add and GetAndRemove different sessions with cache hit",
eventsToAdd: []testEvent{
{SessionKey{"127.0.0.1", 0}, bytes.NewBufferString("test-event-1")},
{SessionKey{"127.0.0.2", 0}, bytes.NewBufferString("test-event-1")},
},
eventsToGet: []SessionKey{
{"127.0.0.1", 0},
{"127.0.0.2", 0},
},
eventsExpected: []*bytes.Buffer{
bytes.NewBufferString("test-event-1"),
bytes.NewBufferString("test-event-1"),
},
getDelay: 1 * time.Second,
cleanInterval: 2 * time.Second,
removalThreshold: 2 * time.Second,
},
{
name: "Add and GetAndRemove same sessions with cache hit",
eventsToAdd: []testEvent{
{SessionKey{"127.0.0.1", 0}, bytes.NewBufferString("test-event-1")},
{SessionKey{"127.0.0.1", 0}, bytes.NewBufferString("test-event-1")},
},
eventsToGet: []SessionKey{
{"127.0.0.1", 0},
},
eventsExpected: []*bytes.Buffer{
bytes.NewBufferString("test-event-1"),
bytes.NewBufferString("test-event-1"),
},
getDelay: 1 * time.Second,
cleanInterval: 2 * time.Second,
removalThreshold: 2 * time.Second,
},
{
name: "Add and GetAndRemove with cache miss",
eventsToAdd: []testEvent{
{SessionKey{"127.0.0.1", 0}, bytes.NewBufferString("test-event-1")},
{SessionKey{"127.0.0.2", 0}, bytes.NewBufferString("test-event-1")},
},
eventsToGet: []SessionKey{
{"127.0.0.1", 0},
{"127.0.0.2", 0},
},
eventsExpected: []*bytes.Buffer(nil),
getDelay: 2 * time.Second,
cleanInterval: 1 * time.Second,
removalThreshold: 1 * time.Second,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
cache := newPendingTemplatesCache()
cache.start(ctx.Done(), tt.cleanInterval, tt.removalThreshold)
pkoutsovasilis marked this conversation as resolved.
Show resolved Hide resolved
for _, event := range tt.eventsToAdd {
cache.Add(event.key, event.buf)
}
time.Sleep(tt.getDelay)
var readEvents []*bytes.Buffer
for _, key := range tt.eventsToGet {
if events := cache.GetAndRemove(key); events != nil {
readEvents = append(readEvents, events...)
}
}
require.EqualValues(t, tt.eventsExpected, readEvents)

time.Sleep(2 * tt.cleanInterval)

cache.mtx.Lock()
lruLen := len(cache.hp)
lruCap := cap(cache.hp)
cache.mtx.Unlock()
require.Zero(t, lruLen)
require.Zero(t, lruCap)
})
}
}
Loading
Loading