From 0c95b2d61d95c5e7c967c133d950f42553cbd017 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 11 Apr 2018 11:15:33 -0400 Subject: [PATCH] Add periodic flush count to flush stats (#29360) Currently, a flush stats contains only the total flush which is the sum of manual flush (via API) and periodic flush (async triggered when the uncommitted translog size is exceeded the flush threshold). Sometimes, it's useful to know these two numbers independently. This commit tracks and returns a periodic flush count in a flush stats. --- .../test/indices.flush/10_basic.yml | 31 +++++++++++++++++ .../elasticsearch/index/flush/FlushStats.java | 25 ++++++++++++-- .../elasticsearch/index/shard/IndexShard.java | 5 ++- .../index/shard/IndexShardIT.java | 33 ++++++++++++++++++- 4 files changed, 89 insertions(+), 5 deletions(-) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.flush/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.flush/10_basic.yml index 659435ae19615..f80f6b1096b27 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.flush/10_basic.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.flush/10_basic.yml @@ -21,3 +21,34 @@ indices.stats: {level: shards} - is_true: indices.testing.shards.0.0.commit.user_data.sync_id + +--- +"Flush stats": + - skip: + version: " - 6.2.99" + reason: periodic flush stats is introduced in 6.3.0 + - do: + indices.create: + index: test + body: + settings: + number_of_shards: 1 + index.translog.flush_threshold_size: 160b + - do: + indices.flush: + index: test + - do: + indices.stats: { index: test } + - match: { indices.test.primaries.flush.periodic: 0 } + - match: { indices.test.primaries.flush.total: 1 } + - do: + index: + index: test + type: doc + id: 1 + body: { "message": "a long message to make a periodic flush happen after this index operation" } + - do: + indices.stats: { index: test } + # periodic flush is async + - gte: { indices.test.primaries.flush.periodic: 0 } + - gte: { indices.test.primaries.flush.total: 1 } diff --git a/server/src/main/java/org/elasticsearch/index/flush/FlushStats.java b/server/src/main/java/org/elasticsearch/index/flush/FlushStats.java index 4b931e47372b7..02e44dac105c4 100644 --- a/server/src/main/java/org/elasticsearch/index/flush/FlushStats.java +++ b/server/src/main/java/org/elasticsearch/index/flush/FlushStats.java @@ -19,6 +19,7 @@ package org.elasticsearch.index.flush; +import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; @@ -31,20 +32,22 @@ public class FlushStats implements Streamable, ToXContentFragment { private long total; - + private long periodic; private long totalTimeInMillis; public FlushStats() { } - public FlushStats(long total, long totalTimeInMillis) { + public FlushStats(long total, long periodic, long totalTimeInMillis) { this.total = total; + this.periodic = periodic; this.totalTimeInMillis = totalTimeInMillis; } - public void add(long total, long totalTimeInMillis) { + public void add(long total, long periodic, long totalTimeInMillis) { this.total += total; + this.periodic += periodic; this.totalTimeInMillis += totalTimeInMillis; } @@ -57,6 +60,7 @@ public void addTotals(FlushStats flushStats) { return; } this.total += flushStats.total; + this.periodic += flushStats.periodic; this.totalTimeInMillis += flushStats.totalTimeInMillis; } @@ -67,6 +71,13 @@ public long getTotal() { return this.total; } + /** + * The number of flushes that were periodically triggered when translog exceeded the flush threshold. + */ + public long getPeriodic() { + return periodic; + } + /** * The total time merges have been executed (in milliseconds). */ @@ -85,6 +96,7 @@ public TimeValue getTotalTime() { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(Fields.FLUSH); builder.field(Fields.TOTAL, total); + builder.field(Fields.PERIODIC, periodic); builder.humanReadableField(Fields.TOTAL_TIME_IN_MILLIS, Fields.TOTAL_TIME, getTotalTime()); builder.endObject(); return builder; @@ -93,6 +105,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws static final class Fields { static final String FLUSH = "flush"; static final String TOTAL = "total"; + static final String PERIODIC = "periodic"; static final String TOTAL_TIME = "total_time"; static final String TOTAL_TIME_IN_MILLIS = "total_time_in_millis"; } @@ -101,11 +114,17 @@ static final class Fields { public void readFrom(StreamInput in) throws IOException { total = in.readVLong(); totalTimeInMillis = in.readVLong(); + if (in.getVersion().onOrAfter(Version.V_6_3_0)) { + periodic = in.readVLong(); + } } @Override public void writeTo(StreamOutput out) throws IOException { out.writeVLong(total); out.writeVLong(totalTimeInMillis); + if (out.getVersion().onOrAfter(Version.V_6_3_0)) { + out.writeVLong(periodic); + } } } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index f308fd66fdf6a..db0c1bd7d5485 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -59,6 +59,7 @@ import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -210,6 +211,7 @@ Runnable getGlobalCheckpointSyncer() { private final RecoveryStats recoveryStats = new RecoveryStats(); private final MeanMetric refreshMetric = new MeanMetric(); private final MeanMetric flushMetric = new MeanMetric(); + private final CounterMetric periodicFlushMetric = new CounterMetric(); private final ShardEventListener shardEventListener = new ShardEventListener(); @@ -862,7 +864,7 @@ public RefreshStats refreshStats() { } public FlushStats flushStats() { - return new FlushStats(flushMetric.count(), TimeUnit.NANOSECONDS.toMillis(flushMetric.sum())); + return new FlushStats(flushMetric.count(), periodicFlushMetric.count(), TimeUnit.NANOSECONDS.toMillis(flushMetric.sum())); } public DocsStats docStats() { @@ -2379,6 +2381,7 @@ public void onFailure(final Exception e) { @Override protected void doRun() throws IOException { flush(new FlushRequest()); + periodicFlushMetric.inc(); } @Override diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index e2fe98f1c6145..3e95b5b12c7d7 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -39,6 +39,7 @@ import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.CheckedRunnable; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.lucene.uid.Versions; @@ -96,6 +97,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; +import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -341,6 +343,7 @@ public void testMaybeFlush() throws Exception { .setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get(); assertBusy(() -> { // this is async assertFalse(shard.shouldPeriodicallyFlush()); + assertThat(shard.flushStats().getPeriodic(), greaterThan(0L)); }); assertEquals(0, translog.stats().getUncommittedOperations()); translog.sync(); @@ -438,8 +441,12 @@ public void testStressMaybeFlushOrRollTranslogGeneration() throws Exception { if (flush) { final FlushStats flushStats = shard.flushStats(); final long total = flushStats.getTotal(); + final long periodic = flushStats.getPeriodic(); client().prepareIndex("test", "test", "1").setSource("{}", XContentType.JSON).get(); - check = () -> assertEquals(total + 1, shard.flushStats().getTotal()); + check = () -> { + assertThat(shard.flushStats().getTotal(), equalTo(total + 1)); + assertThat(shard.flushStats().getPeriodic(), equalTo(periodic + 1)); + }; } else { final long generation = shard.getEngine().getTranslog().currentFileGeneration(); client().prepareIndex("test", "test", "1").setSource("{}", XContentType.JSON).get(); @@ -455,6 +462,30 @@ public void testStressMaybeFlushOrRollTranslogGeneration() throws Exception { check.run(); } + public void testFlushStats() throws Exception { + final IndexService indexService = createIndex("test"); + ensureGreen(); + Settings settings = Settings.builder().put("index.translog.flush_threshold_size", "" + between(200, 300) + "b").build(); + client().admin().indices().prepareUpdateSettings("test").setSettings(settings).get(); + final int numDocs = between(10, 100); + for (int i = 0; i < numDocs; i++) { + client().prepareIndex("test", "doc", Integer.toString(i)).setSource("{}", XContentType.JSON).get(); + } + // A flush stats may include the new total count but the old period count - assert eventually. + assertBusy(() -> { + final FlushStats flushStats = client().admin().indices().prepareStats("test").clear().setFlush(true).get().getTotal().flush; + assertThat(flushStats.getPeriodic(), allOf(equalTo(flushStats.getTotal()), greaterThan(0L))); + }); + assertBusy(() -> assertThat(indexService.getShard(0).shouldPeriodicallyFlush(), equalTo(false))); + settings = Settings.builder().put("index.translog.flush_threshold_size", (String) null).build(); + client().admin().indices().prepareUpdateSettings("test").setSettings(settings).get(); + + client().prepareIndex("test", "doc", UUIDs.randomBase64UUID()).setSource("{}", XContentType.JSON).get(); + client().admin().indices().prepareFlush("test").setForce(randomBoolean()).setWaitIfOngoing(true).get(); + final FlushStats flushStats = client().admin().indices().prepareStats("test").clear().setFlush(true).get().getTotal().flush; + assertThat(flushStats.getTotal(), greaterThan(flushStats.getPeriodic())); + } + public void testShardHasMemoryBufferOnTranslogRecover() throws Throwable { createIndex("test"); ensureGreen();