Skip to content

Commit

Permalink
Remove PRE_60_NODE_CHECKPOINT
Browse files Browse the repository at this point in the history
This commit removes the obsolete `PRE_60_NODE_CHECKPOINT` constant for dealing
with 5.x nodes' lack of sequence number support.

Backport of elastic#42527
  • Loading branch information
DaveCTurner committed May 24, 2019
1 parent 075fd2a commit 59d8e27
Show file tree
Hide file tree
Showing 15 changed files with 78 additions and 156 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -981,28 +981,19 @@ public ReplicaResponse(long localCheckpoint, long globalCheckpoint) {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
localCheckpoint = in.readZLong();
} else {
// 5.x used to read empty responses, which don't really read anything off the stream, so just do nothing.
localCheckpoint = SequenceNumbers.PRE_60_NODE_CHECKPOINT;
}
if (in.getVersion().onOrAfter(Version.V_6_0_0_rc1)) {
globalCheckpoint = in.readZLong();
} else {
globalCheckpoint = SequenceNumbers.PRE_60_NODE_CHECKPOINT;
}
assert in.getVersion().onOrAfter(Version.V_6_0_0_alpha1);
assert in.getVersion().onOrAfter(Version.V_6_0_0_rc1);
localCheckpoint = in.readZLong();
globalCheckpoint = in.readZLong();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
out.writeZLong(localCheckpoint);
}
if (out.getVersion().onOrAfter(Version.V_6_0_0_rc1)) {
out.writeZLong(globalCheckpoint);
}
assert out.getVersion().onOrAfter(Version.V_6_0_0_alpha1);
assert out.getVersion().onOrAfter(Version.V_6_0_0_rc1);
out.writeZLong(localCheckpoint);
out.writeZLong(globalCheckpoint);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -545,9 +545,7 @@ private boolean invariant() {
"checkpoints map should always have an entry for the current shard";

// local checkpoints only set during primary mode
assert primaryMode || checkpoints.values().stream()
.allMatch(lcps -> lcps.localCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO ||
lcps.localCheckpoint == SequenceNumbers.PRE_60_NODE_CHECKPOINT);
assert primaryMode || checkpoints.values().stream().allMatch(lcps -> lcps.localCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO);

// global checkpoints for other shards only set during primary mode
assert primaryMode
Expand All @@ -556,9 +554,7 @@ private boolean invariant() {
.stream()
.filter(e -> e.getKey().equals(shardAllocationId) == false)
.map(Map.Entry::getValue)
.allMatch(cps ->
(cps.globalCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO
|| cps.globalCheckpoint == SequenceNumbers.PRE_60_NODE_CHECKPOINT));
.allMatch(cps -> cps.globalCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO);

// relocation handoff can only occur in primary mode
assert !handoffInProgress || primaryMode;
Expand Down Expand Up @@ -637,7 +633,7 @@ private static long inSyncCheckpointStates(
.stream()
.filter(cps -> cps.inSync)
.mapToLong(function)
.filter(v -> v != SequenceNumbers.PRE_60_NODE_CHECKPOINT && v != SequenceNumbers.UNASSIGNED_SEQ_NO));
.filter(v -> v != SequenceNumbers.UNASSIGNED_SEQ_NO));
return value.isPresent() ? value.getAsLong() : SequenceNumbers.UNASSIGNED_SEQ_NO;
}

