From 11c9417b39ef765360f0adaa36040017ec465604 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Wed, 17 Aug 2022 17:25:19 -0700 Subject: [PATCH] [Segment Replication] Update PrimaryShardAllocator to prefer replicas with higher replication checkpoint (#4041) * [Segment Replication] Update PrimaryShardAllocator to prefer replicas having higher replication checkpoint Signed-off-by: Suraj Singh * Use empty replication checkpoint to avoid NPE Signed-off-by: Suraj Singh * Update NodeGatewayStartedShards to optionally wire in/out ReplicationCheckpoint field Signed-off-by: Suraj Singh * Use default replication checkpoint causing EOF errors on empty checkpoint * Add indexSettings to GatewayAllocator to allow ReplicationCheckpoint comparator only for segrep enabled indices * Add unit tests for primary term first replica promotion & comparator fix * Fix NPE on empty IndexMetadata * Remove settings from AllocationService and directly inject in GatewayAllocator * Add more unit tests and minor code clean up Signed-off-by: Suraj Singh * Address review comments & integration test Signed-off-by: Suraj Singh * Fix comparator on null ReplicationCheckpoint Signed-off-by: Suraj Singh Signed-off-by: Suraj Singh --- .../gateway/PrimaryShardAllocator.java | 17 +- ...ransportNodesListGatewayStartedShards.java | 58 ++++- .../checkpoint/ReplicationCheckpoint.java | 7 +- .../gateway/PrimaryShardAllocatorTests.java | 229 +++++++++++++++++- .../test/gateway/TestGatewayAllocator.java | 17 +- 5 files changed, 314 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java index b9cfebaa98521..4dc9396751fc9 100644 --- a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java @@ -313,6 +313,11 @@ private static ShardStoreInfo shardStoreInfo(NodeGatewayStartedShards nodeShardS NodeGatewayStartedShards::primary ).reversed(); + private static final Comparator HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR = Comparator.comparing( + NodeGatewayStartedShards::replicationCheckpoint, + Comparator.nullsLast(Comparator.naturalOrder()) + ); + /** * Builds a list of nodes. If matchAnyShard is set to false, only nodes that have an allocation id matching * inSyncAllocationIds are added to the list. Otherwise, any node that has a shard is added to the list, but @@ -381,6 +386,12 @@ protected static NodeShardsResult buildNodeShardsResult( } } + /** + * Orders the active shards copies based on below comparators + * 1. No store exception i.e. shard copy is readable + * 2. Prefer previous primary shard + * 3. Prefer shard copy with the highest replication checkpoint. It is NO-OP for doc rep enabled indices. + */ final Comparator comparator; // allocation preference if (matchAnyShard) { // prefer shards with matching allocation ids @@ -388,9 +399,11 @@ protected static NodeShardsResult buildNodeShardsResult( (NodeGatewayStartedShards state) -> inSyncAllocationIds.contains(state.allocationId()) ).reversed(); comparator = matchingAllocationsFirst.thenComparing(NO_STORE_EXCEPTION_FIRST_COMPARATOR) - .thenComparing(PRIMARY_FIRST_COMPARATOR); + .thenComparing(PRIMARY_FIRST_COMPARATOR) + .thenComparing(HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR); } else { - comparator = NO_STORE_EXCEPTION_FIRST_COMPARATOR.thenComparing(PRIMARY_FIRST_COMPARATOR); + comparator = NO_STORE_EXCEPTION_FIRST_COMPARATOR.thenComparing(PRIMARY_FIRST_COMPARATOR) + .thenComparing(HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR); } nodeShardStates.sort(comparator); diff --git a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java index 78b4fa287ef59..c43f539243d7a 100644 --- a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java +++ b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java @@ -35,6 +35,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.LegacyESVersion; import org.opensearch.OpenSearchException; +import org.opensearch.Version; import org.opensearch.action.ActionListener; import org.opensearch.action.ActionType; import org.opensearch.action.FailedNodeException; @@ -56,11 +57,13 @@ import org.opensearch.common.xcontent.NamedXContentRegistry; import org.opensearch.env.NodeEnvironment; import org.opensearch.index.IndexSettings; +import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; import org.opensearch.index.shard.ShardPath; import org.opensearch.index.shard.ShardStateMetadata; import org.opensearch.index.store.Store; import org.opensearch.indices.IndicesService; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; @@ -195,6 +198,7 @@ protected NodeGatewayStartedShards nodeOperation(NodeRequest request) { clusterService.localNode(), allocationId, shardStateMetadata.primary, + null, exception ); } @@ -202,10 +206,16 @@ protected NodeGatewayStartedShards nodeOperation(NodeRequest request) { logger.debug("{} shard state info found: [{}]", shardId, shardStateMetadata); String allocationId = shardStateMetadata.allocationId != null ? shardStateMetadata.allocationId.getId() : null; - return new NodeGatewayStartedShards(clusterService.localNode(), allocationId, shardStateMetadata.primary); + final IndexShard shard = indicesService.getShardOrNull(shardId); + return new NodeGatewayStartedShards( + clusterService.localNode(), + allocationId, + shardStateMetadata.primary, + shard != null ? shard.getLatestReplicationCheckpoint() : null + ); } logger.trace("{} no local shard info found", shardId); - return new NodeGatewayStartedShards(clusterService.localNode(), null, false); + return new NodeGatewayStartedShards(clusterService.localNode(), null, false, null); } catch (Exception e) { throw new OpenSearchException("failed to load started shards", e); } @@ -349,10 +359,10 @@ public String getCustomDataPath() { * @opensearch.internal */ public static class NodeGatewayStartedShards extends BaseNodeResponse { - private final String allocationId; private final boolean primary; private final Exception storeException; + private final ReplicationCheckpoint replicationCheckpoint; public NodeGatewayStartedShards(StreamInput in) throws IOException { super(in); @@ -363,16 +373,33 @@ public NodeGatewayStartedShards(StreamInput in) throws IOException { } else { storeException = null; } + if (in.getVersion().onOrAfter(Version.V_2_3_0) && in.readBoolean()) { + replicationCheckpoint = new ReplicationCheckpoint(in); + } else { + replicationCheckpoint = null; + } } - public NodeGatewayStartedShards(DiscoveryNode node, String allocationId, boolean primary) { - this(node, allocationId, primary, null); + public NodeGatewayStartedShards( + DiscoveryNode node, + String allocationId, + boolean primary, + ReplicationCheckpoint replicationCheckpoint + ) { + this(node, allocationId, primary, replicationCheckpoint, null); } - public NodeGatewayStartedShards(DiscoveryNode node, String allocationId, boolean primary, Exception storeException) { + public NodeGatewayStartedShards( + DiscoveryNode node, + String allocationId, + boolean primary, + ReplicationCheckpoint replicationCheckpoint, + Exception storeException + ) { super(node); this.allocationId = allocationId; this.primary = primary; + this.replicationCheckpoint = replicationCheckpoint; this.storeException = storeException; } @@ -384,6 +411,10 @@ public boolean primary() { return this.primary; } + public ReplicationCheckpoint replicationCheckpoint() { + return this.replicationCheckpoint; + } + public Exception storeException() { return this.storeException; } @@ -399,6 +430,14 @@ public void writeTo(StreamOutput out) throws IOException { } else { out.writeBoolean(false); } + if (out.getVersion().onOrAfter(Version.V_2_3_0)) { + if (replicationCheckpoint != null) { + out.writeBoolean(true); + replicationCheckpoint.writeTo(out); + } else { + out.writeBoolean(false); + } + } } @Override @@ -414,7 +453,8 @@ public boolean equals(Object o) { return primary == that.primary && Objects.equals(allocationId, that.allocationId) - && Objects.equals(storeException, that.storeException); + && Objects.equals(storeException, that.storeException) + && Objects.equals(replicationCheckpoint, that.replicationCheckpoint); } @Override @@ -422,6 +462,7 @@ public int hashCode() { int result = (allocationId != null ? allocationId.hashCode() : 0); result = 31 * result + (primary ? 1 : 0); result = 31 * result + (storeException != null ? storeException.hashCode() : 0); + result = 31 * result + (replicationCheckpoint != null ? replicationCheckpoint.hashCode() : 0); return result; } @@ -432,6 +473,9 @@ public String toString() { if (storeException != null) { buf.append(",storeException=").append(storeException); } + if (replicationCheckpoint != null) { + buf.append(",ReplicationCheckpoint=").append(replicationCheckpoint.toString()); + } buf.append("]"); return buf.toString(); } diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java index 8afb5bd055636..6a4e5e449f178 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java @@ -23,7 +23,7 @@ * * @opensearch.internal */ -public class ReplicationCheckpoint implements Writeable { +public class ReplicationCheckpoint implements Writeable, Comparable { private final ShardId shardId; private final long primaryTerm; @@ -107,6 +107,11 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(segmentInfosVersion); } + @Override + public int compareTo(ReplicationCheckpoint other) { + return this.isAheadOf(other) ? -1 : 1; + } + @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/server/src/test/java/org/opensearch/gateway/PrimaryShardAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/PrimaryShardAllocatorTests.java index 4a1ecb9661687..3c39ec9f03b2a 100644 --- a/server/src/test/java/org/opensearch/gateway/PrimaryShardAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/PrimaryShardAllocatorTests.java @@ -62,6 +62,7 @@ import org.opensearch.common.util.set.Sets; import org.opensearch.env.ShardLockObtainFailedException; import org.opensearch.index.shard.ShardId; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.repositories.IndexId; import org.opensearch.snapshots.Snapshot; import org.opensearch.snapshots.SnapshotId; @@ -205,6 +206,203 @@ public void testShardLockObtainFailedException() { assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); } + /** + * Tests that replica with the highest primary term version will be selected as target + */ + public void testPreferReplicaWithHighestPrimaryTerm() { + String allocId1 = randomAlphaOfLength(10); + String allocId2 = randomAlphaOfLength(10); + String allocId3 = randomAlphaOfLength(10); + final RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas( + yesAllocationDeciders(), + CLUSTER_RECOVERED, + allocId1, + allocId2, + allocId3 + ); + testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 20, 10, 101, 1)); + testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 22, 10, 120, 2)); + testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 20, 10, 120, 2)); + allocateAllUnassigned(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(true)); + assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); + assertThat( + allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), + equalTo(node2.getId()) + ); + // Assert node2's allocation id is used + assertThat( + allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(), + equalTo(allocId2) + ); + assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); + } + + /** + * Tests that replica with highest primary ter version will be selected as target + */ + public void testPreferReplicaWithNullReplicationCheckpoint() { + String allocId1 = randomAlphaOfLength(10); + String allocId2 = randomAlphaOfLength(10); + String allocId3 = randomAlphaOfLength(10); + final RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas( + yesAllocationDeciders(), + CLUSTER_RECOVERED, + allocId1, + allocId2, + allocId3 + ); + testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 20, 10, 101, 1)); + testAllocator.addData(node2, allocId2, false); + testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 40, 10, 120, 2)); + allocateAllUnassigned(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(true)); + assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); + assertThat( + allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), + equalTo(node3.getId()) + ); + // Assert node3's allocation id should be used as it has highest replication checkpoint + assertThat( + allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(), + equalTo(allocId3) + ); + assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); + } + + /** + * Tests that null ReplicationCheckpoint are ignored + */ + public void testPreferReplicaWithAllNullReplicationCheckpoint() { + String allocId1 = randomAlphaOfLength(10); + String allocId2 = randomAlphaOfLength(10); + String allocId3 = randomAlphaOfLength(10); + final RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas( + yesAllocationDeciders(), + CLUSTER_RECOVERED, + allocId1, + allocId2, + allocId3 + ); + testAllocator.addData(node1, allocId1, false, null, null); + testAllocator.addData(node2, allocId2, false, null, null); + testAllocator.addData(node3, allocId3, true, null, null); + allocateAllUnassigned(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(true)); + assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); + assertThat( + allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), + equalTo(node3.getId()) + ); + // Assert node3's allocation id should be used as it was previous primary + assertThat( + allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(), + equalTo(allocId3) + ); + assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); + } + + /** + * Tests that replica with highest segment info version will be selected as target on equal primary terms + */ + public void testPreferReplicaWithHighestSegmentInfoVersion() { + String allocId1 = randomAlphaOfLength(10); + String allocId2 = randomAlphaOfLength(10); + String allocId3 = randomAlphaOfLength(10); + final RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas( + yesAllocationDeciders(), + CLUSTER_RECOVERED, + allocId1, + allocId2, + allocId3 + ); + testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 10, 10, 101, 1)); + testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 10, 120, 3)); + testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 20, 10, 120, 2)); + allocateAllUnassigned(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(true)); + assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); + assertThat( + allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), + equalTo(node2.getId()) + ); + // Assert node2's allocation id is used + assertThat( + allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(), + equalTo(allocId2) + ); + assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); + } + + /** + * Tests that prefer allocation of replica at lower checkpoint but in sync set + */ + public void testOutOfSyncHighestRepCheckpointIsIgnored() { + String allocId1 = randomAlphaOfLength(10); + String allocId2 = randomAlphaOfLength(10); + String allocId3 = randomAlphaOfLength(10); + final RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas( + yesAllocationDeciders(), + CLUSTER_RECOVERED, + allocId1, + allocId3 + ); + testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 10, 10, 101, 1)); + testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 10, 120, 2)); + testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 15, 10, 120, 2)); + allocateAllUnassigned(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(true)); + assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); + assertThat( + allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), + equalTo(node3.getId()) + ); + // Assert node3's allocation id is used + assertThat( + allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(), + equalTo(allocId3) + ); + assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); + } + + /** + * Tests that prefer allocation of older primary over replica with higher replication checkpoint + */ + public void testPreferAllocatingPreviousPrimaryWithLowerRepCheckpoint() { + String allocId1 = randomAlphaOfLength(10); + String allocId2 = randomAlphaOfLength(10); + String allocId3 = randomAlphaOfLength(10); + final RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas( + yesAllocationDeciders(), + CLUSTER_RECOVERED, + allocId1, + allocId2, + allocId3 + ); + testAllocator.addData(node1, allocId1, true, new ReplicationCheckpoint(shardId, 10, 10, 101, 1)); + testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 10, 120, 2)); + testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 15, 10, 120, 2)); + allocateAllUnassigned(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(true)); + assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); + assertThat( + allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), + equalTo(node1.getId()) + ); + // Assert node1's allocation id is used with highest replication checkpoint + assertThat( + allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(), + equalTo(allocId1) + ); + assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); + } + /** * Tests that when one node returns a ShardLockObtainFailedException and another properly loads the store, it will * select the second node as target @@ -219,7 +417,7 @@ public void testShardLockObtainFailedExceptionPreferOtherValidCopies() { allocId2 ); testAllocator.addData(node1, allocId1, randomBoolean(), new ShardLockObtainFailedException(shardId, "test")); - testAllocator.addData(node2, allocId2, randomBoolean(), null); + testAllocator.addData(node2, allocId2, randomBoolean()); allocateAllUnassigned(allocation); assertThat(allocation.routingNodesChanged(), equalTo(true)); assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); @@ -601,17 +799,42 @@ public TestAllocator clear() { return this; } + public TestAllocator addData( + DiscoveryNode node, + String allocationId, + boolean primary, + ReplicationCheckpoint replicationCheckpoint + ) { + return addData(node, allocationId, primary, replicationCheckpoint, null); + } + public TestAllocator addData(DiscoveryNode node, String allocationId, boolean primary) { - return addData(node, allocationId, primary, null); + return addData(node, allocationId, primary, ReplicationCheckpoint.empty(shardId), null); } public TestAllocator addData(DiscoveryNode node, String allocationId, boolean primary, @Nullable Exception storeException) { + return addData(node, allocationId, primary, ReplicationCheckpoint.empty(shardId), storeException); + } + + public TestAllocator addData( + DiscoveryNode node, + String allocationId, + boolean primary, + ReplicationCheckpoint replicationCheckpoint, + @Nullable Exception storeException + ) { if (data == null) { data = new HashMap<>(); } data.put( node, - new TransportNodesListGatewayStartedShards.NodeGatewayStartedShards(node, allocationId, primary, storeException) + new TransportNodesListGatewayStartedShards.NodeGatewayStartedShards( + node, + allocationId, + primary, + replicationCheckpoint, + storeException + ) ); return this; } diff --git a/test/framework/src/main/java/org/opensearch/test/gateway/TestGatewayAllocator.java b/test/framework/src/main/java/org/opensearch/test/gateway/TestGatewayAllocator.java index 54c92f4d519aa..a36dc26685eb4 100644 --- a/test/framework/src/main/java/org/opensearch/test/gateway/TestGatewayAllocator.java +++ b/test/framework/src/main/java/org/opensearch/test/gateway/TestGatewayAllocator.java @@ -43,6 +43,7 @@ import org.opensearch.gateway.ReplicaShardAllocator; import org.opensearch.gateway.TransportNodesListGatewayStartedShards.NodeGatewayStartedShards; import org.opensearch.index.shard.ShardId; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.store.TransportNodesListShardStoreMetadata.NodeStoreFilesMetadata; import java.util.Collections; @@ -71,6 +72,7 @@ public class TestGatewayAllocator extends GatewayAllocator { Map> knownAllocations = new HashMap<>(); DiscoveryNodes currentNodes = DiscoveryNodes.EMPTY_NODES; + Map shardIdNodeToReplicationCheckPointMap = new HashMap<>(); PrimaryShardAllocator primaryShardAllocator = new PrimaryShardAllocator() { @Override @@ -90,7 +92,8 @@ protected AsyncShardFetch.FetchResult fetchData(ShardR routing -> new NodeGatewayStartedShards( currentNodes.get(routing.currentNodeId()), routing.allocationId().getId(), - routing.primary() + routing.primary(), + getReplicationCheckpoint(shardId, routing.currentNodeId()) ) ) ); @@ -99,6 +102,10 @@ protected AsyncShardFetch.FetchResult fetchData(ShardR } }; + private ReplicationCheckpoint getReplicationCheckpoint(ShardId shardId, String nodeName) { + return shardIdNodeToReplicationCheckPointMap.getOrDefault(getReplicationCheckPointKey(shardId, nodeName), null); + } + ReplicaShardAllocator replicaShardAllocator = new ReplicaShardAllocator() { @Override protected AsyncShardFetch.FetchResult fetchData(ShardRouting shard, RoutingAllocation allocation) { @@ -156,4 +163,12 @@ public void allocateUnassigned( public void addKnownAllocation(ShardRouting shard) { knownAllocations.computeIfAbsent(shard.currentNodeId(), id -> new HashMap<>()).put(shard.shardId(), shard); } + + public String getReplicationCheckPointKey(ShardId shardId, String nodeName) { + return shardId.toString() + "_" + nodeName; + } + + public void addReplicationCheckpoint(ShardId shardId, String nodeName, ReplicationCheckpoint replicationCheckpoint) { + shardIdNodeToReplicationCheckPointMap.putIfAbsent(getReplicationCheckPointKey(shardId, nodeName), replicationCheckpoint); + } }