Skip to content

Commit

Permalink
Force merge only track indices by default (#865)
Browse files Browse the repository at this point in the history
With this commit only indices that are defined by the track are force-merged.
Users can override this behavior with a new `index` parameter that allows to
specify different indices (or the special value `_all` to force-merge all
indices).

Closes #835
  • Loading branch information
rohitnair authored and danielmitterdorfer committed Jan 23, 2020
1 parent 9edb2fd commit 21347e1
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 6 deletions.
3 changes: 2 additions & 1 deletion docs/track.rst
Original file line number Diff line number Diff line change
Expand Up @@ -341,8 +341,9 @@ Throughput will be reported as number of indexed documents per second.
force-merge
~~~~~~~~~~~

With the operation type ``force-merge`` you can call the `force merge API <http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-forcemerge.html>`_. On older versions of Elasticsearch (prior to 2.1), Rally will use the ``optimize API`` instead. It supports the following parameter:
With the operation type ``force-merge`` you can call the `force merge API <http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-forcemerge.html>`_. On older versions of Elasticsearch (prior to 2.1), Rally will use the ``optimize API`` instead. It supports the following parameters:

* ``index`` (optional, defaults to the indices defined in the ``indices`` section or ``_all`` if no indices are defined there): The name of the index that should be force-merged.
* ``max-num-segments`` (optional) The number of segments the index should be merged into. Defaults to simply checking if a merge needs to execute, and if so, executes it.

This is an administrative operation. Metrics are not reported by default. If reporting is forced by setting ``include-in-reporting`` to ``true``, then throughput is reported as the number of completed force-merge operations per second.
Expand Down
4 changes: 2 additions & 2 deletions esrally/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -576,9 +576,9 @@ def __call__(self, es, params):
request_timeout = params.get("request-timeout")
try:
if max_num_segments:
es.indices.forcemerge(index="_all", max_num_segments=max_num_segments, request_timeout=request_timeout)
es.indices.forcemerge(index=params.get("index"), max_num_segments=max_num_segments, request_timeout=request_timeout)
else:
es.indices.forcemerge(index="_all", request_timeout=request_timeout)
es.indices.forcemerge(index=params.get("index"), request_timeout=request_timeout)
except elasticsearch.TransportError as e:
# this is caused by older versions of Elasticsearch (< 2.1), fall back to optimize
if e.status_code == 400:
Expand Down
21 changes: 21 additions & 0 deletions esrally/track/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,26 @@ def percent_completed(self):
return self.current_bulk / self.total_bulks


class ForceMergeParamSource(ParamSource):
def __init__(self, track, params, **kwargs):
super().__init__(track, params, **kwargs)
if len(track.indices) > 0:
default_index = ','.join(map(str, track.indices))
else:
default_index = "_all"

self._index_name = params.get("index", default_index)
self._max_num_segments = params.get("max-num-segments")
self._request_timeout = params.get("request-timeout")

def params(self):
return {
"index": self._index_name,
"max-num-segments": self._max_num_segments,
"request-timeout": self._request_timeout
}


def number_of_bulks(corpora, partition_index, total_partitions, bulk_size):
"""
:return: The number of bulk operations that the given client will issue.
Expand Down Expand Up @@ -922,6 +942,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):
register_param_source_for_operation(track.OperationType.CreateIndexTemplate, CreateIndexTemplateParamSource)
register_param_source_for_operation(track.OperationType.DeleteIndexTemplate, DeleteIndexTemplateParamSource)
register_param_source_for_operation(track.OperationType.Sleep, SleepParamSource)
register_param_source_for_operation(track.OperationType.ForceMerge, ForceMergeParamSource)

# Also register by name, so users can use it too
register_param_source_for_name("file-reader", BulkIndexParamSource)
6 changes: 3 additions & 3 deletions tests/driver/runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -692,21 +692,21 @@ class ForceMergeRunnerTests(TestCase):
@mock.patch("elasticsearch.Elasticsearch")
def test_force_merge_with_defaults(self, es):
force_merge = runner.ForceMerge()
force_merge(es, params={})
force_merge(es, params={"index" : "_all"})

es.indices.forcemerge.assert_called_once_with(index="_all", request_timeout=None)

@mock.patch("elasticsearch.Elasticsearch")
def test_force_merge_override_request_timeout(self, es):
force_merge = runner.ForceMerge()
force_merge(es, params={"request-timeout": 50000})
force_merge(es, params={"index" : "_all", "request-timeout": 50000})

es.indices.forcemerge.assert_called_once_with(index="_all", request_timeout=50000)

@mock.patch("elasticsearch.Elasticsearch")
def test_force_merge_with_params(self, es):
force_merge = runner.ForceMerge()
force_merge(es, params={"max-num-segments": 1, "request-timeout": 50000})
force_merge(es, params={"index" : "_all", "max-num-segments": 1, "request-timeout": 50000})

es.indices.forcemerge.assert_called_once_with(index="_all", max_num_segments=1, request_timeout=50000)

Expand Down
38 changes: 38 additions & 0 deletions tests/track/params_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1700,3 +1700,41 @@ def test_replaces_body_params(self):
second = copy.deepcopy(search.params(choice=lambda d: d[1]))

self.assertNotEqual(first, second)


class ForceMergeParamSourceTests(TestCase):
def test_force_merge_index_from_track(self):
source = params.ForceMergeParamSource(track.Track(name="unit-test", indices=[
track.Index(name="index1"),
track.Index(name="index2"),
track.Index(name="index3")
]), params={})

p = source.params()

self.assertEqual("index1,index2,index3", p["index"])

def test_force_merge_index_by_name(self):
source = params.ForceMergeParamSource(track.Track(name="unit-test"), params={"index": "index2"})

p = source.params()

self.assertEqual("index2", p["index"])

def test_default_force_merge_index(self):
source = params.ForceMergeParamSource(track.Track(name="unit-test"), params={})

p = source.params()

self.assertEqual("_all", p["index"])

def test_force_merge_all_params(self):
source = params.ForceMergeParamSource(track.Track(name="unit-test"), params={"index": "index2",
"request-timeout": 30,
"max-num-segments": 1})

p = source.params()

self.assertEqual("index2", p["index"])
self.assertEqual(30, p["request-timeout"])
self.assertEqual(1, p["max-num-segments"])

0 comments on commit 21347e1

Please sign in to comment.