Skip to content

Commit

Permalink
Publisher pipeline
Browse files Browse the repository at this point in the history
This change marks the beginning of the libbeat event publisher pipeline
refactoring.

- central to the publisher pipeline is the broker:
  - broker implementation can be configured when constructing the pipeline
  - common broker implementation tests in `brokertest` package
- broker features:
  - Fully in control of all published events. In comparison to old publisher
    pipeline with many batches in flight, the broker now configures/controls
    the total number of events stored in the publisher pipeline. Only after
    ACKs from outputs, will new space become available.
  - broker returns ACKS in correct order to publisher
  - broker batches up multiple ACKs
  - producer can only send one event at a time to the broker (push)
  - consumer can only receive batches of events from broker (pull)
  - producer can cancel(remove) active events not yet pulled by a consumer
- broker/output related interfaces defined in `publisher` package
- pipeline/client interfaces for use by beats currently defined in
  `publisher/beat` package
  - event structure has been changed to be more compatible with Logstash (See
    beat.Event): Beats can send metadata to libbeat outputs (e.g. pipeline) and
    logstash by using the `Event.Meta` field. Event fields will be stored on
    `Event.Fields`. Event fields are normalized (for use with processors) and
    serialized using.
- The old publishers publish API is moved to libbeat/publisher/bc/publisher for
  now:
  - move to new sub-package to fight of circular imports
  - package implements old pipeline API on top of new pipeline
- Filters/Processors are still executed before pushing events to the new
  pipeline
- New API:
  - beats client requirements are configured via `beat.ClientConfig`:
    - register async ACK callbacks (currently callbacks will not be triggered
      after `Client.Close`)
    - configurable sending guarantees (must match ACK support)
    - "wait on close", for beats clients to wait for pending events to be ACKed
      (only if ACK is configured)
    - pipeline also supports "wait on close", waiting for pending events
      (independent of ACK configurations). Can be used by any beat, to wait on
      shutdown for published events to be actually send

Event Structure:
----------------
The event structure has been changed a little:

```
type Event struct {
	Timestamp time.Time
	Meta      common.MapStr
	Fields    common.MapStr
}
```

- We always require the timestamps.
- Meta contains additional meta data (hints?) a beat can forward to the
  outputs. For example `pipeline` or `index` settings for the Elasticsearch
  output.
- If output is not Elasticsearch, a `@metadata` field will always be written to
  the json document. This way Logstash can take advantage of all `@metadata`,
  even if the event has been send via kafka or redis.

The new output plugin factory is defined as:

```
type Factory func(beat common.BeatInfo, cfg *common.Config) (Group, error)
```

The package libbeat/output/mode is being removed + all it's functionality is
moved into a single implementation in the publisher pipeline supporting
sync/async clients with failover and load-balancing. In the future dynamic
output discovery might be added as well. This change requires output.Group to
return some common settings for an active output, to configure the pipeline:

```
// Group configures and combines multiple clients into load-balanced group of
// clients being managed by the publisher pipeline.
type Group struct {
	Clients   []Client
	BatchSize int
	Retry     int
}
```

Moving functionality from the outputs to the publisher pipeline restricts beats
from having one output type configured only.

All client instances configured will participate in load-balancing being driven
by the publisher pipeline. This removes some intermediate workers used for
forwarding batches. Future changes to groups include:

outputs always operate in batches by implement only the `Publish` method:

```
// Publish sends events to the clients sink. A client must synchronously or
// asynchronously ACK the given batch, once all events have been processed.
// Using Retry/Cancelled a client can return a batch of unprocessed events to
// the publisher pipeline. The publisher pipeline (if configured by the output
// factory) will take care of retrying/dropping events.
Publish(publisher.Batch) error
```

With:

```
// Batch is used to pass a batch of events to the outputs and asynchronously listening
// for signals from these outpts. After a batch is processed (completed or
// errors), one of the signal methods must be called.
type Batch interface {
	Events() []Event

	// signals
	ACK()
	Drop()
	Retry()
	RetryEvents(events []Event)
	Cancelled()
	CancelledEvents(events []Event)
}
```

