Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove PRE_60_NODE_CHECKPOINT #42527

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -534,9 +534,7 @@ private boolean invariant() {
"checkpoints map should always have an entry for the current shard";

// local checkpoints only set during primary mode
assert primaryMode || checkpoints.values().stream()
.allMatch(lcps -> lcps.localCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO ||
lcps.localCheckpoint == SequenceNumbers.PRE_60_NODE_CHECKPOINT);
assert primaryMode || checkpoints.values().stream().allMatch(lcps -> lcps.localCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO);

// global checkpoints for other shards only set during primary mode
assert primaryMode
Expand All @@ -545,9 +543,7 @@ private boolean invariant() {
.stream()
.filter(e -> e.getKey().equals(shardAllocationId) == false)
.map(Map.Entry::getValue)
.allMatch(cps ->
(cps.globalCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO
|| cps.globalCheckpoint == SequenceNumbers.PRE_60_NODE_CHECKPOINT));
.allMatch(cps -> cps.globalCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO);

// relocation handoff can only occur in primary mode
assert !handoffInProgress || primaryMode;
Expand Down Expand Up @@ -626,7 +622,7 @@ private static long inSyncCheckpointStates(
.stream()
.filter(cps -> cps.inSync)
.mapToLong(function)
.filter(v -> v != SequenceNumbers.PRE_60_NODE_CHECKPOINT && v != SequenceNumbers.UNASSIGNED_SEQ_NO));
.filter(v -> v != SequenceNumbers.UNASSIGNED_SEQ_NO));
return value.isPresent() ? value.getAsLong() : SequenceNumbers.UNASSIGNED_SEQ_NO;
}

Expand Down Expand Up @@ -911,13 +907,9 @@ public synchronized void markAllocationIdAsInSync(final String allocationId, fin
}

private boolean updateLocalCheckpoint(String allocationId, CheckpointState cps, long localCheckpoint) {
// a local checkpoint of PRE_60_NODE_CHECKPOINT cannot be overridden
assert cps.localCheckpoint != SequenceNumbers.PRE_60_NODE_CHECKPOINT ||
localCheckpoint == SequenceNumbers.PRE_60_NODE_CHECKPOINT :
"pre-6.0 shard copy " + allocationId + " unexpected to send valid local checkpoint " + localCheckpoint;
// a local checkpoint for a shard copy should be a valid sequence number or the pre-6.0 sequence number indicator
assert localCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO :
"invalid local checkpoint for shard copy [" + allocationId + "]";
// a local checkpoint for a shard copy should be a valid sequence number
assert localCheckpoint >= SequenceNumbers.NO_OPS_PERFORMED :
"invalid local checkpoint [" + localCheckpoint + "] for shard copy [" + allocationId + "]";
if (localCheckpoint > cps.localCheckpoint) {
logger.trace("updated local checkpoint of [{}] from [{}] to [{}]", allocationId, cps.localCheckpoint, localCheckpoint);
cps.localCheckpoint = localCheckpoint;
Expand Down Expand Up @@ -976,8 +968,6 @@ private static long computeGlobalCheckpoint(final Set<String> pendingInSync, fin
if (cps.localCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO) {
// unassigned in-sync replica
return fallback;
} else if (cps.localCheckpoint == SequenceNumbers.PRE_60_NODE_CHECKPOINT) {
// 5.x replica, ignore for global checkpoint calculation
} else {
minLocalCheckpoint = Math.min(cps.localCheckpoint, minLocalCheckpoint);
}
Expand Down Expand Up @@ -1049,18 +1039,11 @@ public synchronized void completeRelocationHandoff() {
handoffInProgress = false;
relocated = true;
// forget all checkpoint information except for global checkpoint of current shard
checkpoints.entrySet().stream().forEach(e -> {
final CheckpointState cps = e.getValue();
if (cps.localCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO &&
cps.localCheckpoint != SequenceNumbers.PRE_60_NODE_CHECKPOINT) {
cps.localCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
if (e.getKey().equals(shardAllocationId) == false) {
checkpoints.forEach((key, cps) -> {
cps.localCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
if (key.equals(shardAllocationId) == false) {
// don't throw global checkpoint information of current shard away
if (cps.globalCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO &&
cps.globalCheckpoint != SequenceNumbers.PRE_60_NODE_CHECKPOINT) {
cps.globalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
cps.globalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
});
assert invariant();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@ public class SequenceNumbers {

public static final String LOCAL_CHECKPOINT_KEY = "local_checkpoint";
public static final String MAX_SEQ_NO = "max_seq_no";
/**
* Represents a checkpoint coming from a pre-6.0 node
*/
public static final long PRE_60_NODE_CHECKPOINT = -3L;
/**
* Represents an unassigned sequence number (e.g., can be used on primary operations before they are executed).
*/
Expand Down