Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update broker/pipeline setup #4650

Merged
merged 1 commit into from
Jul 12, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ https://github.com/elastic/beats/compare/v6.0.0-alpha2...master[Check the HEAD d
- The `scripts/import_dashboards` is removed from packages. Use the `setup` command instead. {pull}4586[4586]
- Change format of the saved kibana dashboards to have a single JSON file for each dashboard {pull}4413[4413]
- Rename `configtest` command to `test config`. {pull}4590[4590]
- Remove setting `queue_size` and `bulk_queue_size`. {pull}4650[4650]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a message to the 5.6 release that these values will change?


*Filebeat*

Expand Down Expand Up @@ -70,6 +71,8 @@ https://github.com/elastic/beats/compare/v6.0.0-alpha2...master[Check the HEAD d
- Add support for analyzers and multifields in fields.yml. {pull}4574[4574]
- Add support for JSON logging. {pull}4523[4523]
- Add `test output` command, to test Elasticsearch and Logstash output settings. {pull}4590[4590]
- Introduce configurable event queue settings: queue.mem.events, queue.mem.flush.min_events and queue.mem.flush.timeout. {pull}4650[4650]
- Enable pipelining in Logstash output by default. {pull}4650[4650]

*Filebeat*

Expand All @@ -81,6 +84,7 @@ https://github.com/elastic/beats/compare/v6.0.0-alpha2...master[Check the HEAD d
- Add udp prospector type. {pull}4452[4452]
- Enabled Cgo which means libc is dynamically compiled. {pull}4546[4546]
- Add Beta module config reloading mechanism {pull}4566[4566]
- Remove spooler and publisher components and settings. {pull}4644[4644]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, we should probably have a message in 5.6 or even better, not let the beat start in 6.0 if these values are set?


*Heartbeat*

Expand Down
29 changes: 21 additions & 8 deletions auditbeat/auditbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,25 @@ auditbeat.modules:
# sub-dictionary. Default is false.
#fields_under_root: false

# Internal queue size for single events in processing pipeline
#queue_size: 1000

# The internal queue size for bulk events in the processing pipeline.
# Do not modify this value.
#bulk_queue_size: 0
# Internal queue configuration for buffering events to be published.
#queue:
# Queue type by name (default 'mem')
# The memory queue will present all available events (up to the outputs
# bulk_max_size) to the output, the moment the output is ready to server
# another batch of events.
#mem:
# Max number of events the queue can buffer.
#events: 4096

# Hints the minimum number of events stored in the queue,
# before providing a batch of events to the outputs.
# A value of 0 (the default) ensures events are immediately available
# to be sent to the outputs.
#flush.min_events: 0

# Maximum duration after which events are available to the outputs,
# if the number of events stored in the queue is < min_flush_events.
#flush.timeout: 0s

# Sets the maximum number of CPUs that can be executing simultaneously. The
# default is the number of logical CPUs available in the system.
Expand Down Expand Up @@ -284,9 +297,9 @@ output.elasticsearch:
# Optional load balance the events between the Logstash hosts
#loadbalance: true

# Number of batches to be send asynchronously to logstash while processing
# Number of batches to be sent asynchronously to logstash while processing
# new batches.
#pipelining: 0
#pipelining: 5
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason we changed the default to 5? Should probably be mentioned in the changelog?

Copy link
Author

@urso urso Jul 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • pipeline can operate complete async -> take advantage of this fact
  • with default settings we have a high chance to publish very small batches (no flush timeout -> reduced latency) if producers don't have a high enough rate. In Logstash case we can easily compensate for this by pushing more batches on the link

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated changelog.


# Optional index name. The default index name is set to name of the beat
# in all lowercase.
Expand Down
11 changes: 0 additions & 11 deletions filebeat/_meta/common.reference.p2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -240,21 +240,10 @@ filebeat.prospectors:

#========================= Filebeat global options ============================

# Event count spool threshold - forces network flush if exceeded
#filebeat.spool_size: 2048

# Enable async publisher pipeline in filebeat (Experimental!)
#filebeat.publish_async: false

# Defines how often the spooler is flushed. After idle_timeout the spooler is
# Flush even though spool_size is not reached.
#filebeat.idle_timeout: 5s

# Name of the registry file. If a relative path is used, it is considered relative to the
# data path.
#filebeat.registry_file: ${path.data}/registry

