Skip to content

Commit

Permalink
add support for queue settings under outputs
Browse files Browse the repository at this point in the history
- add support for `idle_connection_timeout` for ES output
- add support for queue settings under output

Closes elastic#35615
  • Loading branch information
leehinman committed Oct 19, 2023
1 parent deb7d42 commit 95ded73
Show file tree
Hide file tree
Showing 22 changed files with 180 additions and 3 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ is collected by it.
- Add support for AWS external IDs. {issue}36321[36321] {pull}36322[36322]
- [Enhanncement for host.ip and host.mac] Disabling netinfo.enabled option of add-host-metadata processor {pull}36506[36506]
Setting environmental variable ELASTIC_NETINFO:false in Elastic Agent pod will disable the netinfo.enabled option of add_host_metadata processor
- allow `queue` configuration settings to be set under the output. {issue}35615[35615] {pull}99999[99999]
- elasticsearch output now supports `idle_connection_timeout`. {issue}35615[35615] {pull}99999[99999]

*Auditbeat*

Expand Down
5 changes: 5 additions & 0 deletions auditbeat/auditbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,11 @@ output.elasticsearch:
# Elasticsearch after a network error. The default is 60s.
#backoff.max: 60s

# The maximum amount of time an idle connection will remain idle
# before closing itself. Zero means no limit. The format is a Go
# language duration (example 60s is 60 seconds). The default is 0.
#idle_connection_timeout: 60s

# Configure HTTP request timeout before failing a request to Elasticsearch.
#timeout: 90

Expand Down
5 changes: 5 additions & 0 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1618,6 +1618,11 @@ output.elasticsearch:
# Elasticsearch after a network error. The default is 60s.
#backoff.max: 60s

# The maximum amount of time an idle connection will remain idle
# before closing itself. Zero means no limit. The format is a Go
# language duration (example 60s is 60 seconds). The default is 0.
#idle_connection_timeout: 60s

# Configure HTTP request timeout before failing a request to Elasticsearch.
#timeout: 90

Expand Down
5 changes: 5 additions & 0 deletions heartbeat/heartbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,11 @@ output.elasticsearch:
# Elasticsearch after a network error. The default is 60s.
#backoff.max: 60s

# The maximum amount of time an idle connection will remain idle
# before closing itself. Zero means no limit. The format is a Go
# language duration (example 60s is 60 seconds). The default is 0.
#idle_connection_timeout: 60s

# Configure HTTP request timeout before failing a request to Elasticsearch.
#timeout: 90

Expand Down
5 changes: 5 additions & 0 deletions libbeat/_meta/config/output-elasticsearch.reference.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ output.elasticsearch:
# Elasticsearch after a network error. The default is 60s.
#backoff.max: 60s

# The maximum amount of time an idle connection will remain idle
# before closing itself. Zero means no limit. The format is a Go
# language duration (example 60s is 60 seconds). The default is 0.
#idle_connection_timeout: 60s

# Configure HTTP request timeout before failing a request to Elasticsearch.
#timeout: 90

Expand Down
18 changes: 18 additions & 0 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -783,6 +783,10 @@ func (b *Beat) configure(settings Settings) error {
return fmt.Errorf("error unpacking config data: %w", err)
}

if err := mergeOutputQueueSettings(&b.Config); err != nil {
return fmt.Errorf("could not merge output queue settings: %w", err)
}

if err := features.UpdateFromConfig(b.RawConfig); err != nil {
return fmt.Errorf("could not parse features: %w", err)
}
Expand Down Expand Up @@ -1480,3 +1484,17 @@ func sanitizeIPs(ips []string) []string {
}
return validIPs
}

