From c74b9150ac1a46386a7d566faba9bbd86e53313a Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 24 May 2019 15:59:30 +0100 Subject: [PATCH 1/2] Remove PRE_60_NODE_CHECKPOINT This commit removes the obsolete `PRE_60_NODE_CHECKPOINT` constant for dealing with 5.x nodes' lack of sequence number support. --- .../index/seqno/ReplicationTracker.java | 31 +++++-------------- .../index/seqno/SequenceNumbers.java | 4 --- 2 files changed, 7 insertions(+), 28 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index 892056674019f..501db7b80d717 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -534,9 +534,7 @@ private boolean invariant() { "checkpoints map should always have an entry for the current shard"; // local checkpoints only set during primary mode - assert primaryMode || checkpoints.values().stream() - .allMatch(lcps -> lcps.localCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO || - lcps.localCheckpoint == SequenceNumbers.PRE_60_NODE_CHECKPOINT); + assert primaryMode || checkpoints.values().stream().allMatch(lcps -> lcps.localCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO); // global checkpoints for other shards only set during primary mode assert primaryMode @@ -545,9 +543,7 @@ private boolean invariant() { .stream() .filter(e -> e.getKey().equals(shardAllocationId) == false) .map(Map.Entry::getValue) - .allMatch(cps -> - (cps.globalCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO - || cps.globalCheckpoint == SequenceNumbers.PRE_60_NODE_CHECKPOINT)); + .allMatch(cps -> cps.globalCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO); // relocation handoff can only occur in primary mode assert !handoffInProgress || primaryMode; @@ -626,7 +622,7 @@ private static long inSyncCheckpointStates( .stream() .filter(cps -> cps.inSync) .mapToLong(function) - .filter(v -> v != SequenceNumbers.PRE_60_NODE_CHECKPOINT && v != SequenceNumbers.UNASSIGNED_SEQ_NO)); + .filter(v -> v != SequenceNumbers.UNASSIGNED_SEQ_NO)); return value.isPresent() ? value.getAsLong() : SequenceNumbers.UNASSIGNED_SEQ_NO; } @@ -911,10 +907,6 @@ public synchronized void markAllocationIdAsInSync(final String allocationId, fin } private boolean updateLocalCheckpoint(String allocationId, CheckpointState cps, long localCheckpoint) { - // a local checkpoint of PRE_60_NODE_CHECKPOINT cannot be overridden - assert cps.localCheckpoint != SequenceNumbers.PRE_60_NODE_CHECKPOINT || - localCheckpoint == SequenceNumbers.PRE_60_NODE_CHECKPOINT : - "pre-6.0 shard copy " + allocationId + " unexpected to send valid local checkpoint " + localCheckpoint; // a local checkpoint for a shard copy should be a valid sequence number or the pre-6.0 sequence number indicator assert localCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO : "invalid local checkpoint for shard copy [" + allocationId + "]"; @@ -976,8 +968,6 @@ private static long computeGlobalCheckpoint(final Set pendingInSync, fin if (cps.localCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO) { // unassigned in-sync replica return fallback; - } else if (cps.localCheckpoint == SequenceNumbers.PRE_60_NODE_CHECKPOINT) { - // 5.x replica, ignore for global checkpoint calculation } else { minLocalCheckpoint = Math.min(cps.localCheckpoint, minLocalCheckpoint); } @@ -1049,18 +1039,11 @@ public synchronized void completeRelocationHandoff() { handoffInProgress = false; relocated = true; // forget all checkpoint information except for global checkpoint of current shard - checkpoints.entrySet().stream().forEach(e -> { - final CheckpointState cps = e.getValue(); - if (cps.localCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO && - cps.localCheckpoint != SequenceNumbers.PRE_60_NODE_CHECKPOINT) { - cps.localCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO; - } - if (e.getKey().equals(shardAllocationId) == false) { + checkpoints.forEach((key, cps) -> { + cps.localCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO; + if (key.equals(shardAllocationId) == false) { // don't throw global checkpoint information of current shard away - if (cps.globalCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO && - cps.globalCheckpoint != SequenceNumbers.PRE_60_NODE_CHECKPOINT) { - cps.globalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO; - } + cps.globalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO; } }); assert invariant(); diff --git a/server/src/main/java/org/elasticsearch/index/seqno/SequenceNumbers.java b/server/src/main/java/org/elasticsearch/index/seqno/SequenceNumbers.java index 6336e83338f8c..87257a97076da 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/SequenceNumbers.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/SequenceNumbers.java @@ -28,10 +28,6 @@ public class SequenceNumbers { public static final String LOCAL_CHECKPOINT_KEY = "local_checkpoint"; public static final String MAX_SEQ_NO = "max_seq_no"; - /** - * Represents a checkpoint coming from a pre-6.0 node - */ - public static final long PRE_60_NODE_CHECKPOINT = -3L; /** * Represents an unassigned sequence number (e.g., can be used on primary operations before they are executed). */ From 7517b17eb0ab0625efe136ead16d94bde9077db2 Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 24 May 2019 16:27:21 +0100 Subject: [PATCH 2/2] Adjust comment and assertion --- .../org/elasticsearch/index/seqno/ReplicationTracker.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index 501db7b80d717..bda96b4d52d9f 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -907,9 +907,9 @@ public synchronized void markAllocationIdAsInSync(final String allocationId, fin } private boolean updateLocalCheckpoint(String allocationId, CheckpointState cps, long localCheckpoint) { - // a local checkpoint for a shard copy should be a valid sequence number or the pre-6.0 sequence number indicator - assert localCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO : - "invalid local checkpoint for shard copy [" + allocationId + "]"; + // a local checkpoint for a shard copy should be a valid sequence number + assert localCheckpoint >= SequenceNumbers.NO_OPS_PERFORMED : + "invalid local checkpoint [" + localCheckpoint + "] for shard copy [" + allocationId + "]"; if (localCheckpoint > cps.localCheckpoint) { logger.trace("updated local checkpoint of [{}] from [{}] to [{}]", allocationId, cps.localCheckpoint, localCheckpoint); cps.localCheckpoint = localCheckpoint;