Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add periodic flush count to flush stats #29360

Merged
merged 4 commits into from
Apr 11, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,34 @@
indices.stats: {level: shards}

- is_true: indices.testing.shards.0.0.commit.user_data.sync_id

---
"Flush stats":
- skip:
version: " - 6.99.99"
reason: periodic flush stats is introduced in 7.0
- do:
indices.create:
index: test
body:
settings:
number_of_shards: 1
index.translog.flush_threshold_size: 160b
- do:
indices.flush:
index: test
- do:
indices.stats: { index: test }
- match: { indices.test.primaries.flush.periodic: 0 }
- match: { indices.test.primaries.flush.total: 1 }
- do:
index:
index: test
type: doc
id: 1
body: { "message": "a long message to make a periodic flush happen after this index operation" }
- do:
indices.stats: { index: test }
# periodic flush is async
- gte: { indices.test.primaries.flush.periodic: 0 }
- gte: { indices.test.primaries.flush.total: 1 }
25 changes: 22 additions & 3 deletions server/src/main/java/org/elasticsearch/index/flush/FlushStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.index.flush;

import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
Expand All @@ -31,20 +32,22 @@
public class FlushStats implements Streamable, ToXContentFragment {

private long total;

private long periodic;
private long totalTimeInMillis;

public FlushStats() {

}

public FlushStats(long total, long totalTimeInMillis) {
public FlushStats(long total, long periodic, long totalTimeInMillis) {
this.total = total;
this.periodic = periodic;
this.totalTimeInMillis = totalTimeInMillis;
}

public void add(long total, long totalTimeInMillis) {
public void add(long total, long periodic, long totalTimeInMillis) {
this.total += total;
this.periodic += periodic;
this.totalTimeInMillis += totalTimeInMillis;
}

Expand All @@ -57,6 +60,7 @@ public void addTotals(FlushStats flushStats) {
return;
}
this.total += flushStats.total;
this.periodic += flushStats.periodic;
this.totalTimeInMillis += flushStats.totalTimeInMillis;
}

Expand All @@ -67,6 +71,13 @@ public long getTotal() {
return this.total;
}

/**
* The number of flushes that were periodically triggered when translog exceeded the flush threshold.
*/
public long getPeriodic() {
return periodic;
}

/**
* The total time merges have been executed (in milliseconds).
*/
Expand All @@ -85,6 +96,7 @@ public TimeValue getTotalTime() {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.FLUSH);
builder.field(Fields.TOTAL, total);
builder.field(Fields.PERIODIC, periodic);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we have a rest test that checks that this is there?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@s1monw
I had a test but I could not verify the periodic value as the periodic flush is executed async. Do you have any suggestion for this? Or just check its presence (eg. 0 is ok)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used the analogue of gt: { periodic: 0} when I did something similar.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @DaveCTurner. Unfortunately, sometimes a period flush might still be executing (or just scheduled) and periodic is still 0.

Copy link
Contributor

@DaveCTurner DaveCTurner Apr 4, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies, I meant gte: { periodic: 0 }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah ++

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will go this suggestion. Thanks @DaveCTurner and @s1monw.

builder.humanReadableField(Fields.TOTAL_TIME_IN_MILLIS, Fields.TOTAL_TIME, getTotalTime());
builder.endObject();
return builder;
Expand All @@ -93,6 +105,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
static final class Fields {
static final String FLUSH = "flush";
static final String TOTAL = "total";
static final String PERIODIC = "periodic";
static final String TOTAL_TIME = "total_time";
static final String TOTAL_TIME_IN_MILLIS = "total_time_in_millis";
}
Expand All @@ -101,11 +114,17 @@ static final class Fields {
public void readFrom(StreamInput in) throws IOException {
total = in.readVLong();
totalTimeInMillis = in.readVLong();
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
periodic = in.readVLong();
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(total);
out.writeVLong(totalTimeInMillis);
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
out.writeVLong(periodic);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
Expand Down Expand Up @@ -210,6 +211,7 @@ Runnable getGlobalCheckpointSyncer() {
private final RecoveryStats recoveryStats = new RecoveryStats();
private final MeanMetric refreshMetric = new MeanMetric();
private final MeanMetric flushMetric = new MeanMetric();
private final CounterMetric periodicFlushMetric = new CounterMetric();

private final ShardEventListener shardEventListener = new ShardEventListener();

Expand Down Expand Up @@ -846,7 +848,7 @@ public RefreshStats refreshStats() {
}

public FlushStats flushStats() {
return new FlushStats(flushMetric.count(), TimeUnit.NANOSECONDS.toMillis(flushMetric.sum()));
return new FlushStats(flushMetric.count(), periodicFlushMetric.count(), TimeUnit.NANOSECONDS.toMillis(flushMetric.sum()));
}

public DocsStats docStats() {
Expand Down Expand Up @@ -2363,6 +2365,7 @@ public void onFailure(final Exception e) {
@Override
protected void doRun() throws IOException {
flush(new FlushRequest());
periodicFlushMetric.inc();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.elasticsearch.index.shard;

import org.apache.lucene.store.LockObtainFailedException;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
Expand All @@ -42,6 +41,7 @@
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.lucene.uid.Versions;
Expand All @@ -50,6 +50,7 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLock;
Expand Down Expand Up @@ -102,6 +103,7 @@
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoSearchHits;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
Expand Down Expand Up @@ -347,6 +349,7 @@ public void testMaybeFlush() throws Exception {
.setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get();
assertBusy(() -> { // this is async
assertFalse(shard.shouldPeriodicallyFlush());
assertThat(shard.flushStats().getPeriodic(), greaterThan(0L));
});
assertEquals(0, translog.stats().getUncommittedOperations());
translog.sync();
Expand Down Expand Up @@ -444,8 +447,12 @@ public void testStressMaybeFlushOrRollTranslogGeneration() throws Exception {
if (flush) {
final FlushStats flushStats = shard.flushStats();
final long total = flushStats.getTotal();
final long periodic = flushStats.getPeriodic();
client().prepareIndex("test", "test", "1").setSource("{}", XContentType.JSON).get();
check = () -> assertEquals(total + 1, shard.flushStats().getTotal());
check = () -> {
assertThat(shard.flushStats().getTotal(), equalTo(total + 1));
assertThat(shard.flushStats().getPeriodic(), equalTo(periodic + 1));
};
} else {
final long generation = shard.getEngine().getTranslog().currentFileGeneration();
client().prepareIndex("test", "test", "1").setSource("{}", XContentType.JSON).get();
Expand All @@ -461,6 +468,30 @@ public void testStressMaybeFlushOrRollTranslogGeneration() throws Exception {
check.run();
}

public void testFlushStats() throws Exception {
final IndexService indexService = createIndex("test");
ensureGreen();
Settings settings = Settings.builder().put("index.translog.flush_threshold_size", "" + between(200, 300) + "b").build();
client().admin().indices().prepareUpdateSettings("test").setSettings(settings).get();
final int numDocs = between(10, 100);
for (int i = 0; i < numDocs; i++) {
client().prepareIndex("test", "doc", Integer.toString(i)).setSource("{}", XContentType.JSON).get();
}
// A flush stats may include the new total count but the old period count - assert eventually.
assertBusy(() -> {
final FlushStats flushStats = client().admin().indices().prepareStats("test").clear().setFlush(true).get().getTotal().flush;
assertThat(flushStats.getPeriodic(), allOf(equalTo(flushStats.getTotal()), greaterThan(0L)));
});
assertBusy(() -> assertThat(indexService.getShard(0).shouldPeriodicallyFlush(), equalTo(false)));
settings = Settings.builder().put("index.translog.flush_threshold_size", (String) null).build();
client().admin().indices().prepareUpdateSettings("test").setSettings(settings).get();

client().prepareIndex("test", "doc", UUIDs.randomBase64UUID()).setSource("{}", XContentType.JSON).get();
client().admin().indices().prepareFlush("test").setForce(randomBoolean()).setWaitIfOngoing(true).get();
final FlushStats flushStats = client().admin().indices().prepareStats("test").clear().setFlush(true).get().getTotal().flush;
assertThat(flushStats.getTotal(), greaterThan(flushStats.getPeriodic()));
}

public void testShardHasMemoryBufferOnTranslogRecover() throws Throwable {
createIndex("test");
ensureGreen();
Expand Down