Skip to content

Commit

Permalink
Add refresh/merge/flush totals in summary
Browse files Browse the repository at this point in the history
Due to elastic#608 it's likely we need to benchmark scenarios without using
the node-stats telemetry device. At the same time we want to get a
general idea of how many refreshes/merges/flushes happened (in total)
by accessing the index stats.

Add total count for merges/refresh/flush in summary output; this is
collected from `_all/primaries` in `_stats`.

Also modify summary description to clarify the values are totals from
primary shards.

Finally fix bug where index stats where time/count == 0 got skipped
from the summary.

Closes elastic#614
  • Loading branch information
dliappis committed Dec 10, 2018
1 parent e33abfa commit ced08a4
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 20 deletions.
23 changes: 22 additions & 1 deletion esrally/mechanic/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -1061,6 +1061,9 @@ def on_benchmark_stop(self):
for t in self.index_times(index_stats):
self.metrics_store.put_doc(doc=t, level=metrics.MetaInfoScope.cluster)

for ct in self.index_counts(index_stats):
self.metrics_store.put_doc(doc=ct, level=metrics.MetaInfoScope.cluster)

self.add_metrics(self.extract_value(p, ["segments", "doc_values_memory_in_bytes"]), "segments_doc_values_memory_in_bytes", "byte")
self.add_metrics(self.extract_value(p, ["segments", "stored_fields_memory_in_bytes"]), "segments_stored_fields_memory_in_bytes", "byte")
self.add_metrics(self.extract_value(p, ["segments", "terms_memory_in_bytes"]), "segments_terms_memory_in_bytes", "byte")
Expand Down Expand Up @@ -1090,7 +1093,7 @@ def index_times(self, stats, per_shard_stats=True):
def index_time(self, values, stats, name, path, per_shard_stats):
primary_total_stats = self.extract_value(stats, ["_all", "primaries"], default_value={})
value = self.extract_value(primary_total_stats, path)
if value:
if value is not None:
doc = {
"name": name,
"value": value,
Expand All @@ -1100,6 +1103,24 @@ def index_time(self, values, stats, name, path, per_shard_stats):
doc["per-shard"] = self.primary_shard_stats(stats, path)
values.append(doc)

def index_counts(self, stats):
counts = []
self.index_count(counts, stats, "merges_total_count", ["merges", "total"])
self.index_count(counts, stats, "refresh_total_count", ["refresh", "total"])
self.index_count(counts, stats, "flush_total_count", ["flush", "total"])
return counts

def index_count(self, values, stats, name, path):
primary_total_stats = self.extract_value(stats, ["_all", "primaries"], default_value={})
value = self.extract_value(primary_total_stats, path)
if value is not None:
doc = {
"name": name,
"value": value,
"unit": "",
}
values.append(doc)

def primary_shard_stats(self, stats, path):
shard_stats = []
try:
Expand Down
43 changes: 37 additions & 6 deletions esrally/reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,13 @@ def __call__(self):
result.indexing_throttle_time_per_shard = self.shard_stats("indexing_throttle_time")
result.merge_time = self.sum("merges_total_time")
result.merge_time_per_shard = self.shard_stats("merges_total_time")
result.merge_count = self.sum("merges_total_count")
result.refresh_time = self.sum("refresh_total_time")
result.refresh_time_per_shard = self.shard_stats("refresh_total_time")
result.refresh_count = self.sum("refresh_total_count")
result.flush_time = self.sum("flush_total_time")
result.flush_time_per_shard = self.shard_stats("flush_total_time")
result.flush_count = self.sum("flush_total_count")
result.merge_throttle_time = self.sum("merges_total_throttled_time")
result.merge_throttle_time_per_shard = self.shard_stats("merges_total_throttled_time")

Expand Down Expand Up @@ -300,10 +303,13 @@ def __init__(self, d=None):
self.indexing_throttle_time_per_shard = self.v(d, "indexing_throttle_time_per_shard", default={})
self.merge_time = self.v(d, "merge_time")
self.merge_time_per_shard = self.v(d, "merge_time_per_shard", default={})
self.merge_count = self.v(d, "merge_count")
self.refresh_time = self.v(d, "refresh_time")
self.refresh_time_per_shard = self.v(d, "refresh_time_per_shard", default={})
self.refresh_count = self.v(d, "refresh_count")
self.flush_time = self.v(d, "flush_time")
self.flush_time_per_shard = self.v(d, "flush_time_per_shard", default={})
self.flush_count = self.v(d, "flush_count")
self.merge_throttle_time = self.v(d, "merge_throttle_time")
self.merge_throttle_time_per_shard = self.v(d, "merge_throttle_time_per_shard", default={})
self.ml_processing_time = self.v(d, "ml_processing_time", default=[])
Expand Down Expand Up @@ -451,6 +457,7 @@ def report(self):
warnings = []
metrics_table = []
metrics_table.extend(self.report_total_times(stats))
metrics_table.extend(self.report_total_counts(stats))
metrics_table.extend(self.report_merge_part_times(stats))
metrics_table.extend(self.report_ml_processing_times(stats))

Expand Down Expand Up @@ -533,12 +540,24 @@ def report_total_times(self, stats):
def report_total_time(self, name, total_time, total_time_per_shard):
unit = "min"
return self.join(
self.line("Total {}".format(name), "", total_time, unit, convert.ms_to_minutes),
self.line("Total primaries {}".format(name), "", total_time, unit, convert.ms_to_minutes),
self.line("Min {} per shard".format(name), "", total_time_per_shard.get("min"), unit, convert.ms_to_minutes),
self.line("Median {} per shard".format(name), "", total_time_per_shard.get("median"), unit, convert.ms_to_minutes),
self.line("Max {} per shard".format(name), "", total_time_per_shard.get("max"), unit, convert.ms_to_minutes),
)

def report_total_counts(self, stats):
lines = []
lines.extend(self.report_total_count("merge count", stats.merge_count))
lines.extend(self.report_total_count("refresh count", stats.refresh_count))
lines.extend(self.report_total_count("flush count", stats.flush_count))
return lines

def report_total_count(self, name, total_count):
return self.join(
self.line("Total primaries {}".format(name), "", total_count, ""),
)

def report_merge_part_times(self, stats):
# note that these times are not(!) wall clock time results but total times summed up over multiple threads
unit = "min"
Expand Down Expand Up @@ -569,8 +588,8 @@ def report_cpu_usage(self, stats):

def report_gc_times(self, stats):
return self.join(
self.line("Total Young Gen GC", "", stats.young_gc_time, "s", convert.ms_to_seconds),
self.line("Total Old Gen GC", "", stats.old_gc_time, "s", convert.ms_to_seconds)
self.line("Total primaries Young Gen GC", "", stats.young_gc_time, "s", convert.ms_to_seconds),
self.line("Total primaries Old Gen GC", "", stats.old_gc_time, "s", convert.ms_to_seconds)
)

def report_disk_usage(self, stats):
Expand Down Expand Up @@ -771,12 +790,18 @@ def report_total_times(self, baseline_stats, contender_stats):
lines.extend(self.report_total_time("flush time",
baseline_stats.flush_time, baseline_stats.flush_time_per_shard,
contender_stats.flush_time, contender_stats.flush_time_per_shard))
lines.extend(self.report_total_count("merge count",
baseline_stats.merge_count, contender_stats.merge_count))
lines.extend(self.report_total_count("refresh count",
baseline_stats.refresh_count, contender_stats.refresh_count))
lines.extend(self.report_total_count("flush count",
baseline_stats.flush_count, contender_stats.flush_count))
return lines

def report_total_time(self, name, baseline_total, baseline_per_shard, contender_total, contender_per_shard):
unit = "min"
return self.join(
self.line("Total {}".format(name), baseline_total, contender_total, "", unit,
self.line("Total primaries {}".format(name), baseline_total, contender_total, "", unit,
treat_increase_as_improvement=False, formatter=convert.ms_to_minutes),
self.line("Min {} per shard".format(name), baseline_per_shard.get("min"), contender_per_shard.get("min"), "", unit,
treat_increase_as_improvement=False, formatter=convert.ms_to_minutes),
Expand All @@ -786,11 +811,17 @@ def report_total_time(self, name, baseline_total, baseline_per_shard, contender_
treat_increase_as_improvement=False, formatter=convert.ms_to_minutes),
)

def report_total_count(self, name, baseline_total, contender_total):
return self.join(
self.line("Total primaries {}".format(name), baseline_total, contender_total, "", "",
treat_increase_as_improvement=False)
)

def report_gc_times(self, baseline_stats, contender_stats):
return self.join(
self.line("Total Young Gen GC", baseline_stats.young_gc_time, contender_stats.young_gc_time, "", "s",
self.line("Total primaries Young Gen GC", baseline_stats.young_gc_time, contender_stats.young_gc_time, "", "s",
treat_increase_as_improvement=False, formatter=convert.ms_to_seconds),
self.line("Total Old Gen GC", baseline_stats.old_gc_time, contender_stats.old_gc_time, "", "s",
self.line("Total primaries Old Gen GC", baseline_stats.old_gc_time, contender_stats.old_gc_time, "", "s",
treat_increase_as_improvement=False, formatter=convert.ms_to_seconds)
)

Expand Down
45 changes: 33 additions & 12 deletions tests/mechanic/telemetry_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1917,16 +1917,19 @@ def test_stores_available_index_stats(self, metrics_store_cluster_count, metrics
},
"merges": {
"total_time_in_millis": 0,
"total_throttled_time_in_millis": 0
"total_throttled_time_in_millis": 0,
"total": 0
},
"indexing": {
"index_time_in_millis": 0
},
"refresh": {
"total_time_in_millis": 0
"total_time_in_millis": 0,
"total": 0
},
"flush": {
"total_time_in_millis": 0
"total_time_in_millis": 0,
"total": 0
}
}
}
Expand All @@ -1951,16 +1954,19 @@ def test_stores_available_index_stats(self, metrics_store_cluster_count, metrics
},
"merges": {
"total_time_in_millis": 509341,
"total_throttled_time_in_millis": 98925
"total_throttled_time_in_millis": 98925,
"total": 3
},
"indexing": {
"index_time_in_millis": 1065688
},
"refresh": {
"total_time_in_millis": 158465
"total_time_in_millis": 158465,
"total": 10
},
"flush": {
"total_time_in_millis": 19082
"total_time_in_millis": 0,
"total": 0
}
},
"total": {
Expand Down Expand Up @@ -1998,7 +2004,7 @@ def test_stores_available_index_stats(self, metrics_store_cluster_count, metrics
"total_time_in_millis": 81004
},
"flush": {
"total_time_in_millis": 9879
"total_time_in_millis": 0
}
}
],
Expand All @@ -2018,7 +2024,7 @@ def test_stores_available_index_stats(self, metrics_store_cluster_count, metrics
"total_time_in_millis": 77461,
},
"flush": {
"total_time_in_millis": 9203
"total_time_in_millis": 0
}
}
]
Expand All @@ -2042,7 +2048,7 @@ def test_stores_available_index_stats(self, metrics_store_cluster_count, metrics
"total_time_in_millis": 81004,
},
"flush": {
"total_time_in_millis": 9879
"total_time_in_millis": 0
}
}
],
Expand All @@ -2063,7 +2069,7 @@ def test_stores_available_index_stats(self, metrics_store_cluster_count, metrics
"total_time_in_millis": 77461,
},
"flush": {
"total_time_in_millis": 9203
"total_time_in_millis": 0
}
}
]
Expand Down Expand Up @@ -2115,11 +2121,26 @@ def test_stores_available_index_stats(self, metrics_store_cluster_count, metrics
}, level=metrics.MetaInfoScope.cluster),
mock.call(doc={
"name": "flush_total_time",
"value": 19082,
"value": 0,
"unit": "ms",
# [9203, 9879]
# [0, 0]
"per-shard": [s["flush"]["total_time_in_millis"] for s in primary_shards]
}, level=metrics.MetaInfoScope.cluster),
mock.call(doc={
"name": "merges_total_count",
"value": 3,
"unit": "",
}, level=metrics.MetaInfoScope.cluster),
mock.call(doc={
"name": "refresh_total_count",
"value": 10,
"unit": ""
}, level=metrics.MetaInfoScope.cluster),
mock.call(doc={
"name": "flush_total_count",
"value": 0,
"unit": ""
}, level=metrics.MetaInfoScope.cluster),
])

