Skip to content

Commit

Permalink
Combine the execution of an exclusive replica operation with primary …
Browse files Browse the repository at this point in the history
…term update (#36116)

This commit changes how an operation which requires all index shard 
operations permits is executed when a primary term update is required: 
the operation and the update are combined so that the operation is 
executed after the primary term update under the same blocking 
operation.

Closes #35850

Co-authored-by: Yannick Welsch <[email protected]>
  • Loading branch information
tlrx and ywelsch authored Dec 4, 2018
1 parent 89fae42 commit 5d684ca
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 44 deletions.
122 changes: 81 additions & 41 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AsyncIOProcessor;
import org.elasticsearch.common.util.concurrent.RunOnce;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.Index;
Expand Down Expand Up @@ -548,7 +549,7 @@ public void onFailure(Exception e) {
} catch (final AlreadyClosedException e) {
// okay, the index was deleted
}
});
}, null);
}
}
// set this last, once we finished updating all internal state.
Expand Down Expand Up @@ -2316,14 +2317,26 @@ public void acquireAllPrimaryOperationsPermits(final ActionListener<Releasable>
indexShardOperationPermits.asyncBlockOperations(onPermitAcquired, timeout.duration(), timeout.timeUnit());
}

private <E extends Exception> void bumpPrimaryTerm(final long newPrimaryTerm, final CheckedRunnable<E> onBlocked) {
private <E extends Exception> void bumpPrimaryTerm(final long newPrimaryTerm,
final CheckedRunnable<E> onBlocked,
@Nullable ActionListener<Releasable> combineWithAction) {
assert Thread.holdsLock(mutex);
assert newPrimaryTerm > pendingPrimaryTerm;
assert newPrimaryTerm > pendingPrimaryTerm || (newPrimaryTerm >= pendingPrimaryTerm && combineWithAction != null);
assert operationPrimaryTerm <= pendingPrimaryTerm;
final CountDownLatch termUpdated = new CountDownLatch(1);
indexShardOperationPermits.asyncBlockOperations(new ActionListener<Releasable>() {
@Override
public void onFailure(final Exception e) {
try {
innerFail(e);
} finally {
if (combineWithAction != null) {
combineWithAction.onFailure(e);
}
}
}

private void innerFail(final Exception e) {
try {
failShard("exception during primary term transition", e);
} catch (AlreadyClosedException ace) {
Expand All @@ -2333,7 +2346,8 @@ public void onFailure(final Exception e) {

@Override
public void onResponse(final Releasable releasable) {
try (Releasable ignored = releasable) {
final RunOnce releaseOnce = new RunOnce(releasable::close);
try {
assert operationPrimaryTerm <= pendingPrimaryTerm;
termUpdated.await();
// indexShardOperationPermits doesn't guarantee that async submissions are executed
Expand All @@ -2343,7 +2357,17 @@ public void onResponse(final Releasable releasable) {
onBlocked.run();
}
} catch (final Exception e) {
onFailure(e);
if (combineWithAction == null) {
// otherwise leave it to combineWithAction to release the permit
releaseOnce.run();
}
innerFail(e);
} finally {
if (combineWithAction != null) {
combineWithAction.onResponse(releasable);
} else {
releaseOnce.run();
}
}
}
}, 30, TimeUnit.MINUTES);
Expand Down Expand Up @@ -2371,7 +2395,7 @@ public void onResponse(final Releasable releasable) {
public void acquireReplicaOperationPermit(final long opPrimaryTerm, final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes,
final ActionListener<Releasable> onPermitAcquired, final String executorOnDelay,
final Object debugInfo) {
innerAcquireReplicaOperationPermit(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onPermitAcquired,
innerAcquireReplicaOperationPermit(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onPermitAcquired, false,
(listener) -> indexShardOperationPermits.acquire(listener, executorOnDelay, true, debugInfo));
}

Expand All @@ -2393,49 +2417,24 @@ public void acquireAllReplicaOperationsPermits(final long opPrimaryTerm,
final long maxSeqNoOfUpdatesOrDeletes,
final ActionListener<Releasable> onPermitAcquired,
final TimeValue timeout) {
innerAcquireReplicaOperationPermit(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onPermitAcquired,
innerAcquireReplicaOperationPermit(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onPermitAcquired, true,
(listener) -> indexShardOperationPermits.asyncBlockOperations(listener, timeout.duration(), timeout.timeUnit()));
}

private void innerAcquireReplicaOperationPermit(final long opPrimaryTerm,
final long globalCheckpoint,
final long maxSeqNoOfUpdatesOrDeletes,
final ActionListener<Releasable> onPermitAcquired,
final Consumer<ActionListener<Releasable>> consumer) {
final boolean allowCombineOperationWithPrimaryTermUpdate,
final Consumer<ActionListener<Releasable>> operationExecutor) {
verifyNotClosed();
if (opPrimaryTerm > pendingPrimaryTerm) {
synchronized (mutex) {
if (opPrimaryTerm > pendingPrimaryTerm) {
final IndexShardState shardState = state();
// only roll translog and update primary term if shard has made it past recovery
// Having a new primary term here means that the old primary failed and that there is a new primary, which again
// means that the master will fail this shard as all initializing shards are failed when a primary is selected
// We abort early here to prevent an ongoing recovery from the failed primary to mess with the global / local checkpoint
if (shardState != IndexShardState.POST_RECOVERY &&
shardState != IndexShardState.STARTED) {
throw new IndexShardNotStartedException(shardId, shardState);
}

if (opPrimaryTerm > pendingPrimaryTerm) {
bumpPrimaryTerm(opPrimaryTerm, () -> {
updateGlobalCheckpointOnReplica(globalCheckpoint, "primary term transition");
final long currentGlobalCheckpoint = getGlobalCheckpoint();
final long maxSeqNo = seqNoStats().getMaxSeqNo();
logger.info("detected new primary with primary term [{}], global checkpoint [{}], max_seq_no [{}]",
opPrimaryTerm, currentGlobalCheckpoint, maxSeqNo);
if (currentGlobalCheckpoint < maxSeqNo) {
resetEngineToGlobalCheckpoint();
} else {
getEngine().rollTranslogGeneration();
}
});
}
}
}
}
assert opPrimaryTerm <= pendingPrimaryTerm
: "operation primary term [" + opPrimaryTerm + "] should be at most [" + pendingPrimaryTerm + "]";
consumer.accept(new ActionListener<Releasable>() {
// This listener is used for the execution of the operation. If the operation requires all the permits for its
// execution and the primary term must be updated first, we can combine the operation execution with the
// primary term update. Since indexShardOperationPermits doesn't guarantee that async submissions are executed
// in the order submitted, combining both operations ensure that the term is updated before the operation is
// executed. It also has the side effect of acquiring all the permits one time instead of two.
final ActionListener<Releasable> operationListener = new ActionListener<Releasable>() {
@Override
public void onResponse(final Releasable releasable) {
if (opPrimaryTerm < operationPrimaryTerm) {
Expand Down Expand Up @@ -2465,7 +2464,48 @@ public void onResponse(final Releasable releasable) {
public void onFailure(final Exception e) {
onPermitAcquired.onFailure(e);
}
});
};

if (requirePrimaryTermUpdate(opPrimaryTerm, allowCombineOperationWithPrimaryTermUpdate)) {
synchronized (mutex) {
if (requirePrimaryTermUpdate(opPrimaryTerm, allowCombineOperationWithPrimaryTermUpdate)) {
final IndexShardState shardState = state();
// only roll translog and update primary term if shard has made it past recovery
// Having a new primary term here means that the old primary failed and that there is a new primary, which again
// means that the master will fail this shard as all initializing shards are failed when a primary is selected
// We abort early here to prevent an ongoing recovery from the failed primary to mess with the global / local checkpoint
if (shardState != IndexShardState.POST_RECOVERY &&
shardState != IndexShardState.STARTED) {
throw new IndexShardNotStartedException(shardId, shardState);
}

bumpPrimaryTerm(opPrimaryTerm, () -> {
updateGlobalCheckpointOnReplica(globalCheckpoint, "primary term transition");
final long currentGlobalCheckpoint = getGlobalCheckpoint();
final long maxSeqNo = seqNoStats().getMaxSeqNo();
logger.info("detected new primary with primary term [{}], global checkpoint [{}], max_seq_no [{}]",
opPrimaryTerm, currentGlobalCheckpoint, maxSeqNo);
if (currentGlobalCheckpoint < maxSeqNo) {
resetEngineToGlobalCheckpoint();
} else {
getEngine().rollTranslogGeneration();
}
}, allowCombineOperationWithPrimaryTermUpdate ? operationListener : null);

if (allowCombineOperationWithPrimaryTermUpdate) {
logger.debug("operation execution has been combined with primary term update");
return;
}
}
}
}
assert opPrimaryTerm <= pendingPrimaryTerm
: "operation primary term [" + opPrimaryTerm + "] should be at most [" + pendingPrimaryTerm + "]";
operationExecutor.accept(operationListener);
}

private boolean requirePrimaryTermUpdate(final long opPrimaryTerm, final boolean allPermits) {
return (opPrimaryTerm > pendingPrimaryTerm) || (allPermits && opPrimaryTerm > operationPrimaryTerm);
}

public int getActiveOperationsCount() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,6 @@ private Releasable acquireReplicaOperationPermitBlockingly(IndexShard indexShard
return fut.get();
}

@AwaitsFix(bugUrl="https://github.com/elastic/elasticsearch/issues/35850")
public void testOperationPermitOnReplicaShards() throws Exception {
final ShardId shardId = new ShardId("test", "_na_", 0);
final IndexShard indexShard;
Expand Down Expand Up @@ -1024,7 +1023,6 @@ public void testGlobalCheckpointSync() throws IOException {
closeShards(replicaShard, primaryShard);
}

@AwaitsFix(bugUrl="https://github.com/elastic/elasticsearch/issues/35850")
public void testRestoreLocalHistoryFromTranslogOnPromotion() throws IOException, InterruptedException {
final IndexShard indexShard = newStartedShard(false);
final int operations = 1024 - scaledRandomIntBetween(0, 1024);
Expand Down Expand Up @@ -1089,7 +1087,6 @@ public void onFailure(Exception e) {
closeShard(indexShard, false);
}

@AwaitsFix(bugUrl="https://github.com/elastic/elasticsearch/issues/35850")
public void testRollbackReplicaEngineOnPromotion() throws IOException, InterruptedException {
final IndexShard indexShard = newStartedShard(false);

Expand Down Expand Up @@ -3600,6 +3597,67 @@ public void testResetEngine() throws Exception {
closeShard(shard, false);
}

public void testConcurrentAcquireAllReplicaOperationsPermitsWithPrimaryTermUpdate() throws Exception {
final IndexShard replica = newStartedShard(false);
indexOnReplicaWithGaps(replica, between(0, 1000), Math.toIntExact(replica.getLocalCheckpoint()));

final int nbTermUpdates = randomIntBetween(1, 5);

for (int i = 0; i < nbTermUpdates; i++) {
long opPrimaryTerm = replica.getOperationPrimaryTerm() + 1;
final long globalCheckpoint = replica.getGlobalCheckpoint();
final long maxSeqNoOfUpdatesOrDeletes = replica.getMaxSeqNoOfUpdatesOrDeletes();

final int operations = scaledRandomIntBetween(5, 32);
final CyclicBarrier barrier = new CyclicBarrier(1 + operations);
final CountDownLatch latch = new CountDownLatch(operations);

final Thread[] threads = new Thread[operations];
for (int j = 0; j < operations; j++) {
threads[j] = new Thread(() -> {
try {
barrier.await();
} catch (final BrokenBarrierException | InterruptedException e) {
throw new RuntimeException(e);
}
replica.acquireAllReplicaOperationsPermits(
opPrimaryTerm,
globalCheckpoint,
maxSeqNoOfUpdatesOrDeletes,
new ActionListener<Releasable>() {
@Override
public void onResponse(final Releasable releasable) {
try (Releasable ignored = releasable) {
assertThat(replica.getPendingPrimaryTerm(), greaterThanOrEqualTo(opPrimaryTerm));
assertThat(replica.getOperationPrimaryTerm(), equalTo(opPrimaryTerm));
} finally {
latch.countDown();
}
}

@Override
public void onFailure(final Exception e) {
try {
throw new RuntimeException(e);
} finally {
latch.countDown();
}
}
}, TimeValue.timeValueMinutes(30L));
});
threads[j].start();
}
barrier.await();
latch.await();

for (Thread thread : threads) {
thread.join();
}
}

closeShard(replica, false);
}

@Override
public Settings threadPoolSettings() {
return Settings.builder().put(super.threadPoolSettings()).put("thread_pool.estimated_time_interval", "5ms").build();
Expand Down

0 comments on commit 5d684ca

Please sign in to comment.