diff --git a/filebeat/tests/system/test_pipeline.py b/filebeat/tests/system/test_pipeline.py index 7b5b6c381bd..3ca8ba2fdc8 100644 --- a/filebeat/tests/system/test_pipeline.py +++ b/filebeat/tests/system/test_pipeline.py @@ -17,6 +17,12 @@ def init(self): logging.getLogger("urllib3").setLevel(logging.WARNING) logging.getLogger("elasticsearch").setLevel(logging.ERROR) + self.existing_pipelines = {} + try: + self.existing_pipelines = self.es.transport.perform_request("GET", "/_ingest/pipeline/*") + except: + pass + self.modules_path = os.path.abspath(self.working_dir + "/../../../../module") @@ -45,14 +51,6 @@ def test_input_pipeline_config(self): pass self.wait_until(lambda: not self.es.indices.exists(index_name)) - body = { - "transient": { - "script.max_compilations_rate": "100/1m" - } - } - - self.es.transport.perform_request('PUT', "/_cluster/settings", body=body) - self.render_config_template( path=os.path.abspath(self.working_dir) + "/log/*", elasticsearch=dict( @@ -70,34 +68,42 @@ def test_input_pipeline_config(self): with open(testfile, 'a') as file: file.write("Hello World1\n") - # put pipeline - self.es.transport.perform_request("PUT", "/_ingest/pipeline/test", - body={ - "processors": [{ - "set": { - "field": "x-pipeline", - "value": "test-pipeline", - } - }]}) - - filebeat = self.start_beat() - - # Wait until the event is in ES - self.wait_until(lambda: self.es.indices.exists(index_name)) - - def search_objects(): - try: - self.es.indices.refresh(index=index_name) - res = self.es.search(index=index_name, - body={"query": {"match_all": {}}}) - return [o["_source"] for o in res["hits"]["hits"]] - except: - return [] - - self.wait_until(lambda: len(search_objects()) > 0, max_timeout=20) - filebeat.check_kill_and_wait() - - objects = search_objects() - assert len(objects) == 1 - o = objects[0] - assert o["x-pipeline"] == "test-pipeline" + # clean up all pipelines before installing our own + self.es.transport.perform_request("DELETE", "/_ingest/pipeline/*") + + try: + # put pipeline + self.es.transport.perform_request("PUT", "/_ingest/pipeline/test", + body={ + "processors": [{ + "set": { + "field": "x-pipeline", + "value": "test-pipeline", + } + }]}) + + filebeat = self.start_beat() + + # Wait until the event is in ES + self.wait_until(lambda: self.es.indices.exists(index_name)) + + def search_objects(): + try: + self.es.indices.refresh(index=index_name) + res = self.es.search(index=index_name, + body={"query": {"match_all": {}}}) + return [o["_source"] for o in res["hits"]["hits"]] + except: + return [] + + self.wait_until(lambda: len(search_objects()) > 0, max_timeout=20) + filebeat.check_kill_and_wait() + + objects = search_objects() + assert len(objects) == 1 + o = objects[0] + assert o["x-pipeline"] == "test-pipeline" + finally: + for pipeline_id, pipeline in list(self.existing_pipelines.items()): + print(pipeline_id, pipeline) + self.es.transport.perform_request("PUT", "/_ingest/pipeline/"+pipeline_id, body=pipeline)