Skip to content

Commit

Permalink
A replica can be promoted and started in one cluster state update (#3…
Browse files Browse the repository at this point in the history
…2042)

When a replica is fully recovered (i.e., in `POST_RECOVERY` state) we send a request to the master
to start the shard. The master changes the state of the replica and publishes a cluster state to that
effect. In certain cases, that cluster state can be processed on the node hosting the replica
*together* with a cluster state that promotes that, now started, replica to a primary. This can
happen due to cluster state batched processing or if the master died after having committed the
cluster state that starts the shard but before publishing it to the node with the replica. If the master
also held the primary shard, the new master node will remove the primary (as it failed) and will also
immediately promote the replica (thinking it is started).

Sadly our code in IndexShard didn't allow for this which caused [assertions](https://github.com/elastic/elasticsearch/blob/13917162ad5c59a96ccb4d6a81a5044546c45c22/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java#L482) to be tripped in some of our tests runs.
  • Loading branch information
bleskes committed Jul 19, 2018
1 parent e598cef commit 8d41d4d
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 87 deletions.
42 changes: 25 additions & 17 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
Expand Down Expand Up @@ -403,21 +402,10 @@ public void updateShardState(final ShardRouting newRouting,
if (state == IndexShardState.POST_RECOVERY && newRouting.active()) {
assert currentRouting.active() == false : "we are in POST_RECOVERY, but our shard routing is active " + currentRouting;

if (newRouting.primary()) {
final DiscoveryNode recoverySourceNode = recoveryState.getSourceNode();
final Engine engine = getEngine();
if (currentRouting.isRelocationTarget() == false || recoverySourceNode.getVersion().before(Version.V_6_0_0_alpha1)) {
// there was no primary context hand-off in < 6.0.0, need to manually activate the shard
replicationTracker.activatePrimaryMode(getEngine().getLocalCheckpointTracker().getCheckpoint());
}
if (currentRouting.isRelocationTarget() == true && recoverySourceNode.getVersion().before(Version.V_6_0_0_alpha1)) {
// Flush the translog as it may contain operations with no sequence numbers. We want to make sure those
// operations will never be replayed as part of peer recovery to avoid an arbitrary mixture of operations with seq#
// (due to active indexing) and operations without a seq# coming from the translog. We therefore flush
// to create a lucene commit point to an empty translog file.
engine.flush(false, true);
}
}
assert currentRouting.isRelocationTarget() == false || currentRouting.primary() == false ||
recoveryState.getSourceNode().getVersion().before(Version.V_6_0_0_alpha1) ||
replicationTracker.isPrimaryMode() :
"a primary relocation is completed by the master, but primary mode is not active " + currentRouting;

changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]");
} else if (currentRouting.primary() && currentRouting.relocating() && replicationTracker.isPrimaryMode() == false &&
Expand All @@ -433,7 +421,22 @@ public void updateShardState(final ShardRouting newRouting,
final CountDownLatch shardStateUpdated = new CountDownLatch(1);

if (newRouting.primary()) {
if (newPrimaryTerm != primaryTerm) {
if (newPrimaryTerm == primaryTerm) {
if (currentRouting.initializing() && newRouting.active()) {
if (currentRouting.isRelocationTarget() == false) {
// the master started a recovering primary, activate primary mode.
replicationTracker.activatePrimaryMode(getLocalCheckpoint());
} else if (recoveryState.getSourceNode().getVersion().before(Version.V_6_0_0_alpha1)) {
// there was no primary context hand-off in < 6.0.0, need to manually activate the shard
replicationTracker.activatePrimaryMode(getLocalCheckpoint());
// Flush the translog as it may contain operations with no sequence numbers. We want to make sure those
// operations will never be replayed as part of peer recovery to avoid an arbitrary mixture of operations with
// seq# (due to active indexing) and operations without a seq# coming from the translog. We therefore flush
// to create a lucene commit point to an empty translog file.
getEngine().flush(false, true);
}
}
} else {
assert currentRouting.primary() == false : "term is only increased as part of primary promotion";
/* Note that due to cluster state batching an initializing primary shard term can failed and re-assigned
* in one state causing it's term to be incremented. Note that if both current shard state and new
Expand Down Expand Up @@ -531,6 +534,11 @@ public void onFailure(Exception e) {
}
// set this last, once we finished updating all internal state.
this.shardRouting = newRouting;

assert this.shardRouting.primary() == false ||
this.shardRouting.started() == false || // note that we use started and not active to avoid relocating shards
this.replicationTracker.isPrimaryMode()
: "an started primary must be in primary mode " + this.shardRouting;
shardStateUpdated.countDown();
}
if (currentRouting != null && currentRouting.active() == false && newRouting.active()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -265,7 +264,7 @@ public synchronized IndexShard addReplicaWithExistingPath(final ShardPath shardP
RecoverySource.PeerRecoverySource.INSTANCE);

final IndexShard newReplica =
newShard(shardRouting, shardPath, indexMetaData, null, getEngineFactory(shardRouting), () -> {}, EMPTY_EVENT_LISTENER);
newShard(shardRouting, shardPath, indexMetaData, null, getEngineFactory(shardRouting), () -> {}, EMPTY_EVENT_LISTENER);
replicas.add(newReplica);
updateAllocationIDsOnPrimary();
return newReplica;
Expand Down Expand Up @@ -341,8 +340,11 @@ public void recoverReplica(
IndexShard replica,
BiFunction<IndexShard, DiscoveryNode, RecoveryTarget> targetSupplier,
boolean markAsRecovering) throws IOException {
ESIndexLevelReplicationTestCase.this.recoverReplica(replica, primary, targetSupplier, markAsRecovering, activeIds(),
routingTable(Function.identity()));
final IndexShardRoutingTable routingTable = routingTable(Function.identity());
final Set<String> inSyncIds = activeIds();
ESIndexLevelReplicationTestCase.this.recoverUnstartedReplica(replica, primary, targetSupplier, markAsRecovering, inSyncIds,
routingTable);
ESIndexLevelReplicationTestCase.this.startReplicaAfterRecovery(replica, primary, inSyncIds, routingTable);
}

public synchronized DiscoveryNode getPrimaryNode() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ public void testSeqNoCollision() throws Exception {
logger.info("--> Promote replica2 as the primary");
shards.promoteReplicaToPrimary(replica2);
logger.info("--> Recover replica3 from replica2");
recoverReplica(replica3, replica2);
recoverReplica(replica3, replica2, true);
try (Translog.Snapshot snapshot = getTranslog(replica3).newSnapshot()) {
assertThat(snapshot.totalOperations(), equalTo(initDocs + 1));
assertThat(snapshot.next(), equalTo(op2));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ public void testPersistenceStateMetadataPersistence() throws Exception {
}

public void testFailShard() throws Exception {
allowShardFailures();
IndexShard shard = newStartedShard();
final ShardPath shardPath = shard.shardPath();
assertNotNull(shardPath);
Expand Down Expand Up @@ -304,7 +305,8 @@ public void testRejectOperationPermitWithHigherTermWhenNotStarted() throws IOExc
}

public void testPrimaryPromotionDelaysOperations() throws IOException, BrokenBarrierException, InterruptedException {
final IndexShard indexShard = newStartedShard(false);
final IndexShard indexShard = newShard(false);
recoveryEmptyReplica(indexShard, randomBoolean());

final int operations = scaledRandomIntBetween(1, 64);
final CyclicBarrier barrier = new CyclicBarrier(1 + operations);
Expand Down Expand Up @@ -348,20 +350,10 @@ public void onFailure(Exception e) {
barrier.await();
latch.await();

// promote the replica
final ShardRouting replicaRouting = indexShard.routingEntry();
final ShardRouting primaryRouting =
newShardRouting(
replicaRouting.shardId(),
replicaRouting.currentNodeId(),
null,
true,
ShardRoutingState.STARTED,
replicaRouting.allocationId());
indexShard.updateShardState(primaryRouting, indexShard.getPrimaryTerm() + 1, (shard, listener) -> {},
0L, Collections.singleton(primaryRouting.allocationId().getId()),
new IndexShardRoutingTable.Builder(primaryRouting.shardId()).addShard(primaryRouting).build(),
Collections.emptySet());
promoteReplica(indexShard, Collections.singleton(replicaRouting.allocationId().getId()),
new IndexShardRoutingTable.Builder(replicaRouting.shardId()).addShard(replicaRouting).build());


final int delayedOperations = scaledRandomIntBetween(1, 64);
final CyclicBarrier delayedOperationsBarrier = new CyclicBarrier(1 + delayedOperations);
Expand Down Expand Up @@ -423,8 +415,9 @@ public void onFailure(Exception e) {
* 1) Internal state (ala ReplicationTracker) have been updated
* 2) Primary term is set to the new term
*/
public void testPublishingOrderOnPromotion() throws IOException, BrokenBarrierException, InterruptedException {
final IndexShard indexShard = newStartedShard(false);
public void testPublishingOrderOnPromotion() throws IOException, InterruptedException, BrokenBarrierException {
final IndexShard indexShard = newShard(false);
recoveryEmptyReplica(indexShard, randomBoolean());
final long promotedTerm = indexShard.getPrimaryTerm() + 1;
final CyclicBarrier barrier = new CyclicBarrier(2);
final AtomicBoolean stop = new AtomicBoolean();
Expand All @@ -443,18 +436,10 @@ public void testPublishingOrderOnPromotion() throws IOException, BrokenBarrierEx
});
thread.start();

final ShardRouting replicaRouting = indexShard.routingEntry();
final ShardRouting primaryRouting = newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), null, true,
ShardRoutingState.STARTED, replicaRouting.allocationId());


final Set<String> inSyncAllocationIds = Collections.singleton(primaryRouting.allocationId().getId());
final IndexShardRoutingTable routingTable =
new IndexShardRoutingTable.Builder(primaryRouting.shardId()).addShard(primaryRouting).build();
barrier.await();
// promote the replica
indexShard.updateShardState(primaryRouting, promotedTerm, (shard, listener) -> {}, 0L, inSyncAllocationIds, routingTable,
Collections.emptySet());
final ShardRouting replicaRouting = indexShard.routingEntry();
promoteReplica(indexShard, Collections.singleton(replicaRouting.allocationId().getId()),
new IndexShardRoutingTable.Builder(replicaRouting.shardId()).addShard(replicaRouting).build());

stop.set(true);
thread.join();
Expand All @@ -463,7 +448,8 @@ public void testPublishingOrderOnPromotion() throws IOException, BrokenBarrierEx


public void testPrimaryFillsSeqNoGapsOnPromotion() throws Exception {
final IndexShard indexShard = newStartedShard(false);
final IndexShard indexShard = newShard(false);
recoveryEmptyReplica(indexShard, randomBoolean());

// most of the time this is large enough that most of the time there will be at least one gap
final int operations = 1024 - scaledRandomIntBetween(0, 1024);
Expand All @@ -474,17 +460,8 @@ public void testPrimaryFillsSeqNoGapsOnPromotion() throws Exception {

// promote the replica
final ShardRouting replicaRouting = indexShard.routingEntry();
final ShardRouting primaryRouting =
newShardRouting(
replicaRouting.shardId(),
replicaRouting.currentNodeId(),
null,
true,
ShardRoutingState.STARTED,
replicaRouting.allocationId());
indexShard.updateShardState(primaryRouting, indexShard.getPrimaryTerm() + 1, (shard, listener) -> {},
0L, Collections.singleton(primaryRouting.allocationId().getId()),
new IndexShardRoutingTable.Builder(primaryRouting.shardId()).addShard(primaryRouting).build(), Collections.emptySet());
promoteReplica(indexShard, Collections.singleton(replicaRouting.allocationId().getId()),
new IndexShardRoutingTable.Builder(replicaRouting.shardId()).addShard(replicaRouting).build());

/*
* This operation completing means that the delay operation executed as part of increasing the primary term has completed and the
Expand All @@ -501,7 +478,7 @@ public void onResponse(Releasable releasable) {

@Override
public void onFailure(Exception e) {
throw new RuntimeException(e);
throw new AssertionError(e);
}
},
ThreadPool.Names.GENERIC, "");
Expand Down Expand Up @@ -840,7 +817,7 @@ public void testGlobalCheckpointSync() throws IOException {
// add a replica
recoverShardFromStore(primaryShard);
final IndexShard replicaShard = newShard(shardId, false);
recoverReplica(replicaShard, primaryShard);
recoverReplica(replicaShard, primaryShard, true);
final int maxSeqNo = randomIntBetween(0, 128);
for (int i = 0; i <= maxSeqNo; i++) {
primaryShard.getEngine().getLocalCheckpointTracker().generateSeqNo();
Expand Down Expand Up @@ -1619,7 +1596,7 @@ public void testPrimaryHandOffUpdatesLocalCheckpoint() throws IOException {
IndexShardTestCase.updateRoutingEntry(primarySource, primarySource.routingEntry().relocate(randomAlphaOfLength(10), -1));
final IndexShard primaryTarget = newShard(primarySource.routingEntry().getTargetRelocatingShard());
updateMappings(primaryTarget, primarySource.indexSettings().getIndexMetaData());
recoverReplica(primaryTarget, primarySource);
recoverReplica(primaryTarget, primarySource, true);

// check that local checkpoint of new primary is properly tracked after primary relocation
assertThat(primaryTarget.getLocalCheckpoint(), equalTo(totalOps - 1L));
Expand Down Expand Up @@ -2055,7 +2032,7 @@ public long indexTranslogOperations(List<Translog.Operation> operations, int tot
assertFalse(replica.isSyncNeeded());
return localCheckpoint;
}
}, true);
}, true, true);

closeShards(primary, replica);
}
Expand Down Expand Up @@ -2162,7 +2139,7 @@ public long indexTranslogOperations(List<Translog.Operation> operations, int tot
assertTrue(replica.isActive());
return localCheckpoint;
}
}, false);
}, false, true);

closeShards(primary, replica);
}
Expand Down Expand Up @@ -2214,7 +2191,7 @@ public void finalizeRecovery(long globalCheckpoint) throws IOException {
super.finalizeRecovery(globalCheckpoint);
assertListenerCalled.accept(replica);
}
}, false);
}, false, true);

closeShards(primary, replica);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,14 @@ public void updateShardState(ShardRouting shardRouting,
assertTrue("and active shard must stay active, current: " + this.shardRouting + ", got: " + shardRouting,
shardRouting.active());
}
if (this.shardRouting.primary()) {
assertTrue("a primary shard can't be demoted", shardRouting.primary());
} else if (shardRouting.primary()) {
// note: it's ok for a replica in post recovery to be started and promoted at once
// this can happen when the primary failed after we sent the start shard message
assertTrue("a replica can only be promoted when active. current: " + this.shardRouting + " new: " + shardRouting,
shardRouting.active());
}
this.shardRouting = shardRouting;
if (shardRouting.primary()) {
term = newPrimaryTerm;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void testGetStartingSeqNo() throws Exception {
try {
// Empty store
{
recoveryEmptyReplica(replica);
recoveryEmptyReplica(replica, true);
final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null);
assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(0L));
recoveryTarget.decRef();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ public void testPeerRecoverySendSafeCommitInFileBased() throws Exception {
}
IndexShard replicaShard = newShard(primaryShard.shardId(), false);
updateMappings(replicaShard, primaryShard.indexSettings().getIndexMetaData());
recoverReplica(replicaShard, primaryShard);
recoverReplica(replicaShard, primaryShard, true);
List<IndexCommit> commits = DirectoryReader.listCommits(replicaShard.store().directory());
long maxSeqNo = Long.parseLong(commits.get(0).getUserData().get(SequenceNumbers.MAX_SEQ_NO));
assertThat(maxSeqNo, lessThanOrEqualTo(globalCheckpoint));
Expand Down
Loading

0 comments on commit 8d41d4d

Please sign in to comment.