#
# These config files must have the full filebeat config part inside, but only
# the prospector part is processed. All global options like spool_size are ignored.
# The config_dir MUST point to a different directory then where the main filebeat config file is in.
Expand Down
5 changes: 0 additions & 5 deletions filebeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ const (

type Config struct {
Prospectors []*common.Config `config:"prospectors"`
SpoolSize uint64 `config:"spool_size" validate:"min=1"`
PublishAsync bool `config:"publish_async"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Glad to see this one removed ;-)

IdleTimeout time.Duration `config:"idle_timeout" validate:"nonzero,min=0s"`
RegistryFile string `config:"registry_file"`
ConfigDir string `config:"config_dir"`
ShutdownTimeout time.Duration `config:"shutdown_timeout"`
Expand All @@ -34,8 +31,6 @@ type Config struct {
var (
DefaultConfig = Config{
RegistryFile: "registry",
SpoolSize: 2048,
IdleTimeout: 5 * time.Second,
ShutdownTimeout: 0,
}
)
Expand Down
2 changes: 0 additions & 2 deletions filebeat/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ func TestReadConfig2(t *testing.T) {
// Reads second config file
err = cfgfile.Read(config, absPath+"/config2.yml")
assert.Nil(t, err)

assert.Equal(t, uint64(0), config.SpoolSize)
}

func TestGetConfigFiles_File(t *testing.T) {
Expand Down
40 changes: 21 additions & 19 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -409,21 +409,10 @@ filebeat.prospectors:

#========================= Filebeat global options ============================

# Event count spool threshold - forces network flush if exceeded
#filebeat.spool_size: 2048

# Enable async publisher pipeline in filebeat (Experimental!)
#filebeat.publish_async: false

# Defines how often the spooler is flushed. After idle_timeout the spooler is
# Flush even though spool_size is not reached.
#filebeat.idle_timeout: 5s

# Name of the registry file. If a relative path is used, it is considered relative to the
# data path.
#filebeat.registry_file: ${path.data}/registry

#
# These config files must have the full filebeat config part inside, but only
# the prospector part is processed. All global options like spool_size are ignored.
# The config_dir MUST point to a different directory then where the main filebeat config file is in.
Expand Down Expand Up @@ -469,12 +458,25 @@ filebeat.prospectors:
# sub-dictionary. Default is false.
#fields_under_root: false

# Internal queue size for single events in processing pipeline
#queue_size: 1000

# The internal queue size for bulk events in the processing pipeline.
# Do not modify this value.
#bulk_queue_size: 0
# Internal queue configuration for buffering events to be published.
#queue:
# Queue type by name (default 'mem')
# The memory queue will present all available events (up to the outputs
# bulk_max_size) to the output, the moment the output is ready to server
# another batch of events.
#mem:
# Max number of events the queue can buffer.
#events: 4096

# Hints the minimum number of events stored in the queue,
# before providing a batch of events to the outputs.
# A value of 0 (the default) ensures events are immediately available
# to be sent to the outputs.
#flush.min_events: 0

# Maximum duration after which events are available to the outputs,
# if the number of events stored in the queue is < min_flush_events.
#flush.timeout: 0s

# Sets the maximum number of CPUs that can be executing simultaneously. The
# default is the number of logical CPUs available in the system.
Expand Down Expand Up @@ -667,9 +669,9 @@ output.elasticsearch:
# Optional load balance the events between the Logstash hosts
#loadbalance: true

# Number of batches to be send asynchronously to logstash while processing
# Number of batches to be sent asynchronously to logstash while processing
# new batches.
#pipelining: 0
#pipelining: 5

# Optional index name. The default index name is set to name of the beat
# in all lowercase.
Expand Down
3 changes: 0 additions & 3 deletions filebeat/tests/system/config/filebeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,10 @@ filebeat.prospectors:
{{prospector_raw}}
{% endif %}

filebeat.spool_size:
filebeat.shutdown_timeout: {{ shutdown_timeout|default(0) }}
filebeat.idle_timeout: 0.1s
{% if not skip_registry_config %}
filebeat.registry_file: {{ beat.working_dir + '/' }}{{ registryFile|default("registry")}}
{%endif%}
filebeat.publish_async: {{publish_async}}

{% if reload or reload_path -%}
filebeat.config.{{ reload_type|default("prospectors") }}:
Expand Down
1 change: 0 additions & 1 deletion filebeat/tests/system/test_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ def test_registrar_file_content(self):

self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*",
publish_async=True
)
os.mkdir(self.working_dir + "/log/")

Expand Down
29 changes: 21 additions & 8 deletions heartbeat/heartbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -236,12 +236,25 @@ heartbeat.scheduler:
# sub-dictionary. Default is false.
#fields_under_root: false

# Internal queue size for single events in processing pipeline
#queue_size: 1000

# The internal queue size for bulk events in the processing pipeline.
# Do not modify this value.
#bulk_queue_size: 0
# Internal queue configuration for buffering events to be published.
#queue:
# Queue type by name (default 'mem')
# The memory queue will present all available events (up to the outputs
# bulk_max_size) to the output, the moment the output is ready to server
# another batch of events.
#mem:
# Max number of events the queue can buffer.
#events: 4096

# Hints the minimum number of events stored in the queue,
# before providing a batch of events to the outputs.
# A value of 0 (the default) ensures events are immediately available
# to be sent to the outputs.
#flush.min_events: 0

# Maximum duration after which events are available to the outputs,
# if the number of events stored in the queue is < min_flush_events.
#flush.timeout: 0s

# Sets the maximum number of CPUs that can be executing simultaneously. The
# default is the number of logical CPUs available in the system.
Expand Down Expand Up @@ -434,9 +447,9 @@ output.elasticsearch:
# Optional load balance the events between the Logstash hosts
#loadbalance: true

# Number of batches to be send asynchronously to logstash while processing
# Number of batches to be sent asynchronously to logstash while processing
# new batches.
#pipelining: 0
#pipelining: 5

# Optional index name. The default index name is set to name of the beat
# in all lowercase.
Expand Down
29 changes: 21 additions & 8 deletions libbeat/_meta/config.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,25 @@
# sub-dictionary. Default is false.
#fields_under_root: false

# Internal queue size for single events in processing pipeline
#queue_size: 1000

# The internal queue size for bulk events in the processing pipeline.
# Do not modify this value.
#bulk_queue_size: 0
# Internal queue configuration for buffering events to be published.
#queue:
# Queue type by name (default 'mem')
# The memory queue will present all available events (up to the outputs
# bulk_max_size) to the output, the moment the output is ready to server
# another batch of events.
#mem:
# Max number of events the queue can buffer.
#events: 4096

# Hints the minimum number of events stored in the queue,
# before providing a batch of events to the outputs.
# A value of 0 (the default) ensures events are immediately available
# to be sent to the outputs.
#flush.min_events: 0

# Maximum duration after which events are available to the outputs,
# if the number of events stored in the queue is < min_flush_events.
#flush.timeout: 0s

# Sets the maximum number of CPUs that can be executing simultaneously. The
# default is the number of logical CPUs available in the system.
Expand Down Expand Up @@ -220,9 +233,9 @@ output.elasticsearch:
# Optional load balance the events between the Logstash hosts
#loadbalance: true

# Number of batches to be send asynchronously to logstash while processing
# Number of batches to be sent asynchronously to logstash while processing
# new batches.
#pipelining: 0
#pipelining: 5

# Optional index name. The default index name is set to name of the beat
# in all lowercase.
Expand Down
5 changes: 4 additions & 1 deletion libbeat/monitoring/report/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,10 @@ func makeReporter(beat common.BeatInfo, cfg *common.Config) (report.Reporter, er
}

brokerFactory := func(e broker.Eventer) (broker.Broker, error) {
return membroker.NewBroker(e, 20, false), nil
return membroker.NewBroker(membroker.Settings{
Eventer: e,
Events: 20,
}), nil
}
pipeline, err := pipeline.New(brokerFactory, out, pipeline.Settings{
WaitClose: 0,
Expand Down
1 change: 1 addition & 0 deletions libbeat/outputs/logstash/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Backoff struct {
var defaultConfig = Config{
Port: 5044,
LoadBalance: false,
Pipelining: 5,
BulkMaxSize: 2048,
CompressionLevel: 3,
Timeout: 30 * time.Second,
Expand Down
33 changes: 20 additions & 13 deletions libbeat/publisher/bc/publisher/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,22 @@ package publisher

import (
"errors"
"fmt"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/libbeat/publisher/broker"
"github.com/elastic/beats/libbeat/publisher/broker/membroker"
"github.com/elastic/beats/libbeat/publisher/pipeline"
)

const defaultBrokerSize = 8 * 1024

func createPipeline(
beatInfo common.BeatInfo,
shipper ShipperConfig,
processors *processors.Processors,
outcfg common.ConfigNamespace,
) (*pipeline.Pipeline, error) {
queueSize := defaultBrokerSize
if qs := shipper.QueueSize; qs != nil {
if sz := *qs; sz > 0 {
queueSize = sz
}
}

var out outputs.Group
if !(*publishDisabled) {
var err error
Expand Down Expand Up @@ -63,11 +54,27 @@ func createPipeline(
},
}

brokerFactory := func(e broker.Eventer) (broker.Broker, error) {
return membroker.NewBroker(e, queueSize, false), nil
brokerType := "mem"
if b := shipper.Queue.Name(); b != "" {
brokerType = b
}

p, err := pipeline.New(brokerFactory, out, settings)
brokerFactory := broker.FindFactory(brokerType)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As quickly discussed, later we should probably rename broker package to queue to make the relation to the config more obvious.

if brokerFactory == nil {
return nil, fmt.Errorf("'%v' is no valid queue type", brokerType)
}

brokerConfig := shipper.Queue.Config()
if brokerConfig == nil {
brokerConfig = common.NewConfig()
}

p, err := pipeline.New(
func(eventer broker.Eventer) (broker.Broker, error) {
return brokerFactory(eventer, brokerConfig)
},
out, settings,
)
if err != nil {
return nil, err
}
Expand Down
Loading