-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Let primary own its replication group #25692
Let primary own its replication group #25692
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Conceptually, I'm +1 on a change like this. I do not have time to give this a super careful review, and I left some comments to which I will not be able to see the replies until after vacation; please proceed getting this change in either way. I trust @bleskes to give you a careful review otherwise.
// we also have to make sure to get the global checkpoint before the replication group, to ensure that the global checkpoint | ||
// is valid for this replication group. If we would sample in the reverse, the global checkpoint might be based on a subset | ||
// of the sampled replication group, and advanced further than what the given replication group would allow it to. | ||
// This would entail that some shards could learn about a global checkpoint that would be higher as its local checkpoint. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as
-> than
* | ||
* @return the replication group | ||
*/ | ||
public synchronized ReplicationGroup getReplicationGroup() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder about this. Does it need to be synchronized? Instead can the field be volatile and only take the lock if needing to calculate the replication group?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 (and we probably want the same thing for the global checkpoint - although that's a different change). Can we also not generated this on request but rather when it changes? the it being null means that it should not be relevant.
@@ -1742,6 +1753,9 @@ public void updateGlobalCheckpointOnReplica(final long globalCheckpoint, final S | |||
* while the global checkpoint update may have emanated from the primary when we were in that state, we could subsequently move | |||
* to recovery finalization, or even finished recovery before the update arrives here. | |||
*/ | |||
assert state() != IndexShardState.POST_RECOVERY && state() != IndexShardState.STARTED && state() != IndexShardState.RELOCATED : | |||
"supposedly in-sync shard copy received a global checkpoint [" + globalCheckpoint + "] " + | |||
"that is higher than its local checkpoint [" + localCheckpoint + "]"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we fail the shard in the production case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should save the "hard failure if something is wrong" to things that will make production really stuck.
@@ -634,7 +659,7 @@ public static IndexShardRoutingTable readFromThin(StreamInput in, Index index) t | |||
} | |||
|
|||
public static void writeTo(IndexShardRoutingTable indexShard, StreamOutput out) throws IOException { | |||
out.writeString(indexShard.shardId().getIndex().getName()); | |||
indexShard.shardId().getIndex().writeTo(out); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like a serialization-breaking change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was apparently never used before
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great. I'm still bummed out that we use IndexShardRoutingTable but I agree that's the right trade off for now. Let's get this in on see what we can do later.
I left a bunch of small comments here and there. It will also be great to have a test around the replication group cache. GlobalCheckpointTracker#getReplicationGroup
isn't called anywhere in the classes unit tests.
* | ||
* @return the replication group | ||
*/ | ||
public synchronized ReplicationGroup getReplicationGroup() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 (and we probably want the same thing for the global checkpoint - although that's a different change). Can we also not generated this on request but rather when it changes? the it being null means that it should not be relevant.
@@ -204,6 +216,13 @@ private boolean invariant() { | |||
"global checkpoint is not up-to-date, expected: " + | |||
computeGlobalCheckpoint(pendingInSync, localCheckpoints.values(), globalCheckpoint) + " but was: " + globalCheckpoint; | |||
|
|||
assert cachedReplicationGroup == null || cachedReplicationGroup.equals(calculateReplicationGroup()) : |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we assert that when we're in primary mode we have a routing table?
} | ||
|
||
@Override | ||
public String toString() { | ||
return "PrimaryContext{" + | ||
"clusterStateVersion=" + clusterStateVersion + | ||
", localCheckpoints=" + localCheckpoints + | ||
", routingTable=" + routingTable + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a multi line string, maybe just add the version? I'm afraid it will be clumsy to work with and we typically also log the cluster state changes tests where this is relevant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed the newly introduced toString()
method on IndexShardRoutingTable
to be single line so that we still get useful output.
@@ -1742,6 +1753,9 @@ public void updateGlobalCheckpointOnReplica(final long globalCheckpoint, final S | |||
* while the global checkpoint update may have emanated from the primary when we were in that state, we could subsequently move | |||
* to recovery finalization, or even finished recovery before the update arrives here. | |||
*/ | |||
assert state() != IndexShardState.POST_RECOVERY && state() != IndexShardState.STARTED && state() != IndexShardState.RELOCATED : | |||
"supposedly in-sync shard copy received a global checkpoint [" + globalCheckpoint + "] " + | |||
"that is higher than its local checkpoint [" + localCheckpoint + "]"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should save the "hard failure if something is wrong" to things that will make production really stuck.
@Override | ||
public String toString() { | ||
return "ReplicationGroup{" + | ||
"routingTable=" + routingTable + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment about multilines...
request.targetNode()); | ||
throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node"); | ||
} | ||
if (targetShardRouting.initializing() == false) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can not happen today, can it? as we never reusing replica aId... If I'm correct, does it make sense to transform this to an assertion + fail the recovery?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
true, this should never happen unless we reuse replica aids. Either way, the primary should probably not care to validate this, as the replica is in charge of saying when a recovery should occur, and the primary has only as role to validate that it is properly tracking the replica (the specific state the replica is in does not matter).
@@ -176,24 +187,6 @@ public RecoveryResponse recoverToTarget() throws IOException { | |||
throw new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e); | |||
} | |||
|
|||
// engine was just started at the end of phase1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why can this go away? I presume because we check post translog replay as well, so this is this will happen later but wanted to double check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
correct, the reason for having this check here was to make sure that recovery of a replica cannot complete once a primary is relocated. We now have this check again in the finalizeRecovery stage (when we call runUnderOperationPermit(() -> shard.markAllocationIdAsInSync
) so we're covered.
@@ -134,6 +129,22 @@ public StartRecoveryRequest getRequest() { | |||
* performs the recovery from the local engine to the target | |||
*/ | |||
public RecoveryResponse recoverToTarget() throws IOException { | |||
cancellableThreads.execute(() -> runUnderOperationPermit(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add a comment (here or somewhere else) that we acquire a permit to validate relocation? maybe we should rename runUnderPermit to runUnderPrimaryPermit[AndValidateRelocation]. Also it seems we always run that method under cancellable threads. Shall we fold that into the method?
cancellableThreads.execute(() -> shard.relocated("to " + request.targetNode(), recoveryTarget::handoffPrimaryContext)); | ||
} | ||
logger.trace("performing relocation hand-off"); | ||
cancellableThreads.execute(() -> shard.relocated("to " + request.targetNode(), recoveryTarget::handoffPrimaryContext)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add a comment here say that this will acquire all permits and will thus will delay new recoveries until it's done?
doAnswer(invocation -> { | ||
((ActionListener<Releasable>)invocation.getArguments()[0]).onResponse(() -> {}); | ||
return null; | ||
}).when(shard).acquirePrimaryOperationPermit(any(), anyString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why was this needed? was there some bug hiding in this test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test was doing something weird: It created this IndexShard mock, but never used it in the test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still LGTM
@@ -204,6 +204,9 @@ private boolean invariant() { | |||
// there is at least one in-sync shard copy when the global checkpoint tracker operates in primary mode (i.e. the shard itself) | |||
assert !primaryMode || localCheckpoints.values().stream().anyMatch(lcps -> lcps.inSync); | |||
|
|||
// the routing table and replication group is set when the global checkpoint tracker operates in primary mode | |||
assert !primaryMode || (routingTable != null && replicationGroup != null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
message?
Thanks @bleskes @jasontedor |
Primary terms were introduced as part of the sequence-number effort (#10708) and added in ES 5.0. Subsequent work introduced the replication tracker which lets the primary own its replication group (#25692) to coordinate recovery and replication. The replication tracker explicitly exposes whether it is operating in primary mode or replica mode, independent of the ShardRouting object that's associated with a shard. During a primary relocation, for example, the primary mode is transferred between the primary relocation source and the primary relocation target. After transferring this so-called primary context, the old primary becomes a replication target and the new primary the replication source, reflected in the replication tracker on both nodes. With the most recent PR in this area (#32442), we finally have a clean transition between a shard that's operating as a primary and issuing sequence numbers and a shard that's serving as a replication target. The transition from one state to the other is enforced through the operation-permit system, where we block permit acquisition during such changes and perform the transition under this operation block, ensuring that there are no operations in progress while the transition is being performed. This finally allows us to turn the best-effort checks that were put in place to prevent shards from being used in the wrong way (i.e. primary as replica, or replica as primary) into hard assertions, making it easier to catch any bugs in this area.
Primary terms were introduced as part of the sequence-number effort (#10708) and added in ES 5.0. Subsequent work introduced the replication tracker which lets the primary own its replication group (#25692) to coordinate recovery and replication. The replication tracker explicitly exposes whether it is operating in primary mode or replica mode, independent of the ShardRouting object that's associated with a shard. During a primary relocation, for example, the primary mode is transferred between the primary relocation source and the primary relocation target. After transferring this so-called primary context, the old primary becomes a replication target and the new primary the replication source, reflected in the replication tracker on both nodes. With the most recent PR in this area (#32442), we finally have a clean transition between a shard that's operating as a primary and issuing sequence numbers and a shard that's serving as a replication target. The transition from one state to the other is enforced through the operation-permit system, where we block permit acquisition during such changes and perform the transition under this operation block, ensuring that there are no operations in progress while the transition is being performed. This finally allows us to turn the best-effort checks that were put in place to prevent shards from being used in the wrong way (i.e. primary as replica, or replica as primary) into hard assertions, making it easier to catch any bugs in this area.
Currently replication and recovery are both coordinated through the latest cluster state available on the
ClusterService
as well as through theGlobalCheckpointTracker
(to have consistent local/global checkpoint information), making it difficult to understand the relation between recovery and replication, and requiring some tricky checks in the recovery code to coordinate between the two. This PR makes the primary the single owner of its replication group, which simplifies the replication model and allows to clean up corner cases we have in our recovery code. It also reduces the dependencies in the code, so that neitherRecoverySourceXXX
norReplicationOperation
need access to the latest state onClusterService
anymore. Finally, it gives us the property that in-sync shard copies won't receive global checkpoint updates which are above their local checkpoint (relates #25485).