From e46de91d55f1315feb04f26a65a4bcd0c5bfc562 Mon Sep 17 00:00:00 2001 From: Rick Boyd Date: Tue, 13 Sep 2022 13:45:20 -0400 Subject: [PATCH 1/7] fix deleting big indices --- esrally/driver/runner.py | 12 ++++-------- tests/driver/runner_test.py | 4 ++-- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index c20b8419c..3fee0ce3d 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -1363,18 +1363,14 @@ async def __call__(self, es, params): ops = 0 indices = mandatory(params, "indices", self) - only_if_exists = params.get("only-if-exists", False) request_params = params.get("request-params", {}) + ignore_unavailable = not params.get("only-if-exists", False) + request_params["ignore_unavailable"] = str(ignore_unavailable).lower() prior_destructive_setting = await set_destructive_requires_name(es, False) try: for index_name in indices: - if not only_if_exists: - await es.indices.delete(index=index_name, params=request_params) - ops += 1 - elif only_if_exists and await es.indices.exists(index=index_name): - self.logger.info("Index [%s] already exists. Deleting it.", index_name) - await es.indices.delete(index=index_name, params=request_params) - ops += 1 + await es.indices.delete(index=index_name, params=request_params) + ops += 1 finally: await set_destructive_requires_name(es, prior_destructive_setting) return { diff --git a/tests/driver/runner_test.py b/tests/driver/runner_test.py index 0e251f922..4b775114d 100644 --- a/tests/driver/runner_test.py +++ b/tests/driver/runner_test.py @@ -2723,7 +2723,7 @@ async def test_deletes_existing_indices(self, es): result = await r(es, params) assert result == { - "weight": 1, + "weight": 2, "unit": "ops", "success": True, } @@ -2734,7 +2734,7 @@ async def test_deletes_existing_indices(self, es): mock.call(body={"transient": {"action.destructive_requires_name": True}}), ] ) - es.indices.delete.assert_awaited_once_with(index="indexB", params={}) + es.indices.delete.assert_awaited_with(index="indexB", params={'ignore_unavailable': 'false'}) @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio From dd1567d179b6df6e349cb9b9c829ea0d9e2ab0f3 Mon Sep 17 00:00:00 2001 From: Rick Boyd Date: Tue, 13 Sep 2022 13:48:12 -0400 Subject: [PATCH 2/7] docs and formatting --- docs/track.rst | 5 +---- esrally/driver/runner.py | 10 ++-------- tests/driver/runner_test.py | 2 +- 3 files changed, 4 insertions(+), 13 deletions(-) diff --git a/docs/track.rst b/docs/track.rst index 2bb1d7e64..c85629bb1 100644 --- a/docs/track.rst +++ b/docs/track.rst @@ -1631,13 +1631,11 @@ Properties If you want it to delete all data streams that have been declared in the ``data-streams`` section, you can specify the following properties: -* ``only-if-exists`` (optional, defaults to ``true``): Defines whether a data stream should only be deleted if it exists. * ``request-params`` (optional): A structure containing any request parameters that are allowed by the delete index API. Rally will not attempt to serialize the parameters and pass them as is. Always use "true" / "false" strings for boolean parameters (see example below). If you want it to delete one specific data stream (pattern) instead, you can specify the following properties: * ``data-stream`` (mandatory): One or more names of the data streams that should be deleted. If only one data stream should be deleted, you can use a string otherwise this needs to be a list of strings. -* ``only-if-exists`` (optional, defaults to ``true``): Defines whether a data stream should only be deleted if it exists. * ``request-params`` (optional): A structure containing any request parameters that are allowed by the delete data stream API. Rally will not attempt to serialize the parameters and pass them as is. Always use "true" / "false" strings for boolean parameters (see example below). **Examples** @@ -1654,8 +1652,7 @@ With the following snippet we will delete all ``ds-logs-*`` data streams:: { "name": "delete-data-streams", "operation-type": "delete-data-stream", - "data-stream": "ds-logs-*", - "only-if-exists": false + "data-stream": "ds-logs-*" } This is an administrative operation. Metrics are not reported by default. Reporting can be forced by setting ``include-in-reporting`` to ``true``. diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index 3fee0ce3d..30bd00e94 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -1392,17 +1392,11 @@ async def __call__(self, es, params): ops = 0 data_streams = mandatory(params, "data-streams", self) - only_if_exists = mandatory(params, "only-if-exists", self) request_params = mandatory(params, "request-params", self) for data_stream in data_streams: - if not only_if_exists: - await es.indices.delete_data_stream(name=data_stream, ignore=[404], params=request_params) - ops += 1 - elif only_if_exists and await es.indices.exists(index=data_stream): - self.logger.info("Data stream [%s] already exists. Deleting it.", data_stream) - await es.indices.delete_data_stream(name=data_stream, params=request_params) - ops += 1 + await es.indices.delete_data_stream(name=data_stream, ignore=[404], params=request_params) + ops += 1 return { "weight": ops, diff --git a/tests/driver/runner_test.py b/tests/driver/runner_test.py index 4b775114d..52d2afceb 100644 --- a/tests/driver/runner_test.py +++ b/tests/driver/runner_test.py @@ -2734,7 +2734,7 @@ async def test_deletes_existing_indices(self, es): mock.call(body={"transient": {"action.destructive_requires_name": True}}), ] ) - es.indices.delete.assert_awaited_with(index="indexB", params={'ignore_unavailable': 'false'}) + es.indices.delete.assert_awaited_with(index="indexB", params={"ignore_unavailable": "false"}) @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio From 8775206d0cfefe3931800489042416b73ea2e925 Mon Sep 17 00:00:00 2001 From: Rick Boyd Date: Tue, 13 Sep 2022 14:30:20 -0400 Subject: [PATCH 3/7] test fix --- tests/driver/runner_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/driver/runner_test.py b/tests/driver/runner_test.py index 52d2afceb..3e6723834 100644 --- a/tests/driver/runner_test.py +++ b/tests/driver/runner_test.py @@ -2787,12 +2787,12 @@ async def test_deletes_existing_data_streams(self, es): result = await r(es, params) assert result == { - "weight": 1, + "weight": 2, "unit": "ops", "success": True, } - es.indices.delete_data_stream.assert_awaited_once_with(name="data-stream-B", params={}) + es.indices.delete_data_stream.assert_awaited_with(name="data-stream-B", ignore=[404], params={}) @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio From b664ca9e82606cfcafffbfa87ba260e5931f5bdc Mon Sep 17 00:00:00 2001 From: Rick Boyd Date: Tue, 13 Sep 2022 15:38:00 -0400 Subject: [PATCH 4/7] pivot to using .get() --- docs/track.rst | 5 ++++- esrally/driver/runner.py | 26 ++++++++++++++++++++------ tests/driver/runner_test.py | 12 ++++++------ 3 files changed, 30 insertions(+), 13 deletions(-) diff --git a/docs/track.rst b/docs/track.rst index c85629bb1..2bb1d7e64 100644 --- a/docs/track.rst +++ b/docs/track.rst @@ -1631,11 +1631,13 @@ Properties If you want it to delete all data streams that have been declared in the ``data-streams`` section, you can specify the following properties: +* ``only-if-exists`` (optional, defaults to ``true``): Defines whether a data stream should only be deleted if it exists. * ``request-params`` (optional): A structure containing any request parameters that are allowed by the delete index API. Rally will not attempt to serialize the parameters and pass them as is. Always use "true" / "false" strings for boolean parameters (see example below). If you want it to delete one specific data stream (pattern) instead, you can specify the following properties: * ``data-stream`` (mandatory): One or more names of the data streams that should be deleted. If only one data stream should be deleted, you can use a string otherwise this needs to be a list of strings. +* ``only-if-exists`` (optional, defaults to ``true``): Defines whether a data stream should only be deleted if it exists. * ``request-params`` (optional): A structure containing any request parameters that are allowed by the delete data stream API. Rally will not attempt to serialize the parameters and pass them as is. Always use "true" / "false" strings for boolean parameters (see example below). **Examples** @@ -1652,7 +1654,8 @@ With the following snippet we will delete all ``ds-logs-*`` data streams:: { "name": "delete-data-streams", "operation-type": "delete-data-stream", - "data-stream": "ds-logs-*" + "data-stream": "ds-logs-*", + "only-if-exists": false } This is an administrative operation. Metrics are not reported by default. Reporting can be forced by setting ``include-in-reporting`` to ``true``. diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index 30bd00e94..f77e53a39 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -1363,14 +1363,20 @@ async def __call__(self, es, params): ops = 0 indices = mandatory(params, "indices", self) + only_if_exists = params.get("only-if-exists", False) request_params = params.get("request-params", {}) - ignore_unavailable = not params.get("only-if-exists", False) - request_params["ignore_unavailable"] = str(ignore_unavailable).lower() prior_destructive_setting = await set_destructive_requires_name(es, False) try: for index_name in indices: - await es.indices.delete(index=index_name, params=request_params) - ops += 1 + if not only_if_exists: + await es.indices.delete(index=index_name, params=request_params) + ops += 1 + elif only_if_exists: + get_response = await es.indices.get(index=index_name, ignore=[404]) + if not get_response.get("status") == 404: + self.logger.info("Index [%s] already exists. Deleting it.", index_name) + await es.indices.delete(index=index_name, params=request_params) + ops += 1 finally: await set_destructive_requires_name(es, prior_destructive_setting) return { @@ -1392,11 +1398,19 @@ async def __call__(self, es, params): ops = 0 data_streams = mandatory(params, "data-streams", self) + only_if_exists = mandatory(params, "only-if-exists", self) request_params = mandatory(params, "request-params", self) for data_stream in data_streams: - await es.indices.delete_data_stream(name=data_stream, ignore=[404], params=request_params) - ops += 1 + if not only_if_exists: + await es.indices.delete_data_stream(name=data_stream, ignore=[404], params=request_params) + ops += 1 + elif only_if_exists: + get_response = await es.indices.get(index=data_stream) + if not get_response.get("status") == 404: + self.logger.info("Data stream [%s] already exists. Deleting it.", data_stream) + await es.indices.delete_data_stream(name=data_stream, params=request_params) + ops += 1 return { "weight": ops, diff --git a/tests/driver/runner_test.py b/tests/driver/runner_test.py index 3e6723834..e056feeb3 100644 --- a/tests/driver/runner_test.py +++ b/tests/driver/runner_test.py @@ -2712,7 +2712,7 @@ class TestDeleteIndexRunner: @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio async def test_deletes_existing_indices(self, es): - es.indices.exists = mock.AsyncMock(side_effect=[False, True]) + es.indices.get = mock.AsyncMock(side_effect=[{"status": 404}, {"status": 200}]) es.indices.delete = mock.AsyncMock() es.cluster.get_settings = mock.AsyncMock(return_value={"persistent": {}, "transient": {"action.destructive_requires_name": True}}) es.cluster.put_settings = mock.AsyncMock() @@ -2723,7 +2723,7 @@ async def test_deletes_existing_indices(self, es): result = await r(es, params) assert result == { - "weight": 2, + "weight": 1, "unit": "ops", "success": True, } @@ -2734,7 +2734,7 @@ async def test_deletes_existing_indices(self, es): mock.call(body={"transient": {"action.destructive_requires_name": True}}), ] ) - es.indices.delete.assert_awaited_with(index="indexB", params={"ignore_unavailable": "false"}) + es.indices.delete.assert_awaited_once_with(index="indexB", params={}) @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio @@ -2777,7 +2777,7 @@ class TestDeleteDataStreamRunner: @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio async def test_deletes_existing_data_streams(self, es): - es.indices.exists = mock.AsyncMock(side_effect=[False, True]) + es.indices.get = mock.AsyncMock(side_effect=[{"status": 404}, {"status": 200}]) es.indices.delete_data_stream = mock.AsyncMock() r = runner.DeleteDataStream() @@ -2787,12 +2787,12 @@ async def test_deletes_existing_data_streams(self, es): result = await r(es, params) assert result == { - "weight": 2, + "weight": 1, "unit": "ops", "success": True, } - es.indices.delete_data_stream.assert_awaited_with(name="data-stream-B", ignore=[404], params={}) + es.indices.delete_data_stream.assert_awaited_once_with(name="data-stream-B", params={}) @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio From cb9936ac3a98c47d2ee911961c6bf67d14bada59 Mon Sep 17 00:00:00 2001 From: Rick Boyd Date: Wed, 14 Sep 2022 11:02:37 -0400 Subject: [PATCH 5/7] allow us to handle missing data streams --- esrally/driver/runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index f77e53a39..da55b88b8 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -1406,7 +1406,7 @@ async def __call__(self, es, params): await es.indices.delete_data_stream(name=data_stream, ignore=[404], params=request_params) ops += 1 elif only_if_exists: - get_response = await es.indices.get(index=data_stream) + get_response = await es.indices.get(index=data_stream, ignore=[404]) if not get_response.get("status") == 404: self.logger.info("Data stream [%s] already exists. Deleting it.", data_stream) await es.indices.delete_data_stream(name=data_stream, params=request_params) From 8f0acadc752b2e77c2f043395b176f29ccebe435 Mon Sep 17 00:00:00 2001 From: Rick Boyd Date: Wed, 14 Sep 2022 13:34:51 -0400 Subject: [PATCH 6/7] fix up more operations and leave a comment --- esrally/driver/runner.py | 45 +++++++++++++++++++++++++++---------- tests/driver/runner_test.py | 9 +++----- 2 files changed, 36 insertions(+), 18 deletions(-) diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index da55b88b8..58576d9f1 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -1372,6 +1372,9 @@ async def __call__(self, es, params): await es.indices.delete(index=index_name, params=request_params) ops += 1 elif only_if_exists: + # here we use .get() and check for 404 instead of exists due to a bug in some versions + # of elasticsearch-py/elastic-transport with HEAD calls. + # can change back once using elasticsearch-py >= 8.0.0 and elastic-transport >= 8.1.0 get_response = await es.indices.get(index=index_name, ignore=[404]) if not get_response.get("status") == 404: self.logger.info("Index [%s] already exists. Deleting it.", index_name) @@ -1406,6 +1409,9 @@ async def __call__(self, es, params): await es.indices.delete_data_stream(name=data_stream, ignore=[404], params=request_params) ops += 1 elif only_if_exists: + # here we use .get() and check for 404 instead of exists due to a bug in some versions + # of elasticsearch-py/elastic-transport with HEAD calls. + # can change back once using elasticsearch-py >= 8.0.0 and elastic-transport >= 8.1.0 get_response = await es.indices.get(index=data_stream, ignore=[404]) if not get_response.get("status") == 404: self.logger.info("Data stream [%s] already exists. Deleting it.", data_stream) @@ -1459,10 +1465,15 @@ async def __call__(self, es, params): if not only_if_exists: await es.cluster.delete_component_template(name=template_name, params=request_params, ignore=[404]) ops_count += 1 - elif only_if_exists and await es.cluster.exists_component_template(name=template_name): - self.logger.info("Component Index template [%s] already exists. Deleting it.", template_name) - await es.cluster.delete_component_template(name=template_name, params=request_params) - ops_count += 1 + elif only_if_exists: + # here we use .get() and check for 404 instead of exists_component_template due to a bug in some versions + # of elasticsearch-py/elastic-transport with HEAD calls. + # can change back once using elasticsearch-py >= 8.0.0 and elastic-transport >= 8.1.0 + component_template_exists = await es.cluster.get_component_template(name=template_name, ignore=[404]) + if not component_template_exists.get("status") == 404: + self.logger.info("Component Index template [%s] already exists. Deleting it.", template_name) + await es.cluster.delete_component_template(name=template_name, params=request_params) + ops_count += 1 return { "weight": ops_count, "unit": "ops", @@ -1509,10 +1520,15 @@ async def __call__(self, es, params): if not only_if_exists: await es.indices.delete_index_template(name=template_name, params=request_params, ignore=[404]) ops_count += 1 - elif only_if_exists and await es.indices.exists_index_template(name=template_name): - self.logger.info("Composable Index template [%s] already exists. Deleting it.", template_name) - await es.indices.delete_index_template(name=template_name, params=request_params) - ops_count += 1 + elif only_if_exists: + # here we use .get() and check for 404 instead of exists_index_template due to a bug in some versions + # of elasticsearch-py/elastic-transport with HEAD calls. + # can change back once using elasticsearch-py >= 8.0.0 and elastic-transport >= 8.1.0 + index_template_exists = await es.indices.get_index_template(name=template_name, ignore=[404]) + if not index_template_exists.get("status") == 404: + self.logger.info("Composable Index template [%s] already exists. Deleting it.", template_name) + await es.indices.delete_index_template(name=template_name, params=request_params) + ops_count += 1 # ensure that we do not provide an empty index pattern by accident if delete_matching_indices and index_pattern: await es.indices.delete(index=index_pattern) @@ -1564,10 +1580,15 @@ async def __call__(self, es, params): if not only_if_exists: await es.indices.delete_template(name=template_name, params=request_params) ops_count += 1 - elif only_if_exists and await es.indices.exists_template(name=template_name): - self.logger.info("Index template [%s] already exists. Deleting it.", template_name) - await es.indices.delete_template(name=template_name, params=request_params) - ops_count += 1 + elif only_if_exists: + # here we use .get() and check for 404 instead of exists_template due to a bug in some versions + # of elasticsearch-py/elastic-transport with HEAD calls. + # can change back once using elasticsearch-py >= 8.0.0 and elastic-transport >= 8.1.0 + template_exists = await es.indices.get_template(name=template_name, ignore=[404]) + if not template_exists.get("status") == 404: + self.logger.info("Index template [%s] already exists. Deleting it.", template_name) + await es.indices.delete_template(name=template_name, params=request_params) + ops_count += 1 # ensure that we do not provide an empty index pattern by accident if delete_matching_indices and index_pattern: await es.indices.delete(index=index_pattern) diff --git a/tests/driver/runner_test.py b/tests/driver/runner_test.py index e056feeb3..82660d72f 100644 --- a/tests/driver/runner_test.py +++ b/tests/driver/runner_test.py @@ -2907,7 +2907,7 @@ async def test_deletes_all_index_templates(self, es): @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio async def test_deletes_only_existing_index_templates(self, es): - es.indices.exists_template = mock.AsyncMock(side_effect=[False, True]) + es.indices.get_template = mock.AsyncMock(side_effect=[{"status": 404}, {"status": 200}]) es.indices.delete_template = mock.AsyncMock() es.indices.delete = mock.AsyncMock() @@ -3038,10 +3038,7 @@ async def test_deletes_all_index_templates(self, es): @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio async def test_deletes_only_existing_index_templates(self, es): - async def _side_effect(name): - return name == "templateB" - - es.cluster.exists_component_template = mock.AsyncMock(side_effect=_side_effect) + es.cluster.get_component_template = mock.AsyncMock(side_effect=[{"status": 404}, {"status": 200}]) es.cluster.delete_component_template = mock.AsyncMock() r = runner.DeleteComponentTemplate() @@ -3201,7 +3198,7 @@ async def test_deletes_all_index_templates(self, es): @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio async def test_deletes_only_existing_index_templates(self, es): - es.indices.exists_index_template = mock.AsyncMock(side_effect=[False, True]) + es.indices.get_index_template = mock.AsyncMock(side_effect=[{"status": 404}, {"status": 200}]) es.indices.delete_index_template = mock.AsyncMock() r = runner.DeleteComposableTemplate() From e906071805cbcac04a7c15fd0fe32af74f2ac124 Mon Sep 17 00:00:00 2001 From: Rick Boyd Date: Wed, 14 Sep 2022 15:33:55 -0400 Subject: [PATCH 7/7] properly handle legacy index template api --- esrally/driver/runner.py | 16 +++++++--------- tests/driver/runner_test.py | 2 +- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index 58576d9f1..25b22b3ba 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -1580,15 +1580,13 @@ async def __call__(self, es, params): if not only_if_exists: await es.indices.delete_template(name=template_name, params=request_params) ops_count += 1 - elif only_if_exists: - # here we use .get() and check for 404 instead of exists_template due to a bug in some versions - # of elasticsearch-py/elastic-transport with HEAD calls. - # can change back once using elasticsearch-py >= 8.0.0 and elastic-transport >= 8.1.0 - template_exists = await es.indices.get_template(name=template_name, ignore=[404]) - if not template_exists.get("status") == 404: - self.logger.info("Index template [%s] already exists. Deleting it.", template_name) - await es.indices.delete_template(name=template_name, params=request_params) - ops_count += 1 + # here we use .get_template() and check for empty instead of exists_template due to a bug in some versions + # of elasticsearch-py/elastic-transport with HEAD calls. + # can change back once using elasticsearch-py >= 8.0.0 and elastic-transport >= 8.1.0 + elif only_if_exists and await es.indices.get_template(name=template_name, ignore=[404]): + self.logger.info("Index template [%s] already exists. Deleting it.", template_name) + await es.indices.delete_template(name=template_name, params=request_params) + ops_count += 1 # ensure that we do not provide an empty index pattern by accident if delete_matching_indices and index_pattern: await es.indices.delete(index=index_pattern) diff --git a/tests/driver/runner_test.py b/tests/driver/runner_test.py index 82660d72f..a99b5e7ee 100644 --- a/tests/driver/runner_test.py +++ b/tests/driver/runner_test.py @@ -2907,7 +2907,7 @@ async def test_deletes_all_index_templates(self, es): @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio async def test_deletes_only_existing_index_templates(self, es): - es.indices.get_template = mock.AsyncMock(side_effect=[{"status": 404}, {"status": 200}]) + es.indices.get_template = mock.AsyncMock(side_effect=[False, True]) es.indices.delete_template = mock.AsyncMock() es.indices.delete = mock.AsyncMock()