Skip to content

Commit

Permalink
Use 'es'snapshot.get' for 'WaitForCurrentSnapshotsCreate' runner
Browse files Browse the repository at this point in the history
  • Loading branch information
b-deam committed Mar 2, 2023
1 parent 758c95c commit b9808fc
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 27 deletions.
21 changes: 2 additions & 19 deletions esrally/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2053,32 +2053,15 @@ async def __call__(self, es, params):
wait_period = params.get("completion-recheck-wait-period", 1)
es_info = await es.info()
es_version = Version.from_string(es_info["version"]["number"])
api = es.snapshot.get
request_args = {"repository": repository, "snapshot": "_current", "verbose": False}

# significantly reduce response size when lots of snapshots have been taken
# only available since ES 8.3.0 (https://github.com/elastic/elasticsearch/pull/86269)
if (es_version.major, es_version.minor) >= (8, 3):
request_params, headers = self._transport_request_params(params)
headers["Content-Type"] = "application/json"

request_params["index_names"] = "false"
request_params["verbose"] = "false"

request_args = {
"method": "GET",
"path": f"_snapshot/{repository}/_current",
"headers": headers,
"params": request_params,
}

# TODO: Switch to native es.snapshot.get once `index_names` becomes supported in
# `es.snapshot.get` of the elasticsearch-py client and we've upgraded the client in Rally, see:
# https://elasticsearch-py.readthedocs.io/en/latest/api.html#elasticsearch.client.SnapshotClient.get
api = es.perform_request
request_args["index_names"] = False

while True:
response = await api(**request_args)
response = await es.snapshot.get(**request_args)

if int(response.get("total")) == 0:
break
Expand Down
11 changes: 3 additions & 8 deletions tests/driver/runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4159,7 +4159,7 @@ async def test_wait_for_current_snapshots_create_after_8_3_0(self, es):
"completion-recheck-wait-period": 0,
}

es.perform_request = mock.AsyncMock(
es.snapshot.get = mock.AsyncMock(
side_effect=[
{
"snapshots": [
Expand Down Expand Up @@ -4202,14 +4202,9 @@ async def test_wait_for_current_snapshots_create_after_8_3_0(self, es):
r = runner.WaitForCurrentSnapshotsCreate()
result = await r(es, task_params)

es.perform_request.assert_awaited_with(
method="GET",
path=f"_snapshot/{repository}/_current",
headers={"Content-Type": "application/json"},
params={"index_names": "false", "verbose": "false"},
)
es.snapshot.get.assert_awaited_with(repository=repository, snapshot="_current", verbose=False, index_names=False)

assert es.perform_request.await_count == 2
assert es.snapshot.get.await_count == 2

assert result is None

Expand Down

0 comments on commit b9808fc

Please sign in to comment.