metrics_store_cluster_count.assert_has_calls([
Expand Down
49 changes: 48 additions & 1 deletion tests/reporter_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,15 +136,25 @@ def test_as_flat_list(self):
],
"young_gc_time": 68,
"old_gc_time": 0,
"merge_time": 3702,
"merge_time_per_shard": {
"min": 40,
"median": 3702,
"max": 3900,
"unit": "ms"
},
"merge_count": 2,
"refresh_time": 596,
"refresh_time_per_shard": {
"min": 48,
"median": 89,
"max": 204,
"unit": "ms"
},
"refresh_count": 10,
"flush_time": None,
"flush_time_per_shard": {}
"flush_time_per_shard": {},
"flush_count": 0
}

s = reporter.Stats(d)
Expand Down Expand Up @@ -242,6 +252,30 @@ def test_as_flat_list(self):
}
}, select(metric_list, "old_gc_time"))

self.assertEqual({
"name": "merge_time",
"value": {
"single": 3702
}
}, select(metric_list, "merge_time"))

self.assertEqual({
"name": "merge_time_per_shard",
"value": {
"min": 40,
"median": 3702,
"max": 3900,
"unit": "ms"
}
}, select(metric_list, "merge_time_per_shard"))

self.assertEqual({
"name": "merge_count",
"value": {
"single": 2
}
}, select(metric_list, "merge_count"))

self.assertEqual({
"name": "refresh_time",
"value": {
Expand All @@ -259,8 +293,21 @@ def test_as_flat_list(self):
}
}, select(metric_list, "refresh_time_per_shard"))

self.assertEqual({
"name": "refresh_count",
"value": {
"single": 10
}
}, select(metric_list, "refresh_count"))

self.assertIsNone(select(metric_list, "flush_time"))
self.assertIsNone(select(metric_list, "flush_time_per_shard"))
self.assertEqual({
"name": "flush_count",
"value": {
"single": 0
}
}, select(metric_list, "flush_count"))


class FormatterTests(TestCase):
Expand Down

0 comments on commit ced08a4

Please sign in to comment.