From de07dac82eb4aa63121404765887982219a69731 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Mon, 8 Aug 2022 11:55:51 -0700 Subject: [PATCH] Address review comments & integration test Signed-off-by: Suraj Singh --- .../gateway/BaseGatewayShardAllocator.java | 13 ++++----- .../opensearch/gateway/GatewayAllocator.java | 28 +++++------------- .../gateway/PrimaryShardAllocator.java | 29 +++++++++---------- .../gateway/ReplicaShardAllocator.java | 4 +-- ...ransportNodesListGatewayStartedShards.java | 4 ++- .../gateway/PrimaryShardAllocatorTests.java | 16 ++-------- .../gateway/ReplicaShardAllocatorTests.java | 4 +-- .../test/gateway/TestGatewayAllocator.java | 18 +++--------- 8 files changed, 36 insertions(+), 80 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java b/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java index dc767fe3a2554..59ef894958cbe 100644 --- a/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java @@ -43,7 +43,6 @@ import org.opensearch.cluster.routing.allocation.NodeAllocationResult; import org.opensearch.cluster.routing.allocation.RoutingAllocation; import org.opensearch.cluster.routing.allocation.decider.Decision; -import org.opensearch.common.settings.Settings; import java.util.ArrayList; import java.util.List; @@ -63,7 +62,7 @@ public abstract class BaseGatewayShardAllocator { /** * Allocate an unassigned shard to nodes (if any) where valid copies of the shard already exist. - * It is up to the individual implementations of {@link #makeAllocationDecision(ShardRouting, RoutingAllocation, Logger, Settings)} + * It is up to the individual implementations of {@link #makeAllocationDecision(ShardRouting, RoutingAllocation, Logger)} * to make decisions on assigning shards to nodes. * @param shardRouting the shard to allocate * @param allocation the allocation state container object @@ -72,10 +71,9 @@ public abstract class BaseGatewayShardAllocator { public void allocateUnassigned( ShardRouting shardRouting, RoutingAllocation allocation, - ExistingShardsAllocator.UnassignedAllocationHandler unassignedAllocationHandler, - Settings settings + ExistingShardsAllocator.UnassignedAllocationHandler unassignedAllocationHandler ) { - final AllocateUnassignedDecision allocateUnassignedDecision = makeAllocationDecision(shardRouting, allocation, logger, settings); + final AllocateUnassignedDecision allocateUnassignedDecision = makeAllocationDecision(shardRouting, allocation, logger); if (allocateUnassignedDecision.isDecisionTaken() == false) { // no decision was taken by this allocator @@ -108,7 +106,7 @@ protected long getExpectedShardSize(ShardRouting shardRouting, RoutingAllocation /** * Make a decision on the allocation of an unassigned shard. This method is used by - * {@link #allocateUnassigned(ShardRouting, RoutingAllocation, ExistingShardsAllocator.UnassignedAllocationHandler, Settings)} to make decisions + * {@link #allocateUnassigned(ShardRouting, RoutingAllocation, ExistingShardsAllocator.UnassignedAllocationHandler)} to make decisions * about whether or not the shard can be allocated by this allocator and if so, to which node it will be allocated. * * @param unassignedShard the unassigned shard to allocate @@ -119,8 +117,7 @@ protected long getExpectedShardSize(ShardRouting shardRouting, RoutingAllocation public abstract AllocateUnassignedDecision makeAllocationDecision( ShardRouting unassignedShard, RoutingAllocation allocation, - Logger logger, - Settings settings + Logger logger ); /** diff --git a/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java b/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java index e15c23f26da10..cdcf813d9ede0 100644 --- a/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java @@ -51,7 +51,6 @@ import org.opensearch.common.Priority; import org.opensearch.common.inject.Inject; import org.opensearch.common.lease.Releasables; -import org.opensearch.common.settings.Settings; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.common.util.set.Sets; import org.opensearch.index.shard.ShardId; @@ -80,8 +79,6 @@ public class GatewayAllocator implements ExistingShardsAllocator { private final PrimaryShardAllocator primaryShardAllocator; private final ReplicaShardAllocator replicaShardAllocator; - protected final Settings settings; - private final ConcurrentMap< ShardId, AsyncShardFetch> asyncFetchStarted = ConcurrentCollections @@ -94,13 +91,11 @@ public class GatewayAllocator implements ExistingShardsAllocator { public GatewayAllocator( RerouteService rerouteService, TransportNodesListGatewayStartedShards startedAction, - TransportNodesListShardStoreMetadata storeAction, - Settings settings + TransportNodesListShardStoreMetadata storeAction ) { this.rerouteService = rerouteService; this.primaryShardAllocator = new InternalPrimaryShardAllocator(startedAction); this.replicaShardAllocator = new InternalReplicaShardAllocator(storeAction); - this.settings = settings; } @Override @@ -116,7 +111,6 @@ protected GatewayAllocator() { this.rerouteService = null; this.primaryShardAllocator = null; this.replicaShardAllocator = null; - this.settings = null; } @Override @@ -171,14 +165,7 @@ public void allocateUnassigned( ) { assert primaryShardAllocator != null; assert replicaShardAllocator != null; - innerAllocatedUnassigned( - allocation, - primaryShardAllocator, - replicaShardAllocator, - shardRouting, - unassignedAllocationHandler, - this.settings - ); + innerAllocatedUnassigned(allocation, primaryShardAllocator, replicaShardAllocator, shardRouting, unassignedAllocationHandler); } // allow for testing infra to change shard allocators implementation @@ -187,14 +174,13 @@ protected static void innerAllocatedUnassigned( PrimaryShardAllocator primaryShardAllocator, ReplicaShardAllocator replicaShardAllocator, ShardRouting shardRouting, - ExistingShardsAllocator.UnassignedAllocationHandler unassignedAllocationHandler, - Settings settings + ExistingShardsAllocator.UnassignedAllocationHandler unassignedAllocationHandler ) { assert shardRouting.unassigned(); if (shardRouting.primary()) { - primaryShardAllocator.allocateUnassigned(shardRouting, allocation, unassignedAllocationHandler, settings); + primaryShardAllocator.allocateUnassigned(shardRouting, allocation, unassignedAllocationHandler); } else { - replicaShardAllocator.allocateUnassigned(shardRouting, allocation, unassignedAllocationHandler, settings); + replicaShardAllocator.allocateUnassigned(shardRouting, allocation, unassignedAllocationHandler); } } @@ -204,10 +190,10 @@ public AllocateUnassignedDecision explainUnassignedShardAllocation(ShardRouting assert routingAllocation.debugDecision(); if (unassignedShard.primary()) { assert primaryShardAllocator != null; - return primaryShardAllocator.makeAllocationDecision(unassignedShard, routingAllocation, logger, settings); + return primaryShardAllocator.makeAllocationDecision(unassignedShard, routingAllocation, logger); } else { assert replicaShardAllocator != null; - return replicaShardAllocator.makeAllocationDecision(unassignedShard, routingAllocation, logger, settings); + return replicaShardAllocator.makeAllocationDecision(unassignedShard, routingAllocation, logger); } } diff --git a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java index 5cf50807d935f..f6b20b7dfb581 100644 --- a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java @@ -48,11 +48,9 @@ import org.opensearch.cluster.routing.allocation.RoutingAllocation; import org.opensearch.cluster.routing.allocation.decider.Decision; import org.opensearch.cluster.routing.allocation.decider.Decision.Type; -import org.opensearch.common.settings.Settings; import org.opensearch.env.ShardLockObtainFailedException; import org.opensearch.gateway.AsyncShardFetch.FetchResult; import org.opensearch.gateway.TransportNodesListGatewayStartedShards.NodeGatewayStartedShards; -import org.opensearch.index.IndexSettings; import java.util.ArrayList; import java.util.Collection; @@ -95,8 +93,7 @@ private static boolean isResponsibleFor(final ShardRouting shard) { public AllocateUnassignedDecision makeAllocationDecision( final ShardRouting unassignedShard, final RoutingAllocation allocation, - final Logger logger, - final Settings settings + final Logger logger ) { if (isResponsibleFor(unassignedShard) == false) { // this allocator is not responsible for allocating this shard @@ -127,7 +124,6 @@ public AllocateUnassignedDecision makeAllocationDecision( // don't create a new IndexSetting object for every shard as this could cause a lot of garbage // on cluster restart if we allocate a boat load of shards final IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(unassignedShard.index()); - final IndexSettings indexSettings = settings != null ? new IndexSettings(indexMetadata, settings) : null; final Set inSyncAllocationIds = indexMetadata.inSyncAllocationIds(unassignedShard.id()); final boolean snapshotRestore = unassignedShard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT; @@ -139,7 +135,6 @@ public AllocateUnassignedDecision makeAllocationDecision( allocation.getIgnoreNodes(unassignedShard.shardId()), inSyncAllocationIds, shardState, - indexSettings, logger ); final boolean enoughAllocationsFound = nodeShardsResult.orderedAllocationCandidates.size() > 0; @@ -318,7 +313,7 @@ private static ShardStoreInfo shardStoreInfo(NodeGatewayStartedShards nodeShardS NodeGatewayStartedShards::primary ).reversed(); - private static final Comparator HIGHEST_REPLICATION_FIRST_CHECKPOINT_COMPARATOR = Comparator.nullsLast( + private static final Comparator HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR = Comparator.nullsLast( Comparator.comparing(NodeGatewayStartedShards::replicationCheckpoint) ); @@ -333,7 +328,6 @@ protected static NodeShardsResult buildNodeShardsResult( Set ignoreNodes, Set inSyncAllocationIds, FetchResult shardState, - IndexSettings indexSettings, Logger logger ) { List nodeShardStates = new ArrayList<>(); @@ -391,21 +385,24 @@ protected static NodeShardsResult buildNodeShardsResult( } } - Comparator comparator; // allocation preference + /** + * Orders the active shards copies based on below comparators + * 1. No store exception + * 2. Shard copies previously primary shard + * 3. Shard copies with highest replication checkpoint. This comparator is NO-OP for doc rep enabled indices. + */ + final Comparator comparator; // allocation preference if (matchAnyShard) { // prefer shards with matching allocation ids Comparator matchingAllocationsFirst = Comparator.comparing( (NodeGatewayStartedShards state) -> inSyncAllocationIds.contains(state.allocationId()) ).reversed(); comparator = matchingAllocationsFirst.thenComparing(NO_STORE_EXCEPTION_FIRST_COMPARATOR) - .thenComparing(PRIMARY_FIRST_COMPARATOR); + .thenComparing(PRIMARY_FIRST_COMPARATOR) + .thenComparing(HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR); } else { - comparator = NO_STORE_EXCEPTION_FIRST_COMPARATOR.thenComparing(PRIMARY_FIRST_COMPARATOR); - } - - // If index has segrep enabled, then use replication checkpoint info to order the replicas - if (indexSettings != null && indexSettings.isSegRepEnabled()) { - comparator = comparator.thenComparing(HIGHEST_REPLICATION_FIRST_CHECKPOINT_COMPARATOR); + comparator = NO_STORE_EXCEPTION_FIRST_COMPARATOR.thenComparing(PRIMARY_FIRST_COMPARATOR) + .thenComparing(HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR); } nodeShardStates.sort(comparator); diff --git a/server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java b/server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java index bad85f7cfb57b..c0b831b6fe4d0 100644 --- a/server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java @@ -49,7 +49,6 @@ import org.opensearch.cluster.routing.allocation.decider.Decision; import org.opensearch.common.Nullable; import org.opensearch.common.collect.Tuple; -import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.common.unit.TimeValue; import org.opensearch.index.store.StoreFileMetadata; @@ -181,8 +180,7 @@ private static boolean isResponsibleFor(final ShardRouting shard) { public AllocateUnassignedDecision makeAllocationDecision( final ShardRouting unassignedShard, final RoutingAllocation allocation, - final Logger logger, - final Settings settings + final Logger logger ) { if (isResponsibleFor(unassignedShard) == false) { // this allocator is not responsible for deciding on this shard diff --git a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java index 3047e27b7a037..953b4def9d653 100644 --- a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java +++ b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java @@ -473,7 +473,9 @@ public String toString() { if (storeException != null) { buf.append(",storeException=").append(storeException); } - buf.append(",ReplicationCheckpoint=").append(replicationCheckpoint.toString()); + if (replicationCheckpoint != null) { + buf.append(",ReplicationCheckpoint=").append(replicationCheckpoint.toString()); + } buf.append("]"); return buf.toString(); } diff --git a/server/src/test/java/org/opensearch/gateway/PrimaryShardAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/PrimaryShardAllocatorTests.java index d1ed1ec6404c5..9982bb0d71f4f 100644 --- a/server/src/test/java/org/opensearch/gateway/PrimaryShardAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/PrimaryShardAllocatorTests.java @@ -63,7 +63,6 @@ import org.opensearch.env.ShardLockObtainFailedException; import org.opensearch.index.shard.ShardId; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; -import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.repositories.IndexId; import org.opensearch.snapshots.Snapshot; import org.opensearch.snapshots.SnapshotId; @@ -98,7 +97,7 @@ public void buildTestAllocator() { private void allocateAllUnassigned(final RoutingAllocation allocation) { final RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator(); while (iterator.hasNext()) { - testAllocator.allocateUnassigned(iterator.next(), allocation, iterator, testAllocator.settings); + testAllocator.allocateUnassigned(iterator.next(), allocation, iterator); } } @@ -208,7 +207,7 @@ public void testShardLockObtainFailedException() { } /** - * Tests that replica with highest primary ter version will be selected as target + * Tests that replica with the highest primary term version will be selected as target */ public void testPreferReplicaWithHighestPrimaryTerm() { String allocId1 = randomAlphaOfLength(10); @@ -221,7 +220,6 @@ public void testPreferReplicaWithHighestPrimaryTerm() { allocId2, allocId3 ); - this.testAllocator.enableSegmentReplication(); testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 20, 10, 101, 1)); testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 22, 10, 120, 2)); testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 20, 10, 120, 2)); @@ -255,7 +253,6 @@ public void testPreferReplicaWithNullReplicationCheckpoint() { allocId2, allocId3 ); - this.testAllocator.enableSegmentReplication(); testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 20, 10, 101, 1)); testAllocator.addData(node2, allocId2, false); testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 40, 10, 120, 2)); @@ -289,7 +286,6 @@ public void testPreferReplicaWithHighestSegmentInfoVersion() { allocId2, allocId3 ); - this.testAllocator.enableSegmentReplication(); testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 10, 10, 101, 1)); testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 10, 120, 3)); testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 20, 10, 120, 2)); @@ -322,7 +318,6 @@ public void testOutOfSyncHighestRepCheckpointIsIgnored() { allocId1, allocId3 ); - this.testAllocator.enableSegmentReplication(); testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 10, 10, 101, 1)); testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 10, 120, 2)); testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 15, 10, 120, 2)); @@ -356,7 +351,6 @@ public void testPreferAllocatingPreviousPrimaryWithLowerRepCheckpoint() { allocId2, allocId3 ); - this.testAllocator.enableSegmentReplication(); testAllocator.addData(node1, allocId1, true, new ReplicationCheckpoint(shardId, 10, 10, 101, 1)); testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 10, 120, 2)); testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 15, 10, 120, 2)); @@ -767,8 +761,6 @@ class TestAllocator extends PrimaryShardAllocator { private Map data; - private Settings settings = Settings.EMPTY; - public TestAllocator clear() { data = null; return this; @@ -821,9 +813,5 @@ protected AsyncShardFetch.FetchResult(shardId, data, Collections.emptySet()); } - - public void enableSegmentReplication() { - this.settings = Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build(); - } } } diff --git a/server/src/test/java/org/opensearch/gateway/ReplicaShardAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/ReplicaShardAllocatorTests.java index fc8d57acdb302..36ac93524d6aa 100644 --- a/server/src/test/java/org/opensearch/gateway/ReplicaShardAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/ReplicaShardAllocatorTests.java @@ -96,8 +96,6 @@ public class ReplicaShardAllocatorTests extends OpenSearchAllocationTestCase { private TestAllocator testAllocator; - private final Settings settings = Settings.EMPTY; - @Before public void buildTestAllocator() { this.testAllocator = new TestAllocator(); @@ -106,7 +104,7 @@ public void buildTestAllocator() { private void allocateAllUnassigned(final RoutingAllocation allocation) { final RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator(); while (iterator.hasNext()) { - testAllocator.allocateUnassigned(iterator.next(), allocation, iterator, settings); + testAllocator.allocateUnassigned(iterator.next(), allocation, iterator); } } diff --git a/test/framework/src/main/java/org/opensearch/test/gateway/TestGatewayAllocator.java b/test/framework/src/main/java/org/opensearch/test/gateway/TestGatewayAllocator.java index cdd1301060ad7..a36dc26685eb4 100644 --- a/test/framework/src/main/java/org/opensearch/test/gateway/TestGatewayAllocator.java +++ b/test/framework/src/main/java/org/opensearch/test/gateway/TestGatewayAllocator.java @@ -71,9 +71,9 @@ public class TestGatewayAllocator extends GatewayAllocator { Map> knownAllocations = new HashMap<>(); - - Map shardIdNodeToReplicationCheckPointMap = new HashMap<>(); DiscoveryNodes currentNodes = DiscoveryNodes.EMPTY_NODES; + Map shardIdNodeToReplicationCheckPointMap = new HashMap<>(); + PrimaryShardAllocator primaryShardAllocator = new PrimaryShardAllocator() { @Override protected AsyncShardFetch.FetchResult fetchData(ShardRouting shard, RoutingAllocation allocation) { @@ -103,10 +103,7 @@ protected AsyncShardFetch.FetchResult fetchData(ShardR }; private ReplicationCheckpoint getReplicationCheckpoint(ShardId shardId, String nodeName) { - return shardIdNodeToReplicationCheckPointMap.getOrDefault( - getReplicationCheckPointKey(shardId, nodeName), - ReplicationCheckpoint.empty(shardId) - ); + return shardIdNodeToReplicationCheckPointMap.getOrDefault(getReplicationCheckPointKey(shardId, nodeName), null); } ReplicaShardAllocator replicaShardAllocator = new ReplicaShardAllocator() { @@ -157,14 +154,7 @@ public void allocateUnassigned( UnassignedAllocationHandler unassignedAllocationHandler ) { currentNodes = allocation.nodes(); - innerAllocatedUnassigned( - allocation, - primaryShardAllocator, - replicaShardAllocator, - shardRouting, - unassignedAllocationHandler, - this.settings - ); + innerAllocatedUnassigned(allocation, primaryShardAllocator, replicaShardAllocator, shardRouting, unassignedAllocationHandler); } /**