Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[x-pack/filebeat/netflow] implement netflow with multiple workers #40122

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
78c69b9
feat: implement netflow with multiple workers
pkoutsovasilis Jul 4, 2024
bf3ec66
feat: lru improvements
pkoutsovasilis Jul 6, 2024
2a33181
feat: add reversed order pcap for testing lru
pkoutsovasilis Jul 6, 2024
a19dfcb
doc: update CHANGELOG.next.asciidoc
pkoutsovasilis Jul 8, 2024
c0dbb72
fix: reduce capacity of LRU slice during Pop
pkoutsovasilis Jul 8, 2024
cffc71d
fix: ensure uniqueness for flows from different exporters in lru
pkoutsovasilis Jul 9, 2024
b4ab4a6
fix: correct godocs
pkoutsovasilis Jul 9, 2024
91d3db6
fix: rename workers_number to number_of_workers
pkoutsovasilis Jul 9, 2024
db0bf5b
doc: add number_of_workers in netflow plugin asciidoc
pkoutsovasilis Jul 9, 2024
2aa4862
fix: unique id for number_of_workers field in netflow asciidoc
pkoutsovasilis Jul 9, 2024
df1ef35
fix: rename number_of_workers to workers
pkoutsovasilis Jul 9, 2024
513c86a
fix: replace github.com/elastic/beats/v7/libbeat/common/atomic with s…
pkoutsovasilis Jul 22, 2024
4be3b9e
fix: improve lru cleanup code flow
pkoutsovasilis Jul 22, 2024
5b64eae
feat: add lru unit-tests
pkoutsovasilis Jul 22, 2024
1b13d48
feat: remove nil checks from lru, replace done chan with context
pkoutsovasilis Jul 22, 2024
3bdee4c
Merge branch 'main' into pkoutsovasilis/scale_netflow
aleksmaus Jul 22, 2024
5da5471
Merge remote-tracking branch 'refs/remotes/beats/main' into pkoutsova…
pkoutsovasilis Jul 24, 2024
75a74a3
Merge branch 'main' into pkoutsovasilis/scale_netflow
aleksmaus Jul 24, 2024
b79f348
fix: replace entryTime.Sub with the more efficient entryTime.Before i…
pkoutsovasilis Jul 24, 2024
159c77e
fix: remove race-condition prone isEmpty atomic
pkoutsovasilis Jul 24, 2024
5c0a7b0
fix: rename stop to wait for lru to capture its functionality
pkoutsovasilis Jul 24, 2024
1aa4b24
Merge branch 'main' into pkoutsovasilis/scale_netflow
pkoutsovasilis Jul 24, 2024
a0d878c
doc: update workers and output preset documentation
pkoutsovasilis Jul 26, 2024
0a3be7d
Merge branch 'main' into pkoutsovasilis/scale_netflow
pkoutsovasilis Jul 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Implement Elastic Agent status and health reporting for Netflow Filebeat input. {pull}40080[40080]
- Enhance input state reporting for CEL evaluations that return a single error object in events. {pull}40083[40083]
- Allow absent credentials when using GCS with Application Default Credentials. {issue}39977[39977] {pull}40072[40072]
- Add scaling up support for Netflow input. {issue}37761[37761] {pull}40122[40122]

*Auditbeat*

Expand Down
2 changes: 2 additions & 0 deletions x-pack/filebeat/input/netflow/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type config struct {
CustomDefinitions []string `config:"custom_definitions"`
DetectSequenceReset bool `config:"detect_sequence_reset"`
ShareTemplates bool `config:"share_templates"`
WorkersNumber uint32 `config:"workers_number"`
pkoutsovasilis marked this conversation as resolved.
Show resolved Hide resolved
}

