-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update libbeat publisher pipeline #4492
Conversation
aa3995b
to
a6868b0
Compare
libbeat/outputs/codec/json/event.go
Outdated
"github.com/elastic/beats/libbeat/publisher/beat" | ||
) | ||
|
||
// Event describes the event strucutre for events |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/strucutre/structure/
88789e6
to
cfe657d
Compare
@@ -63,9 +63,10 @@ def test_shutdown_wait_ok(self): | |||
assert len(registry) == 1 | |||
assert registry[0]["offset"] == output["offset"] | |||
|
|||
@unittest.skip("Skipping unreliable test") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have had to skip the test, as it has been very unreliable and I didn't manage to reproduce it yet for investigation. Plus, this feature as is will be replaced with wait shutdown support directly provided by the new publisher pipeline.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you perhaps open a follow up github issue to track these things?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's the only test I have had to skip. But will create meta-ticket for further pipeline work.
libbeat/publisher/beat/bc.go
Outdated
@@ -0,0 +1 @@ | |||
package beat |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Empty file, is it needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some left-over. Will remove it.
For Reviewer notes:
|
@urso I hit an error when using an output config section like this:
The error is:
Expected? |
@tsg this is to be excepted. From notes:
With this PR one can have only one output configured. The error message is generated by beats config using The fix for using |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I skimmed through and left some remarks. In general LGTM and we can move forward on this.
@@ -63,9 +63,10 @@ def test_shutdown_wait_ok(self): | |||
assert len(registry) == 1 | |||
assert registry[0]["offset"] == output["offset"] | |||
|
|||
@unittest.skip("Skipping unreliable test") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you perhaps open a follow up github issue to track these things?
@@ -0,0 +1,31 @@ | |||
package logp |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why did you touch the logger in this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some parts of the pipeline allow for a logger to be passed by interface/type. This allows the more 'expensive' tests to capture the log output in the test context -> correctly group log messages with test output.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The old internal Logger
type is renamed to logger
. We now export a *Logger
type with configurable selector.
|
||
// backoff parameters |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume I will find all the defaults below somewhere else :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the monitoring reporter, not the actual Elasticsearch event output.
The backoff parameters have become configurable, as one has(will have to) configure the backoff strategy when creating the outputs.Group
. This is due this PR shifting some responsibilities from the outputs to the publisher pipeline itself (e.g. error handling, backoff, retry). Some of the shifting will be necessary, when introducing dynamic output reloading, as the pipeline must transfer active events from old to new outputs.
if found != "" { | ||
err := fmt.Errorf("multiple potential monitoring reporters found (for example %v and %v)", found, name) | ||
return "", nil, err | ||
if outputs.IsSet() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice. I see the possibility coming of not having an output initially ;-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No promisses ;)
} | ||
|
||
// WithBackoff wraps a NetworkClient, adding exponential backoff support to a network client if connection/publishing failed. | ||
func WithBackoff(client NetworkClient, init, max time.Duration) NetworkClient { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reminds me of the filebeat readers implementation ;-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah. It's kind of a wrapper for NetworkClient. This will be moved closer to the libbeat pipeline itself. Backoff strategy + parameters will become config parameters in outputs.Group
.
|
||
// BulkPublish implements the BulkOutputer interface pushing a bulk of events | ||
// via lumberjack. | ||
func (lj *logstash) BulkPublish( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did all this stuff get replaced by new publisher?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes. I modified the pipeline and output interfaces to be of one kind only. Outputs always get batches and on input it's of pipeline it's always by one event being pushed. This simplifies/removes some extra logic in outputs and pipeline itself for dealing with bulk and non-bulk event handling.
func makeEvent(fields common.MapStr, meta common.MapStr) beat.Event { | ||
var ts time.Time | ||
switch value := fields["@timestamp"].(type) { | ||
case time.Time: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems we have this code in different places
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Different places? The bc client is adapting the event, before pushing to the new pipeline. Maybe in some unit tests you mean?
return len(p), nil | ||
} | ||
|
||
func withLogOutput(fn func(*testing.T)) func(*testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
own logger?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
originally I have had my own logger interface, to correctly capture log output within a test context (so log output and test name do make some more sense if something goes wrong). But when adopting pipeline to libbeat I opted for using logp. This enforced the introduction of withLogOutput, capturing stderr into t.Log
, so I can still make sense of error logs mixed with pipeline processing logs.
package includes | ||
|
||
import ( | ||
// load supported output plugins |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could generate this in the future ... but as we are not going to add outputs ...
@@ -239,7 +239,9 @@ def read_output_json(self, output_file=None): | |||
# hit EOF | |||
break | |||
|
|||
jsons.append(json.loads(line)) | |||
event = json.loads(line) | |||
del event['@metadata'] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
id @metadata in the output event?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
I'm removing @metadata
from events. All outputs but Elasticsearch print the events in JSON-format, as expected by logstash. including @metadata
. So our beats->LS->ES configs are valid, even if kafka or redis is used. With @metadata
being somewhat private, we never documented them. That is, I have to remove the fields, as a number of system tests do check all fields in an event are documented.
3da60a6
to
7e6cbc8
Compare
@urso |
@tsg looking into it. Interestingly test is completely unrelated to any changes in this PR + passes for me. Did restart travis job. |
7e6cbc8
to
80f5369
Compare
@tsg did fix test, let's wait for travis. |
fa87690
to
76176d7
Compare
This change marks the beginning of the libbeat event publisher pipeline refactoring. - central to the publisher pipeline is the broker: - broker implementation can be configured when constructing the pipeline - common broker implementation tests in `brokertest` package - broker features: - Fully in control of all published events. In comparison to old publisher pipeline with many batches in flight, the broker now configures/controls the total number of events stored in the publisher pipeline. Only after ACKs from outputs, will new space become available. - broker returns ACKS in correct order to publisher - broker batches up multiple ACKs - producer can only send one event at a time to the broker (push) - consumer can only receive batches of events from broker (pull) - producer can cancel(remove) active events not yet pulled by a consumer - broker/output related interfaces defined in `publisher` package - pipeline/client interfaces for use by beats currently defined in `publisher/beat` package - event structure has been changed to be more compatible with Logstash (See beat.Event): Beats can send metadata to libbeat outputs (e.g. pipeline) and logstash by using the `Event.Meta` field. Event fields will be stored on `Event.Fields`. Event fields are normalized (for use with processors) and serialized using. - The old publishers publish API is moved to libbeat/publisher/bc/publisher for now: - move to new sub-package to fight of circular imports - package implements old pipeline API on top of new pipeline - Filters/Processors are still executed before pushing events to the new pipeline - New API: - beats client requirements are configured via `beat.ClientConfig`: - register async ACK callbacks (currently callbacks will not be triggered after `Client.Close`) - configurable sending guarantees (must match ACK support) - "wait on close", for beats clients to wait for pending events to be ACKed (only if ACK is configured) - pipeline also supports "wait on close", waiting for pending events (independent of ACK configurations). Can be used by any beat, to wait on shutdown for published events to be actually send Event Structure: ---------------- The event structure has been changed a little: ``` type Event struct { Timestamp time.Time Meta common.MapStr Fields common.MapStr } ``` - We always require the timestamps. - Meta contains additional meta data (hints?) a beat can forward to the outputs. For example `pipeline` or `index` settings for the Elasticsearch output. - If output is not Elasticsearch, a `@metadata` field will always be written to the json document. This way Logstash can take advantage of all `@metadata`, even if the event has been send via kafka or redis. The new output plugin factory is defined as: ``` type Factory func(beat common.BeatInfo, cfg *common.Config) (Group, error) ``` The package libbeat/output/mode is being removed + all it's functionality is moved into a single implementation in the publisher pipeline supporting sync/async clients with failover and load-balancing. In the future dynamic output discovery might be added as well. This change requires output.Group to return some common settings for an active output, to configure the pipeline: ``` // Group configures and combines multiple clients into load-balanced group of // clients being managed by the publisher pipeline. type Group struct { Clients []Client BatchSize int Retry int } ``` Moving functionality from the outputs to the publisher pipeline restricts beats from having one output type configured only. All client instances configured will participate in load-balancing being driven by the publisher pipeline. This removes some intermediate workers used for forwarding batches. Future changes to groups include: outputs always operate in batches by implement only the `Publish` method: ``` // Publish sends events to the clients sink. A client must synchronously or // asynchronously ACK the given batch, once all events have been processed. // Using Retry/Cancelled a client can return a batch of unprocessed events to // the publisher pipeline. The publisher pipeline (if configured by the output // factory) will take care of retrying/dropping events. Publish(publisher.Batch) error ``` With: ``` // Batch is used to pass a batch of events to the outputs and asynchronously listening // for signals from these outpts. After a batch is processed (completed or // errors), one of the signal methods must be called. type Batch interface { Events() []Event // signals ACK() Drop() Retry() RetryEvents(events []Event) Cancelled() CancelledEvents(events []Event) } ``` The batch interface combines `events + signaling` into one common interface. The main difference between sync/async clients is, when `batch.ACK` is called. Batches/Events can be processed out of order. The publisher pipelining doing the batching and load-balancing guarantees ACKs being returned to the beat in order + implements upper bound. Once publisher pipeline is 'full', it will block, waiting for ACKs from outputs. The logic for dropping events on retry and guaranteed sending is moved to the publisher pipeline as well. Outputs are concerned with publishing and signaling ACK or Retry only.
Write the packetbeat log output to the test output, in case of packetbeat exit code does not match the expected exit code
79d35f0
to
f9482d7
Compare
"Producer cancel" is a feature that allows closing queue producers to also cancel any pending events created by that producer that have not yet been sent to a queue reader. It was introduced as a small part of a [very large refactor](#4492) in 2017, but current code doesn't depend on it for anything. Since this feature adds considerable complexity to the queue API and implementation, this PR removes the feature and associated helpers. This PR should cause no user-visible behavior change.
This PR marks the beginning of the libbeat event publisher pipeline refactoring.
brokertest
packagepublisher
packagepublisher/beat
packageEvent.Meta
field. Event fields will be stored onEvent.Fields
. Event fields are normalized (for use with processors) and serialized using.beat.ClientConfig
:Client.Close
)Event Structure:
The event structure has been changed a little:
pipeline
orindex
settings for the Elasticsearch output.@metadata
field will always be written to the json document. This way Logstash can take advantage of all@metadata
, even if the event has been send via kafka or redis.Output changes
The new output plugin factory is defined as:
The package libbeat/output/mode is being removed + all it's functionality is moved into a single implementation in the publisher pipeline supporting sync/async clients with failover and load-balancing. In the future dynamic output discovery might be added as well. This change requires output.Group to return some common settings for an active output, to configure the pipeline:
Moving functionality from the outputs to the publisher pipeline restricts beats from having one output type configured only.
All client instances configured will participate in load-balancing being driven by the publisher pipeline. This removes some intermediate workers used for forwarding batches. Future changes to groups include:
outputs always operate in batches by implement only the
Publish
method:With:
The batch interface combines
events + signaling
into one common interface.The main difference between sync/async clients is, when
batch.ACK
is called. Batches/Events can be processed out of order. The publisher pipelining doing the batching and load-balancing guarantees ACKs being returned to the beat in order + implements upper bound. Once publisher pipeline is 'full', it will block, waiting for ACKs from outputs.The logic for dropping events on retry and guaranteed sending is moved to the publisher pipeline as well. Outputs are concerned with publishing and signaling ACK or Retry only.
Upcoming
queue_size
andbulk_queue_size
in favor of configurable broker)beat.Event
instead ofcommon.MapStr
@metadata
accessible from processors?bc/publisher
packagebc/publisher
packageoutputs.Group
, so codecs can be applied earlier in pipeline (=> limit events per batch/queue by memory usage)