Skip to content

Commit

Permalink
Flush instead of synced-flush inactive shards (#49126)
Browse files Browse the repository at this point in the history
With peer recovery retention leases and sequence-number based replica 
allocation, a regular flush can speed up recovery as a synced-flush.
With this change, we will flush instead of synced-flush when a shard
becomes inactive.

Closes #31965
  • Loading branch information
dnhatn authored Nov 22, 2019
1 parent b8ce07b commit 725dda3
Show file tree
Hide file tree
Showing 12 changed files with 87 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,18 +118,6 @@ public void afterIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSha
}
}

@Override
public void onShardInactive(IndexShard indexShard) {
for (IndexEventListener listener : listeners) {
try {
listener.onShardInactive(indexShard);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("[{}] failed to invoke on shard inactive callback",
indexShard.shardId().getId()), e);
throw e;
}
}
}

@Override
public void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardState previousState, IndexShardState currentState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,6 @@ default void afterIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh
default void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardState previousState,
IndexShardState currentState, @Nullable String reason) {}

/**
* Called when a shard is marked as inactive
*
* @param indexShard The shard that was marked inactive
*/
default void onShardInactive(IndexShard indexShard) {}

/**
* Called before the index gets created. Note that this is also called
* when the index is created on data nodes
Expand Down
27 changes: 18 additions & 9 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1578,7 +1578,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
onNewEngine(newEngine);
currentEngineReference.set(newEngine);
// We set active because we are now writing operations to the engine; this way,
// if we go idle after some time and become inactive, we still give sync'd flush a chance to run.
// we can flush if we go idle after some time and become inactive.
active.set(true);
}
// time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during
Expand Down Expand Up @@ -1758,19 +1758,28 @@ public void addShardFailureCallback(Consumer<ShardFailure> onShardFailure) {

/**
* Called by {@link IndexingMemoryController} to check whether more than {@code inactiveTimeNS} has passed since the last
* indexing operation, and notify listeners that we are now inactive so e.g. sync'd flush can happen.
* indexing operation, so we can flush the index.
*/
public void checkIdle(long inactiveTimeNS) {
public void flushOnIdle(long inactiveTimeNS) {
Engine engineOrNull = getEngineOrNull();
if (engineOrNull != null && System.nanoTime() - engineOrNull.getLastWriteNanos() >= inactiveTimeNS) {
boolean wasActive = active.getAndSet(false);
if (wasActive) {
logger.debug("shard is now inactive");
try {
indexEventListener.onShardInactive(this);
} catch (Exception e) {
logger.warn("failed to notify index event listener", e);
}
logger.debug("flushing shard on inactive");
threadPool.executor(ThreadPool.Names.FLUSH).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
if (state != IndexShardState.CLOSED) {
logger.warn("failed to flush shard on inactive", e);
}
}

@Override
protected void doRun() {
flush(new FlushRequest().waitIfOngoing(false).force(false));
periodicFlushMetric.inc();
}
});
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ private void runUnlocked() {
long totalBytesWriting = 0;
for (IndexShard shard : availableShards()) {

// Give shard a chance to transition to inactive so sync'd flush can happen:
// Give shard a chance to transition to inactive so we can flush
checkIdle(shard, inactiveTime.nanos());

// How many bytes this shard is currently (async'd) moving from heap to disk:
Expand Down Expand Up @@ -400,7 +400,7 @@ private void runUnlocked() {
*/
protected void checkIdle(IndexShard shard, long inactiveTimeNS) {
try {
shard.checkIdle(inactiveTimeNS);
shard.flushOnIdle(inactiveTimeNS);
} catch (AlreadyClosedException e) {
logger.trace(() -> new ParameterizedMessage("ignore exception while checking if shard {} is inactive", shard.shardId()), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,13 +177,7 @@ public IndicesClusterStateService(
final RetentionLeaseSyncer retentionLeaseSyncer,
final NodeClient client) {
this.settings = settings;
this.buildInIndexListener =
Arrays.asList(
peerRecoverySourceService,
recoveryTargetService,
searchService,
syncedFlushService,
snapshotShardsService);
this.buildInIndexListener = Arrays.asList(peerRecoverySourceService, recoveryTargetService, searchService, snapshotShardsService);
this.indicesService = indicesService;
this.clusterService = clusterService;
this.threadPool = threadPool;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
Expand All @@ -71,7 +70,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentMap;

public class SyncedFlushService implements IndexEventListener {
public class SyncedFlushService {

private static final Logger logger = LogManager.getLogger(SyncedFlushService.class);

Expand Down Expand Up @@ -101,25 +100,6 @@ public SyncedFlushService(IndicesService indicesService,
new InFlightOpCountTransportHandler());
}

@Override
public void onShardInactive(final IndexShard indexShard) {
// we only want to call sync flush once, so only trigger it when we are on a primary
if (indexShard.routingEntry().primary()) {
attemptSyncedFlush(indexShard.shardId(), new ActionListener<ShardsSyncedFlushResult>() {
@Override
public void onResponse(ShardsSyncedFlushResult syncedFlushResult) {
logger.trace("{} sync flush on inactive shard returned successfully for sync_id: {}",
syncedFlushResult.getShardId(), syncedFlushResult.syncId());
}

@Override
public void onFailure(Exception e) {
logger.debug(() -> new ParameterizedMessage("{} sync flush on inactive shard failed", indexShard.shardId()), e);
}
});
}
}

/**
* a utility method to perform a synced flush for all shards of multiple indices.
* see {@link #attemptSyncedFlush(ShardId, ActionListener)}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ public void testShrinkCommitsMergeOnIdle() throws Exception {
IndexService indexShards = service.indexService(target.getIndex());
IndexShard shard = indexShards.getShard(0);
assertTrue(shard.isActive());
shard.checkIdle(0);
shard.flushOnIdle(0);
assertFalse(shard.isActive());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
Expand Down Expand Up @@ -167,23 +166,6 @@ public void testLockTryingToDelete() throws Exception {
}
}

public void testMarkAsInactiveTriggersSyncedFlush() throws Exception {
assertAcked(client().admin().indices().prepareCreate("test")
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0)));
client().prepareIndex("test").setSource("{}", XContentType.JSON).get();
ensureGreen("test");
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
indicesService.indexService(resolveIndex("test")).getShardOrNull(0).checkIdle(0);
assertBusy(() -> {
IndexStats indexStats = client().admin().indices().prepareStats("test").clear().get().getIndex("test");
assertNotNull(indexStats.getShards()[0].getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID));
indicesService.indexService(resolveIndex("test")).getShardOrNull(0).checkIdle(0);
}
);
IndexStats indexStats = client().admin().indices().prepareStats("test").get().getIndex("test");
assertNotNull(indexStats.getShards()[0].getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID));
}

public void testDurableFlagHasEffect() throws Exception {
createIndex("test");
ensureGreen();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3419,7 +3419,7 @@ public void testScheduledRefresh() throws Exception {
indexDoc(primary, "_doc", "2", "{\"foo\" : \"bar\"}");
assertFalse(primary.scheduledRefresh());
assertTrue(primary.isSearchIdle());
primary.checkIdle(0);
primary.flushOnIdle(0);
assertTrue(primary.scheduledRefresh()); // make sure we refresh once the shard is inactive
try (Engine.Searcher searcher = primary.acquireSearcher("test")) {
assertEquals(3, searcher.getIndexReader().numDocs());
Expand Down Expand Up @@ -3629,49 +3629,24 @@ public void testSegmentMemoryTrackedWithRandomSearchers() throws Exception {
assertThat(breaker.getUsed(), equalTo(0L));
}

public void testFlushOnInactive() throws Exception {
Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.build();
IndexMetaData metaData = IndexMetaData.builder("test")
.putMapping("{ \"properties\": { \"foo\": { \"type\": \"text\"}}}")
.settings(settings)
.primaryTerm(0, 1).build();
ShardRouting shardRouting =
TestShardRouting.newShardRouting(new ShardId(metaData.getIndex(), 0), "n1", true,
ShardRoutingState.INITIALIZING, RecoverySource.EmptyStoreRecoverySource.INSTANCE);
final ShardId shardId = shardRouting.shardId();
final NodeEnvironment.NodePath nodePath = new NodeEnvironment.NodePath(createTempDir());
ShardPath shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId);
AtomicBoolean markedInactive = new AtomicBoolean();
AtomicReference<IndexShard> primaryRef = new AtomicReference<>();
IndexShard primary = newShard(shardRouting, shardPath, metaData, null, null, new InternalEngineFactory(), () -> { },
RetentionLeaseSyncer.EMPTY, new IndexEventListener() {
@Override
public void onShardInactive(IndexShard indexShard) {
markedInactive.set(true);
primaryRef.get().flush(new FlushRequest());
}
});
primaryRef.set(primary);
recoverShardFromStore(primary);
public void testFlushOnIdle() throws Exception {
IndexShard shard = newStartedShard();
for (int i = 0; i < 3; i++) {
indexDoc(primary, "_doc", "" + i, "{\"foo\" : \"" + randomAlphaOfLength(10) + "\"}");
primary.refresh("test"); // produce segments
indexDoc(shard, "_doc", Integer.toString(i));
shard.refresh("test"); // produce segments
}
List<Segment> segments = primary.segments(false);
List<Segment> segments = shard.segments(false);
Set<String> names = new HashSet<>();
for (Segment segment : segments) {
assertFalse(segment.committed);
assertTrue(segment.search);
names.add(segment.getName());
}
assertEquals(3, segments.size());
primary.flush(new FlushRequest());
primary.forceMerge(new ForceMergeRequest().maxNumSegments(1).flush(false));
primary.refresh("test");
segments = primary.segments(false);
shard.flush(new FlushRequest());
shard.forceMerge(new ForceMergeRequest().maxNumSegments(1).flush(false));
shard.refresh("test");
segments = shard.segments(false);
for (Segment segment : segments) {
if (names.contains(segment.getName())) {
assertTrue(segment.committed);
Expand All @@ -3683,20 +3658,18 @@ public void onShardInactive(IndexShard indexShard) {
}
assertEquals(4, segments.size());

assertFalse(markedInactive.get());
assertBusy(() -> {
primary.checkIdle(0);
assertFalse(primary.isActive());
});
shard.flushOnIdle(0);
assertFalse(shard.isActive());

assertTrue(markedInactive.get());
segments = primary.segments(false);
assertEquals(1, segments.size());
for (Segment segment : segments) {
assertTrue(segment.committed);
assertTrue(segment.search);
}
closeShards(primary);
assertBusy(() -> { // flush happens in the background using the flush threadpool
List<Segment> segmentsAfterFlush = shard.segments(false);
assertEquals(1, segmentsAfterFlush.size());
for (Segment segment : segmentsAfterFlush) {
assertTrue(segment.committed);
assertTrue(segment.search);
}
});
closeShards(shard);
}

public void testOnCloseStats() throws IOException {
Expand Down
38 changes: 38 additions & 0 deletions server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.InternalEngine;
Expand All @@ -47,15 +48,22 @@
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndexingMemoryController;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalSettingsPlugin;
import org.elasticsearch.test.InternalTestCluster;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
Expand All @@ -71,6 +79,11 @@
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST)
public class FlushIT extends ESIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singletonList(InternalSettingsPlugin.class);
}

public void testWaitIfOngoing() throws InterruptedException {
createIndex("test");
ensureGreen("test");
Expand Down Expand Up @@ -369,4 +382,29 @@ public void testDoNotRenewSyncedFlushWhenAllSealed() throws Exception {
assertThat(forthSeal.successfulShards(), equalTo(numberOfReplicas + 1));
assertThat(forthSeal.syncId(), not(equalTo(thirdSeal.syncId())));
}

public void testFlushOnInactive() throws Exception {
final String indexName = "flush_on_inactive";
List<String> dataNodes = internalCluster().startDataOnlyNodes(2, Settings.builder()
.put(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.getKey(), randomTimeValue(10, 1000, "ms")).build());
assertAcked(client().admin().indices().prepareCreate(indexName).setSettings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), randomTimeValue(50, 200, "ms"))
.put("index.routing.allocation.include._name", String.join(",", dataNodes))
.build()));
ensureGreen(indexName);
int numDocs = randomIntBetween(1, 10);
for (int i = 0; i < numDocs; i++) {
client().prepareIndex(indexName).setSource("f", "v").get();
}
if (randomBoolean()) {
internalCluster().restartNode(randomFrom(dataNodes), new InternalTestCluster.RestartCallback());
ensureGreen(indexName);
}
assertBusy(() -> {
for (ShardStats shardStats : client().admin().indices().prepareStats(indexName).get().getShards()) {
assertThat(shardStats.getStats().getTranslog().getUncommittedOperations(), equalTo(0));
}
}, 30, TimeUnit.SECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1983,9 +1983,13 @@ public List<String> startMasterOnlyNodes(int numNodes, Settings settings) {
}

public List<String> startDataOnlyNodes(int numNodes) {
return startDataOnlyNodes(numNodes, Settings.EMPTY);
}

public List<String> startDataOnlyNodes(int numNodes, Settings settings) {
return startNodes(
numNodes,
Settings.builder().put(Settings.EMPTY).put(Node.NODE_MASTER_SETTING.getKey(), false)
Settings.builder().put(settings).put(Node.NODE_MASTER_SETTING.getKey(), false)
.put(Node.NODE_DATA_SETTING.getKey(), true).build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,6 @@ public void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardSt
delegate.indexShardStateChanged(indexShard, previousState, currentState, reason);
}

@Override
public void onShardInactive(IndexShard indexShard) {
delegate.onShardInactive(indexShard);
}

@Override
public void beforeIndexCreated(Index index, Settings indexSettings) {
delegate.beforeIndexCreated(index, indexSettings);
Expand Down

0 comments on commit 725dda3

Please sign in to comment.