-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix race between replica reset and primary promotion #32442
Changes from 2 commits
aecc31a
9396f53
a102ef9
021e833
78f8306
70262d7
afadef0
0a8ab47
75f7a5f
78a7e82
63314f4
f7c4ae0
42d59d8
9a42562
ceb330b
c328417
4b82ca7
b65ce61
d5df2ef
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -473,6 +473,8 @@ public void updateShardState(final ShardRouting newRouting, | |
TimeUnit.MINUTES, | ||
() -> { | ||
shardStateUpdated.await(); | ||
assert primaryTerm == newPrimaryTerm : | ||
"shard term changed on primary. expected [" + newPrimaryTerm + "] but was [" + primaryTerm + "]"; | ||
try { | ||
/* | ||
* If this shard was serving as a replica shard when another shard was promoted to primary then the state of | ||
|
@@ -2216,10 +2218,11 @@ public void acquireReplicaOperationPermit(final long operationPrimaryTerm, final | |
final Object debugInfo) { | ||
verifyNotClosed(); | ||
verifyReplicationTarget(); | ||
final boolean globalCheckpointUpdated; | ||
if (operationPrimaryTerm > primaryTerm) { | ||
synchronized (primaryTermMutex) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if we can remove this and only lock There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm fine relaxing it and using mutex as we now use asyncBlockOperations. This means that in practice, we will have at most have the number of indexing threads block on this (while possibly a concurrent cluster state update comes in, trying to acquire mutex as well). The first indexing thread will increase |
||
if (operationPrimaryTerm > primaryTerm) { | ||
verifyNotClosed(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm wondering - why did you have to add this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I did not have to (i.e. no failing test). I just saw that we were not rechecking this condition after possibly waiting for a while on There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. after second thought, this is less of an issue after I converted blockOperations to asyncBlockOperations in acquireReplicaOperationPermit. I'm going to revert |
||
|
||
IndexShardState shardState = state(); | ||
// only roll translog and update primary term if shard has made it past recovery | ||
// Having a new primary term here means that the old primary failed and that there is a new primary, which again | ||
|
@@ -2229,38 +2232,41 @@ public void acquireReplicaOperationPermit(final long operationPrimaryTerm, final | |
shardState != IndexShardState.STARTED) { | ||
throw new IndexShardNotStartedException(shardId, shardState); | ||
} | ||
try { | ||
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> { | ||
assert operationPrimaryTerm > primaryTerm : | ||
"shard term already update. op term [" + operationPrimaryTerm + "], shardTerm [" + primaryTerm + "]"; | ||
|
||
synchronized (mutex) { | ||
final CountDownLatch termUpdated = new CountDownLatch(1); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if we should have a method called "setPrimaryTerm" which gets a primary + a runnable to run underly the async block. That method will be called both from here and from updateShardState and make sure that the semantics of the exposing the primary term (after submitting async block and asserting we're under a mutex via assertions) are the same. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've given this a try in 70262d7 |
||
if (operationPrimaryTerm > primaryTerm) { | ||
indexShardOperationPermits.asyncBlockOperations(30, TimeUnit.MINUTES, () -> { | ||
termUpdated.await(); | ||
// a primary promotion, or another primary term transition, might have been triggered concurrently to this | ||
// recheck under the operation permit if we can skip doing this work | ||
if (operationPrimaryTerm == primaryTerm) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we can assert the operationPrimary term is always <= than the primary term here. |
||
updateGlobalCheckpointOnReplica(globalCheckpoint, "primary term transition"); | ||
final long currentGlobalCheckpoint = getGlobalCheckpoint(); | ||
final long localCheckpoint; | ||
if (currentGlobalCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO) { | ||
localCheckpoint = SequenceNumbers.NO_OPS_PERFORMED; | ||
} else { | ||
localCheckpoint = currentGlobalCheckpoint; | ||
} | ||
logger.trace( | ||
"detected new primary with primary term [{}], resetting local checkpoint from [{}] to [{}]", | ||
operationPrimaryTerm, | ||
getLocalCheckpoint(), | ||
localCheckpoint); | ||
getEngine().resetLocalCheckpoint(localCheckpoint); | ||
getEngine().rollTranslogGeneration(); | ||
} else { | ||
logger.trace("a primary promotion or concurrent primary term transition has made this reset obsolete"); | ||
} | ||
}, e -> failShard("exception during primary term transition", e)); | ||
|
||
primaryTerm = operationPrimaryTerm; | ||
updateGlobalCheckpointOnReplica(globalCheckpoint, "primary term transition"); | ||
final long currentGlobalCheckpoint = getGlobalCheckpoint(); | ||
final long localCheckpoint; | ||
if (currentGlobalCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO) { | ||
localCheckpoint = SequenceNumbers.NO_OPS_PERFORMED; | ||
} else { | ||
localCheckpoint = currentGlobalCheckpoint; | ||
} | ||
logger.trace( | ||
"detected new primary with primary term [{}], resetting local checkpoint from [{}] to [{}]", | ||
operationPrimaryTerm, | ||
getLocalCheckpoint(), | ||
localCheckpoint); | ||
getEngine().resetLocalCheckpoint(localCheckpoint); | ||
getEngine().rollTranslogGeneration(); | ||
}); | ||
globalCheckpointUpdated = true; | ||
} catch (final Exception e) { | ||
onPermitAcquired.onFailure(e); | ||
return; | ||
termUpdated.countDown(); | ||
} | ||
} | ||
} else { | ||
globalCheckpointUpdated = false; | ||
} | ||
} | ||
} else { | ||
globalCheckpointUpdated = false; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 to removing this. |
||
} | ||
|
||
assert operationPrimaryTerm <= primaryTerm | ||
|
@@ -2279,14 +2285,12 @@ public void onResponse(final Releasable releasable) { | |
primaryTerm); | ||
onPermitAcquired.onFailure(new IllegalStateException(message)); | ||
} else { | ||
if (globalCheckpointUpdated == false) { | ||
try { | ||
updateGlobalCheckpointOnReplica(globalCheckpoint, "operation"); | ||
} catch (Exception e) { | ||
releasable.close(); | ||
onPermitAcquired.onFailure(e); | ||
return; | ||
} | ||
try { | ||
updateGlobalCheckpointOnReplica(globalCheckpoint, "operation"); | ||
} catch (Exception e) { | ||
releasable.close(); | ||
onPermitAcquired.onFailure(e); | ||
return; | ||
} | ||
onPermitAcquired.onResponse(releasable); | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,6 +33,7 @@ | |
|
||
import java.io.Closeable; | ||
import java.util.ArrayList; | ||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
|
@@ -59,7 +60,7 @@ final class IndexShardOperationPermits implements Closeable { | |
final Semaphore semaphore = new Semaphore(TOTAL_PERMITS, true); // fair to ensure a blocking thread is not starved | ||
private final List<DelayedOperation> delayedOperations = new ArrayList<>(); // operations that are delayed | ||
private volatile boolean closed; | ||
private boolean delayed; // does not need to be volatile as all accesses are done under a lock on this | ||
private int queuedBlockOperations; // does not need to be volatile as all accesses are done under a lock on this | ||
|
||
// only valid when assertions are enabled. Key is AtomicBoolean associated with each permit to ensure close once semantics. | ||
// Value is a tuple, with a some debug information supplied by the caller and a stack trace of the acquiring thread | ||
|
@@ -102,9 +103,6 @@ <E extends Exception> void blockOperations( | |
final long timeout, | ||
final TimeUnit timeUnit, | ||
final CheckedRunnable<E> onBlocked) throws InterruptedException, TimeoutException, E { | ||
if (closed) { | ||
throw new IndexShardClosedException(shardId); | ||
} | ||
delayOperations(); | ||
try { | ||
doBlockOperations(timeout, timeUnit, onBlocked); | ||
|
@@ -147,13 +145,12 @@ public void onAfter() { | |
} | ||
|
||
private void delayOperations() { | ||
if (closed) { | ||
throw new IndexShardClosedException(shardId); | ||
} | ||
synchronized (this) { | ||
if (delayed) { | ||
throw new IllegalStateException("operations are already delayed"); | ||
} else { | ||
assert delayedOperations.isEmpty(); | ||
delayed = true; | ||
} | ||
assert queuedBlockOperations > 0 || delayedOperations.isEmpty(); | ||
queuedBlockOperations++; | ||
} | ||
} | ||
|
||
|
@@ -164,7 +161,7 @@ private <E extends Exception> void doBlockOperations( | |
if (Assertions.ENABLED) { | ||
// since delayed is not volatile, we have to synchronize even here for visibility | ||
synchronized (this) { | ||
assert delayed; | ||
assert queuedBlockOperations > 0; | ||
} | ||
} | ||
if (semaphore.tryAcquire(TOTAL_PERMITS, timeout, timeUnit)) { | ||
|
@@ -182,10 +179,14 @@ private <E extends Exception> void doBlockOperations( | |
private void releaseDelayedOperations() { | ||
final List<DelayedOperation> queuedActions; | ||
synchronized (this) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we can pull this up to the method. I don't see a reason to drain the queue and the release the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure what you mean. What would you change? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I updated my comment. I mean make this entire method synchronized. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I find the reasoning simpler here if we don't extend the mutex to a section of the code which it does not need to cover. Are you ok keeping it as is? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, I'm ok. It's subjective. |
||
assert delayed; | ||
queuedActions = new ArrayList<>(delayedOperations); | ||
delayedOperations.clear(); | ||
delayed = false; | ||
assert queuedBlockOperations > 0; | ||
queuedBlockOperations--; | ||
if (queuedBlockOperations == 0) { | ||
queuedActions = new ArrayList<>(delayedOperations); | ||
delayedOperations.clear(); | ||
} else { | ||
queuedActions = Collections.emptyList(); | ||
} | ||
} | ||
if (!queuedActions.isEmpty()) { | ||
/* | ||
|
@@ -242,7 +243,7 @@ private void acquire(final ActionListener<Releasable> onAcquired, final String e | |
final Releasable releasable; | ||
try { | ||
synchronized (this) { | ||
if (delayed) { | ||
if (queuedBlockOperations > 0) { | ||
final Supplier<StoredContext> contextSupplier = threadPool.getThreadContext().newRestorableContext(false); | ||
final ActionListener<Releasable> wrappedListener; | ||
if (executorOnDelay != null) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💯 . can you please add the shard routing so we're sure to know where it came from?