diff --git a/docs/track.rst b/docs/track.rst index e2ff8518f..708681934 100644 --- a/docs/track.rst +++ b/docs/track.rst @@ -341,8 +341,9 @@ Throughput will be reported as number of indexed documents per second. force-merge ~~~~~~~~~~~ -With the operation type ``force-merge`` you can call the `force merge API `_. On older versions of Elasticsearch (prior to 2.1), Rally will use the ``optimize API`` instead. It supports the following parameter: +With the operation type ``force-merge`` you can call the `force merge API `_. On older versions of Elasticsearch (prior to 2.1), Rally will use the ``optimize API`` instead. It supports the following parameters: +* ``index`` (optional, defaults to the indices defined in the ``indices`` section or ``_all`` if no indices are defined there): The name of the index that should be force-merged. * ``max-num-segments`` (optional) The number of segments the index should be merged into. Defaults to simply checking if a merge needs to execute, and if so, executes it. This is an administrative operation. Metrics are not reported by default. If reporting is forced by setting ``include-in-reporting`` to ``true``, then throughput is reported as the number of completed force-merge operations per second. diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index 3a0481c32..6fdfe150c 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -576,9 +576,9 @@ def __call__(self, es, params): request_timeout = params.get("request-timeout") try: if max_num_segments: - es.indices.forcemerge(index="_all", max_num_segments=max_num_segments, request_timeout=request_timeout) + es.indices.forcemerge(index=params.get("index"), max_num_segments=max_num_segments, request_timeout=request_timeout) else: - es.indices.forcemerge(index="_all", request_timeout=request_timeout) + es.indices.forcemerge(index=params.get("index"), request_timeout=request_timeout) except elasticsearch.TransportError as e: # this is caused by older versions of Elasticsearch (< 2.1), fall back to optimize if e.status_code == 400: diff --git a/esrally/track/params.py b/esrally/track/params.py index 26a273730..c7c079a14 100644 --- a/esrally/track/params.py +++ b/esrally/track/params.py @@ -580,6 +580,26 @@ def percent_completed(self): return self.current_bulk / self.total_bulks +class ForceMergeParamSource(ParamSource): + def __init__(self, track, params, **kwargs): + super().__init__(track, params, **kwargs) + if len(track.indices) > 0: + default_index = ','.join(map(str, track.indices)) + else: + default_index = "_all" + + self._index_name = params.get("index", default_index) + self._max_num_segments = params.get("max-num-segments") + self._request_timeout = params.get("request-timeout") + + def params(self): + return { + "index": self._index_name, + "max-num-segments": self._max_num_segments, + "request-timeout": self._request_timeout + } + + def number_of_bulks(corpora, partition_index, total_partitions, bulk_size): """ :return: The number of bulk operations that the given client will issue. @@ -922,6 +942,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): register_param_source_for_operation(track.OperationType.CreateIndexTemplate, CreateIndexTemplateParamSource) register_param_source_for_operation(track.OperationType.DeleteIndexTemplate, DeleteIndexTemplateParamSource) register_param_source_for_operation(track.OperationType.Sleep, SleepParamSource) +register_param_source_for_operation(track.OperationType.ForceMerge, ForceMergeParamSource) # Also register by name, so users can use it too register_param_source_for_name("file-reader", BulkIndexParamSource) diff --git a/tests/driver/runner_test.py b/tests/driver/runner_test.py index 322282b08..f19775445 100644 --- a/tests/driver/runner_test.py +++ b/tests/driver/runner_test.py @@ -692,21 +692,21 @@ class ForceMergeRunnerTests(TestCase): @mock.patch("elasticsearch.Elasticsearch") def test_force_merge_with_defaults(self, es): force_merge = runner.ForceMerge() - force_merge(es, params={}) + force_merge(es, params={"index" : "_all"}) es.indices.forcemerge.assert_called_once_with(index="_all", request_timeout=None) @mock.patch("elasticsearch.Elasticsearch") def test_force_merge_override_request_timeout(self, es): force_merge = runner.ForceMerge() - force_merge(es, params={"request-timeout": 50000}) + force_merge(es, params={"index" : "_all", "request-timeout": 50000}) es.indices.forcemerge.assert_called_once_with(index="_all", request_timeout=50000) @mock.patch("elasticsearch.Elasticsearch") def test_force_merge_with_params(self, es): force_merge = runner.ForceMerge() - force_merge(es, params={"max-num-segments": 1, "request-timeout": 50000}) + force_merge(es, params={"index" : "_all", "max-num-segments": 1, "request-timeout": 50000}) es.indices.forcemerge.assert_called_once_with(index="_all", max_num_segments=1, request_timeout=50000) diff --git a/tests/track/params_test.py b/tests/track/params_test.py index 27efbd3e4..46d8a0adf 100644 --- a/tests/track/params_test.py +++ b/tests/track/params_test.py @@ -1700,3 +1700,41 @@ def test_replaces_body_params(self): second = copy.deepcopy(search.params(choice=lambda d: d[1])) self.assertNotEqual(first, second) + + +class ForceMergeParamSourceTests(TestCase): + def test_force_merge_index_from_track(self): + source = params.ForceMergeParamSource(track.Track(name="unit-test", indices=[ + track.Index(name="index1"), + track.Index(name="index2"), + track.Index(name="index3") + ]), params={}) + + p = source.params() + + self.assertEqual("index1,index2,index3", p["index"]) + + def test_force_merge_index_by_name(self): + source = params.ForceMergeParamSource(track.Track(name="unit-test"), params={"index": "index2"}) + + p = source.params() + + self.assertEqual("index2", p["index"]) + + def test_default_force_merge_index(self): + source = params.ForceMergeParamSource(track.Track(name="unit-test"), params={}) + + p = source.params() + + self.assertEqual("_all", p["index"]) + + def test_force_merge_all_params(self): + source = params.ForceMergeParamSource(track.Track(name="unit-test"), params={"index": "index2", + "request-timeout": 30, + "max-num-segments": 1}) + + p = source.params() + + self.assertEqual("index2", p["index"]) + self.assertEqual(30, p["request-timeout"]) + self.assertEqual(1, p["max-num-segments"])