diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 02da12904a6e7..0d8082da00cd0 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -522,7 +522,6 @@ public void testCancellation() throws Exception { assertDocCounts(docCount, primaryNode); } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception { final String primaryNode = internalCluster().startNode(featureFlagSettings()); createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()); diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 5a5356282681e..60c52e9583821 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -74,6 +74,7 @@ import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.cluster.routing.RecoverySource.SnapshotRecoverySource; import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.common.Booleans; import org.opensearch.common.CheckedConsumer; import org.opensearch.common.CheckedFunction; @@ -1454,22 +1455,53 @@ public ReplicationCheckpoint getLatestReplicationCheckpoint() { } /** - * Checks if checkpoint should be processed - * - * @param requestCheckpoint received checkpoint that is checked for processing - * @return true if checkpoint should be processed + * Checks if this target shard should start a round of segment replication. + * @return - True if the shard is able to perform segment replication. */ - public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint) { - if (state().equals(IndexShardState.STARTED) == false) { - logger.trace(() -> new ParameterizedMessage("Ignoring new replication checkpoint - shard is not started {}", state())); + public boolean isSegmentReplicationAllowed() { + if (indexSettings.isSegRepEnabled() == false) { + logger.warn("Attempting to perform segment replication when it is not enabled on the index"); return false; } if (getReplicationTracker().isPrimaryMode()) { - logger.warn("Ignoring new replication checkpoint - shard is in primaryMode and cannot receive any checkpoints."); + logger.warn("Shard is in primary mode and cannot perform segment replication as a replica."); return false; } if (this.routingEntry().primary()) { - logger.warn("Ignoring new replication checkpoint - primary shard cannot receive any checkpoints."); + logger.warn("Shard is marked as primary and cannot perform segment replication as a replica"); + return false; + } + if (state().equals(IndexShardState.STARTED) == false + && (state() == IndexShardState.POST_RECOVERY && shardRouting.state() == ShardRoutingState.INITIALIZING) == false) { + logger.warn( + () -> new ParameterizedMessage( + "Shard is not started or recovering {} {} and cannot perform segment replication as a replica", + state(), + shardRouting.state() + ) + ); + return false; + } + if (getReplicationEngine().isEmpty()) { + logger.warn( + () -> new ParameterizedMessage( + "Shard does not have the correct engine type to perform segment replication {}.", + getEngine().getClass() + ) + ); + return false; + } + return true; + } + + /** + * Checks if checkpoint should be processed + * + * @param requestCheckpoint received checkpoint that is checked for processing + * @return true if checkpoint should be processed + */ + public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint) { + if (isSegmentReplicationAllowed() == false) { return false; } ReplicationCheckpoint localCheckpoint = getLatestReplicationCheckpoint(); diff --git a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java index 83f4e0c7cbed9..e8adcbdc1c89a 100644 --- a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java @@ -47,7 +47,6 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.ShardRouting; -import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.RoutingNode; @@ -811,11 +810,7 @@ private void forceSegmentReplication( StepListener forceSegRepListener ) { IndexShard indexShard = (IndexShard) indexService.getShardOrNull(shardRouting.id()); - if (indexShard != null - && indexShard.indexSettings().isSegRepEnabled() - && shardRouting.primary() == false - && shardRouting.state() == ShardRoutingState.INITIALIZING - && indexShard.state() == IndexShardState.POST_RECOVERY) { + if (indexShard != null && indexShard.isSegmentReplicationAllowed()) { segmentReplicationTargetService.startReplication( ReplicationCheckpoint.empty(shardRouting.shardId()), indexShard, diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index c46f97b5ec785..44771faf36871 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -27,6 +27,7 @@ import org.opensearch.index.IndexSettings; import org.opensearch.index.engine.DocIdSeqNoAndSource; import org.opensearch.index.engine.InternalEngine; +import org.opensearch.index.engine.InternalEngineFactory; import org.opensearch.index.engine.NRTReplicationEngine; import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.mapper.MapperService; @@ -97,6 +98,12 @@ public void testReplicationCheckpointNotNullForSegRep() throws IOException { closeShards(indexShard); } + public void testIsSegmentReplicationAllowed_WrongEngineType() throws IOException { + final IndexShard indexShard = newShard(false, settings, new InternalEngineFactory()); + assertFalse(indexShard.isSegmentReplicationAllowed()); + closeShards(indexShard); + } + public void testSegmentReplication_Index_Update_Delete() throws Exception { String mappings = "{ \"" + MapperService.SINGLE_MAPPING_NAME + "\": { \"properties\": { \"foo\": { \"type\": \"keyword\"} }}}"; try (ReplicationGroup shards = createGroup(2, settings, mappings, new NRTReplicationEngineFactory())) {