Expand Down Expand Up @@ -784,14 +780,12 @@ public synchronized void activatePrimaryMode(final long localCheckpoint) {

/**
* Notifies the tracker of the current allocation IDs in the cluster state.
*
* @param applyingClusterStateVersion the cluster state version being applied when updating the allocation IDs from the master
* @param inSyncAllocationIds the allocation IDs of the currently in-sync shard copies
* @param routingTable the shard routing table
* @param pre60AllocationIds the allocation IDs of shards that are allocated to pre-6.0 nodes
*/
public synchronized void updateFromMaster(final long applyingClusterStateVersion, final Set<String> inSyncAllocationIds,
final IndexShardRoutingTable routingTable, final Set<String> pre60AllocationIds) {
final IndexShardRoutingTable routingTable) {
assert invariant();
if (applyingClusterStateVersion > appliedClusterStateVersion) {
// check that the master does not fabricate new in-sync entries out of thin air once we are in primary mode
Expand All @@ -812,8 +806,7 @@ public synchronized void updateFromMaster(final long applyingClusterStateVersion
final boolean inSync = inSyncAllocationIds.contains(initializingId);
assert inSync == false : "update from master in primary mode has " + initializingId +
" as in-sync but it does not exist locally";
final long localCheckpoint = pre60AllocationIds.contains(initializingId) ?
SequenceNumbers.PRE_60_NODE_CHECKPOINT : SequenceNumbers.UNASSIGNED_SEQ_NO;
final long localCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
final long globalCheckpoint = localCheckpoint;
checkpoints.put(initializingId, new CheckpointState(localCheckpoint, globalCheckpoint, inSync, inSync));
}
Expand All @@ -824,8 +817,7 @@ public synchronized void updateFromMaster(final long applyingClusterStateVersion
} else {
for (String initializingId : initializingAllocationIds) {
if (shardAllocationId.equals(initializingId) == false) {
final long localCheckpoint = pre60AllocationIds.contains(initializingId) ?
SequenceNumbers.PRE_60_NODE_CHECKPOINT : SequenceNumbers.UNASSIGNED_SEQ_NO;
final long localCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
final long globalCheckpoint = localCheckpoint;
checkpoints.put(initializingId, new CheckpointState(localCheckpoint, globalCheckpoint, false, false));
}
Expand All @@ -837,8 +829,7 @@ public synchronized void updateFromMaster(final long applyingClusterStateVersion
checkpointState.inSync = true;
checkpointState.tracked = true;
} else {
final long localCheckpoint = pre60AllocationIds.contains(inSyncId) ?
SequenceNumbers.PRE_60_NODE_CHECKPOINT : SequenceNumbers.UNASSIGNED_SEQ_NO;
final long localCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
final long globalCheckpoint = localCheckpoint;
checkpoints.put(inSyncId, new CheckpointState(localCheckpoint, globalCheckpoint, true, true));
}
Expand Down Expand Up @@ -926,13 +917,9 @@ public synchronized void markAllocationIdAsInSync(final String allocationId, fin
}

private boolean updateLocalCheckpoint(String allocationId, CheckpointState cps, long localCheckpoint) {
// a local checkpoint of PRE_60_NODE_CHECKPOINT cannot be overridden
assert cps.localCheckpoint != SequenceNumbers.PRE_60_NODE_CHECKPOINT ||
localCheckpoint == SequenceNumbers.PRE_60_NODE_CHECKPOINT :
"pre-6.0 shard copy " + allocationId + " unexpected to send valid local checkpoint " + localCheckpoint;
// a local checkpoint for a shard copy should be a valid sequence number or the pre-6.0 sequence number indicator
assert localCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO :
"invalid local checkpoint for shard copy [" + allocationId + "]";
// a local checkpoint for a shard copy should be a valid sequence number
assert localCheckpoint >= SequenceNumbers.NO_OPS_PERFORMED :
"invalid local checkpoint [" + localCheckpoint + "] for shard copy [" + allocationId + "]";
if (localCheckpoint > cps.localCheckpoint) {
logger.trace("updated local checkpoint of [{}] from [{}] to [{}]", allocationId, cps.localCheckpoint, localCheckpoint);
cps.localCheckpoint = localCheckpoint;
Expand Down Expand Up @@ -991,8 +978,6 @@ private static long computeGlobalCheckpoint(final Set<String> pendingInSync, fin
if (cps.localCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO) {
// unassigned in-sync replica
return fallback;
} else if (cps.localCheckpoint == SequenceNumbers.PRE_60_NODE_CHECKPOINT) {
// 5.x replica, ignore for global checkpoint calculation
} else {
minLocalCheckpoint = Math.min(cps.localCheckpoint, minLocalCheckpoint);
}
Expand Down Expand Up @@ -1064,18 +1049,11 @@ public synchronized void completeRelocationHandoff() {
handoffInProgress = false;
relocated = true;
// forget all checkpoint information except for global checkpoint of current shard
checkpoints.entrySet().stream().forEach(e -> {
final CheckpointState cps = e.getValue();
if (cps.localCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO &&
cps.localCheckpoint != SequenceNumbers.PRE_60_NODE_CHECKPOINT) {
cps.localCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
if (e.getKey().equals(shardAllocationId) == false) {
checkpoints.forEach((key, cps) -> {
cps.localCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
if (key.equals(shardAllocationId) == false) {
// don't throw global checkpoint information of current shard away
if (cps.globalCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO &&
cps.globalCheckpoint != SequenceNumbers.PRE_60_NODE_CHECKPOINT) {
cps.globalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
cps.globalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
});
assert invariant();
Expand Down Expand Up @@ -1112,17 +1090,13 @@ private Runnable getMasterUpdateOperationFromCurrentState() {
assert primaryMode == false;
final long lastAppliedClusterStateVersion = appliedClusterStateVersion;
final Set<String> inSyncAllocationIds = new HashSet<>();
final Set<String> pre60AllocationIds = new HashSet<>();
checkpoints.entrySet().forEach(entry -> {
if (entry.getValue().inSync) {
inSyncAllocationIds.add(entry.getKey());
}
if (entry.getValue().getLocalCheckpoint() == SequenceNumbers.PRE_60_NODE_CHECKPOINT) {
pre60AllocationIds.add(entry.getKey());
}
});
final IndexShardRoutingTable lastAppliedRoutingTable = routingTable;
return () -> updateFromMaster(lastAppliedClusterStateVersion, inSyncAllocationIds, lastAppliedRoutingTable, pre60AllocationIds);
return () -> updateFromMaster(lastAppliedClusterStateVersion, inSyncAllocationIds, lastAppliedRoutingTable);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@ public class SequenceNumbers {

public static final String LOCAL_CHECKPOINT_KEY = "local_checkpoint";
public static final String MAX_SEQ_NO = "max_seq_no";
/**
* Represents a checkpoint coming from a pre-6.0 node
*/
public static final long PRE_60_NODE_CHECKPOINT = -3L;
/**
* Represents an unassigned sequence number (e.g., can be used on primary operations before they are executed).
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,8 +433,7 @@ public void updateShardState(final ShardRouting newRouting,
final BiConsumer<IndexShard, ActionListener<ResyncTask>> primaryReplicaSyncer,
final long applyingClusterStateVersion,
final Set<String> inSyncAllocationIds,
final IndexShardRoutingTable routingTable,
final Set<String> pre60AllocationIds) throws IOException {
final IndexShardRoutingTable routingTable) throws IOException {
final ShardRouting currentRouting;
synchronized (mutex) {
currentRouting = this.shardRouting;
Expand All @@ -453,7 +452,7 @@ public void updateShardState(final ShardRouting newRouting,
}

if (newRouting.primary()) {
replicationTracker.updateFromMaster(applyingClusterStateVersion, inSyncAllocationIds, routingTable, pre60AllocationIds);
replicationTracker.updateFromMaster(applyingClusterStateVersion, inSyncAllocationIds, routingTable);
}

if (state == IndexShardState.POST_RECOVERY && newRouting.active()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.LockObtainFailedException;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.cluster.ClusterChangedEvent;
Expand All @@ -35,7 +34,6 @@
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RecoverySource.Type;
import org.elasticsearch.cluster.routing.RoutingNode;
Expand Down Expand Up @@ -94,8 +92,6 @@
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason.CLOSED;
import static org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason.DELETED;
Expand Down Expand Up @@ -630,21 +626,8 @@ private void updateShard(DiscoveryNodes nodes, ShardRouting shardRouting, Shard
primaryTerm = indexMetaData.primaryTerm(shard.shardId().id());
final Set<String> inSyncIds = indexMetaData.inSyncAllocationIds(shard.shardId().id());
final IndexShardRoutingTable indexShardRoutingTable = routingTable.shardRoutingTable(shardRouting.shardId());
final Set<String> pre60AllocationIds = indexShardRoutingTable.assignedShards()
.stream()
.flatMap(shr -> {
if (shr.relocating()) {
return Stream.of(shr, shr.getTargetRelocatingShard());
} else {
return Stream.of(shr);
}
})
.filter(shr -> nodes.get(shr.currentNodeId()).getVersion().before(Version.V_6_0_0_alpha1))
.map(ShardRouting::allocationId)
.map(AllocationId::getId)
.collect(Collectors.toSet());
shard.updateShardState(shardRouting, primaryTerm, primaryReplicaSyncer::resync, clusterState.version(),
inSyncIds, indexShardRoutingTable, pre60AllocationIds);
inSyncIds, indexShardRoutingTable);
} catch (Exception e) {
failAndRemoveShard(shardRouting, true, "failed updating shard routing entry", e, clusterState);
return;
Expand Down Expand Up @@ -810,7 +793,7 @@ public interface Shard {
* - Updates and persists the new routing value.
* - Updates the primary term if this shard is a primary.
* - Updates the allocation ids that are tracked by the shard if it is a primary.
* See {@link ReplicationTracker#updateFromMaster(long, Set, IndexShardRoutingTable, Set)} for details.
* See {@link ReplicationTracker#updateFromMaster(long, Set, IndexShardRoutingTable)} for details.
*
* @param shardRouting the new routing entry
* @param primaryTerm the new primary term
Expand All @@ -826,8 +809,7 @@ void updateShardState(ShardRouting shardRouting,
BiConsumer<IndexShard, ActionListener<ResyncTask>> primaryReplicaSyncer,
long applyingClusterStateVersion,
Set<String> inSyncAllocationIds,
IndexShardRoutingTable routingTable,
Set<String> pre60AllocationIds) throws IOException;
IndexShardRoutingTable routingTable) throws IOException;
}

public interface AllocatedIndex<T extends Shard> extends Iterable<T>, IndexComponent {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2279,7 +2279,7 @@ public void testSeqNoAndCheckpoints() throws IOException {
ReplicationTracker gcpTracker = (ReplicationTracker) initialEngine.config().getGlobalCheckpointSupplier();
gcpTracker.updateFromMaster(1L, new HashSet<>(Arrays.asList(primary.allocationId().getId(),
replica.allocationId().getId())),
new IndexShardRoutingTable.Builder(shardId).addShard(primary).addShard(replica).build(), Collections.emptySet());
new IndexShardRoutingTable.Builder(shardId).addShard(primary).addShard(replica).build());
gcpTracker.activatePrimaryMode(primarySeqNo);
for (int op = 0; op < opCount; op++) {
final String id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public void testNoopAfterRegularEngine() throws IOException {
ShardRouting routing = TestShardRouting.newShardRouting("test", shardId.id(), "node",
null, true, ShardRoutingState.STARTED, allocationId);
IndexShardRoutingTable table = new IndexShardRoutingTable.Builder(shardId).addShard(routing).build();
tracker.updateFromMaster(1L, Collections.singleton(allocationId.getId()), table, Collections.emptySet());
tracker.updateFromMaster(1L, Collections.singleton(allocationId.getId()), table);
tracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
for (int i = 0; i < docs; i++) {
ParsedDocument doc = testParsedDocument("" + i, null, testDocumentWithTextField(), B_1, null);
Expand Down
Loading

0 comments on commit 59d8e27

Please sign in to comment.