diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 84abfc0d25d..edad6ec64b4 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -93,6 +93,7 @@ https://github.com/elastic/beats/compare/v5.1.1...master[Check the HEAD diff] *Filebeat* - Add enabled config option to prospectors. {pull}3157[3157] - Add target option for decoded_json_field. {pull}3169[3169] +- Add the `pipeline` config option at the prospector level, for configuring the Ingest Node pipeline ID. {pull}3433[3433] *Winlogbeat* diff --git a/filebeat/_meta/beat.full.yml b/filebeat/_meta/beat.full.yml index 7de2e652220..fc9c0f19741 100644 --- a/filebeat/_meta/beat.full.yml +++ b/filebeat/_meta/beat.full.yml @@ -143,6 +143,10 @@ filebeat.prospectors: # this can mean that the first entries of a new file are skipped. #tail_files: false + # The Ingest Node pipeline ID associated with this prospector. If this is set, it + # overwrites the pipeline option from the Elasticsearch output. + #pipeline: + # Experimental: If symlinks is enabled, symlinks are opened and harvested. The harvester is openening the # original for harvesting but will report the symlink name as source. #symlinks: false diff --git a/filebeat/docs/reference/configuration/filebeat-options.asciidoc b/filebeat/docs/reference/configuration/filebeat-options.asciidoc index 599697728f2..5bb509c49ff 100644 --- a/filebeat/docs/reference/configuration/filebeat-options.asciidoc +++ b/filebeat/docs/reference/configuration/filebeat-options.asciidoc @@ -405,6 +405,14 @@ This option applies to files that Filebeat has not already processed. If you ran NOTE: You can use this setting to avoid indexing old log lines when you run Filebeat on a set of log files for the first time. After the first run, we recommend disabling this option, or you risk losing lines during file rotation. +===== pipeline + +The Ingest Node pipeline ID to set for the events generated by this prospector. + +NOTE: The pipeline ID can also be configured in the Elasticsearch output, but this + option usually results in simpler configuration files. If the pipeline is configured both + in the prospector and in the output, the option from the prospector is the one used. + ===== symlinks experimental[] diff --git a/filebeat/filebeat.full.yml b/filebeat/filebeat.full.yml index 4fec95ecadf..bd2e0ce5e57 100644 --- a/filebeat/filebeat.full.yml +++ b/filebeat/filebeat.full.yml @@ -143,6 +143,10 @@ filebeat.prospectors: # this can mean that the first entries of a new file are skipped. #tail_files: false + # The Ingest Node pipeline ID associated with this prospector. If this is set, it + # overwrites the pipeline option from the Elasticsearch output. + #pipeline: + # Experimental: If symlinks is enabled, symlinks are opened and harvested. The harvester is openening the # original for harvesting but will report the symlink name as source. #symlinks: false diff --git a/filebeat/harvester/config.go b/filebeat/harvester/config.go index e5fcded5292..d1baaf9a638 100644 --- a/filebeat/harvester/config.go +++ b/filebeat/harvester/config.go @@ -52,6 +52,7 @@ type harvesterConfig struct { MaxBytes int `config:"max_bytes" validate:"min=0,nonzero"` Multiline *reader.MultilineConfig `config:"multiline"` JSON *reader.JSONConfig `config:"json"` + Pipeline string `config:"pipeline"` } func (config *harvesterConfig) Validate() error { diff --git a/filebeat/harvester/log.go b/filebeat/harvester/log.go index afbd0a6bd89..a47bf67809c 100644 --- a/filebeat/harvester/log.go +++ b/filebeat/harvester/log.go @@ -137,6 +137,7 @@ func (h *Harvester) Harvest(r reader.Reader) { event.InputType = h.config.InputType event.DocumentType = h.config.DocumentType event.JSONConfig = h.config.JSON + event.Pipeline = h.config.Pipeline } // Always send event to update state, also if lines was skipped diff --git a/filebeat/input/event.go b/filebeat/input/event.go index 415de417842..3d9a8852600 100644 --- a/filebeat/input/event.go +++ b/filebeat/input/event.go @@ -20,6 +20,7 @@ type Event struct { JSONConfig *reader.JSONConfig State file.State Data common.MapStr // Use in readers to add data to the event + Pipeline string } func NewEvent(state file.State) *Event { @@ -58,6 +59,17 @@ func (e *Event) ToMapStr() common.MapStr { return event } +// Metadata creates a common.MapStr containing the metadata to +// be associated with the event. +func (e *Event) Metadata() common.MapStr { + if e.Pipeline != "" { + return common.MapStr{ + "pipeline": e.Pipeline, + } + } + return nil +} + // HasData returns true if the event itself contains data // Events without data are only state updates func (e *Event) HasData() bool { diff --git a/filebeat/publisher/async.go b/filebeat/publisher/async.go index 93ed5e5ceb1..38ef9af77ba 100644 --- a/filebeat/publisher/async.go +++ b/filebeat/publisher/async.go @@ -84,10 +84,12 @@ func (p *asyncLogPublisher) Start() { flag: 0, events: events, } + dataEvents, meta := getDataEvents(events) p.client.PublishEvents( - getDataEvents(events), + dataEvents, publisher.Signal(batch), - publisher.Guaranteed) + publisher.Guaranteed, + publisher.MetadataBatch(meta)) p.active.append(batch) case <-ticker.C: diff --git a/filebeat/publisher/publisher.go b/filebeat/publisher/publisher.go index 07bbf5fefb9..914208b9679 100644 --- a/filebeat/publisher/publisher.go +++ b/filebeat/publisher/publisher.go @@ -45,12 +45,15 @@ var ( ) // getDataEvents returns all events which contain data (not only state updates) -func getDataEvents(events []*input.Event) []common.MapStr { - dataEvents := make([]common.MapStr, 0, len(events)) +// together with their associated metadata +func getDataEvents(events []*input.Event) (dataEvents []common.MapStr, meta []common.MapStr) { + dataEvents = make([]common.MapStr, 0, len(events)) + meta = make([]common.MapStr, 0, len(events)) for _, event := range events { if event.HasData() { dataEvents = append(dataEvents, event.ToMapStr()) + meta = append(meta, event.Metadata()) } } - return dataEvents + return dataEvents, meta } diff --git a/filebeat/publisher/sync.go b/filebeat/publisher/sync.go index 9aba6f925c1..5b4885959dc 100644 --- a/filebeat/publisher/sync.go +++ b/filebeat/publisher/sync.go @@ -58,7 +58,9 @@ func (p *syncLogPublisher) Publish() error { case events = <-p.in: } - ok := p.client.PublishEvents(getDataEvents(events), publisher.Sync, publisher.Guaranteed) + dataEvents, meta := getDataEvents(events) + ok := p.client.PublishEvents(dataEvents, publisher.Sync, publisher.Guaranteed, + publisher.MetadataBatch(meta)) if !ok { // PublishEvents will only returns false, if p.client has been closed. return sigPublisherStop diff --git a/filebeat/tests/system/config/filebeat.yml.j2 b/filebeat/tests/system/config/filebeat.yml.j2 index c3cc5aa2220..6cbe7764192 100644 --- a/filebeat/tests/system/config/filebeat.yml.j2 +++ b/filebeat/tests/system/config/filebeat.yml.j2 @@ -28,6 +28,7 @@ filebeat.prospectors: clean_removed: {{clean_removed}} harvester_limit: {{harvester_limit | default(0) }} symlinks: {{symlinks}} + pipeline: {{pipeline}} {% if fields %} fields: @@ -132,11 +133,18 @@ processors: # Configure what outputs to use when sending the data collected by the beat. # Multiple outputs may be used. -#------------------------------- File output ---------------------------------- -{%- if logstash %} +{%- if elasticsearch %} +#------------------------------- Elasticsearch output ---------------------------- +output.elasticsearch: + hosts: ["{{ elasticsearch.host }}"] + pipeline: {{elasticsearch.pipeline}} + index: {{elasticsearch.index}} +{%- elif logstash %} +#------------------------------- Logstash output --------------------------------- output.logstash: hosts: ["{{ logstash.host }}"] {%- else %} +#------------------------------- File output ---------------------------------- output.file: path: {{ output_file_path|default(beat.working_dir + "/output") }} filename: "{{ output_file_filename|default("filebeat") }}" diff --git a/filebeat/tests/system/test_modules.py b/filebeat/tests/system/test_modules.py index 845b6dff02c..2f439a6418a 100644 --- a/filebeat/tests/system/test_modules.py +++ b/filebeat/tests/system/test_modules.py @@ -110,3 +110,65 @@ def run_on_file(self, module, fileset, test_file, cfgfile): if not found: raise Exception("The following expected object was" + " not found: {}".format(obj)) + + @unittest.skipIf(not INTEGRATION_TESTS or + os.getenv("TESTING_ENVIRONMENT") == "2x", + "integration test not available on 2.x") + def test_prospector_pipeline_config(self): + """ + Tests that the pipeline configured in the prospector overwrites + the one from the output. + """ + self.init() + index_name = "filebeat-test-prospector" + try: + self.es.indices.delete(index=index_name) + except: + pass + self.wait_until(lambda: not self.es.indices.exists(index_name)) + + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/*", + elasticsearch=dict( + host=self.elasticsearch_url, + pipeline="estest", + index=index_name), + pipeline="test", + ) + + os.mkdir(self.working_dir + "/log/") + testfile = self.working_dir + "/log/test.log" + with open(testfile, 'a') as file: + file.write("Hello World1\n") + + # put pipeline + self.es.transport.perform_request("PUT", "/_ingest/pipeline/test", + body={ + "processors": [{ + "set": { + "field": "x-pipeline", + "value": "test-pipeline", + } + }]}) + + filebeat = self.start_beat() + + # Wait until the event is in ES + self.wait_until(lambda: self.es.indices.exists(index_name)) + + def search_objects(): + try: + self.es.indices.refresh(index=index_name) + res = self.es.search(index=index_name, + body={"query": {"match_all": {}}}) + return [o["_source"] for o in res["hits"]["hits"]] + except: + return [] + + self.wait_until(lambda: len(search_objects()) > 0, max_timeout=20) + filebeat.check_kill_and_wait() + + objects = search_objects() + assert len(objects) == 1 + o = objects[0] + assert o["x-pipeline"] == "test-pipeline" diff --git a/libbeat/publisher/client.go b/libbeat/publisher/client.go index 246151a96a1..2001f74b790 100644 --- a/libbeat/publisher/client.go +++ b/libbeat/publisher/client.go @@ -238,5 +238,5 @@ func MakeContext(opts []ClientOption) ([]common.MapStr, Context) { } } } - return nil, ctx + return meta, ctx }