var defaultConfig = config{
Expand All @@ -40,4 +41,5 @@ var defaultConfig = config{
PacketQueueSize: 8192,
DetectSequenceReset: true,
ShareTemplates: false,
WorkersNumber: 1,
pkoutsovasilis marked this conversation as resolved.
Show resolved Hide resolved
}
13 changes: 13 additions & 0 deletions x-pack/filebeat/input/netflow/decoder/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Config struct {
detectReset bool
fields fields.FieldDict
sharedTemplates bool
withCache bool
activeSessionsMetric ActiveSessionsMetric
}

Expand All @@ -33,6 +34,7 @@ var defaultCfg = Config{
expiration: time.Hour,
detectReset: true,
sharedTemplates: false,
withCache: false,
}

// Defaults returns a configuration object with defaults settings:
Expand Down Expand Up @@ -63,6 +65,17 @@ func (c *Config) WithExpiration(timeout time.Duration) *Config {
return c
}

// WithCache
pkoutsovasilis marked this conversation as resolved.
Show resolved Hide resolved
func (c *Config) WithCache(enabled bool) *Config {
c.withCache = enabled
return c
}

// WithCache
func (c *Config) Cache() bool {
return c.withCache
}

// WithSequenceResetEnabled allows to toggle the detection of reset sequences,
// which mean that an Exporter has restarted. This will cause the session to be
// reset (all templates expired). A value of true enables this behavior.
Expand Down
169 changes: 169 additions & 0 deletions x-pack/filebeat/input/netflow/decoder/v9/lru.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package v9

import (
"bytes"
"container/heap"
"sync"
"time"

"github.com/elastic/beats/v7/libbeat/common/atomic"
pkoutsovasilis marked this conversation as resolved.
Show resolved Hide resolved
)

type eventWithMissingTemplate struct {
setID uint16
entryTime time.Time
}

type pendingEventsHeap []eventWithMissingTemplate

func (h pendingEventsHeap) Len() int {
return len(h)
}

func (h pendingEventsHeap) Less(i, j int) bool {
return h[i].entryTime.Sub(h[j].entryTime) < 0
}

func (h pendingEventsHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
}

func (h *pendingEventsHeap) Push(x any) {
v, ok := x.(eventWithMissingTemplate)
if ok {
*h = append(*h, v)
}
}

func (h *pendingEventsHeap) Pop() any {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1 : n-1]
return x
}

type pendingTemplatesCache struct {
mtx sync.RWMutex
wg sync.WaitGroup
isEmpty atomic.Bool
hp pendingEventsHeap
started bool
events map[uint16][]*bytes.Buffer
}

func newPendingTemplatesCache() *pendingTemplatesCache {
cache := &pendingTemplatesCache{
events: make(map[uint16][]*bytes.Buffer),
hp: pendingEventsHeap{},
}
return cache
}

func (h *pendingTemplatesCache) GetAndRemove(setID uint16) []*bytes.Buffer {
if h == nil {
return nil
}

if h.isEmpty.Load() {
return nil
}

h.mtx.Lock()
defer h.mtx.Unlock()
events, ok := h.events[setID]
if !ok {
return nil
}
delete(h.events, setID)
h.isEmpty.Store(len(h.events) == 0)
return events
}

func (h *pendingTemplatesCache) Add(setID uint16, events *bytes.Buffer) {
pkoutsovasilis marked this conversation as resolved.
Show resolved Hide resolved
if h == nil {
return
}

h.mtx.Lock()
defer h.mtx.Unlock()

h.events[setID] = append(h.events[setID], events)
h.hp.Push(eventWithMissingTemplate{setID: setID, entryTime: time.Now()})
h.isEmpty.Store(false)
pkoutsovasilis marked this conversation as resolved.
Show resolved Hide resolved
}