func mergeOutputQueueSettings(bc *beatConfig) error {
if bc.Output.IsSet() && bc.Output.Config().Enabled() {
pc := pipeline.Config{}
err := bc.Output.Config().Unpack(&pc)
if err != nil {
return fmt.Errorf("error unpacking output queue settings: %w", err)
}
if pc.Queue.IsSet() {
bc.Pipeline.Queue = pc.Queue
}
}
return nil
}
73 changes: 73 additions & 0 deletions libbeat/cmd/instance/beat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import (
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/go-ucfg/yaml"

"github.com/gofrs/uuid"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -269,3 +271,74 @@ func (r *outputReloaderMock) Reload(
r.cfg = cfg
return nil
}

func TestMergeOutputQueueSettings(t *testing.T) {
tests := map[string]struct {
input []byte
memEvents int
}{
"blank": {input: []byte(""),
memEvents: 4096},
"defaults": {input: []byte(`
name: mockbeat
output:
elasticsearch:
hosts:
- "localhost:9200"
`),
memEvents: 4096},
"topLevelQueue": {input: []byte(`
name: mockbeat
queue:
mem:
events: 8096
output:
elasticsearch:
hosts:
- "localhost:9200"
`),
memEvents: 8096},
"outputLevelQueue": {input: []byte(`
name: mockbeat
output:
elasticsearch:
hosts:
- "localhost:9200"
queue:
mem:
events: 8096
`),
memEvents: 8096},
"topAndOutputLevelQueue": {input: []byte(`
name: mockbeat
queue:
mem:
events: 2048
output:
elasticsearch:
hosts:
- "localhost:9200"
queue:
mem:
events: 8096
`),
memEvents: 8096},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
cfg, err := yaml.NewConfig(tc.input)
require.NoError(t, err)

config := beatConfig{}
err = cfg.Unpack(&config)
require.NoError(t, err)

err = mergeOutputQueueSettings(&config)
require.NoError(t, err)

ms, err := memqueue.SettingsForUserConfig(config.Pipeline.Queue.Config())
require.NoError(t, err)
require.Equalf(t, tc.memEvents, ms.Events, "config was: %v", config.Pipeline.Queue.Config())
})
}
}
9 changes: 6 additions & 3 deletions libbeat/docs/queueconfig.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@ queue is responsible for buffering and combining events into batches that can
be consumed by the outputs. The outputs will use bulk operations to send a
batch of events in one transaction.

You can configure the type and behavior of the internal queue by setting
options in the `queue` section of the +{beatname_lc}.yml+ config file. Only one
queue type can be configured.
You can configure the type and behavior of the internal queue by
setting options in the `queue` section of the +{beatname_lc}.yml+
config file or by setting options in the `queue` section of the
output. Only one queue type can be configured. If both the top level
queue section and the output section are specified the output section
takes precedence.


