Skip to content

Commit

Permalink
Add the ability for streams to set interface and process watching inf…
Browse files Browse the repository at this point in the history
…ormation (elastic#22575)
  • Loading branch information
Andrew Stucki authored Nov 16, 2020
1 parent 897dd26 commit 36f28bf
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 0 deletions.
42 changes: 42 additions & 0 deletions packetbeat/config/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/packetbeat/procs"
"github.com/elastic/go-ucfg"
)

Expand Down Expand Up @@ -94,6 +95,25 @@ func (i agentInput) addProcessorsAndIndex(cfg *common.Config) (*common.Config, e
return cfg, nil
}

func mergeProcsConfig(one, two procs.ProcsConfig) procs.ProcsConfig {
maxProcReadFreq := one.MaxProcReadFreq
if two.MaxProcReadFreq > maxProcReadFreq {
maxProcReadFreq = two.MaxProcReadFreq
}

refreshPidsFreq := one.RefreshPidsFreq
if two.RefreshPidsFreq < refreshPidsFreq {
refreshPidsFreq = two.RefreshPidsFreq
}

return procs.ProcsConfig{
Enabled: true,
MaxProcReadFreq: maxProcReadFreq,
RefreshPidsFreq: refreshPidsFreq,
Monitored: append(one.Monitored, two.Monitored...),
}
}

// NewAgentConfig allows the packetbeat configuration to understand
// agent semantics
func NewAgentConfig(cfg *common.Config) (Config, error) {
Expand All @@ -111,6 +131,28 @@ func NewAgentConfig(cfg *common.Config) (Config, error) {

logp.Debug("agent", fmt.Sprintf("Found %d inputs", len(input.Streams)))
for _, stream := range input.Streams {
if interfaceOverride, ok := stream["interface"]; ok {
cfg, err := common.NewConfigFrom(interfaceOverride)
if err != nil {
return config, err
}
if err := cfg.Unpack(&config.Interfaces); err != nil {
return config, err
}
}

if procsOverride, ok := stream["procs"]; ok {
cfg, err := common.NewConfigFrom(procsOverride)
if err != nil {
return config, err
}
var newProcsConfig procs.ProcsConfig
if err := cfg.Unpack(&newProcsConfig); err != nil {
return config, err
}
config.Procs = mergeProcsConfig(config.Procs, newProcsConfig)
}

if rawStreamType, ok := stream["type"]; ok {
streamType, ok := rawStreamType.(string)
if !ok {
Expand Down
22 changes: 22 additions & 0 deletions packetbeat/config/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,30 @@ streams:
timeout: 10s
period: 10s
keep_null: false
interface:
device: thisisignoredfornow
snaplen: 1514
type: af_packet
buffer_size_mb: 100
procs:
enabled: true
monitored:
- process: mysqld
cmdline_grep: mysqld
data_stream:
dataset: packet.flow
type: logs
- type: icmp
interface:
device: en1
snaplen: 1514
type: af_packet
buffer_size_mb: 100
procs:
enabled: true
monitored:
- process: postgresql
cmdline_grep: postgresql
data_stream:
dataset: packet.icmp
type: logs
Expand All @@ -61,4 +81,6 @@ streams:
var protocol map[string]interface{}
require.NoError(t, config.ProtocolsList[0].Unpack(&protocol))
require.Len(t, protocol["processors"].([]interface{}), 3)
require.Equal(t, config.Interfaces.Device, "en1")
require.Len(t, config.Procs.Monitored, 2)
}

0 comments on commit 36f28bf

Please sign in to comment.