-
Notifications
You must be signed in to change notification settings - Fork 24.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix race between replica reset and primary promotion #32442
Conversation
Pinging @elastic/es-distributed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks for picking this up. I left some comments that I think will improve it but I'm happy with the current solution too.
@@ -473,6 +473,8 @@ public void updateShardState(final ShardRouting newRouting, | |||
TimeUnit.MINUTES, | |||
() -> { | |||
shardStateUpdated.await(); | |||
assert primaryTerm == newPrimaryTerm : | |||
"shard term changed on primary. expected [" + newPrimaryTerm + "] but was [" + primaryTerm + "]"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💯 . can you please add the shard routing so we're sure to know where it came from?
if (operationPrimaryTerm > primaryTerm) { | ||
synchronized (primaryTermMutex) { | ||
if (operationPrimaryTerm > primaryTerm) { | ||
verifyNotClosed(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering - why did you have to add this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did not have to (i.e. no failing test). I just saw that we were not rechecking this condition after possibly waiting for a while on primaryTermMutex
. The next check 2 lines below will also fail this with an IndexShardNotStartedException
. I found it nicer though to throw the IndexShardClosedException
if possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
after second thought, this is less of an issue after I converted blockOperations to asyncBlockOperations in acquireReplicaOperationPermit. I'm going to revert
termUpdated.await(); | ||
// a primary promotion, or another primary term transition, might have been triggered concurrently to this | ||
// recheck under the operation permit if we can skip doing this work | ||
if (operationPrimaryTerm == primaryTerm) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can assert the operationPrimary term is always <= than the primary term here.
} | ||
} | ||
} else { | ||
globalCheckpointUpdated = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to removing this.
@@ -182,10 +179,14 @@ private void delayOperations() { | |||
private void releaseDelayedOperations() { | |||
final List<DelayedOperation> queuedActions; | |||
synchronized (this) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can pull this up to the method. I don't see a reason to drain the queue and the release the queue lock and it will simplify the reasoning a bit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what you mean. What would you change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated my comment. I mean make this entire method synchronized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find the reasoning simpler here if we don't extend the mutex to a section of the code which it does not need to cover. Are you ok keeping it as is?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, I'm ok. It's subjective.
@@ -2216,10 +2218,11 @@ public void acquireReplicaOperationPermit(final long operationPrimaryTerm, final | |||
final Object debugInfo) { | |||
verifyNotClosed(); | |||
verifyReplicationTarget(); | |||
final boolean globalCheckpointUpdated; | |||
if (operationPrimaryTerm > primaryTerm) { | |||
synchronized (primaryTermMutex) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we can remove this and only lock mutex
on this level (it's always good to avoid multiple locks if possible).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm fine relaxing it and using mutex as we now use asyncBlockOperations. This means that in practice, we will have at most have the number of indexing threads block on this (while possibly a concurrent cluster state update comes in, trying to acquire mutex as well). The first indexing thread will increase pendingPrimaryTerm
, and all the other ones that are blocked on mutex will just acquire mutex and do a quick noop. All subsequent writes will not acquire the mutex anymore as they will bypass the pre-flight check.
"shard term already update. op term [" + operationPrimaryTerm + "], shardTerm [" + primaryTerm + "]"; | ||
|
||
synchronized (mutex) { | ||
final CountDownLatch termUpdated = new CountDownLatch(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should have a method called "setPrimaryTerm" which gets a primary + a runnable to run underly the async block. That method will be called both from here and from updateShardState and make sure that the semantics of the exposing the primary term (after submitting async block and asserting we're under a mutex via assertions) are the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've given this a try in 70262d7
The CI failure on this highlighted another issue. When a replica is promoted to primary while there is still an ongoing replica operation on the shard, the operation can incorrectly use the primary term of the new shard. In the typical case, this is fortunately caught by the translog due to an extra term check there, but in the presence of a generation rollover at the wrong moment, even that might not hold. The issue is that the term is not incremented under the operation block. To solve this, I've pushed a102ef9 which distinguishes between the primary term that the shard is supposed to have because of the cluster state, and the primary term that is used by operations and by the underlying engine. The former is updated right away, whereas the latter is only updated under the operation block, ensuring that each operation with a permit always sees the correct term. Let me know what you think. |
@elasticmachine retest this please. |
…ode to be activated on the replication tracker)
@bleskes this is ready for another review. The main change (beside addressing your comments) I've done after the first review iteration was to better handle the primary activation of the replication tracker. Before the last iteration, the activation was done on the cluster state update thread, not when the shard actually became primary (i.e. under the operation block). This led to some test failures which are fingers crossed all fixed by the latest iteration. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left some questions to be understand the change.
@@ -192,7 +193,8 @@ | |||
|
|||
protected volatile ShardRouting shardRouting; | |||
protected volatile IndexShardState state; | |||
protected volatile long primaryTerm; | |||
protected volatile long pendingPrimaryTerm; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we have a comment pointing people to the java docs of getPendingPrimaryTerm
for explanation of what it means?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed in 4b82ca7
bumpPrimaryTerm(opPrimaryTerm, () -> { | ||
// a primary promotion, or another primary term transition, might have been triggered concurrently to this | ||
// recheck under the operation permit if we can skip doing this work | ||
if (opPrimaryTerm == pendingPrimaryTerm) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how is that possible? shouldn't the pending primary term update under mutex prevent this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure how it can "prevent" this.
Assume you have a replica term bump followed by a promotion to primary. The replica term bump will call asyncBlockOperations to run the above code. Assume that acquireReplicaOperationPermit leaves the mutex before the code in the operation block gets to execute. Then a primary promotion comes in from the cluster state, updating the pendingPrimaryTerm again. Note that there's a test for this (testReplicaTermIncrementWithConcurrentPrimaryPromotion
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to the test.
So say the increase of the term comes in first. That acquires the mutex, bumps the pendingPrimaryTerm
and submits an async block under mutex.
Then the replica gets promoted. That means the term is higher. When the cluster state comes in, it bumps the pendingPrimaryTerm
again and submit an async block. We know that they first async block code will run first, followed by the second, and I think that's OK.
If the reverse happens - i.e., the updateShardState comes in first, it will bump the pendingPrimaryTerm to a higher number than the replication operation, which will prevent the replication operation from submitting it's async block, so we're good here too.
What am I missing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We know that they first async block code will run first?
No we don't know that. See implementation of asyncBlockOperations. Both just submit a task to the generic threadpool. Both potentially race to the call of doBlockOperations
store.associateIndexWithNewTranslog(translogUUID); | ||
assert indexShard.shardRouting.primary() : "only primary shards can recover from store"; | ||
indexShard.openEngineAndRecoverFromTranslog(); | ||
indexShard.getEngine().fillSeqNoGaps(indexShard.getPrimaryTerm()); | ||
indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is another wart... I wonder if we should fold it into openEngineAndRecoverFromTranslog (another change)
try { | ||
synchronized (mutex) { | ||
assert shardRouting.primary(); | ||
// do these updates under the mutex as this otherwise races with subsequent calls of updateShardState |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what race conditions do you refer to?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is obsolete and does not need to be done under the mutex anymore. It came from an earlier iteration where I had not introduced the relocated
state in ReplicationTracker yet, and where I was using both replicationTracker.isPrimaryMode()
and comparing operationPrimaryTerm
to pendingPrimaryTerm
to figure out if a shard had possibly relocated and wanted this to be an atomic thing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One problematic place I see is the assertion at the end of updateShardState which checks both isPrimaryMode and the operationTerm + pendingPrimaryTerm. If we don't update the replication tracker + the operationPrimaryTerm atomically under the mutex, this invariant might be violated.
Thanks @bleskes |
We've recently seen a number of test failures that tripped an assertion in IndexShard (see issues linked below), leading to the discovery of a race between resetting a replica when it learns about a higher term and when the same replica is promoted to primary. This commit fixes the race by distinguishing between a cluster state primary term (called pendingPrimaryTerm) and a shard-level operation term. The former is set during the cluster state update or when a replica learns about a new primary. The latter is only incremented under the operation block, which can happen in a delayed fashion. It also solves the issue where a replica that's still adjusting to the new term receives a cluster state update that promotes it to primary, which can happen in the situation of multiple nodes being shut down in short succession. In that case, the cluster state update thread would call `asyncBlockOperations` in `updateShardState`, which in turn would throw an exception as blocking permits is not allowed while an ongoing block is in place, subsequently failing the shard. This commit therefore extends the IndexShardOperationPermits to allow it to queue multiple blocks (which will all take precedence over operations acquiring permits). Finally, it also moves the primary activation of the replication tracker under the operation block, so that the actual transition to primary only happens under the operation block. Relates to #32431, #32304 and #32118
We've recently seen a number of test failures that tripped an assertion in IndexShard (see issues linked below), leading to the discovery of a race between resetting a replica when it learns about a higher term and when the same replica is promoted to primary. This commit fixes the race by distinguishing between a cluster state primary term (called pendingPrimaryTerm) and a shard-level operation term. The former is set during the cluster state update or when a replica learns about a new primary. The latter is only incremented under the operation block, which can happen in a delayed fashion. It also solves the issue where a replica that's still adjusting to the new term receives a cluster state update that promotes it to primary, which can happen in the situation of multiple nodes being shut down in short succession. In that case, the cluster state update thread would call `asyncBlockOperations` in `updateShardState`, which in turn would throw an exception as blocking permits is not allowed while an ongoing block is in place, subsequently failing the shard. This commit therefore extends the IndexShardOperationPermits to allow it to queue multiple blocks (which will all take precedence over operations acquiring permits). Finally, it also moves the primary activation of the replication tracker under the operation block, so that the actual transition to primary only happens under the operation block. Relates to #32431, #32304 and #32118
If the shard is already closed while bumping the primary term, this can result in an AlreadyClosedException to be thrown. As we use asyncBlockOperations, the exception will be thrown on a thread from the generic thread pool and end up in the uncaught exception handler, failing our tests. Relates to #32442
If the shard is already closed while bumping the primary term, this can result in an AlreadyClosedException to be thrown. As we use asyncBlockOperations, the exception will be thrown on a thread from the generic thread pool and end up in the uncaught exception handler, failing our tests. Relates to #32442
If the shard is already closed while bumping the primary term, this can result in an AlreadyClosedException to be thrown. As we use asyncBlockOperations, the exception will be thrown on a thread from the generic thread pool and end up in the uncaught exception handler, failing our tests. Relates to #32442
* 6.x: [Kerberos] Use canonical host name (#32588) Cross-cluster search: preserve cluster alias in shard failures (#32608) [TEST] Allow to run in FIPS JVM (#32607) Handle AlreadyClosedException when bumping primary term [Test] Add ckb to the list of unsupported languages (#32611) SCRIPTING: Move Aggregation Scripts to their own context (#32068) (#32629) [TEST] Enhance failure message when bulk updates have failures [ML] Add ML result classes to protocol library (#32587) Suppress LicensingDocumentationIT.testPutLicense in release builds (#32613) [Rollup] Improve ID scheme for rollup documents (#32558) Mutes failing SQL string function tests due to #32589 Suppress Wildfly test in FIPS JVMs (#32543) Add cluster UUID to Cluster Stats API response (#32206) [ML] Add some ML config classes to protocol library (#32502) [TEST]Split transport verification mode none tests (#32488) [Rollup] Remove builders from DateHistogramGroupConfig (#32555) [ML] Add Detector config classes to protocol library (#32495) [Rollup] Remove builders from MetricConfig (#32536) Fix race between replica reset and primary promotion (#32442) HLRC: Move commercial clients from XPackClient (#32596) Security: move User to protocol project (#32367) Minor fix for javadoc (applicable for java 11). (#32573) Painless: Move Some Lookup Logic to PainlessLookup (#32565) Core: Minor size reduction for AbstractComponent (#32509) INGEST: Enable default pipelines (#32286) (#32591) TEST: Avoid merges in testSeqNoAndCheckpoints [Rollup] Remove builders from HistoGroupConfig (#32533) fixed elements in array of produced terms (#32519) Mutes ReindexFailureTests.searchFailure dues to #28053 Mutes LicensingDocumentationIT due to #32580 Remove the SATA controller from OpenSUSE box [ML] Rename JobProvider to JobResultsProvider (#32551)
* master: Cross-cluster search: preserve cluster alias in shard failures (#32608) Handle AlreadyClosedException when bumping primary term [TEST] Allow to run in FIPS JVM (#32607) [Test] Add ckb to the list of unsupported languages (#32611) SCRIPTING: Move Aggregation Scripts to their own context (#32068) Painless: Use LocalMethod Map For Lookup at Runtime (#32599) [TEST] Enhance failure message when bulk updates have failures [ML] Add ML result classes to protocol library (#32587) Suppress LicensingDocumentationIT.testPutLicense in release builds (#32613) [Rollup] Update wire version check after backport Suppress Wildfly test in FIPS JVMs (#32543) [Rollup] Improve ID scheme for rollup documents (#32558) ingest: doc: move Dot Expander Processor doc to correct position (#31743) [ML] Add some ML config classes to protocol library (#32502) [TEST]Split transport verification mode none tests (#32488) Core: Move helper date formatters over to java time (#32504) [Rollup] Remove builders from DateHistogramGroupConfig (#32555) [TEST} unmutes SearchAsyncActionTests and adds debugging info [ML] Add Detector config classes to protocol library (#32495) [Rollup] Remove builders from MetricConfig (#32536) Tests: Add rolling upgrade tests for watcher (#32428) Fix race between replica reset and primary promotion (#32442)
Primary terms were introduced as part of the sequence-number effort (#10708) and added in ES 5.0. Subsequent work introduced the replication tracker which lets the primary own its replication group (#25692) to coordinate recovery and replication. The replication tracker explicitly exposes whether it is operating in primary mode or replica mode, independent of the ShardRouting object that's associated with a shard. During a primary relocation, for example, the primary mode is transferred between the primary relocation source and the primary relocation target. After transferring this so-called primary context, the old primary becomes a replication target and the new primary the replication source, reflected in the replication tracker on both nodes. With the most recent PR in this area (#32442), we finally have a clean transition between a shard that's operating as a primary and issuing sequence numbers and a shard that's serving as a replication target. The transition from one state to the other is enforced through the operation-permit system, where we block permit acquisition during such changes and perform the transition under this operation block, ensuring that there are no operations in progress while the transition is being performed. This finally allows us to turn the best-effort checks that were put in place to prevent shards from being used in the wrong way (i.e. primary as replica, or replica as primary) into hard assertions, making it easier to catch any bugs in this area.
Primary terms were introduced as part of the sequence-number effort (#10708) and added in ES 5.0. Subsequent work introduced the replication tracker which lets the primary own its replication group (#25692) to coordinate recovery and replication. The replication tracker explicitly exposes whether it is operating in primary mode or replica mode, independent of the ShardRouting object that's associated with a shard. During a primary relocation, for example, the primary mode is transferred between the primary relocation source and the primary relocation target. After transferring this so-called primary context, the old primary becomes a replication target and the new primary the replication source, reflected in the replication tracker on both nodes. With the most recent PR in this area (#32442), we finally have a clean transition between a shard that's operating as a primary and issuing sequence numbers and a shard that's serving as a replication target. The transition from one state to the other is enforced through the operation-permit system, where we block permit acquisition during such changes and perform the transition under this operation block, ensuring that there are no operations in progress while the transition is being performed. This finally allows us to turn the best-effort checks that were put in place to prevent shards from being used in the wrong way (i.e. primary as replica, or replica as primary) into hard assertions, making it easier to catch any bugs in this area.
when acquireReplicaOperationPermit alls bumpPrimaryTerm , is it possible that newPrimaryTerm < pendingPrimaryTerm,and lead to bumpPrimaryTerm throw an assert failures? |
() -> { | ||
shardStateUpdated.await(); | ||
assert pendingPrimaryTerm == newPrimaryTerm : |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ywelsch
1、in method updateShardState ,because newPrimaryTerm != pendingPrimaryTerm so the code run into bumpPrimaryTerm method
2、in bumpPrimaryTerm,it will set pendingPrimaryTerm = newPrimaryTerm
3、but before bumpPrimaryTerm set pendingPrimaryTerm = newPrimaryTerm ,onBlocked will run first,in which will assert pendingPrimaryTerm == newPrimaryTerm
Is it reasonable? it seems that the assertion "pendingPrimaryTerm == newPrimaryTerm" will always be false?
You said that : Can you tell me more about this?How is it to avoid failing the shard problem? |
assert pendingPrimaryTerm == newPrimaryTerm : | ||
"shard term changed on primary. expected [" + newPrimaryTerm + "] but was [" + pendingPrimaryTerm + "]" + | ||
", current routing: " + currentRouting + ", new routing: " + newRouting; | ||
assert operationPrimaryTerm == newPrimaryTerm; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@liaoyanyunde can you provide some context for these questions? Are you studying this code to understand how it works? To which goal?
No, primary terms are always non-decreasing, which is guaranteed by the cluster coordination subsystem.
The CountDownLatch
We run all our tests with assertions enabled. If this would not hold true, we would quickly learn about it.
I'm not sure on what you seek clarification. Also, if this is purely about understanding the current shape of the code, perhaps it's not necessary to understand the full history on how we arrived here.
I'm not sure what you mean by "unnecessary". Assertions are there to validate our assumptions about the code. They should obviously always hold true. |
We are incorporating the fix code for this issue into version 6.3.2. So I am researching this code to understand how it works. But in the process of reading, there are many doubts. |
We've recently seen a number of test failures that tripped an assertion in IndexShard (see issues linked below), leading to the discovery of a race between resetting a replica when it learns about a higher term and when the same replica is promoted to primary. This PR fixes the race by distinguishing between a cluster state primary term (called pendingPrimaryTerm) and a shard-level operation term. The former is set during the cluster state update or when a replica learns about a new primary. The latter is only incremented under the operation block, which can happen in a delayed fashion. It also solves the issue where a replica that's still adjusting to the new term receives a cluster state update that promotes it to primary, which can happen in the situation of multiple nodes being shut down in short succession. In that case, the cluster state update thread would call
asyncBlockOperations
inupdateShardState
, which in turn would throw an exception as blocking permits is not allowed while an ongoing block is in place, subsequently failing the shard. This PR therefore extends the IndexShardOperationPermits to allow it to queue multiple blocks (which will all take precedence over operations acquiring permits). Finally, it also moves the primary activation of the replication tracker under the operation block, so that the actual transition to primary only happens under the operation block.Relates to #32431, #32304 and #32118