The batch interface combines `events + signaling` into one common interface.
The main difference between sync/async clients is, when `batch.ACK` is called.
Batches/Events can be processed out of order. The publisher pipelining doing
the batching and load-balancing guarantees ACKs being returned to the beat in
order + implements upper bound. Once publisher pipeline is 'full', it will
block, waiting for ACKs from outputs.

The logic for dropping events on retry and guaranteed sending is moved to the
publisher pipeline as well. Outputs are concerned with publishing and signaling
ACK or Retry only.
  • Loading branch information
urso committed Jun 23, 2017
1 parent b224a3c commit 7f87b55
Show file tree
Hide file tree
Showing 173 changed files with 6,324 additions and 6,793 deletions.
7 changes: 3 additions & 4 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,7 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
// loadModulesPipelines is called when modules are configured to do the initial
// setup.
func (fb *Filebeat) loadModulesPipelines(b *beat.Beat) error {
esConfig := b.Config.Output["elasticsearch"]
if esConfig == nil || !esConfig.Enabled() {
if b.Config.Output.Name() != "elasticsearch" {
logp.Warn("Filebeat is unable to load the Ingest Node pipelines for the configured" +
" modules because the Elasticsearch output is not configured/enabled. If you have" +
" already loaded the Ingest Node pipelines or are using Logstash pipelines, you" +
Expand All @@ -120,13 +119,13 @@ func (fb *Filebeat) loadModulesPipelines(b *beat.Beat) error {
func (fb *Filebeat) loadModulesML(b *beat.Beat) error {
logp.Debug("machine-learning", "Setting up ML jobs for modules")

esConfig := b.Config.Output["elasticsearch"]
if esConfig == nil || !esConfig.Enabled() {
if b.Config.Output.Name() != "elasticsearch" {
logp.Warn("Filebeat is unable to load the Xpack Machine Learning configurations for the" +
" modules because the Elasticsearch output is not configured/enabled.")
return nil
}

esConfig := b.Config.Output.Config()
esClient, err := elasticsearch.NewConnectedClient(esConfig)
if err != nil {
return errors.Errorf("Error creating Elasticsearch client: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion filebeat/publisher/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

"github.com/elastic/beats/filebeat/util"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/publisher"
"github.com/elastic/beats/libbeat/publisher/bc/publisher"
)

type asyncLogPublisher struct {
Expand Down
2 changes: 1 addition & 1 deletion filebeat/publisher/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring"
"github.com/elastic/beats/libbeat/publisher"
"github.com/elastic/beats/libbeat/publisher/bc/publisher"
)

var (
Expand Down
2 changes: 1 addition & 1 deletion filebeat/publisher/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (

"github.com/elastic/beats/filebeat/util"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/publisher"
"github.com/elastic/beats/libbeat/publisher/bc/publisher"
)

type syncLogPublisher struct {
Expand Down
7 changes: 4 additions & 3 deletions filebeat/tests/system/test_shutdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def test_shutdown(self):

def test_shutdown_wait_ok(self):
"""
Test stopping filebeat under load and wait for publisher queue to be emptied.
Test stopping filebeat under load: wait for all events being published.
"""

self.nasa_logs()
Expand Down Expand Up @@ -63,9 +63,10 @@ def test_shutdown_wait_ok(self):
assert len(registry) == 1
assert registry[0]["offset"] == output["offset"]

@unittest.skip("Skipping unreliable test")
def test_shutdown_wait_timeout(self):
"""
Test stopping filebeat under load and wait for publisher queue to be emptied.
Test stopping filebeat under load: allow early shutdown.
"""

self.nasa_logs()
Expand All @@ -80,7 +81,7 @@ def test_shutdown_wait_timeout(self):

# Wait until it tries the first time to publish
self.wait_until(
lambda: self.log_contains("ERR Connecting error publishing events"),
lambda: self.log_contains("ERR Failed to connect"),
max_timeout=15)

filebeat.check_kill_and_wait()
Expand Down
2 changes: 1 addition & 1 deletion generator/beat/{beat}/beater/{beat}.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/publisher"
"github.com/elastic/beats/libbeat/publisher/bc/publisher"

"{beat_path}/config"
)
Expand Down
2 changes: 1 addition & 1 deletion heartbeat/beater/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/publisher"
"github.com/elastic/beats/libbeat/publisher/bc/publisher"

"github.com/elastic/beats/heartbeat/config"
"github.com/elastic/beats/heartbeat/monitors"
Expand Down
2 changes: 1 addition & 1 deletion heartbeat/beater/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/publisher"
"github.com/elastic/beats/libbeat/publisher/bc/publisher"

"github.com/elastic/beats/heartbeat/monitors"
"github.com/elastic/beats/heartbeat/scheduler"
Expand Down
34 changes: 18 additions & 16 deletions libbeat/beat/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,14 @@ import (
"github.com/elastic/beats/libbeat/paths"
"github.com/elastic/beats/libbeat/plugin"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/libbeat/publisher"
"github.com/elastic/beats/libbeat/publisher/bc/publisher"
svc "github.com/elastic/beats/libbeat/service"
"github.com/elastic/beats/libbeat/template"
"github.com/elastic/beats/libbeat/version"

// Register publisher pipeline modules
_ "github.com/elastic/beats/libbeat/publisher/includes"

// Register default processors.
_ "github.com/elastic/beats/libbeat/processors/actions"
_ "github.com/elastic/beats/libbeat/processors/add_cloud_metadata"
Expand Down Expand Up @@ -115,15 +118,15 @@ type Beat struct {

// BeatConfig struct contains the basic configuration of every beat
type BeatConfig struct {
Shipper publisher.ShipperConfig `config:",inline"`
Output map[string]*common.Config `config:"output"`
Monitoring *common.Config `config:"xpack.monitoring"`
Logging logp.Logging `config:"logging"`
Processors processors.PluginConfig `config:"processors"`
Path paths.Path `config:"path"`
Dashboards *common.Config `config:"setup.dashboards"`
Template *common.Config `config:"setup.template"`
Http *common.Config `config:"http"`
Shipper publisher.ShipperConfig `config:",inline"`
Output common.ConfigNamespace `config:"output"`
Monitoring *common.Config `config:"xpack.monitoring"`
Logging logp.Logging `config:"logging"`
Processors processors.PluginConfig `config:"processors"`
Path paths.Path `config:"path"`
Dashboards *common.Config `config:"setup.dashboards"`
Template *common.Config `config:"setup.template"`
Http *common.Config `config:"http"`
}

var (
Expand Down Expand Up @@ -352,11 +355,11 @@ func (b *Beat) Setup(bt Creator, template, dashboards, machineLearning bool) err
}

if template {
esConfig := b.Config.Output["elasticsearch"]
if esConfig == nil || !esConfig.Enabled() {
if b.Config.Output.Name() != "elasticsearch" {
return fmt.Errorf("Template loading requested but the Elasticsearch output is not configured/enabled")
}

esConfig := b.Config.Output.Config()
if b.Config.Template == nil || (b.Config.Template != nil && b.Config.Template.Enabled()) {
loadCallback, err := b.templateLoadingCallback()
if err != nil {
Expand Down Expand Up @@ -560,8 +563,8 @@ func (b *Beat) loadDashboards(force bool) error {
}
}

if b.Config.Dashboards != nil && b.Config.Dashboards.Enabled() {
esConfig := b.Config.Output["elasticsearch"]
if b.Config.Dashboards != nil && b.Config.Dashboards.Enabled() && b.Config.Output.Name() == "elasticsearch" {
esConfig := b.Config.Output.Config()
if esConfig == nil || !esConfig.Enabled() {
return fmt.Errorf("Dashboard loading requested but the Elasticsearch output is not configured/enabled")
}
Expand Down Expand Up @@ -608,9 +611,8 @@ func (b *Beat) registerTemplateLoading() error {
}
}

esConfig := b.Config.Output["elasticsearch"]
// Loads template by default if esOutput is enabled
if esConfig != nil && esConfig.Enabled() {
if b.Config.Output.Name() == "elasticsearch" {
if b.Config.Template == nil || (b.Config.Template != nil && b.Config.Template.Enabled()) {
// load template through callback to make sure it is also loaded
// on reconnecting
Expand Down
13 changes: 12 additions & 1 deletion libbeat/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,15 @@ func (ns *ConfigNamespace) Unpack(cfg *Config) error {
return nil
}

var (
err error
found bool
)

for _, name := range fields {
sub, err := cfg.Child(name, -1)
var sub *Config

sub, err = cfg.Child(name, -1)
if err != nil {
// element is no configuration object -> continue so a namespace
// Config unpacked as a namespace can have other configuration
Expand All @@ -356,8 +363,12 @@ func (ns *ConfigNamespace) Unpack(cfg *Config) error {

ns.name = name
ns.config = sub
found = true
}

if !found {
return err
}
return nil
}

Expand Down
29 changes: 10 additions & 19 deletions libbeat/common/fmtstr/formatevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/dtfmt"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/publisher/beat"
)

// EventFormatString implements format string support on events
// of type common.MapStr.
// of type beat.Event.
//
// The concrete event expansion requires the field name enclosed by brackets.
// For example: '%{[field.name]}'. Field names can be separated by points or
Expand Down Expand Up @@ -170,7 +171,7 @@ func (fs *EventFormatString) Fields() []string {

// Run executes the format string returning a new expanded string or an error
// if execution or event field expansion fails.
func (fs *EventFormatString) Run(event common.MapStr) (string, error) {
func (fs *EventFormatString) Run(event *beat.Event) (string, error) {
ctx := newEventCtx(len(fs.fields))
defer releaseCtx(ctx)

Expand All @@ -192,7 +193,7 @@ func (fs *EventFormatString) Run(event common.MapStr) (string, error) {

// RunBytes executes the format string returning a new expanded string of type
// `[]byte` or an error if execution or event field expansion fails.
func (fs *EventFormatString) RunBytes(event common.MapStr) ([]byte, error) {
func (fs *EventFormatString) RunBytes(event *beat.Event) ([]byte, error) {
ctx := newEventCtx(len(fs.fields))
defer releaseCtx(ctx)

Expand All @@ -208,7 +209,7 @@ func (fs *EventFormatString) RunBytes(event common.MapStr) ([]byte, error) {
}

// Eval executes the format string, writing the resulting string into the provided output buffer. Returns error if execution or event field expansion fails.
func (fs *EventFormatString) Eval(out *bytes.Buffer, event common.MapStr) error {
func (fs *EventFormatString) Eval(out *bytes.Buffer, event *beat.Event) error {
ctx := newEventCtx(len(fs.fields))
defer releaseCtx(ctx)

Expand All @@ -227,7 +228,7 @@ func (fs *EventFormatString) IsConst() bool {
// of strings.
func (fs *EventFormatString) collectFields(
ctx *eventEvalContext,
event common.MapStr,
event *beat.Event,
) error {
for _, fi := range fs.fields {
s, err := fieldString(event, fi.path)
Expand All @@ -242,19 +243,7 @@ func (fs *EventFormatString) collectFields(
}

if fs.timestamp {
timestamp, found := event["@timestamp"]
if !found {
return errors.New("missing timestamp")
}

switch t := timestamp.(type) {
case common.Time:
ctx.ts = time.Time(t)
case time.Time:
ctx.ts = t
default:
return errors.New("unknown timestamp type")
}
ctx.ts = event.Timestamp
}

return nil
Expand Down Expand Up @@ -398,7 +387,7 @@ func parseEventPath(field string) (string, error) {
}

// TODO: move to libbeat/common?
func fieldString(event common.MapStr, field string) (string, error) {
func fieldString(event *beat.Event, field string) (string, error) {
v, err := event.GetValue(field)
if err != nil {
return "", err
Expand All @@ -422,6 +411,8 @@ func tryConvString(v interface{}) (string, error) {
return s, nil
case common.Time:
return s.String(), nil
case time.Time:
return common.Time(s).String(), nil
case []byte:
return string(s), nil
case stringer:
Expand Down
Loading

0 comments on commit 7f87b55

Please sign in to comment.