From cd45c0af088b13ce992b7977c721dccf60642a64 Mon Sep 17 00:00:00 2001 From: urso Date: Mon, 10 Jul 2017 23:56:38 +0200 Subject: [PATCH] Update broker setup - remove setting queue_size - remove unused setting bulk_queue_size - introduce settings namespace `queue` with default `mem.events: 4096` - add settings queue.mem.flush.min_events and queue.mem.flush.timeout - change default output.logstash.pipelining to 5 - remove spooler settings from filebeat config object - Update winlogbeat config validate check --- CHANGELOG.asciidoc | 4 + auditbeat/auditbeat.reference.yml | 29 ++++-- filebeat/_meta/common.reference.p2.yml | 11 --- filebeat/config/config.go | 5 - filebeat/config/config_test.go | 2 - filebeat/filebeat.reference.yml | 40 ++++---- filebeat/tests/system/config/filebeat.yml.j2 | 3 - filebeat/tests/system/test_publisher.py | 1 - heartbeat/heartbeat.reference.yml | 29 ++++-- libbeat/_meta/config.reference.yml | 29 ++++-- .../report/elasticsearch/elasticsearch.go | 5 +- libbeat/outputs/logstash/config.go | 1 + libbeat/publisher/bc/publisher/pipeline.go | 33 ++++--- libbeat/publisher/bc/publisher/publish.go | 30 +----- libbeat/publisher/broker/membroker/broker.go | 92 ++++++++++++++++--- .../publisher/broker/membroker/broker_test.go | 2 +- libbeat/publisher/broker/membroker/buf.go | 8 ++ libbeat/publisher/broker/membroker/config.go | 21 ++++- metricbeat/metricbeat.reference.yml | 29 ++++-- packetbeat/beater/packetbeat.go | 3 - packetbeat/packetbeat.reference.yml | 29 ++++-- winlogbeat/config/config.go | 6 +- winlogbeat/config/config_test.go | 4 +- winlogbeat/winlogbeat.reference.yml | 29 ++++-- 24 files changed, 292 insertions(+), 153 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 9356d02863b..c9a8738e577 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -19,6 +19,7 @@ https://github.com/elastic/beats/compare/v6.0.0-alpha2...master[Check the HEAD d - The `scripts/import_dashboards` is removed from packages. Use the `setup` command instead. {pull}4586[4586] - Change format of the saved kibana dashboards to have a single JSON file for each dashboard {pull}4413[4413] - Rename `configtest` command to `test config`. {pull}4590[4590] +- Remove setting `queue_size` and `bulk_queue_size`. {pull}4650[4650] *Filebeat* @@ -70,6 +71,8 @@ https://github.com/elastic/beats/compare/v6.0.0-alpha2...master[Check the HEAD d - Add support for analyzers and multifields in fields.yml. {pull}4574[4574] - Add support for JSON logging. {pull}4523[4523] - Add `test output` command, to test Elasticsearch and Logstash output settings. {pull}4590[4590] +- Introduce configurable event queue settings: queue.mem.events, queue.mem.flush.min_events and queue.mem.flush.timeout. {pull}4650[4650] +- Enable pipelining in Logstash output by default. {pull}4650[4650] *Filebeat* @@ -81,6 +84,7 @@ https://github.com/elastic/beats/compare/v6.0.0-alpha2...master[Check the HEAD d - Add udp prospector type. {pull}4452[4452] - Enabled Cgo which means libc is dynamically compiled. {pull}4546[4546] - Add Beta module config reloading mechanism {pull}4566[4566] +- Remove spooler and publisher components and settings. {pull}4644[4644] *Heartbeat* diff --git a/auditbeat/auditbeat.reference.yml b/auditbeat/auditbeat.reference.yml index c236e8d1bf5..69971946bf7 100644 --- a/auditbeat/auditbeat.reference.yml +++ b/auditbeat/auditbeat.reference.yml @@ -86,12 +86,25 @@ auditbeat.modules: # sub-dictionary. Default is false. #fields_under_root: false -# Internal queue size for single events in processing pipeline -#queue_size: 1000 - -# The internal queue size for bulk events in the processing pipeline. -# Do not modify this value. -#bulk_queue_size: 0 +# Internal queue configuration for buffering events to be published. +#queue: + # Queue type by name (default 'mem') + # The memory queue will present all available events (up to the outputs + # bulk_max_size) to the output, the moment the output is ready to server + # another batch of events. + #mem: + # Max number of events the queue can buffer. + #events: 4096 + + # Hints the minimum number of events stored in the queue, + # before providing a batch of events to the outputs. + # A value of 0 (the default) ensures events are immediately available + # to be sent to the outputs. + #flush.min_events: 0 + + # Maximum duration after which events are available to the outputs, + # if the number of events stored in the queue is < min_flush_events. + #flush.timeout: 0s # Sets the maximum number of CPUs that can be executing simultaneously. The # default is the number of logical CPUs available in the system. @@ -284,9 +297,9 @@ output.elasticsearch: # Optional load balance the events between the Logstash hosts #loadbalance: true - # Number of batches to be send asynchronously to logstash while processing + # Number of batches to be sent asynchronously to logstash while processing # new batches. - #pipelining: 0 + #pipelining: 5 # Optional index name. The default index name is set to name of the beat # in all lowercase. diff --git a/filebeat/_meta/common.reference.p2.yml b/filebeat/_meta/common.reference.p2.yml index 7d54965f9f0..2c0544669f9 100644 --- a/filebeat/_meta/common.reference.p2.yml +++ b/filebeat/_meta/common.reference.p2.yml @@ -240,21 +240,10 @@ filebeat.prospectors: #========================= Filebeat global options ============================ -# Event count spool threshold - forces network flush if exceeded -#filebeat.spool_size: 2048 - -# Enable async publisher pipeline in filebeat (Experimental!) -#filebeat.publish_async: false - -# Defines how often the spooler is flushed. After idle_timeout the spooler is -# Flush even though spool_size is not reached. -#filebeat.idle_timeout: 5s - # Name of the registry file. If a relative path is used, it is considered relative to the # data path. #filebeat.registry_file: ${path.data}/registry -# # These config files must have the full filebeat config part inside, but only # the prospector part is processed. All global options like spool_size are ignored. # The config_dir MUST point to a different directory then where the main filebeat config file is in. diff --git a/filebeat/config/config.go b/filebeat/config/config.go index eec92518c59..9fcbc265a1a 100644 --- a/filebeat/config/config.go +++ b/filebeat/config/config.go @@ -20,9 +20,6 @@ const ( type Config struct { Prospectors []*common.Config `config:"prospectors"` - SpoolSize uint64 `config:"spool_size" validate:"min=1"` - PublishAsync bool `config:"publish_async"` - IdleTimeout time.Duration `config:"idle_timeout" validate:"nonzero,min=0s"` RegistryFile string `config:"registry_file"` ConfigDir string `config:"config_dir"` ShutdownTimeout time.Duration `config:"shutdown_timeout"` @@ -34,8 +31,6 @@ type Config struct { var ( DefaultConfig = Config{ RegistryFile: "registry", - SpoolSize: 2048, - IdleTimeout: 5 * time.Second, ShutdownTimeout: 0, } ) diff --git a/filebeat/config/config_test.go b/filebeat/config/config_test.go index 0fe866d19c7..1a1c9501401 100644 --- a/filebeat/config/config_test.go +++ b/filebeat/config/config_test.go @@ -22,8 +22,6 @@ func TestReadConfig2(t *testing.T) { // Reads second config file err = cfgfile.Read(config, absPath+"/config2.yml") assert.Nil(t, err) - - assert.Equal(t, uint64(0), config.SpoolSize) } func TestGetConfigFiles_File(t *testing.T) { diff --git a/filebeat/filebeat.reference.yml b/filebeat/filebeat.reference.yml index 20bd1283c9e..bae99cd8f1c 100644 --- a/filebeat/filebeat.reference.yml +++ b/filebeat/filebeat.reference.yml @@ -409,21 +409,10 @@ filebeat.prospectors: #========================= Filebeat global options ============================ -# Event count spool threshold - forces network flush if exceeded -#filebeat.spool_size: 2048 - -# Enable async publisher pipeline in filebeat (Experimental!) -#filebeat.publish_async: false - -# Defines how often the spooler is flushed. After idle_timeout the spooler is -# Flush even though spool_size is not reached. -#filebeat.idle_timeout: 5s - # Name of the registry file. If a relative path is used, it is considered relative to the # data path. #filebeat.registry_file: ${path.data}/registry -# # These config files must have the full filebeat config part inside, but only # the prospector part is processed. All global options like spool_size are ignored. # The config_dir MUST point to a different directory then where the main filebeat config file is in. @@ -469,12 +458,25 @@ filebeat.prospectors: # sub-dictionary. Default is false. #fields_under_root: false -# Internal queue size for single events in processing pipeline -#queue_size: 1000 - -# The internal queue size for bulk events in the processing pipeline. -# Do not modify this value. -#bulk_queue_size: 0 +# Internal queue configuration for buffering events to be published. +#queue: + # Queue type by name (default 'mem') + # The memory queue will present all available events (up to the outputs + # bulk_max_size) to the output, the moment the output is ready to server + # another batch of events. + #mem: + # Max number of events the queue can buffer. + #events: 4096 + + # Hints the minimum number of events stored in the queue, + # before providing a batch of events to the outputs. + # A value of 0 (the default) ensures events are immediately available + # to be sent to the outputs. + #flush.min_events: 0 + + # Maximum duration after which events are available to the outputs, + # if the number of events stored in the queue is < min_flush_events. + #flush.timeout: 0s # Sets the maximum number of CPUs that can be executing simultaneously. The # default is the number of logical CPUs available in the system. @@ -667,9 +669,9 @@ output.elasticsearch: # Optional load balance the events between the Logstash hosts #loadbalance: true - # Number of batches to be send asynchronously to logstash while processing + # Number of batches to be sent asynchronously to logstash while processing # new batches. - #pipelining: 0 + #pipelining: 5 # Optional index name. The default index name is set to name of the beat # in all lowercase. diff --git a/filebeat/tests/system/config/filebeat.yml.j2 b/filebeat/tests/system/config/filebeat.yml.j2 index 6073977562f..bf61fd1c244 100644 --- a/filebeat/tests/system/config/filebeat.yml.j2 +++ b/filebeat/tests/system/config/filebeat.yml.j2 @@ -89,13 +89,10 @@ filebeat.prospectors: {{prospector_raw}} {% endif %} -filebeat.spool_size: filebeat.shutdown_timeout: {{ shutdown_timeout|default(0) }} -filebeat.idle_timeout: 0.1s {% if not skip_registry_config %} filebeat.registry_file: {{ beat.working_dir + '/' }}{{ registryFile|default("registry")}} {%endif%} -filebeat.publish_async: {{publish_async}} {% if reload or reload_path -%} filebeat.config.{{ reload_type|default("prospectors") }}: diff --git a/filebeat/tests/system/test_publisher.py b/filebeat/tests/system/test_publisher.py index 679d4ee1be9..227f09d1050 100644 --- a/filebeat/tests/system/test_publisher.py +++ b/filebeat/tests/system/test_publisher.py @@ -23,7 +23,6 @@ def test_registrar_file_content(self): self.render_config_template( path=os.path.abspath(self.working_dir) + "/log/*", - publish_async=True ) os.mkdir(self.working_dir + "/log/") diff --git a/heartbeat/heartbeat.reference.yml b/heartbeat/heartbeat.reference.yml index a2f35fb1a98..300915f405f 100644 --- a/heartbeat/heartbeat.reference.yml +++ b/heartbeat/heartbeat.reference.yml @@ -236,12 +236,25 @@ heartbeat.scheduler: # sub-dictionary. Default is false. #fields_under_root: false -# Internal queue size for single events in processing pipeline -#queue_size: 1000 - -# The internal queue size for bulk events in the processing pipeline. -# Do not modify this value. -#bulk_queue_size: 0 +# Internal queue configuration for buffering events to be published. +#queue: + # Queue type by name (default 'mem') + # The memory queue will present all available events (up to the outputs + # bulk_max_size) to the output, the moment the output is ready to server + # another batch of events. + #mem: + # Max number of events the queue can buffer. + #events: 4096 + + # Hints the minimum number of events stored in the queue, + # before providing a batch of events to the outputs. + # A value of 0 (the default) ensures events are immediately available + # to be sent to the outputs. + #flush.min_events: 0 + + # Maximum duration after which events are available to the outputs, + # if the number of events stored in the queue is < min_flush_events. + #flush.timeout: 0s # Sets the maximum number of CPUs that can be executing simultaneously. The # default is the number of logical CPUs available in the system. @@ -434,9 +447,9 @@ output.elasticsearch: # Optional load balance the events between the Logstash hosts #loadbalance: true - # Number of batches to be send asynchronously to logstash while processing + # Number of batches to be sent asynchronously to logstash while processing # new batches. - #pipelining: 0 + #pipelining: 5 # Optional index name. The default index name is set to name of the beat # in all lowercase. diff --git a/libbeat/_meta/config.reference.yml b/libbeat/_meta/config.reference.yml index ec498839fea..2500655d89c 100644 --- a/libbeat/_meta/config.reference.yml +++ b/libbeat/_meta/config.reference.yml @@ -22,12 +22,25 @@ # sub-dictionary. Default is false. #fields_under_root: false -# Internal queue size for single events in processing pipeline -#queue_size: 1000 - -# The internal queue size for bulk events in the processing pipeline. -# Do not modify this value. -#bulk_queue_size: 0 +# Internal queue configuration for buffering events to be published. +#queue: + # Queue type by name (default 'mem') + # The memory queue will present all available events (up to the outputs + # bulk_max_size) to the output, the moment the output is ready to server + # another batch of events. + #mem: + # Max number of events the queue can buffer. + #events: 4096 + + # Hints the minimum number of events stored in the queue, + # before providing a batch of events to the outputs. + # A value of 0 (the default) ensures events are immediately available + # to be sent to the outputs. + #flush.min_events: 0 + + # Maximum duration after which events are available to the outputs, + # if the number of events stored in the queue is < min_flush_events. + #flush.timeout: 0s # Sets the maximum number of CPUs that can be executing simultaneously. The # default is the number of logical CPUs available in the system. @@ -220,9 +233,9 @@ output.elasticsearch: # Optional load balance the events between the Logstash hosts #loadbalance: true - # Number of batches to be send asynchronously to logstash while processing + # Number of batches to be sent asynchronously to logstash while processing # new batches. - #pipelining: 0 + #pipelining: 5 # Optional index name. The default index name is set to name of the beat # in all lowercase. diff --git a/libbeat/monitoring/report/elasticsearch/elasticsearch.go b/libbeat/monitoring/report/elasticsearch/elasticsearch.go index 9a50a90787a..0bf872f1b7e 100644 --- a/libbeat/monitoring/report/elasticsearch/elasticsearch.go +++ b/libbeat/monitoring/report/elasticsearch/elasticsearch.go @@ -105,7 +105,10 @@ func makeReporter(beat common.BeatInfo, cfg *common.Config) (report.Reporter, er } brokerFactory := func(e broker.Eventer) (broker.Broker, error) { - return membroker.NewBroker(e, 20, false), nil + return membroker.NewBroker(membroker.Settings{ + Eventer: e, + Events: 20, + }), nil } pipeline, err := pipeline.New(brokerFactory, out, pipeline.Settings{ WaitClose: 0, diff --git a/libbeat/outputs/logstash/config.go b/libbeat/outputs/logstash/config.go index ca8504bf86d..6e8e1feb5e8 100644 --- a/libbeat/outputs/logstash/config.go +++ b/libbeat/outputs/logstash/config.go @@ -30,6 +30,7 @@ type Backoff struct { var defaultConfig = Config{ Port: 5044, LoadBalance: false, + Pipelining: 5, BulkMaxSize: 2048, CompressionLevel: 3, Timeout: 30 * time.Second, diff --git a/libbeat/publisher/bc/publisher/pipeline.go b/libbeat/publisher/bc/publisher/pipeline.go index fcc3b78cc8c..9e8e8845fe7 100644 --- a/libbeat/publisher/bc/publisher/pipeline.go +++ b/libbeat/publisher/bc/publisher/pipeline.go @@ -2,31 +2,22 @@ package publisher import ( "errors" + "fmt" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/outputs" "github.com/elastic/beats/libbeat/processors" "github.com/elastic/beats/libbeat/publisher/broker" - "github.com/elastic/beats/libbeat/publisher/broker/membroker" "github.com/elastic/beats/libbeat/publisher/pipeline" ) -const defaultBrokerSize = 8 * 1024 - func createPipeline( beatInfo common.BeatInfo, shipper ShipperConfig, processors *processors.Processors, outcfg common.ConfigNamespace, ) (*pipeline.Pipeline, error) { - queueSize := defaultBrokerSize - if qs := shipper.QueueSize; qs != nil { - if sz := *qs; sz > 0 { - queueSize = sz - } - } - var out outputs.Group if !(*publishDisabled) { var err error @@ -63,11 +54,27 @@ func createPipeline( }, } - brokerFactory := func(e broker.Eventer) (broker.Broker, error) { - return membroker.NewBroker(e, queueSize, false), nil + brokerType := "mem" + if b := shipper.Queue.Name(); b != "" { + brokerType = b } - p, err := pipeline.New(brokerFactory, out, settings) + brokerFactory := broker.FindFactory(brokerType) + if brokerFactory == nil { + return nil, fmt.Errorf("'%v' is no valid queue type", brokerType) + } + + brokerConfig := shipper.Queue.Config() + if brokerConfig == nil { + brokerConfig = common.NewConfig() + } + + p, err := pipeline.New( + func(eventer broker.Eventer) (broker.Broker, error) { + return brokerFactory(eventer, brokerConfig) + }, + out, settings, + ) if err != nil { return nil, err } diff --git a/libbeat/publisher/bc/publisher/publish.go b/libbeat/publisher/bc/publisher/publish.go index 3a5b3c3c437..ab3312be2be 100644 --- a/libbeat/publisher/bc/publisher/publish.go +++ b/libbeat/publisher/bc/publisher/publish.go @@ -48,20 +48,14 @@ type BeatPublisher struct { } type ShipperConfig struct { - common.EventMetadata `config:",inline"` // Fields and tags to add to each event. - Name string `config:"name"` + common.EventMetadata `config:",inline"` // Fields and tags to add to each event. + Name string `config:"name"` + Queue common.ConfigNamespace `config:"queue"` // internal publisher queue sizes - QueueSize *int `config:"queue_size"` - BulkQueueSize *int `config:"bulk_queue_size"` - MaxProcs *int `config:"max_procs"` + MaxProcs *int `config:"max_procs"` } -const ( - DefaultQueueSize = 1000 - DefaultBulkQueueSize = 0 -) - func init() { publishDisabled = flag.Bool("N", false, "Disable actual publishing for testing") } @@ -93,8 +87,6 @@ func (publisher *BeatPublisher) init( logp.Info("Dry run mode. All output types except the file based one are disabled.") } - shipper.InitShipperConfig() - publisher.name = shipper.Name if publisher.name == "" { publisher.name = beat.Hostname @@ -133,17 +125,3 @@ func (publisher *BeatPublisher) SetACKHandler(h beat.PipelineACKHandler) error { func (publisher *BeatPublisher) GetName() string { return publisher.name } - -func (config *ShipperConfig) InitShipperConfig() { - - // TODO: replace by ucfg - if config.QueueSize == nil || *config.QueueSize <= 0 { - queueSize := DefaultQueueSize - config.QueueSize = &queueSize - } - - if config.BulkQueueSize == nil || *config.BulkQueueSize < 0 { - bulkQueueSize := DefaultBulkQueueSize - config.BulkQueueSize = &bulkQueueSize - } -} diff --git a/libbeat/publisher/broker/membroker/broker.go b/libbeat/publisher/broker/membroker/broker.go index 581d2b6ec6a..286336c90f3 100644 --- a/libbeat/publisher/broker/membroker/broker.go +++ b/libbeat/publisher/broker/membroker/broker.go @@ -15,7 +15,9 @@ type Broker struct { logger logger - buf brokerBuffer + buf brokerBuffer + minEvents int + idleTimeout time.Duration // api channels events chan pushRequest @@ -35,6 +37,14 @@ type Broker struct { waitOnClose bool } +type Settings struct { + Eventer broker.Eventer + Events int + FlushMinEvents int + FlushTimeout time.Duration + WaitOnClose bool +} + type ackChan struct { next *ackChan ch chan batchAckRequest @@ -57,18 +67,41 @@ func create(eventer broker.Eventer, cfg *common.Config) (broker.Broker, error) { return nil, err } - b := NewBroker(eventer, config.Events, false) - return b, nil + return NewBroker(Settings{ + Eventer: eventer, + Events: config.Events, + FlushMinEvents: config.FlushMinEvents, + FlushTimeout: config.FlushTimeout, + }), nil } // NewBroker creates a new in-memory broker holding up to sz number of events. // If waitOnClose is set to true, the broker will block on Close, until all internal // workers handling incoming messages and ACKs have been shut down. -func NewBroker(eventer broker.Eventer, sz int, waitOnClose bool) *Broker { +func NewBroker( + settings Settings, +) *Broker { // define internal channel size for procuder/client requests // to the broker chanSize := 20 + var ( + sz = settings.Events + minEvents = settings.FlushMinEvents + flushTimeout = settings.FlushTimeout + ) + + if minEvents < 1 { + minEvents = 1 + } + if minEvents > 1 && flushTimeout <= 0 { + minEvents = 1 + flushTimeout = 0 + } + if minEvents > sz { + minEvents = sz + } + logger := defaultLogger b := &Broker{ done: make(chan struct{}), @@ -83,11 +116,13 @@ func NewBroker(eventer broker.Eventer, sz int, waitOnClose bool) *Broker { acks: make(chan int), scheduledACKs: make(chan chanList), - waitOnClose: waitOnClose, + waitOnClose: settings.WaitOnClose, - eventer: eventer, + eventer: settings.Eventer, } b.buf.init(logger, sz) + b.minEvents = minEvents + b.idleTimeout = flushTimeout ack := &ackLoop{broker: b} @@ -128,6 +163,10 @@ func (b *Broker) Consumer() broker.Consumer { func (b *Broker) eventLoop() { var ( + timer *time.Timer + idleC <-chan time.Time + forceFlush bool + events = b.events get chan getRequest @@ -144,6 +183,15 @@ func (b *Broker) eventLoop() { schedACKS chan chanList ) + if b.idleTimeout > 0 { + // create initialy 'stopped' timer -> reset will be used + // on timer object, if flush timer becomes active. + timer = time.NewTimer(b.idleTimeout) + if !timer.Stop() { + <-timer.C + } + } + for { select { case <-b.done: @@ -182,6 +230,10 @@ func (b *Broker) eventLoop() { events = b.events } + case <-idleC: + forceFlush = true + idleC = nil + case req := <-get: start, buf := b.buf.reserve(req.sz) count := len(buf) @@ -203,6 +255,15 @@ func (b *Broker) eventLoop() { pendingACKs.append(ackCH) schedACKS = b.scheduledACKs + // stop flush timer on get + forceFlush = false + if idleC != nil { + idleC = nil + if !timer.Stop() { + <-timer.C + } + } + case schedACKS <- pendingACKs: schedACKS = nil pendingACKs = chanList{} @@ -219,12 +280,19 @@ func (b *Broker) eventLoop() { events = b.events } - // b.logger.Debug("active events: ", activeEvents) - if b.buf.Empty() { - // b.logger.Debugf("no event available in active region") - get = nil - } else { - get = b.requests + // update get and idle timer after state machine + + get = b.requests + if !forceFlush { + avail := b.buf.Avail() + if avail == 0 || b.buf.TotalAvail() < b.minEvents { + get = nil + + if avail > 0 && idleC == nil && timer != nil { + timer.Reset(b.idleTimeout) + idleC = timer.C + } + } } } } diff --git a/libbeat/publisher/broker/membroker/broker_test.go b/libbeat/publisher/broker/membroker/broker_test.go index c8f2524b483..6cbe92dd5f9 100644 --- a/libbeat/publisher/broker/membroker/broker_test.go +++ b/libbeat/publisher/broker/membroker/broker_test.go @@ -50,6 +50,6 @@ func TestProducerCancelRemovesEvents(t *testing.T) { func makeTestBroker(sz int) brokertest.BrokerFactory { return func() broker.Broker { - return NewBroker(nil, sz, true) + return NewBroker(Settings{Events: sz, WaitOnClose: true}) } } diff --git a/libbeat/publisher/broker/membroker/buf.go b/libbeat/publisher/broker/membroker/buf.go index 25771e16e5c..0e9ad9bf019 100644 --- a/libbeat/publisher/broker/membroker/buf.go +++ b/libbeat/publisher/broker/membroker/buf.go @@ -240,6 +240,14 @@ func (b *brokerBuffer) Empty() bool { return (b.regA.size - b.reserved) == 0 } +func (b *brokerBuffer) Avail() int { + return b.regA.size - b.reserved +} + +func (b *brokerBuffer) TotalAvail() int { + return b.regA.size + b.regB.size - b.reserved +} + func (b *brokerBuffer) Full() bool { var avail int if b.regB.size > 0 { diff --git a/libbeat/publisher/broker/membroker/config.go b/libbeat/publisher/broker/membroker/config.go index 5414cbe1916..2616b1aeb83 100644 --- a/libbeat/publisher/broker/membroker/config.go +++ b/libbeat/publisher/broker/membroker/config.go @@ -1,9 +1,26 @@ package membroker +import ( + "errors" + "time" +) + type config struct { - Events int `config:"events" validate:"min=32"` + Events int `config:"events" validate:"min=32"` + FlushMinEvents int `config:"flush.min_events" validate:"min=0"` + FlushTimeout time.Duration `config:"flush.timeout"` } var defaultConfig = config{ - Events: 4096, + Events: 4 * 1024, + FlushMinEvents: 0, + FlushTimeout: 0, +} + +func (c *config) Validate() error { + if c.FlushMinEvents > c.Events { + return errors.New("flush.min_events must be less events") + } + + return nil } diff --git a/metricbeat/metricbeat.reference.yml b/metricbeat/metricbeat.reference.yml index 3bd1a8ad6e3..17efcdc7a86 100644 --- a/metricbeat/metricbeat.reference.yml +++ b/metricbeat/metricbeat.reference.yml @@ -431,12 +431,25 @@ metricbeat.modules: # sub-dictionary. Default is false. #fields_under_root: false -# Internal queue size for single events in processing pipeline -#queue_size: 1000 - -# The internal queue size for bulk events in the processing pipeline. -# Do not modify this value. -#bulk_queue_size: 0 +# Internal queue configuration for buffering events to be published. +#queue: + # Queue type by name (default 'mem') + # The memory queue will present all available events (up to the outputs + # bulk_max_size) to the output, the moment the output is ready to server + # another batch of events. + #mem: + # Max number of events the queue can buffer. + #events: 4096 + + # Hints the minimum number of events stored in the queue, + # before providing a batch of events to the outputs. + # A value of 0 (the default) ensures events are immediately available + # to be sent to the outputs. + #flush.min_events: 0 + + # Maximum duration after which events are available to the outputs, + # if the number of events stored in the queue is < min_flush_events. + #flush.timeout: 0s # Sets the maximum number of CPUs that can be executing simultaneously. The # default is the number of logical CPUs available in the system. @@ -629,9 +642,9 @@ output.elasticsearch: # Optional load balance the events between the Logstash hosts #loadbalance: true - # Number of batches to be send asynchronously to logstash while processing + # Number of batches to be sent asynchronously to logstash while processing # new batches. - #pipelining: 0 + #pipelining: 5 # Optional index name. The default index name is set to name of the beat # in all lowercase. diff --git a/packetbeat/beater/packetbeat.go b/packetbeat/beater/packetbeat.go index 484ecf65f1d..53326b13485 100644 --- a/packetbeat/beater/packetbeat.go +++ b/packetbeat/beater/packetbeat.go @@ -103,9 +103,6 @@ func (pb *packetbeat) init(b *beat.Beat) error { return err } - // This is required as init Beat is called before the beat publisher is initialised - b.Config.Shipper.InitShipperConfig() - pb.pipeline = b.Publisher pb.transPub, err = publish.NewTransactionPublisher( b.Publisher, diff --git a/packetbeat/packetbeat.reference.yml b/packetbeat/packetbeat.reference.yml index 1a3a0f0c2f6..466791f8735 100644 --- a/packetbeat/packetbeat.reference.yml +++ b/packetbeat/packetbeat.reference.yml @@ -474,12 +474,25 @@ packetbeat.protocols: # sub-dictionary. Default is false. #fields_under_root: false -# Internal queue size for single events in processing pipeline -#queue_size: 1000 - -# The internal queue size for bulk events in the processing pipeline. -# Do not modify this value. -#bulk_queue_size: 0 +# Internal queue configuration for buffering events to be published. +#queue: + # Queue type by name (default 'mem') + # The memory queue will present all available events (up to the outputs + # bulk_max_size) to the output, the moment the output is ready to server + # another batch of events. + #mem: + # Max number of events the queue can buffer. + #events: 4096 + + # Hints the minimum number of events stored in the queue, + # before providing a batch of events to the outputs. + # A value of 0 (the default) ensures events are immediately available + # to be sent to the outputs. + #flush.min_events: 0 + + # Maximum duration after which events are available to the outputs, + # if the number of events stored in the queue is < min_flush_events. + #flush.timeout: 0s # Sets the maximum number of CPUs that can be executing simultaneously. The # default is the number of logical CPUs available in the system. @@ -672,9 +685,9 @@ output.elasticsearch: # Optional load balance the events between the Logstash hosts #loadbalance: true - # Number of batches to be send asynchronously to logstash while processing + # Number of batches to be sent asynchronously to logstash while processing # new batches. - #pipelining: 0 + #pipelining: 5 # Optional index name. The default index name is set to name of the beat # in all lowercase. diff --git a/winlogbeat/config/config.go b/winlogbeat/config/config.go index 7f4e96c9d2c..a38a077d3e8 100644 --- a/winlogbeat/config/config.go +++ b/winlogbeat/config/config.go @@ -43,10 +43,8 @@ func (s Settings) Validate() error { // TODO: winlogbeat should not try to validate top-level beats config validKeys := []string{ - "fields", "fields_under_root", "tags", "name", - "queue_size", "bulk_queue_size", "max_procs", - "processors", "logging", "output", "path", "winlogbeat", - "dashboards", + "fields", "fields_under_root", "tags", "name", "queue", "max_procs", + "processors", "logging", "output", "path", "winlogbeat", "dashboards", } sort.Strings(validKeys) diff --git a/winlogbeat/config/config_test.go b/winlogbeat/config/config_test.go index f285ef92125..268f326790a 100644 --- a/winlogbeat/config/config_test.go +++ b/winlogbeat/config/config_test.go @@ -46,9 +46,9 @@ func TestConfigValidate(t *testing.T) { }, map[string]interface{}{"other": "value"}, }, - "1 error: Invalid top-level key 'other' found. Valid keys are bulk_queue_size, dashboards, " + + "1 error: Invalid top-level key 'other' found. Valid keys are dashboards, " + "fields, fields_under_root, logging, max_procs, " + - "name, output, path, processors, queue_size, tags, winlogbeat", + "name, output, path, processors, queue, tags, winlogbeat", }, { WinlogbeatConfig{}, diff --git a/winlogbeat/winlogbeat.reference.yml b/winlogbeat/winlogbeat.reference.yml index 480d8a7fe9e..9bfc710b4b8 100644 --- a/winlogbeat/winlogbeat.reference.yml +++ b/winlogbeat/winlogbeat.reference.yml @@ -51,12 +51,25 @@ winlogbeat.event_logs: # sub-dictionary. Default is false. #fields_under_root: false -# Internal queue size for single events in processing pipeline -#queue_size: 1000 - -# The internal queue size for bulk events in the processing pipeline. -# Do not modify this value. -#bulk_queue_size: 0 +# Internal queue configuration for buffering events to be published. +#queue: + # Queue type by name (default 'mem') + # The memory queue will present all available events (up to the outputs + # bulk_max_size) to the output, the moment the output is ready to server + # another batch of events. + #mem: + # Max number of events the queue can buffer. + #events: 4096 + + # Hints the minimum number of events stored in the queue, + # before providing a batch of events to the outputs. + # A value of 0 (the default) ensures events are immediately available + # to be sent to the outputs. + #flush.min_events: 0 + + # Maximum duration after which events are available to the outputs, + # if the number of events stored in the queue is < min_flush_events. + #flush.timeout: 0s # Sets the maximum number of CPUs that can be executing simultaneously. The # default is the number of logical CPUs available in the system. @@ -249,9 +262,9 @@ output.elasticsearch: # Optional load balance the events between the Logstash hosts #loadbalance: true - # Number of batches to be send asynchronously to logstash while processing + # Number of batches to be sent asynchronously to logstash while processing # new batches. - #pipelining: 0 + #pipelining: 5 # Optional index name. The default index name is set to name of the beat # in all lowercase.