This sample configuration sets the memory queue to buffer up to 4096 events:
Expand Down
1 change: 1 addition & 0 deletions libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func NewClient(
CompressionLevel: s.CompressionLevel,
EscapeHTML: s.EscapeHTML,
Transport: s.Transport,
IdleConnTimeout: s.IdleConnTimeout,
})
if err != nil {
return nil, err
Expand Down
4 changes: 4 additions & 0 deletions libbeat/outputs/elasticsearch/docs/elasticsearch.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,10 @@ default is `1s`.
The maximum number of seconds to wait before attempting to connect to
Elasticsearch after a network error. The default is `60s`.

===== `idle_connection_timeout`

The maximum amount of time an idle connection will remain idle before closing itself. Zero means no limit. The format is a Go language duration (example 60s is 60 seconds). The default is 0.

===== `timeout`

The http request timeout in seconds for the Elasticsearch request. The default is 90.
Expand Down
1 change: 1 addition & 0 deletions libbeat/outputs/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func makeES(
Observer: observer,
EscapeHTML: config.EscapeHTML,
Transport: config.Transport,
IdleConnTimeout: config.Transport.IdleConnTimeout,
},
Index: index,
Pipeline: pipeline,
Expand Down
5 changes: 5 additions & 0 deletions metricbeat/metricbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1357,6 +1357,11 @@ output.elasticsearch:
# Elasticsearch after a network error. The default is 60s.
#backoff.max: 60s

# The maximum amount of time an idle connection will remain idle
# before closing itself. Zero means no limit. The format is a Go
# language duration (example 60s is 60 seconds). The default is 0.
#idle_connection_timeout: 60s

# Configure HTTP request timeout before failing a request to Elasticsearch.
#timeout: 90

Expand Down
5 changes: 5 additions & 0 deletions packetbeat/packetbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -988,6 +988,11 @@ output.elasticsearch:
# Elasticsearch after a network error. The default is 60s.
#backoff.max: 60s

# The maximum amount of time an idle connection will remain idle
# before closing itself. Zero means no limit. The format is a Go
# language duration (example 60s is 60 seconds). The default is 0.
#idle_connection_timeout: 60s

# Configure HTTP request timeout before failing a request to Elasticsearch.
#timeout: 90

Expand Down
5 changes: 5 additions & 0 deletions winlogbeat/winlogbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,11 @@ output.elasticsearch:
# Elasticsearch after a network error. The default is 60s.
#backoff.max: 60s

# The maximum amount of time an idle connection will remain idle
# before closing itself. Zero means no limit. The format is a Go
# language duration (example 60s is 60 seconds). The default is 0.
#idle_connection_timeout: 60s

# Configure HTTP request timeout before failing a request to Elasticsearch.
#timeout: 90

Expand Down
5 changes: 5 additions & 0 deletions x-pack/auditbeat/auditbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,11 @@ output.elasticsearch:
# Elasticsearch after a network error. The default is 60s.
#backoff.max: 60s

# The maximum amount of time an idle connection will remain idle
# before closing itself. Zero means no limit. The format is a Go
# language duration (example 60s is 60 seconds). The default is 0.
#idle_connection_timeout: 60s

# Configure HTTP request timeout before failing a request to Elasticsearch.
#timeout: 90

Expand Down
5 changes: 5 additions & 0 deletions x-pack/filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3988,6 +3988,11 @@ output.elasticsearch:
# Elasticsearch after a network error. The default is 60s.
#backoff.max: 60s

# The maximum amount of time an idle connection will remain idle
# before closing itself. Zero means no limit. The format is a Go
# language duration (example 60s is 60 seconds). The default is 0.
#idle_connection_timeout: 60s

# Configure HTTP request timeout before failing a request to Elasticsearch.
#timeout: 90

Expand Down
5 changes: 5 additions & 0 deletions x-pack/functionbeat/functionbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,11 @@ output.elasticsearch:
# Elasticsearch after a network error. The default is 60s.
#backoff.max: 60s

# The maximum amount of time an idle connection will remain idle
# before closing itself. Zero means no limit. The format is a Go
# language duration (example 60s is 60 seconds). The default is 0.
#idle_connection_timeout: 60s

# Configure HTTP request timeout before failing a request to Elasticsearch.
#timeout: 90

Expand Down
5 changes: 5 additions & 0 deletions x-pack/heartbeat/heartbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,11 @@ output.elasticsearch:
# Elasticsearch after a network error. The default is 60s.
#backoff.max: 60s

# The maximum amount of time an idle connection will remain idle
# before closing itself. Zero means no limit. The format is a Go
# language duration (example 60s is 60 seconds). The default is 0.
#idle_connection_timeout: 60s

# Configure HTTP request timeout before failing a request to Elasticsearch.
#timeout: 90

Expand Down
5 changes: 5 additions & 0 deletions x-pack/metricbeat/metricbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1918,6 +1918,11 @@ output.elasticsearch:
# Elasticsearch after a network error. The default is 60s.
#backoff.max: 60s

# The maximum amount of time an idle connection will remain idle
# before closing itself. Zero means no limit. The format is a Go
# language duration (example 60s is 60 seconds). The default is 0.
#idle_connection_timeout: 60s

# Configure HTTP request timeout before failing a request to Elasticsearch.
#timeout: 90

Expand Down
5 changes: 5 additions & 0 deletions x-pack/osquerybeat/osquerybeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,11 @@ output.elasticsearch:
# Elasticsearch after a network error. The default is 60s.
#backoff.max: 60s

# The maximum amount of time an idle connection will remain idle
# before closing itself. Zero means no limit. The format is a Go
# language duration (example 60s is 60 seconds). The default is 0.
#idle_connection_timeout: 60s

# Configure HTTP request timeout before failing a request to Elasticsearch.
#timeout: 90

Expand Down
5 changes: 5 additions & 0 deletions x-pack/packetbeat/packetbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -988,6 +988,11 @@ output.elasticsearch:
# Elasticsearch after a network error. The default is 60s.
#backoff.max: 60s

# The maximum amount of time an idle connection will remain idle
# before closing itself. Zero means no limit. The format is a Go
# language duration (example 60s is 60 seconds). The default is 0.
#idle_connection_timeout: 60s

# Configure HTTP request timeout before failing a request to Elasticsearch.
#timeout: 90

Expand Down
5 changes: 5 additions & 0 deletions x-pack/winlogbeat/winlogbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,11 @@ output.elasticsearch:
# Elasticsearch after a network error. The default is 60s.
#backoff.max: 60s

# The maximum amount of time an idle connection will remain idle
# before closing itself. Zero means no limit. The format is a Go
# language duration (example 60s is 60 seconds). The default is 0.
#idle_connection_timeout: 60s

# Configure HTTP request timeout before failing a request to Elasticsearch.
#timeout: 90

Expand Down

0 comments on commit 95ded73

Please sign in to comment.