From f9666bae7c2f67ff921f594e75254ece3620c843 Mon Sep 17 00:00:00 2001 From: Tudor Golubenco Date: Tue, 24 Jan 2017 10:50:45 +0100 Subject: [PATCH] Per prospector configurable pipeline (#3433) This adds a new "pipeline" configuration option to the prospector, which can be used to set the Elasticsearch Ingest Node pipeline from the prospector config. While this was already possible by using format strings in the `pipeline` config from the output, this makes the configuration simpler in many cases and the mechanism is needed for the Filebeat modules. Part of #3159. --- CHANGELOG.asciidoc | 1 + filebeat/_meta/beat.full.yml | 4 ++ .../configuration/filebeat-options.asciidoc | 8 +++ filebeat/filebeat.full.yml | 4 ++ filebeat/harvester/config.go | 1 + filebeat/harvester/log.go | 1 + filebeat/input/event.go | 12 ++++ filebeat/publisher/async.go | 6 +- filebeat/publisher/publisher.go | 9 ++- filebeat/publisher/sync.go | 4 +- filebeat/tests/system/config/filebeat.yml.j2 | 12 +++- filebeat/tests/system/test_modules.py | 62 +++++++++++++++++++ libbeat/publisher/client.go | 2 +- 13 files changed, 117 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 84abfc0d25d..edad6ec64b4 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -93,6 +93,7 @@ https://github.com/elastic/beats/compare/v5.1.1...master[Check the HEAD diff] *Filebeat* - Add enabled config option to prospectors. {pull}3157[3157] - Add target option for decoded_json_field. {pull}3169[3169] +- Add the `pipeline` config option at the prospector level, for configuring the Ingest Node pipeline ID. {pull}3433[3433] *Winlogbeat* diff --git a/filebeat/_meta/beat.full.yml b/filebeat/_meta/beat.full.yml index 7de2e652220..fc9c0f19741 100644 --- a/filebeat/_meta/beat.full.yml +++ b/filebeat/_meta/beat.full.yml @@ -143,6 +143,10 @@ filebeat.prospectors: # this can mean that the first entries of a new file are skipped. #tail_files: false + # The Ingest Node pipeline ID associated with this prospector. If this is set, it + # overwrites the pipeline option from the Elasticsearch output. + #pipeline: + # Experimental: If symlinks is enabled, symlinks are opened and harvested. The harvester is openening the # original for harvesting but will report the symlink name as source. #symlinks: false diff --git a/filebeat/docs/reference/configuration/filebeat-options.asciidoc b/filebeat/docs/reference/configuration/filebeat-options.asciidoc index 91bebc207d9..4edcd59468a 100644 --- a/filebeat/docs/reference/configuration/filebeat-options.asciidoc +++ b/filebeat/docs/reference/configuration/filebeat-options.asciidoc @@ -405,6 +405,14 @@ This option applies to files that Filebeat has not already processed. If you ran NOTE: You can use this setting to avoid indexing old log lines when you run Filebeat on a set of log files for the first time. After the first run, we recommend disabling this option, or you risk losing lines during file rotation. +===== pipeline + +The Ingest Node pipeline ID to set for the events generated by this prospector. + +NOTE: The pipeline ID can also be configured in the Elasticsearch output, but this + option usually results in simpler configuration files. If the pipeline is configured both + in the prospector and in the output, the option from the prospector is the one used. + ===== symlinks experimental[] diff --git a/filebeat/filebeat.full.yml b/filebeat/filebeat.full.yml index 4fec95ecadf..bd2e0ce5e57 100644 --- a/filebeat/filebeat.full.yml +++ b/filebeat/filebeat.full.yml @@ -143,6 +143,10 @@ filebeat.prospectors: # this can mean that the first entries of a new file are skipped. #tail_files: false + # The Ingest Node pipeline ID associated with this prospector. If this is set, it + # overwrites the pipeline option from the Elasticsearch output. + #pipeline: + # Experimental: If symlinks is enabled, symlinks are opened and harvested. The harvester is openening the # original for harvesting but will report the symlink name as source. #symlinks: false diff --git a/filebeat/harvester/config.go b/filebeat/harvester/config.go index e5fcded5292..d1baaf9a638 100644 --- a/filebeat/harvester/config.go +++ b/filebeat/harvester/config.go @@ -52,6 +52,7 @@ type harvesterConfig struct { MaxBytes int `config:"max_bytes" validate:"min=0,nonzero"` Multiline *reader.MultilineConfig `config:"multiline"` JSON *reader.JSONConfig `config:"json"` + Pipeline string `config:"pipeline"` } func (config *harvesterConfig) Validate() error { diff --git a/filebeat/harvester/log.go b/filebeat/harvester/log.go index afbd0a6bd89..a47bf67809c 100644 --- a/filebeat/harvester/log.go +++ b/filebeat/harvester/log.go @@ -137,6 +137,7 @@ func (h *Harvester) Harvest(r reader.Reader) { event.InputType = h.config.InputType event.DocumentType = h.config.DocumentType event.JSONConfig = h.config.JSON + event.Pipeline = h.config.Pipeline } // Always send event to update state, also if lines was skipped diff --git a/filebeat/input/event.go b/filebeat/input/event.go index 415de417842..3d9a8852600 100644 --- a/filebeat/input/event.go +++ b/filebeat/input/event.go @@ -20,6 +20,7 @@ type Event struct { JSONConfig *reader.JSONConfig State file.State Data common.MapStr // Use in readers to add data to the event + Pipeline string } func NewEvent(state file.State) *Event { @@ -58,6 +59,17 @@ func (e *Event) ToMapStr() common.MapStr { return event } +// Metadata creates a common.MapStr containing the metadata to +// be associated with the event. +func (e *Event) Metadata() common.MapStr { + if e.Pipeline != "" { + return common.MapStr{ + "pipeline": e.Pipeline, + } + } + return nil +} + // HasData returns true if the event itself contains data // Events without data are only state updates func (e *Event) HasData() bool { diff --git a/filebeat/publisher/async.go b/filebeat/publisher/async.go index 93ed5e5ceb1..38ef9af77ba 100644 --- a/filebeat/publisher/async.go +++ b/filebeat/publisher/async.go @@ -84,10 +84,12 @@ func (p *asyncLogPublisher) Start() { flag: 0, events: events, } + dataEvents, meta := getDataEvents(events) p.client.PublishEvents( - getDataEvents(events), + dataEvents, publisher.Signal(batch), - publisher.Guaranteed) + publisher.Guaranteed, + publisher.MetadataBatch(meta)) p.active.append(batch) case <-ticker.C: diff --git a/filebeat/publisher/publisher.go b/filebeat/publisher/publisher.go index 07bbf5fefb9..914208b9679 100644 --- a/filebeat/publisher/publisher.go +++ b/filebeat/publisher/publisher.go @@ -45,12 +45,15 @@ var ( ) // getDataEvents returns all events which contain data (not only state updates) -func getDataEvents(events []*input.Event) []common.MapStr { - dataEvents := make([]common.MapStr, 0, len(events)) +// together with their associated metadata +func getDataEvents(events []*input.Event) (dataEvents []common.MapStr, meta []common.MapStr) { + dataEvents = make([]common.MapStr, 0, len(events)) + meta = make([]common.MapStr, 0, len(events)) for _, event := range events { if event.HasData() { dataEvents = append(dataEvents, event.ToMapStr()) + meta = append(meta, event.Metadata()) } } - return dataEvents + return dataEvents, meta } diff --git a/filebeat/publisher/sync.go b/filebeat/publisher/sync.go index 9aba6f925c1..5b4885959dc 100644 --- a/filebeat/publisher/sync.go +++ b/filebeat/publisher/sync.go @@ -58,7 +58,9 @@ func (p *syncLogPublisher) Publish() error { case events = <-p.in: } - ok := p.client.PublishEvents(getDataEvents(events), publisher.Sync, publisher.Guaranteed) + dataEvents, meta := getDataEvents(events) + ok := p.client.PublishEvents(dataEvents, publisher.Sync, publisher.Guaranteed, + publisher.MetadataBatch(meta)) if !ok { // PublishEvents will only returns false, if p.client has been closed. return sigPublisherStop diff --git a/filebeat/tests/system/config/filebeat.yml.j2 b/filebeat/tests/system/config/filebeat.yml.j2 index 5b34221f828..63639b3209d 100644 --- a/filebeat/tests/system/config/filebeat.yml.j2 +++ b/filebeat/tests/system/config/filebeat.yml.j2 @@ -28,6 +28,7 @@ filebeat.prospectors: clean_removed: {{clean_removed}} harvester_limit: {{harvester_limit | default(0) }} symlinks: {{symlinks}} + pipeline: {{pipeline}} {% if fields %} fields: @@ -138,11 +139,18 @@ processors: # Configure what outputs to use when sending the data collected by the beat. # Multiple outputs may be used. -#------------------------------- File output ---------------------------------- -{%- if logstash %} +{%- if elasticsearch %} +#------------------------------- Elasticsearch output ---------------------------- +output.elasticsearch: + hosts: ["{{ elasticsearch.host }}"] + pipeline: {{elasticsearch.pipeline}} + index: {{elasticsearch.index}} +{%- elif logstash %} +#------------------------------- Logstash output --------------------------------- output.logstash: hosts: ["{{ logstash.host }}"] {%- else %} +#------------------------------- File output ---------------------------------- output.file: path: {{ output_file_path|default(beat.working_dir + "/output") }} filename: "{{ output_file_filename|default("filebeat") }}" diff --git a/filebeat/tests/system/test_modules.py b/filebeat/tests/system/test_modules.py index 845b6dff02c..2f439a6418a 100644 --- a/filebeat/tests/system/test_modules.py +++ b/filebeat/tests/system/test_modules.py @@ -110,3 +110,65 @@ def run_on_file(self, module, fileset, test_file, cfgfile): if not found: raise Exception("The following expected object was" + " not found: {}".format(obj)) + + @unittest.skipIf(not INTEGRATION_TESTS or + os.getenv("TESTING_ENVIRONMENT") == "2x", + "integration test not available on 2.x") + def test_prospector_pipeline_config(self): + """ + Tests that the pipeline configured in the prospector overwrites + the one from the output. + """ + self.init() + index_name = "filebeat-test-prospector" + try: + self.es.indices.delete(index=index_name) + except: + pass + self.wait_until(lambda: not self.es.indices.exists(index_name)) + + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/*", + elasticsearch=dict( + host=self.elasticsearch_url, + pipeline="estest", + index=index_name), + pipeline="test", + ) + + os.mkdir(self.working_dir + "/log/") + testfile = self.working_dir + "/log/test.log" + with open(testfile, 'a') as file: + file.write("Hello World1\n") + + # put pipeline + self.es.transport.perform_request("PUT", "/_ingest/pipeline/test", + body={ + "processors": [{ + "set": { + "field": "x-pipeline", + "value": "test-pipeline", + } + }]}) + + filebeat = self.start_beat() + + # Wait until the event is in ES + self.wait_until(lambda: self.es.indices.exists(index_name)) + + def search_objects(): + try: + self.es.indices.refresh(index=index_name) + res = self.es.search(index=index_name, + body={"query": {"match_all": {}}}) + return [o["_source"] for o in res["hits"]["hits"]] + except: + return [] + + self.wait_until(lambda: len(search_objects()) > 0, max_timeout=20) + filebeat.check_kill_and_wait() + + objects = search_objects() + assert len(objects) == 1 + o = objects[0] + assert o["x-pipeline"] == "test-pipeline" diff --git a/libbeat/publisher/client.go b/libbeat/publisher/client.go index 246151a96a1..2001f74b790 100644 --- a/libbeat/publisher/client.go +++ b/libbeat/publisher/client.go @@ -238,5 +238,5 @@ func MakeContext(opts []ClientOption) ([]common.MapStr, Context) { } } } - return nil, ctx + return meta, ctx }