diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.flush/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.flush/10_basic.yml index 659435ae19615..ab34484b8cf9d 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.flush/10_basic.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.flush/10_basic.yml @@ -21,3 +21,34 @@ indices.stats: {level: shards} - is_true: indices.testing.shards.0.0.commit.user_data.sync_id + +--- +"Flush stats": + - skip: + version: " - 6.99.99" + reason: periodic flush stats is introduced in 7.0 + - do: + indices.create: + index: test + body: + settings: + number_of_shards: 1 + index.translog.flush_threshold_size: 160b + - do: + indices.flush: + index: test + - do: + indices.stats: { index: test } + - match: { indices.test.primaries.flush.periodic: 0 } + - match: { indices.test.primaries.flush.total: 1 } + - do: + index: + index: test + type: doc + id: 1 + body: { "message": "a long message to make a periodic flush happen after this index operation" } + - do: + indices.stats: { index: test } + # periodic flush is async + - gte: { indices.test.primaries.flush.periodic: 0 } + - gte: { indices.test.primaries.flush.total: 1 } diff --git a/server/src/main/java/org/elasticsearch/index/flush/FlushStats.java b/server/src/main/java/org/elasticsearch/index/flush/FlushStats.java index 4b931e47372b7..8ce5adca1575c 100644 --- a/server/src/main/java/org/elasticsearch/index/flush/FlushStats.java +++ b/server/src/main/java/org/elasticsearch/index/flush/FlushStats.java @@ -19,6 +19,7 @@ package org.elasticsearch.index.flush; +import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; @@ -31,20 +32,22 @@ public class FlushStats implements Streamable, ToXContentFragment { private long total; - + private long periodic; private long totalTimeInMillis; public FlushStats() { } - public FlushStats(long total, long totalTimeInMillis) { + public FlushStats(long total, long periodic, long totalTimeInMillis) { this.total = total; + this.periodic = periodic; this.totalTimeInMillis = totalTimeInMillis; } - public void add(long total, long totalTimeInMillis) { + public void add(long total, long periodic, long totalTimeInMillis) { this.total += total; + this.periodic += periodic; this.totalTimeInMillis += totalTimeInMillis; } @@ -57,6 +60,7 @@ public void addTotals(FlushStats flushStats) { return; } this.total += flushStats.total; + this.periodic += flushStats.periodic; this.totalTimeInMillis += flushStats.totalTimeInMillis; } @@ -67,6 +71,13 @@ public long getTotal() { return this.total; } + /** + * The number of flushes that were periodically triggered when translog exceeded the flush threshold. + */ + public long getPeriodic() { + return periodic; + } + /** * The total time merges have been executed (in milliseconds). */ @@ -85,6 +96,7 @@ public TimeValue getTotalTime() { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(Fields.FLUSH); builder.field(Fields.TOTAL, total); + builder.field(Fields.PERIODIC, periodic); builder.humanReadableField(Fields.TOTAL_TIME_IN_MILLIS, Fields.TOTAL_TIME, getTotalTime()); builder.endObject(); return builder; @@ -93,6 +105,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws static final class Fields { static final String FLUSH = "flush"; static final String TOTAL = "total"; + static final String PERIODIC = "periodic"; static final String TOTAL_TIME = "total_time"; static final String TOTAL_TIME_IN_MILLIS = "total_time_in_millis"; } @@ -101,11 +114,17 @@ static final class Fields { public void readFrom(StreamInput in) throws IOException { total = in.readVLong(); totalTimeInMillis = in.readVLong(); + if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + periodic = in.readVLong(); + } } @Override public void writeTo(StreamOutput out) throws IOException { out.writeVLong(total); out.writeVLong(totalTimeInMillis); + if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + out.writeVLong(periodic); + } } } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 46e095ab014d0..cab7246a4cb0f 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -57,6 +57,7 @@ import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -208,6 +209,7 @@ Runnable getGlobalCheckpointSyncer() { private final RecoveryStats recoveryStats = new RecoveryStats(); private final MeanMetric refreshMetric = new MeanMetric(); private final MeanMetric flushMetric = new MeanMetric(); + private final CounterMetric periodicFlushMetric = new CounterMetric(); private final ShardEventListener shardEventListener = new ShardEventListener(); @@ -827,7 +829,7 @@ public RefreshStats refreshStats() { } public FlushStats flushStats() { - return new FlushStats(flushMetric.count(), TimeUnit.NANOSECONDS.toMillis(flushMetric.sum())); + return new FlushStats(flushMetric.count(), periodicFlushMetric.count(), TimeUnit.NANOSECONDS.toMillis(flushMetric.sum())); } public DocsStats docStats() { @@ -2344,6 +2346,7 @@ public void onFailure(final Exception e) { @Override protected void doRun() throws IOException { flush(new FlushRequest()); + periodicFlushMetric.inc(); } @Override diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index b14030d46e4ca..cc241d4ad745d 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -19,7 +19,6 @@ package org.elasticsearch.index.shard; import org.apache.lucene.store.LockObtainFailedException; -import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; @@ -42,6 +41,7 @@ import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.CheckedRunnable; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.lucene.uid.Versions; @@ -50,6 +50,7 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.ShardLock; @@ -102,6 +103,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoSearchHits; +import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -347,6 +349,7 @@ public void testMaybeFlush() throws Exception { .setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get(); assertBusy(() -> { // this is async assertFalse(shard.shouldPeriodicallyFlush()); + assertThat(shard.flushStats().getPeriodic(), greaterThan(0L)); }); assertEquals(0, translog.stats().getUncommittedOperations()); translog.sync(); @@ -444,8 +447,12 @@ public void testStressMaybeFlushOrRollTranslogGeneration() throws Exception { if (flush) { final FlushStats flushStats = shard.flushStats(); final long total = flushStats.getTotal(); + final long periodic = flushStats.getPeriodic(); client().prepareIndex("test", "test", "1").setSource("{}", XContentType.JSON).get(); - check = () -> assertEquals(total + 1, shard.flushStats().getTotal()); + check = () -> { + assertThat(shard.flushStats().getTotal(), equalTo(total + 1)); + assertThat(shard.flushStats().getPeriodic(), equalTo(periodic + 1)); + }; } else { final long generation = shard.getEngine().getTranslog().currentFileGeneration(); client().prepareIndex("test", "test", "1").setSource("{}", XContentType.JSON).get(); @@ -461,6 +468,30 @@ public void testStressMaybeFlushOrRollTranslogGeneration() throws Exception { check.run(); } + public void testFlushStats() throws Exception { + final IndexService indexService = createIndex("test"); + ensureGreen(); + Settings settings = Settings.builder().put("index.translog.flush_threshold_size", "" + between(200, 300) + "b").build(); + client().admin().indices().prepareUpdateSettings("test").setSettings(settings).get(); + final int numDocs = between(10, 100); + for (int i = 0; i < numDocs; i++) { + client().prepareIndex("test", "doc", Integer.toString(i)).setSource("{}", XContentType.JSON).get(); + } + // A flush stats may include the new total count but the old period count - assert eventually. + assertBusy(() -> { + final FlushStats flushStats = client().admin().indices().prepareStats("test").clear().setFlush(true).get().getTotal().flush; + assertThat(flushStats.getPeriodic(), allOf(equalTo(flushStats.getTotal()), greaterThan(0L))); + }); + assertBusy(() -> assertThat(indexService.getShard(0).shouldPeriodicallyFlush(), equalTo(false))); + settings = Settings.builder().put("index.translog.flush_threshold_size", (String) null).build(); + client().admin().indices().prepareUpdateSettings("test").setSettings(settings).get(); + + client().prepareIndex("test", "doc", UUIDs.randomBase64UUID()).setSource("{}", XContentType.JSON).get(); + client().admin().indices().prepareFlush("test").setForce(randomBoolean()).setWaitIfOngoing(true).get(); + final FlushStats flushStats = client().admin().indices().prepareStats("test").clear().setFlush(true).get().getTotal().flush; + assertThat(flushStats.getTotal(), greaterThan(flushStats.getPeriodic())); + } + public void testShardHasMemoryBufferOnTranslogRecover() throws Throwable { createIndex("test"); ensureGreen();