Skip to content

Commit

Permalink
Skip global checkpoint sync for closed indices (#41874)
Browse files Browse the repository at this point in the history
The verifying-before-close step ensures the global checkpoints on all
shard copies are in sync; thus, we don' t need to sync global
checkpoints for closed indices.

Relate #33888
  • Loading branch information
dnhatn committed May 21, 2019
1 parent 4d55e9e commit 3573b1d
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2137,8 +2137,8 @@ public void maybeSyncGlobalCheckpoint(final String reason) {
StreamSupport
.stream(globalCheckpoints.values().spliterator(), false)
.anyMatch(v -> v.value < globalCheckpoint);
// only sync if there is a shard lagging the primary
if (syncNeeded) {
// only sync if index is not closed and there is a shard lagging the primary
if (syncNeeded && indexSettings.getIndexMetaData().getState() == IndexMetaData.State.OPEN) {
logger.trace("syncing global checkpoint for [{}]", reason);
globalCheckpointSyncer.run();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1141,6 +1141,33 @@ public void testGlobalCheckpointSync() throws IOException {
closeShards(replicaShard, primaryShard);
}

public void testClosedIndicesSkipSyncGlobalCheckpoint() throws Exception {
ShardId shardId = new ShardId("index", "_na_", 0);
IndexMetaData.Builder indexMetadata = IndexMetaData.builder("index")
.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2))
.state(IndexMetaData.State.CLOSE).primaryTerm(0, 1);
ShardRouting shardRouting = TestShardRouting.newShardRouting(shardId, randomAlphaOfLength(8), true,
ShardRoutingState.INITIALIZING, RecoverySource.EmptyStoreRecoverySource.INSTANCE);
AtomicBoolean synced = new AtomicBoolean();
IndexShard primaryShard = newShard(shardRouting, indexMetadata.build(), null, new InternalEngineFactory(),
() -> synced.set(true), RetentionLeaseSyncer.EMPTY);
recoverShardFromStore(primaryShard);
IndexShard replicaShard = newShard(shardId, false);
recoverReplica(replicaShard, primaryShard, true);
int numDocs = between(1, 10);
for (int i = 0; i < numDocs; i++) {
indexDoc(primaryShard, "_doc", Integer.toString(i));
}
assertThat(primaryShard.getLocalCheckpoint(), equalTo(numDocs - 1L));
primaryShard.updateLocalCheckpointForShard(replicaShard.shardRouting.allocationId().getId(), primaryShard.getLocalCheckpoint());
long globalCheckpointOnReplica = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, primaryShard.getLocalCheckpoint());
primaryShard.updateGlobalCheckpointForShard(replicaShard.shardRouting.allocationId().getId(), globalCheckpointOnReplica);
primaryShard.maybeSyncGlobalCheckpoint("test");
assertFalse("closed indices should skip global checkpoint sync", synced.get());
closeShards(primaryShard, replicaShard);
}

public void testRestoreLocalHistoryFromTranslogOnPromotion() throws IOException, InterruptedException {
final IndexShard indexShard = newStartedShard(false);
final int operations = 1024 - scaledRandomIntBetween(0, 1024);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndexClosedException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.test.BackgroundIndexer;
import org.elasticsearch.test.ESIntegTestCase;
Expand Down Expand Up @@ -424,6 +426,31 @@ public Settings onNodeStopped(String nodeName) throws Exception {
}
}

public void testResyncPropagatePrimaryTerm() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(3);
final String indexName = "closed_indices_promotion";
createIndex(indexName, Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2)
.build());
indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, randomIntBetween(0, 50))
.mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList()));
ensureGreen(indexName);
assertAcked(client().admin().indices().prepareClose(indexName));
assertIndexIsClosed(indexName);
ensureGreen(indexName);
String nodeWithPrimary = clusterService().state().nodes().get(clusterService().state()
.routingTable().index(indexName).shard(0).primaryShard().currentNodeId()).getName();
internalCluster().restartNode(nodeWithPrimary, new InternalTestCluster.RestartCallback());
ensureGreen(indexName);
long primaryTerm = clusterService().state().metaData().index(indexName).primaryTerm(0);
for (String nodeName : internalCluster().nodesInclude(indexName)) {
IndexShard shard = internalCluster().getInstance(IndicesService.class, nodeName)
.indexService(resolveIndex(indexName)).getShard(0);
assertThat(shard.routingEntry().toString(), shard.getOperationPrimaryTerm(), equalTo(primaryTerm));
}
}

static void assertIndexIsClosed(final String... indices) {
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
for (String index : indices) {
Expand Down

0 comments on commit 3573b1d

Please sign in to comment.