Skip to content

Commit

Permalink
Verify primary mode usage with assertions (#32667)
Browse files Browse the repository at this point in the history
Primary terms were introduced as part of the sequence-number effort (#10708) and added in ES
5.0. Subsequent work introduced the replication tracker which lets the primary own its replication
group (#25692) to coordinate recovery and replication. The replication tracker explicitly exposes
whether it is operating in primary mode or replica mode, independent of the ShardRouting object
that's associated with a shard. During a primary relocation, for example, the primary mode is
transferred between the primary relocation source and the primary relocation target. After
transferring this so-called primary context, the old primary becomes a replication target and the
new primary the replication source, reflected in the replication tracker on both nodes. With the
most recent PR in this area (#32442), we finally have a clean transition between a shard that's
operating as a primary and issuing sequence numbers and a shard that's serving as a replication
target. The transition from one state to the other is enforced through the operation-permit system,
where we block permit acquisition during such changes and perform the transition under this
operation block, ensuring that there are no operations in progress while the transition is being
performed. This finally allows us to turn the best-effort checks that were put in place to prevent
shards from being used in the wrong way (i.e. primary as replica, or replica as primary) into hard
assertions, making it easier to catch any bugs in this area.
  • Loading branch information
ywelsch authored Aug 7, 2018
1 parent 3ce984d commit 45066b5
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 41 deletions.
47 changes: 21 additions & 26 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1446,30 +1446,25 @@ private void ensureWriteAllowed(Engine.Operation.Origin origin) throws IllegalIn
}
} else {
if (origin == Engine.Operation.Origin.PRIMARY) {
verifyPrimary();
assert assertPrimaryMode();
} else {
assert origin == Engine.Operation.Origin.REPLICA;
verifyReplicationTarget();
assert assertReplicationTarget();
}
if (writeAllowedStates.contains(state) == false) {
throw new IllegalIndexShardStateException(shardId, state, "operation only allowed when shard state is one of " + writeAllowedStates + ", origin [" + origin + "]");
}
}
}

private void verifyPrimary() {
if (shardRouting.primary() == false) {
throw new IllegalStateException("shard " + shardRouting + " is not a primary");
}
private boolean assertPrimaryMode() {
assert shardRouting.primary() && replicationTracker.isPrimaryMode() : "shard " + shardRouting + " is not a primary shard in primary mode";
return true;
}

private void verifyReplicationTarget() {
final IndexShardState state = state();
if (shardRouting.primary() && shardRouting.active() && replicationTracker.isPrimaryMode()) {
// must use exception that is not ignored by replication logic. See TransportActions.isShardNotAvailableException
throw new IllegalStateException("active primary shard " + shardRouting + " cannot be a replication target before " +
"relocation hand off, state is [" + state + "]");
}
private boolean assertReplicationTarget() {
assert replicationTracker.isPrimaryMode() == false : "shard " + shardRouting + " in primary mode cannot be a replication target";
return true;
}

private void verifyNotClosed() throws IllegalIndexShardStateException {
Expand Down Expand Up @@ -1716,7 +1711,7 @@ public void writeIndexingBuffer() {
* @param checkpoint the local checkpoint for the shard
*/
public void updateLocalCheckpointForShard(final String allocationId, final long checkpoint) {
verifyPrimary();
assert assertPrimaryMode();
verifyNotClosed();
replicationTracker.updateLocalCheckpoint(allocationId, checkpoint);
}
Expand All @@ -1728,7 +1723,7 @@ public void updateLocalCheckpointForShard(final String allocationId, final long
* @param globalCheckpoint the global checkpoint
*/
public void updateGlobalCheckpointForShard(final String allocationId, final long globalCheckpoint) {
verifyPrimary();
assert assertPrimaryMode();
verifyNotClosed();
replicationTracker.updateGlobalCheckpointForShard(allocationId, globalCheckpoint);
}
Expand All @@ -1750,7 +1745,7 @@ public void waitForOpsToComplete(final long seqNo) throws InterruptedException {
* @param allocationId the allocation ID of the shard for which recovery was initiated
*/
public void initiateTracking(final String allocationId) {
verifyPrimary();
assert assertPrimaryMode();
replicationTracker.initiateTracking(allocationId);
}

Expand All @@ -1763,7 +1758,7 @@ public void initiateTracking(final String allocationId) {
* @param localCheckpoint the current local checkpoint on the shard
*/
public void markAllocationIdAsInSync(final String allocationId, final long localCheckpoint) throws InterruptedException {
verifyPrimary();
assert assertPrimaryMode();
replicationTracker.markAllocationIdAsInSync(allocationId, localCheckpoint);
}

Expand Down Expand Up @@ -1798,7 +1793,7 @@ public long getLastSyncedGlobalCheckpoint() {
* @return a map from allocation ID to the local knowledge of the global checkpoint for that allocation ID
*/
public ObjectLongMap<String> getInSyncGlobalCheckpoints() {
verifyPrimary();
assert assertPrimaryMode();
verifyNotClosed();
return replicationTracker.getInSyncGlobalCheckpoints();
}
Expand All @@ -1808,11 +1803,12 @@ public ObjectLongMap<String> getInSyncGlobalCheckpoints() {
* primary.
*/
public void maybeSyncGlobalCheckpoint(final String reason) {
verifyPrimary();
verifyNotClosed();
assert shardRouting.primary() : "only call maybeSyncGlobalCheckpoint on primary shard";
if (replicationTracker.isPrimaryMode() == false) {
return;
}
assert assertPrimaryMode();
// only sync if there are not operations in flight
final SeqNoStats stats = getEngine().getSeqNoStats(replicationTracker.getGlobalCheckpoint());
if (stats.getMaxSeqNo() == stats.getGlobalCheckpoint()) {
Expand All @@ -1838,7 +1834,7 @@ public void maybeSyncGlobalCheckpoint(final String reason) {
* @return the replication group
*/
public ReplicationGroup getReplicationGroup() {
verifyPrimary();
assert assertPrimaryMode();
verifyNotClosed();
return replicationTracker.getReplicationGroup();
}
Expand All @@ -1850,7 +1846,7 @@ public ReplicationGroup getReplicationGroup() {
* @param reason the reason the global checkpoint was updated
*/
public void updateGlobalCheckpointOnReplica(final long globalCheckpoint, final String reason) {
verifyReplicationTarget();
assert assertReplicationTarget();
final long localCheckpoint = getLocalCheckpoint();
if (globalCheckpoint > localCheckpoint) {
/*
Expand All @@ -1877,8 +1873,7 @@ assert state() != IndexShardState.POST_RECOVERY && state() != IndexShardState.ST
* @param primaryContext the sequence number context
*/
public void activateWithPrimaryContext(final ReplicationTracker.PrimaryContext primaryContext) {
verifyPrimary();
assert shardRouting.isRelocationTarget() : "only relocation target can update allocation IDs from primary context: " + shardRouting;
assert shardRouting.primary() && shardRouting.isRelocationTarget() : "only primary relocation target can update allocation IDs from primary context: " + shardRouting;
assert primaryContext.getCheckpointStates().containsKey(routingEntry().allocationId().getId()) &&
getLocalCheckpoint() == primaryContext.getCheckpointStates().get(routingEntry().allocationId().getId()).getLocalCheckpoint();
synchronized (mutex) {
Expand All @@ -1892,7 +1887,7 @@ public void activateWithPrimaryContext(final ReplicationTracker.PrimaryContext p
* @return {@code true} if there is at least one shard pending in-sync, otherwise false
*/
public boolean pendingInSync() {
verifyPrimary();
assert assertPrimaryMode();
return replicationTracker.pendingInSync();
}

Expand Down Expand Up @@ -2209,7 +2204,7 @@ private EngineConfig newEngineConfig() {
*/
public void acquirePrimaryOperationPermit(ActionListener<Releasable> onPermitAcquired, String executorOnDelay, Object debugInfo) {
verifyNotClosed();
verifyPrimary();
assert shardRouting.primary() : "acquirePrimaryOperationPermit should only be called on primary shard: " + shardRouting;

indexShardOperationPermits.acquire(onPermitAcquired, executorOnDelay, false, debugInfo);
}
Expand Down Expand Up @@ -2259,7 +2254,6 @@ public void acquireReplicaOperationPermit(final long opPrimaryTerm, final long g
final ActionListener<Releasable> onPermitAcquired, final String executorOnDelay,
final Object debugInfo) {
verifyNotClosed();
verifyReplicationTarget();
if (opPrimaryTerm > pendingPrimaryTerm) {
synchronized (mutex) {
if (opPrimaryTerm > pendingPrimaryTerm) {
Expand Down Expand Up @@ -2312,6 +2306,7 @@ public void onResponse(final Releasable releasable) {
operationPrimaryTerm);
onPermitAcquired.onFailure(new IllegalStateException(message));
} else {
assert assertReplicationTarget();
try {
updateGlobalCheckpointOnReplica(globalCheckpoint, "operation");
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.util.Constants;
import org.elasticsearch.Assertions;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
Expand Down Expand Up @@ -560,28 +561,20 @@ public void testOperationPermitsOnPrimaryShards() throws InterruptedException, E
ShardRouting primaryRouting = newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), null,
true, ShardRoutingState.STARTED, replicaRouting.allocationId());
final long newPrimaryTerm = indexShard.getPendingPrimaryTerm() + between(1, 1000);
CountDownLatch latch = new CountDownLatch(1);
indexShard.updateShardState(primaryRouting, newPrimaryTerm, (shard, listener) -> {
assertThat(TestTranslog.getCurrentTerm(getTranslog(indexShard)), equalTo(newPrimaryTerm));
latch.countDown();
}, 0L,
Collections.singleton(indexShard.routingEntry().allocationId().getId()),
new IndexShardRoutingTable.Builder(indexShard.shardId()).addShard(primaryRouting).build(),
Collections.emptySet());
latch.await();
} else {
indexShard = newStartedShard(true);
}
final long primaryTerm = indexShard.getPendingPrimaryTerm();
assertEquals(0, indexShard.getActiveOperationsCount());
if (indexShard.routingEntry().isRelocationTarget() == false) {
try {
final PlainActionFuture<Releasable> permitAcquiredFuture = new PlainActionFuture<>();
indexShard.acquireReplicaOperationPermit(primaryTerm, indexShard.getGlobalCheckpoint(), permitAcquiredFuture,
ThreadPool.Names.WRITE, "");
permitAcquiredFuture.actionGet();
fail("shard shouldn't accept operations as replica");
} catch (IllegalStateException ignored) {

}
}
Releasable operation1 = acquirePrimaryOperationPermitBlockingly(indexShard);
assertEquals(1, indexShard.getActiveOperationsCount());
Releasable operation2 = acquirePrimaryOperationPermitBlockingly(indexShard);
Expand All @@ -590,6 +583,22 @@ public void testOperationPermitsOnPrimaryShards() throws InterruptedException, E
Releasables.close(operation1, operation2);
assertEquals(0, indexShard.getActiveOperationsCount());

if (Assertions.ENABLED && indexShard.routingEntry().isRelocationTarget() == false) {
assertThat(expectThrows(AssertionError.class, () -> indexShard.acquireReplicaOperationPermit(primaryTerm,
indexShard.getGlobalCheckpoint(), new ActionListener<Releasable>() {
@Override
public void onResponse(Releasable releasable) {
fail();
}

@Override
public void onFailure(Exception e) {
fail();
}
},
ThreadPool.Names.WRITE, "")).getMessage(), containsString("in primary mode cannot be a replication target"));
}

closeShards(indexShard);
}

Expand Down Expand Up @@ -647,11 +656,11 @@ public void testOperationPermitOnReplicaShards() throws Exception {
logger.info("shard routing to {}", shardRouting);

assertEquals(0, indexShard.getActiveOperationsCount());
if (shardRouting.primary() == false) {
final IllegalStateException e =
expectThrows(IllegalStateException.class,
if (shardRouting.primary() == false && Assertions.ENABLED) {
final AssertionError e =
expectThrows(AssertionError.class,
() -> indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.WRITE, ""));
assertThat(e, hasToString(containsString("shard " + shardRouting + " is not a primary")));
assertThat(e, hasToString(containsString("acquirePrimaryOperationPermit should only be called on primary shard")));
}

final long primaryTerm = indexShard.getPendingPrimaryTerm();
Expand Down

0 comments on commit 45066b5

Please sign in to comment.