// assumption will need to be revisited.
pkoutsovasilis marked this conversation as resolved.
Show resolved Hide resolved
func (h *pendingTemplatesCache) start(done <-chan struct{}, cleanInterval time.Duration, removalThreshold time.Duration) {
if h == nil {
return
}
pkoutsovasilis marked this conversation as resolved.
Show resolved Hide resolved

h.mtx.Lock()
if h.started {
h.mtx.Unlock()
return
}
h.started = true
h.mtx.Unlock()

h.wg.Add(1)
go func(n *pendingTemplatesCache) {
defer n.wg.Done()
ticker := time.NewTicker(cleanInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
n.mtx.Lock()
pkoutsovasilis marked this conversation as resolved.
Show resolved Hide resolved
if len(n.hp) == 0 {
// lru is empty do not proceed further
n.mtx.Unlock()
continue
} else if len(n.events) == 0 {
// all pending events have been cleaned by GetAndRemove
// thus reset lru since it is not empty (look above) and continue
n.hp = pendingEventsHeap{}
n.mtx.Unlock()
continue
}

hp := &n.hp
now := time.Now()
for {
v := heap.Pop(hp)
c, ok := v.(eventWithMissingTemplate)
if !ok {
// weirdly enough we should never get here
continue
}
if now.Sub(c.entryTime) < removalThreshold {
// we have events that are not old enough
// to be removed thus stop looping
heap.Push(hp, c)
break
}
// we can remove the pending events
delete(n.events, c.setID)
}
h.isEmpty.Store(len(h.events) == 0)
n.mtx.Unlock()
case <-done:
return
}
}
}(h)
}

func (h *pendingTemplatesCache) stop() {
if h == nil {
return
}

h.wg.Wait()
pkoutsovasilis marked this conversation as resolved.
Show resolved Hide resolved
}
35 changes: 29 additions & 6 deletions x-pack/filebeat/input/netflow/decoder/v9/v9.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type NetflowV9Protocol struct {
logger *log.Logger
Session SessionMap
timeout time.Duration
cache *pendingTemplatesCache
done chan struct{}
detectReset bool
shareTemplates bool
Expand All @@ -43,13 +44,19 @@ func New(config config.Config) protocol.Protocol {
}

func NewProtocolWithDecoder(decoder Decoder, config config.Config, logger *log.Logger) *NetflowV9Protocol {
return &NetflowV9Protocol{
pd := &NetflowV9Protocol{
decoder: decoder,
Session: NewSessionMap(logger, config.ActiveSessionsMetric()),
logger: logger,
Session: NewSessionMap(logger, config.ActiveSessionsMetric()),
timeout: config.ExpirationTimeout(),
detectReset: config.SequenceResetEnabled(),
}

if config.Cache() {
pd.cache = newPendingTemplatesCache()
}

return pd
}

func (*NetflowV9Protocol) Version() uint16 {
Expand All @@ -61,12 +68,16 @@ func (p *NetflowV9Protocol) Start() error {
if p.timeout != time.Duration(0) {
go p.Session.CleanupLoop(p.timeout, p.done)
}

p.cache.start(p.done, 30*time.Second, 10*time.Second)
pkoutsovasilis marked this conversation as resolved.
Show resolved Hide resolved

return nil
}

func (p *NetflowV9Protocol) Stop() error {
if p.done != nil {
close(p.done)
p.cache.stop()
pkoutsovasilis marked this conversation as resolved.
Show resolved Hide resolved
}
return nil
}
Expand Down Expand Up @@ -123,11 +134,14 @@ func (p *NetflowV9Protocol) parseSet(
) {
if setID >= 256 {
// Flow of Options record, lookup template and generate flows
if template := session.GetTemplate(setID); template != nil {
return template.Apply(buf, 0)
template := session.GetTemplate(setID)

if template == nil {
p.cache.Add(setID, buf)
return nil, nil
}
p.logger.Printf("No template for ID %d", setID)
return nil, nil

return template.Apply(buf, 0)
}

// Template sets
Expand All @@ -137,6 +151,15 @@ func (p *NetflowV9Protocol) parseSet(
}
for _, template := range templates {
session.AddTemplate(template)
events := p.cache.GetAndRemove(template.ID)
for _, e := range events {
f, err := template.Apply(e, 0)
if err != nil {
continue
}
flows = append(flows, f...)
}
}

return flows, nil
}
Loading
Loading