diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index c20b8419c..25b22b3ba 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -1371,10 +1371,15 @@ async def __call__(self, es, params): if not only_if_exists: await es.indices.delete(index=index_name, params=request_params) ops += 1 - elif only_if_exists and await es.indices.exists(index=index_name): - self.logger.info("Index [%s] already exists. Deleting it.", index_name) - await es.indices.delete(index=index_name, params=request_params) - ops += 1 + elif only_if_exists: + # here we use .get() and check for 404 instead of exists due to a bug in some versions + # of elasticsearch-py/elastic-transport with HEAD calls. + # can change back once using elasticsearch-py >= 8.0.0 and elastic-transport >= 8.1.0 + get_response = await es.indices.get(index=index_name, ignore=[404]) + if not get_response.get("status") == 404: + self.logger.info("Index [%s] already exists. Deleting it.", index_name) + await es.indices.delete(index=index_name, params=request_params) + ops += 1 finally: await set_destructive_requires_name(es, prior_destructive_setting) return { @@ -1403,10 +1408,15 @@ async def __call__(self, es, params): if not only_if_exists: await es.indices.delete_data_stream(name=data_stream, ignore=[404], params=request_params) ops += 1 - elif only_if_exists and await es.indices.exists(index=data_stream): - self.logger.info("Data stream [%s] already exists. Deleting it.", data_stream) - await es.indices.delete_data_stream(name=data_stream, params=request_params) - ops += 1 + elif only_if_exists: + # here we use .get() and check for 404 instead of exists due to a bug in some versions + # of elasticsearch-py/elastic-transport with HEAD calls. + # can change back once using elasticsearch-py >= 8.0.0 and elastic-transport >= 8.1.0 + get_response = await es.indices.get(index=data_stream, ignore=[404]) + if not get_response.get("status") == 404: + self.logger.info("Data stream [%s] already exists. Deleting it.", data_stream) + await es.indices.delete_data_stream(name=data_stream, params=request_params) + ops += 1 return { "weight": ops, @@ -1455,10 +1465,15 @@ async def __call__(self, es, params): if not only_if_exists: await es.cluster.delete_component_template(name=template_name, params=request_params, ignore=[404]) ops_count += 1 - elif only_if_exists and await es.cluster.exists_component_template(name=template_name): - self.logger.info("Component Index template [%s] already exists. Deleting it.", template_name) - await es.cluster.delete_component_template(name=template_name, params=request_params) - ops_count += 1 + elif only_if_exists: + # here we use .get() and check for 404 instead of exists_component_template due to a bug in some versions + # of elasticsearch-py/elastic-transport with HEAD calls. + # can change back once using elasticsearch-py >= 8.0.0 and elastic-transport >= 8.1.0 + component_template_exists = await es.cluster.get_component_template(name=template_name, ignore=[404]) + if not component_template_exists.get("status") == 404: + self.logger.info("Component Index template [%s] already exists. Deleting it.", template_name) + await es.cluster.delete_component_template(name=template_name, params=request_params) + ops_count += 1 return { "weight": ops_count, "unit": "ops", @@ -1505,10 +1520,15 @@ async def __call__(self, es, params): if not only_if_exists: await es.indices.delete_index_template(name=template_name, params=request_params, ignore=[404]) ops_count += 1 - elif only_if_exists and await es.indices.exists_index_template(name=template_name): - self.logger.info("Composable Index template [%s] already exists. Deleting it.", template_name) - await es.indices.delete_index_template(name=template_name, params=request_params) - ops_count += 1 + elif only_if_exists: + # here we use .get() and check for 404 instead of exists_index_template due to a bug in some versions + # of elasticsearch-py/elastic-transport with HEAD calls. + # can change back once using elasticsearch-py >= 8.0.0 and elastic-transport >= 8.1.0 + index_template_exists = await es.indices.get_index_template(name=template_name, ignore=[404]) + if not index_template_exists.get("status") == 404: + self.logger.info("Composable Index template [%s] already exists. Deleting it.", template_name) + await es.indices.delete_index_template(name=template_name, params=request_params) + ops_count += 1 # ensure that we do not provide an empty index pattern by accident if delete_matching_indices and index_pattern: await es.indices.delete(index=index_pattern) @@ -1560,7 +1580,10 @@ async def __call__(self, es, params): if not only_if_exists: await es.indices.delete_template(name=template_name, params=request_params) ops_count += 1 - elif only_if_exists and await es.indices.exists_template(name=template_name): + # here we use .get_template() and check for empty instead of exists_template due to a bug in some versions + # of elasticsearch-py/elastic-transport with HEAD calls. + # can change back once using elasticsearch-py >= 8.0.0 and elastic-transport >= 8.1.0 + elif only_if_exists and await es.indices.get_template(name=template_name, ignore=[404]): self.logger.info("Index template [%s] already exists. Deleting it.", template_name) await es.indices.delete_template(name=template_name, params=request_params) ops_count += 1 diff --git a/tests/driver/runner_test.py b/tests/driver/runner_test.py index 0e251f922..a99b5e7ee 100644 --- a/tests/driver/runner_test.py +++ b/tests/driver/runner_test.py @@ -2712,7 +2712,7 @@ class TestDeleteIndexRunner: @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio async def test_deletes_existing_indices(self, es): - es.indices.exists = mock.AsyncMock(side_effect=[False, True]) + es.indices.get = mock.AsyncMock(side_effect=[{"status": 404}, {"status": 200}]) es.indices.delete = mock.AsyncMock() es.cluster.get_settings = mock.AsyncMock(return_value={"persistent": {}, "transient": {"action.destructive_requires_name": True}}) es.cluster.put_settings = mock.AsyncMock() @@ -2777,7 +2777,7 @@ class TestDeleteDataStreamRunner: @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio async def test_deletes_existing_data_streams(self, es): - es.indices.exists = mock.AsyncMock(side_effect=[False, True]) + es.indices.get = mock.AsyncMock(side_effect=[{"status": 404}, {"status": 200}]) es.indices.delete_data_stream = mock.AsyncMock() r = runner.DeleteDataStream() @@ -2907,7 +2907,7 @@ async def test_deletes_all_index_templates(self, es): @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio async def test_deletes_only_existing_index_templates(self, es): - es.indices.exists_template = mock.AsyncMock(side_effect=[False, True]) + es.indices.get_template = mock.AsyncMock(side_effect=[False, True]) es.indices.delete_template = mock.AsyncMock() es.indices.delete = mock.AsyncMock() @@ -3038,10 +3038,7 @@ async def test_deletes_all_index_templates(self, es): @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio async def test_deletes_only_existing_index_templates(self, es): - async def _side_effect(name): - return name == "templateB" - - es.cluster.exists_component_template = mock.AsyncMock(side_effect=_side_effect) + es.cluster.get_component_template = mock.AsyncMock(side_effect=[{"status": 404}, {"status": 200}]) es.cluster.delete_component_template = mock.AsyncMock() r = runner.DeleteComponentTemplate() @@ -3201,7 +3198,7 @@ async def test_deletes_all_index_templates(self, es): @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio async def test_deletes_only_existing_index_templates(self, es): - es.indices.exists_index_template = mock.AsyncMock(side_effect=[False, True]) + es.indices.get_index_template = mock.AsyncMock(side_effect=[{"status": 404}, {"status": 200}]) es.indices.delete_index_template = mock.AsyncMock() r = runner.DeleteComposableTemplate()