diff --git a/esrally/mechanic/telemetry.py b/esrally/mechanic/telemetry.py index 95a9c901d..5b92a4115 100644 --- a/esrally/mechanic/telemetry.py +++ b/esrally/mechanic/telemetry.py @@ -1061,6 +1061,9 @@ def on_benchmark_stop(self): for t in self.index_times(index_stats): self.metrics_store.put_doc(doc=t, level=metrics.MetaInfoScope.cluster) + for ct in self.index_counts(index_stats): + self.metrics_store.put_doc(doc=ct, level=metrics.MetaInfoScope.cluster) + self.add_metrics(self.extract_value(p, ["segments", "doc_values_memory_in_bytes"]), "segments_doc_values_memory_in_bytes", "byte") self.add_metrics(self.extract_value(p, ["segments", "stored_fields_memory_in_bytes"]), "segments_stored_fields_memory_in_bytes", "byte") self.add_metrics(self.extract_value(p, ["segments", "terms_memory_in_bytes"]), "segments_terms_memory_in_bytes", "byte") @@ -1090,7 +1093,7 @@ def index_times(self, stats, per_shard_stats=True): def index_time(self, values, stats, name, path, per_shard_stats): primary_total_stats = self.extract_value(stats, ["_all", "primaries"], default_value={}) value = self.extract_value(primary_total_stats, path) - if value: + if value is not None: doc = { "name": name, "value": value, @@ -1100,6 +1103,24 @@ def index_time(self, values, stats, name, path, per_shard_stats): doc["per-shard"] = self.primary_shard_stats(stats, path) values.append(doc) + def index_counts(self, stats): + counts = [] + self.index_count(counts, stats, "merges_total_count", ["merges", "total"]) + self.index_count(counts, stats, "refresh_total_count", ["refresh", "total"]) + self.index_count(counts, stats, "flush_total_count", ["flush", "total"]) + return counts + + def index_count(self, values, stats, name, path): + primary_total_stats = self.extract_value(stats, ["_all", "primaries"], default_value={}) + value = self.extract_value(primary_total_stats, path) + if value is not None: + doc = { + "name": name, + "value": value, + "unit": "", + } + values.append(doc) + def primary_shard_stats(self, stats, path): shard_stats = [] try: diff --git a/esrally/reporter.py b/esrally/reporter.py index 1e2413f8c..3fd3db5b4 100644 --- a/esrally/reporter.py +++ b/esrally/reporter.py @@ -160,10 +160,13 @@ def __call__(self): result.indexing_throttle_time_per_shard = self.shard_stats("indexing_throttle_time") result.merge_time = self.sum("merges_total_time") result.merge_time_per_shard = self.shard_stats("merges_total_time") + result.merge_count = self.sum("merges_total_count") result.refresh_time = self.sum("refresh_total_time") result.refresh_time_per_shard = self.shard_stats("refresh_total_time") + result.refresh_count = self.sum("refresh_total_count") result.flush_time = self.sum("flush_total_time") result.flush_time_per_shard = self.shard_stats("flush_total_time") + result.flush_count = self.sum("flush_total_count") result.merge_throttle_time = self.sum("merges_total_throttled_time") result.merge_throttle_time_per_shard = self.shard_stats("merges_total_throttled_time") @@ -300,10 +303,13 @@ def __init__(self, d=None): self.indexing_throttle_time_per_shard = self.v(d, "indexing_throttle_time_per_shard", default={}) self.merge_time = self.v(d, "merge_time") self.merge_time_per_shard = self.v(d, "merge_time_per_shard", default={}) + self.merge_count = self.v(d, "merge_count") self.refresh_time = self.v(d, "refresh_time") self.refresh_time_per_shard = self.v(d, "refresh_time_per_shard", default={}) + self.refresh_count = self.v(d, "refresh_count") self.flush_time = self.v(d, "flush_time") self.flush_time_per_shard = self.v(d, "flush_time_per_shard", default={}) + self.flush_count = self.v(d, "flush_count") self.merge_throttle_time = self.v(d, "merge_throttle_time") self.merge_throttle_time_per_shard = self.v(d, "merge_throttle_time_per_shard", default={}) self.ml_processing_time = self.v(d, "ml_processing_time", default=[]) @@ -451,6 +457,7 @@ def report(self): warnings = [] metrics_table = [] metrics_table.extend(self.report_total_times(stats)) + metrics_table.extend(self.report_total_counts(stats)) metrics_table.extend(self.report_merge_part_times(stats)) metrics_table.extend(self.report_ml_processing_times(stats)) @@ -533,12 +540,24 @@ def report_total_times(self, stats): def report_total_time(self, name, total_time, total_time_per_shard): unit = "min" return self.join( - self.line("Total {}".format(name), "", total_time, unit, convert.ms_to_minutes), + self.line("Total primaries {}".format(name), "", total_time, unit, convert.ms_to_minutes), self.line("Min {} per shard".format(name), "", total_time_per_shard.get("min"), unit, convert.ms_to_minutes), self.line("Median {} per shard".format(name), "", total_time_per_shard.get("median"), unit, convert.ms_to_minutes), self.line("Max {} per shard".format(name), "", total_time_per_shard.get("max"), unit, convert.ms_to_minutes), ) + def report_total_counts(self, stats): + lines = [] + lines.extend(self.report_total_count("merge count", stats.merge_count)) + lines.extend(self.report_total_count("refresh count", stats.refresh_count)) + lines.extend(self.report_total_count("flush count", stats.flush_count)) + return lines + + def report_total_count(self, name, total_count): + return self.join( + self.line("Total primaries {}".format(name), "", total_count, ""), + ) + def report_merge_part_times(self, stats): # note that these times are not(!) wall clock time results but total times summed up over multiple threads unit = "min" @@ -569,8 +588,8 @@ def report_cpu_usage(self, stats): def report_gc_times(self, stats): return self.join( - self.line("Total Young Gen GC", "", stats.young_gc_time, "s", convert.ms_to_seconds), - self.line("Total Old Gen GC", "", stats.old_gc_time, "s", convert.ms_to_seconds) + self.line("Total primaries Young Gen GC", "", stats.young_gc_time, "s", convert.ms_to_seconds), + self.line("Total primaries Old Gen GC", "", stats.old_gc_time, "s", convert.ms_to_seconds) ) def report_disk_usage(self, stats): @@ -771,12 +790,18 @@ def report_total_times(self, baseline_stats, contender_stats): lines.extend(self.report_total_time("flush time", baseline_stats.flush_time, baseline_stats.flush_time_per_shard, contender_stats.flush_time, contender_stats.flush_time_per_shard)) + lines.extend(self.report_total_count("merge count", + baseline_stats.merge_count, contender_stats.merge_count)) + lines.extend(self.report_total_count("refresh count", + baseline_stats.refresh_count, contender_stats.refresh_count)) + lines.extend(self.report_total_count("flush count", + baseline_stats.flush_count, contender_stats.flush_count)) return lines def report_total_time(self, name, baseline_total, baseline_per_shard, contender_total, contender_per_shard): unit = "min" return self.join( - self.line("Total {}".format(name), baseline_total, contender_total, "", unit, + self.line("Total primaries {}".format(name), baseline_total, contender_total, "", unit, treat_increase_as_improvement=False, formatter=convert.ms_to_minutes), self.line("Min {} per shard".format(name), baseline_per_shard.get("min"), contender_per_shard.get("min"), "", unit, treat_increase_as_improvement=False, formatter=convert.ms_to_minutes), @@ -786,11 +811,17 @@ def report_total_time(self, name, baseline_total, baseline_per_shard, contender_ treat_increase_as_improvement=False, formatter=convert.ms_to_minutes), ) + def report_total_count(self, name, baseline_total, contender_total): + return self.join( + self.line("Total primaries {}".format(name), baseline_total, contender_total, "", "", + treat_increase_as_improvement=False) + ) + def report_gc_times(self, baseline_stats, contender_stats): return self.join( - self.line("Total Young Gen GC", baseline_stats.young_gc_time, contender_stats.young_gc_time, "", "s", + self.line("Total primaries Young Gen GC", baseline_stats.young_gc_time, contender_stats.young_gc_time, "", "s", treat_increase_as_improvement=False, formatter=convert.ms_to_seconds), - self.line("Total Old Gen GC", baseline_stats.old_gc_time, contender_stats.old_gc_time, "", "s", + self.line("Total primaries Old Gen GC", baseline_stats.old_gc_time, contender_stats.old_gc_time, "", "s", treat_increase_as_improvement=False, formatter=convert.ms_to_seconds) ) diff --git a/tests/mechanic/telemetry_test.py b/tests/mechanic/telemetry_test.py index 5c94d0f11..bf21650c9 100644 --- a/tests/mechanic/telemetry_test.py +++ b/tests/mechanic/telemetry_test.py @@ -1917,16 +1917,19 @@ def test_stores_available_index_stats(self, metrics_store_cluster_count, metrics }, "merges": { "total_time_in_millis": 0, - "total_throttled_time_in_millis": 0 + "total_throttled_time_in_millis": 0, + "total": 0 }, "indexing": { "index_time_in_millis": 0 }, "refresh": { - "total_time_in_millis": 0 + "total_time_in_millis": 0, + "total": 0 }, "flush": { - "total_time_in_millis": 0 + "total_time_in_millis": 0, + "total": 0 } } } @@ -1951,16 +1954,19 @@ def test_stores_available_index_stats(self, metrics_store_cluster_count, metrics }, "merges": { "total_time_in_millis": 509341, - "total_throttled_time_in_millis": 98925 + "total_throttled_time_in_millis": 98925, + "total": 3 }, "indexing": { "index_time_in_millis": 1065688 }, "refresh": { - "total_time_in_millis": 158465 + "total_time_in_millis": 158465, + "total": 10 }, "flush": { - "total_time_in_millis": 19082 + "total_time_in_millis": 0, + "total": 0 } }, "total": { @@ -1998,7 +2004,7 @@ def test_stores_available_index_stats(self, metrics_store_cluster_count, metrics "total_time_in_millis": 81004 }, "flush": { - "total_time_in_millis": 9879 + "total_time_in_millis": 0 } } ], @@ -2018,7 +2024,7 @@ def test_stores_available_index_stats(self, metrics_store_cluster_count, metrics "total_time_in_millis": 77461, }, "flush": { - "total_time_in_millis": 9203 + "total_time_in_millis": 0 } } ] @@ -2042,7 +2048,7 @@ def test_stores_available_index_stats(self, metrics_store_cluster_count, metrics "total_time_in_millis": 81004, }, "flush": { - "total_time_in_millis": 9879 + "total_time_in_millis": 0 } } ], @@ -2063,7 +2069,7 @@ def test_stores_available_index_stats(self, metrics_store_cluster_count, metrics "total_time_in_millis": 77461, }, "flush": { - "total_time_in_millis": 9203 + "total_time_in_millis": 0 } } ] @@ -2115,11 +2121,26 @@ def test_stores_available_index_stats(self, metrics_store_cluster_count, metrics }, level=metrics.MetaInfoScope.cluster), mock.call(doc={ "name": "flush_total_time", - "value": 19082, + "value": 0, "unit": "ms", - # [9203, 9879] + # [0, 0] "per-shard": [s["flush"]["total_time_in_millis"] for s in primary_shards] }, level=metrics.MetaInfoScope.cluster), + mock.call(doc={ + "name": "merges_total_count", + "value": 3, + "unit": "", + }, level=metrics.MetaInfoScope.cluster), + mock.call(doc={ + "name": "refresh_total_count", + "value": 10, + "unit": "" + }, level=metrics.MetaInfoScope.cluster), + mock.call(doc={ + "name": "flush_total_count", + "value": 0, + "unit": "" + }, level=metrics.MetaInfoScope.cluster), ]) metrics_store_cluster_count.assert_has_calls([ diff --git a/tests/reporter_test.py b/tests/reporter_test.py index 4739949e4..9022951c0 100644 --- a/tests/reporter_test.py +++ b/tests/reporter_test.py @@ -136,6 +136,14 @@ def test_as_flat_list(self): ], "young_gc_time": 68, "old_gc_time": 0, + "merge_time": 3702, + "merge_time_per_shard": { + "min": 40, + "median": 3702, + "max": 3900, + "unit": "ms" + }, + "merge_count": 2, "refresh_time": 596, "refresh_time_per_shard": { "min": 48, @@ -143,8 +151,10 @@ def test_as_flat_list(self): "max": 204, "unit": "ms" }, + "refresh_count": 10, "flush_time": None, - "flush_time_per_shard": {} + "flush_time_per_shard": {}, + "flush_count": 0 } s = reporter.Stats(d) @@ -242,6 +252,30 @@ def test_as_flat_list(self): } }, select(metric_list, "old_gc_time")) + self.assertEqual({ + "name": "merge_time", + "value": { + "single": 3702 + } + }, select(metric_list, "merge_time")) + + self.assertEqual({ + "name": "merge_time_per_shard", + "value": { + "min": 40, + "median": 3702, + "max": 3900, + "unit": "ms" + } + }, select(metric_list, "merge_time_per_shard")) + + self.assertEqual({ + "name": "merge_count", + "value": { + "single": 2 + } + }, select(metric_list, "merge_count")) + self.assertEqual({ "name": "refresh_time", "value": { @@ -259,8 +293,21 @@ def test_as_flat_list(self): } }, select(metric_list, "refresh_time_per_shard")) + self.assertEqual({ + "name": "refresh_count", + "value": { + "single": 10 + } + }, select(metric_list, "refresh_count")) + self.assertIsNone(select(metric_list, "flush_time")) self.assertIsNone(select(metric_list, "flush_time_per_shard")) + self.assertEqual({ + "name": "flush_count", + "value": { + "single": 0 + } + }, select(metric_list, "flush_count")) class FormatterTests(TestCase):