diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java new file mode 100644 index 0000000000000..2c91eadafbee8 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -0,0 +1,306 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import com.carrotsearch.randomizedtesting.RandomizedTest; +import org.apache.lucene.index.SegmentInfos; +import org.junit.BeforeClass; +import org.opensearch.action.admin.indices.segments.IndexShardSegments; +import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse; +import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest; +import org.opensearch.action.admin.indices.segments.ShardSegments; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.index.Index; +import org.opensearch.index.IndexModule; +import org.opensearch.index.IndexService; +import org.opensearch.index.engine.Segment; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.indices.IndicesService; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.test.BackgroundIndexer; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class SegmentReplicationIT extends OpenSearchIntegTestCase { + + private static final String INDEX_NAME = "test-idx-1"; + private static final int SHARD_COUNT = 1; + private static final int REPLICA_COUNT = 1; + + @BeforeClass + public static void assumeFeatureFlag() { + assumeTrue("Segment replication Feature flag is enabled", Boolean.parseBoolean(System.getProperty(FeatureFlags.REPLICATION_TYPE))); + } + + @Override + public Settings indexSettings() { + return Settings.builder() + .put(super.indexSettings()) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT) + .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build(); + } + + @Override + protected boolean addMockInternalEngine() { + return false; + } + + public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { + final String nodeA = internalCluster().startNode(); + final String nodeB = internalCluster().startNode(); + createIndex(INDEX_NAME); + ensureGreen(INDEX_NAME); + + final int initialDocCount = scaledRandomIntBetween(0, 200); + try ( + BackgroundIndexer indexer = new BackgroundIndexer( + INDEX_NAME, + "_doc", + client(), + -1, + RandomizedTest.scaledRandomIntBetween(2, 5), + false, + random() + ) + ) { + indexer.start(initialDocCount); + waitForDocs(initialDocCount, indexer); + refresh(INDEX_NAME); + waitForReplicaUpdate(); + + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + + final int additionalDocCount = scaledRandomIntBetween(0, 200); + final int expectedHitCount = initialDocCount + additionalDocCount; + indexer.start(additionalDocCount); + waitForDocs(expectedHitCount, indexer); + + flushAndRefresh(INDEX_NAME); + waitForReplicaUpdate(); + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + + ensureGreen(INDEX_NAME); + assertSegmentStats(REPLICA_COUNT); + } + } + + public void testReplicationAfterForceMerge() throws Exception { + final String nodeA = internalCluster().startNode(); + final String nodeB = internalCluster().startNode(); + createIndex(INDEX_NAME); + ensureGreen(INDEX_NAME); + + final int initialDocCount = scaledRandomIntBetween(0, 200); + final int additionalDocCount = scaledRandomIntBetween(0, 200); + final int expectedHitCount = initialDocCount + additionalDocCount; + try ( + BackgroundIndexer indexer = new BackgroundIndexer( + INDEX_NAME, + "_doc", + client(), + -1, + RandomizedTest.scaledRandomIntBetween(2, 5), + false, + random() + ) + ) { + indexer.start(initialDocCount); + waitForDocs(initialDocCount, indexer); + + flush(INDEX_NAME); + waitForReplicaUpdate(); + // wait a short amount of time to give replication a chance to complete. + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + + // Index a second set of docs so we can merge into one segment. + indexer.start(additionalDocCount); + waitForDocs(expectedHitCount, indexer); + + // Force a merge here so that the in memory SegmentInfos does not reference old segments on disk. + client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get(); + refresh(INDEX_NAME); + waitForReplicaUpdate(); + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + + ensureGreen(INDEX_NAME); + assertSegmentStats(REPLICA_COUNT); + } + } + + public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception { + final String primaryNode = internalCluster().startNode(); + createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()); + ensureGreen(INDEX_NAME); + + // Index a doc to create the first set of segments. _s1.si + client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").get(); + // Flush segments to disk and create a new commit point (Primary: segments_3, _s1.si) + flushAndRefresh(INDEX_NAME); + assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1); + + // Index to create another segment + client().prepareIndex(INDEX_NAME).setId("2").setSource("foo", "bar").get(); + + // Force a merge here so that the in memory SegmentInfos does not reference old segments on disk. + client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get(); + refresh(INDEX_NAME); + + assertAcked( + client().admin() + .indices() + .prepareUpdateSettings(INDEX_NAME) + .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)) + ); + final String replicaNode = internalCluster().startNode(); + ensureGreen(INDEX_NAME); + + client().prepareIndex(INDEX_NAME).setId("3").setSource("foo", "bar").get(); + + waitForReplicaUpdate(); + assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3); + assertHitCount(client(replicaNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3); + + final Index index = resolveIndex(INDEX_NAME); + IndexShard primaryShard = getIndexShard(index, primaryNode); + IndexShard replicaShard = getIndexShard(index, replicaNode); + assertEquals( + primaryShard.translogStats().estimatedNumberOfOperations(), + replicaShard.translogStats().estimatedNumberOfOperations() + ); + assertSegmentStats(REPLICA_COUNT); + } + + private void assertSegmentStats(int numberOfReplicas) throws IOException { + final IndicesSegmentResponse indicesSegmentResponse = client().admin().indices().segments(new IndicesSegmentsRequest()).actionGet(); + + List segmentsByIndex = getShardSegments(indicesSegmentResponse); + + // There will be an entry in the list for each index. + for (ShardSegments[] replicationGroupSegments : segmentsByIndex) { + + // Separate Primary & replica shards ShardSegments. + final Map> segmentListMap = segmentsByShardType(replicationGroupSegments); + final List primaryShardSegmentsList = segmentListMap.get(true); + final List replicaShardSegments = segmentListMap.get(false); + + assertEquals("There should only be one primary in the replicationGroup", primaryShardSegmentsList.size(), 1); + final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get(); + final Map latestPrimarySegments = getLatestSegments(primaryShardSegments); + + assertEquals( + "There should be a ShardSegment entry for each replica in the replicationGroup", + numberOfReplicas, + replicaShardSegments.size() + ); + + for (ShardSegments shardSegment : replicaShardSegments) { + final Map latestReplicaSegments = getLatestSegments(shardSegment); + for (Segment replicaSegment : latestReplicaSegments.values()) { + final Segment primarySegment = latestPrimarySegments.get(replicaSegment.getName()); + assertEquals(replicaSegment.getGeneration(), primarySegment.getGeneration()); + assertEquals(replicaSegment.getNumDocs(), primarySegment.getNumDocs()); + assertEquals(replicaSegment.getDeletedDocs(), primarySegment.getDeletedDocs()); + assertEquals(replicaSegment.getSize(), primarySegment.getSize()); + } + + // Fetch the IndexShard for this replica and try and build its SegmentInfos from the previous commit point. + // This ensures the previous commit point is not wiped. + final ShardRouting replicaShardRouting = shardSegment.getShardRouting(); + ClusterState state = client(internalCluster().getMasterName()).admin().cluster().prepareState().get().getState(); + final DiscoveryNode replicaNode = state.nodes().resolveNode(replicaShardRouting.currentNodeId()); + final Index index = resolveIndex(INDEX_NAME); + IndexShard indexShard = getIndexShard(index, replicaNode.getName()); + final String lastCommitSegmentsFileName = SegmentInfos.getLastCommitSegmentsFileName(indexShard.store().directory()); + // calls to readCommit will fail if a valid commit point and all its segments are not in the store. + SegmentInfos.readCommit(indexShard.store().directory(), lastCommitSegmentsFileName); + } + } + } + + /** + * Waits until the replica is caught up to the latest primary segments gen. + * @throws Exception + */ + private void waitForReplicaUpdate() throws Exception { + // wait until the replica has the latest segment generation. + assertBusy(() -> { + final IndicesSegmentResponse indicesSegmentResponse = client().admin() + .indices() + .segments(new IndicesSegmentsRequest()) + .actionGet(); + List segmentsByIndex = getShardSegments(indicesSegmentResponse); + for (ShardSegments[] replicationGroupSegments : segmentsByIndex) { + final Map> segmentListMap = segmentsByShardType(replicationGroupSegments); + final List primaryShardSegmentsList = segmentListMap.get(true); + final List replicaShardSegments = segmentListMap.get(false); + + final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get(); + final Map latestPrimarySegments = getLatestSegments(primaryShardSegments); + final Long latestPrimaryGen = latestPrimarySegments.values().stream().findFirst().map(Segment::getGeneration).get(); + for (ShardSegments shardSegments : replicaShardSegments) { + final boolean isReplicaCaughtUpToPrimary = shardSegments.getSegments() + .stream() + .anyMatch(segment -> segment.getGeneration() == latestPrimaryGen); + assertTrue(isReplicaCaughtUpToPrimary); + } + } + }); + } + + private IndexShard getIndexShard(Index index, String node) { + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node); + IndexService indexService = indicesService.indexServiceSafe(index); + final Optional shardId = indexService.shardIds().stream().findFirst(); + return indexService.getShard(shardId.get()); + } + + private List getShardSegments(IndicesSegmentResponse indicesSegmentResponse) { + return indicesSegmentResponse.getIndices() + .values() + .stream() // get list of IndexSegments + .flatMap(is -> is.getShards().values().stream()) // Map to shard replication group + .map(IndexShardSegments::getShards) // get list of segments across replication group + .collect(Collectors.toList()); + } + + private Map getLatestSegments(ShardSegments segments) { + final Long latestPrimaryGen = segments.getSegments().stream().map(Segment::getGeneration).max(Long::compare).get(); + return segments.getSegments() + .stream() + .filter(s -> s.getGeneration() == latestPrimaryGen) + .collect(Collectors.toMap(Segment::getName, Function.identity())); + } + + private Map> segmentsByShardType(ShardSegments[] replicationGroupSegments) { + return Arrays.stream(replicationGroupSegments).collect(Collectors.groupingBy(s -> s.getShardRouting().primary())); + } +} diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 3aae7faf2ee5b..ef95f7f3a39d1 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1382,9 +1382,13 @@ public GatedCloseable acquireSafeIndexCommit() throws EngineExcepti } /** - * Returns the lastest Replication Checkpoint that shard received + * Returns the lastest Replication Checkpoint that shard received. Shards will return an EMPTY checkpoint before + * the engine is opened. */ public ReplicationCheckpoint getLatestReplicationCheckpoint() { + if (getEngineOrNull() == null) { + return ReplicationCheckpoint.empty(shardId); + } try (final GatedCloseable snapshot = getSegmentInfosSnapshot()) { return Optional.ofNullable(snapshot.get()) .map( @@ -1396,15 +1400,7 @@ public ReplicationCheckpoint getLatestReplicationCheckpoint() { segmentInfos.getVersion() ) ) - .orElse( - new ReplicationCheckpoint( - shardId, - getOperationPrimaryTerm(), - SequenceNumbers.NO_OPS_PERFORMED, - getProcessedLocalCheckpoint(), - SequenceNumbers.NO_OPS_PERFORMED - ) - ); + .orElse(ReplicationCheckpoint.empty(shardId)); } catch (IOException ex) { throw new OpenSearchException("Error Closing SegmentInfos Snapshot", ex); } diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index 2309004c0777d..6828ab7d91b2c 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -1003,7 +1003,12 @@ static LoadedMetadata loadMetadata(SegmentInfos segmentInfos, Directory director // version is written since 3.1+: we should have already hit IndexFormatTooOld. throw new IllegalArgumentException("expected valid version value: " + info.info.toString()); } - if (version.onOrAfter(maxVersion)) { + // With segment replication enabled, we compute metadata snapshots from the latest in memory infos. + // In this case we will have SegmentInfos objects fetched from the primary's reader + // where the minSegmentLuceneVersion can be null even though there are segments. + // This is because the SegmentInfos object is not read from a commit/IndexInput, which sets + // minSegmentLuceneVersion. + if (maxVersion == null || version.onOrAfter(maxVersion)) { maxVersion = version; } for (String file : info.files()) { diff --git a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java index a600581384f31..ed66fb448ba95 100644 --- a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java @@ -56,6 +56,7 @@ import org.opensearch.common.inject.Inject; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.concurrent.AbstractRunnable; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.env.ShardLockObtainFailedException; @@ -80,6 +81,7 @@ import org.opensearch.indices.recovery.PeerRecoveryTargetService; import org.opensearch.indices.recovery.RecoveryListener; import org.opensearch.indices.recovery.RecoveryState; +import org.opensearch.indices.replication.SegmentReplicationTargetService; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.common.ReplicationState; import org.opensearch.repositories.RepositoriesService; @@ -90,6 +92,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -134,7 +137,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple private final FailedShardHandler failedShardHandler = new FailedShardHandler(); private final boolean sendRefreshMapping; - private final List buildInIndexListener; + private final List builtInIndexListener; private final PrimaryReplicaSyncer primaryReplicaSyncer; private final Consumer globalCheckpointSyncer; private final RetentionLeaseSyncer retentionLeaseSyncer; @@ -148,6 +151,7 @@ public IndicesClusterStateService( final ClusterService clusterService, final ThreadPool threadPool, final PeerRecoveryTargetService recoveryTargetService, + final SegmentReplicationTargetService segmentReplicationTargetService, final ShardStateAction shardStateAction, final NodeMappingRefreshAction nodeMappingRefreshAction, final RepositoriesService repositoriesService, @@ -165,6 +169,7 @@ public IndicesClusterStateService( clusterService, threadPool, checkpointPublisher, + segmentReplicationTargetService, recoveryTargetService, shardStateAction, nodeMappingRefreshAction, @@ -185,6 +190,7 @@ public IndicesClusterStateService( final ClusterService clusterService, final ThreadPool threadPool, final SegmentReplicationCheckpointPublisher checkpointPublisher, + final SegmentReplicationTargetService segmentReplicationTargetService, final PeerRecoveryTargetService recoveryTargetService, final ShardStateAction shardStateAction, final NodeMappingRefreshAction nodeMappingRefreshAction, @@ -198,7 +204,15 @@ public IndicesClusterStateService( ) { this.settings = settings; this.checkpointPublisher = checkpointPublisher; - this.buildInIndexListener = Arrays.asList(peerRecoverySourceService, recoveryTargetService, searchService, snapshotShardsService); + + final List indexEventListeners = new ArrayList<>( + Arrays.asList(peerRecoverySourceService, recoveryTargetService, searchService, snapshotShardsService) + ); + // if segrep feature flag is not enabled, don't wire the target serivce as an IndexEventListener. + if (FeatureFlags.isEnabled(FeatureFlags.REPLICATION_TYPE)) { + indexEventListeners.add(segmentReplicationTargetService); + } + this.builtInIndexListener = Collections.unmodifiableList(indexEventListeners); this.indicesService = indicesService; this.clusterService = clusterService; this.threadPool = threadPool; @@ -514,7 +528,7 @@ private void createIndices(final ClusterState state) { AllocatedIndex indexService = null; try { - indexService = indicesService.createIndex(indexMetadata, buildInIndexListener, true); + indexService = indicesService.createIndex(indexMetadata, builtInIndexListener, true); if (indexService.updateMapping(null, indexMetadata) && sendRefreshMapping) { nodeMappingRefreshAction.nodeMappingRefresh( state.nodes().getClusterManagerNode(), diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java index 426409f7a5b65..652f3c9a55f53 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java @@ -62,10 +62,13 @@ import org.opensearch.indices.replication.common.ReplicationCollection; import java.io.IOException; +import java.nio.channels.FileChannel; import java.nio.file.Path; import java.util.List; import java.util.concurrent.CountDownLatch; +import static org.opensearch.index.translog.Translog.TRANSLOG_UUID_KEY; + /** * Represents a recovery where the current node is the target node of the recovery. To track recoveries in a central place, instances of * this class are created through {@link ReplicationCollection}. @@ -398,13 +401,29 @@ public void cleanFiles( store.incRef(); try { store.cleanupAndVerify("recovery CleanFilesRequestHandler", sourceMetadata); - final String translogUUID = Translog.createEmptyTranslog( - indexShard.shardPath().resolveTranslog(), - globalCheckpoint, - shardId(), - indexShard.getPendingPrimaryTerm() - ); - store.associateIndexWithNewTranslog(translogUUID); + + // If Segment Replication is enabled, we need to reuse the primary's translog UUID already stored in the index. + // With Segrep, replicas should never create their own commit points. This ensures the index and xlog share the same + // UUID without the extra step to associate the index with a new xlog. + if (indexShard.indexSettings().isSegRepEnabled()) { + final String translogUUID = store.getMetadata().getCommitUserData().get(TRANSLOG_UUID_KEY); + Translog.createEmptyTranslog( + indexShard.shardPath().resolveTranslog(), + shardId(), + globalCheckpoint, + indexShard.getPendingPrimaryTerm(), + translogUUID, + FileChannel::open + ); + } else { + final String translogUUID = Translog.createEmptyTranslog( + indexShard.shardPath().resolveTranslog(), + globalCheckpoint, + shardId(), + indexShard.getPendingPrimaryTerm() + ); + store.associateIndexWithNewTranslog(translogUUID); + } if (indexShard.getRetentionLeases().leases().isEmpty()) { // if empty, may be a fresh IndexShard, so write an empty leases file to disk diff --git a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java index 6302d364fc6d1..a9b032c98b70f 100644 --- a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java +++ b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java @@ -113,7 +113,11 @@ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener { + final String targetAllocationId = request.getTargetAllocationId(); + RunUnderPrimaryPermit.run( + () -> shard.markAllocationIdAsInSync(targetAllocationId, request.getCheckpoint().getSeqNo()), + shard.shardId() + " marking " + targetAllocationId + " as in sync", + shard, + cancellableThreads, + logger + ); try { future.onResponse(new GetSegmentFilesResponse(List.of(storeFileMetadata))); } finally { diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index fb68e59f3b2ef..516cfa91a787b 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -181,11 +181,8 @@ private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener listener) { diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index f9b40d14b0d53..f699f0edba842 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -53,6 +53,27 @@ public class SegmentReplicationTargetService implements IndexEventListener { private final Map latestReceivedCheckpoint = new HashMap<>(); + // Empty Implementation, only required while Segment Replication is under feature flag. + public static final SegmentReplicationTargetService NO_OP = new SegmentReplicationTargetService() { + @Override + public void beforeIndexShardClosed(ShardId shardId, IndexShard indexShard, Settings indexSettings) { + // NoOp; + } + + @Override + public synchronized void onNewCheckpoint(ReplicationCheckpoint receivedCheckpoint, IndexShard replicaShard) { + // noOp; + } + }; + + // Used only for empty implementation. + private SegmentReplicationTargetService() { + threadPool = null; + recoverySettings = null; + onGoingReplications = null; + sourceFactory = null; + } + /** * The internal actions * diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java index f84a65206190b..abcef1bd91944 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java @@ -12,6 +12,7 @@ import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.io.stream.Writeable; +import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.ShardId; import java.io.IOException; @@ -30,6 +31,18 @@ public class ReplicationCheckpoint implements Writeable { private final long seqNo; private final long segmentInfosVersion; + public static ReplicationCheckpoint empty(ShardId shardId) { + return new ReplicationCheckpoint(shardId); + } + + private ReplicationCheckpoint(ShardId shardId) { + this.shardId = shardId; + primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM; + segmentsGen = SequenceNumbers.NO_OPS_PERFORMED; + seqNo = SequenceNumbers.NO_OPS_PERFORMED; + segmentInfosVersion = SequenceNumbers.NO_OPS_PERFORMED; + } + public ReplicationCheckpoint(ShardId shardId, long primaryTerm, long segmentsGen, long seqNo, long segmentInfosVersion) { this.shardId = shardId; this.primaryTerm = primaryTerm; diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java index 27e23ceafb15e..501ff46eeb2ff 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java @@ -49,7 +49,6 @@ public abstract class ReplicationTarget extends AbstractRefCounted { private final long id; protected final AtomicBoolean finished = new AtomicBoolean(); - private final ShardId shardId; protected final IndexShard indexShard; protected final Store store; protected final ReplicationListener listener; @@ -89,7 +88,6 @@ public ReplicationTarget(String name, IndexShard indexShard, ReplicationLuceneIn this.stateIndex = stateIndex; this.indexShard = indexShard; this.store = indexShard.store(); - this.shardId = indexShard.shardId(); // make sure the store is not released until we are done. this.cancellableThreads = new CancellableThreads(); store.incRef(); @@ -131,7 +129,7 @@ public Store store() { } public ShardId shardId() { - return shardId; + return indexShard.shardId(); } /** diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 346bff9afe296..0ac8471be7087 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -961,6 +961,8 @@ protected Node( ); b.bind(SegmentReplicationSourceService.class) .toInstance(new SegmentReplicationSourceService(indicesService, transportService, recoverySettings)); + } else { + b.bind(SegmentReplicationTargetService.class).toInstance(SegmentReplicationTargetService.NO_OP); } } b.bind(HttpServerTransport.class).toInstance(httpServerTransport); diff --git a/server/src/test/java/org/opensearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java b/server/src/test/java/org/opensearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java index 5bac40ab64d11..1f2360abde2ad 100644 --- a/server/src/test/java/org/opensearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java +++ b/server/src/test/java/org/opensearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java @@ -66,6 +66,7 @@ import org.opensearch.index.shard.PrimaryReplicaSyncer; import org.opensearch.index.shard.ShardId; import org.opensearch.indices.recovery.PeerRecoveryTargetService; +import org.opensearch.indices.replication.SegmentReplicationTargetService; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.repositories.RepositoriesService; import org.opensearch.threadpool.TestThreadPool; @@ -570,6 +571,7 @@ private IndicesClusterStateService createIndicesClusterStateService( clusterService, threadPool, SegmentReplicationCheckpointPublisher.EMPTY, + SegmentReplicationTargetService.NO_OP, recoveryTargetService, shardStateAction, null, diff --git a/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java index 5224a54a35e96..3ea74dbf38919 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java @@ -71,6 +71,7 @@ import org.opensearch.index.translog.Translog; import org.opensearch.indices.replication.common.ReplicationListener; import org.opensearch.indices.replication.common.ReplicationState; +import org.opensearch.indices.replication.common.ReplicationType; import java.io.IOException; import java.util.HashMap; @@ -103,6 +104,17 @@ public void testTranslogHistoryTransferred() throws Exception { } } + public void testWithSegmentReplication_ReplicaUsesPrimaryTranslogUUID() throws Exception { + Settings settings = Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build(); + try (ReplicationGroup shards = createGroup(2, settings)) { + shards.startAll(); + final String expectedUUID = getTranslog(shards.getPrimary()).getTranslogUUID(); + assertTrue( + shards.getReplicas().stream().allMatch(indexShard -> getTranslog(indexShard).getTranslogUUID().equals(expectedUUID)) + ); + } + } + public void testRetentionPolicyChangeDuringRecovery() throws Exception { try (ReplicationGroup shards = createGroup(0)) { shards.startPrimary(); diff --git a/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java b/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java index 260f6a13b5010..d42e75871a45a 100644 --- a/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java @@ -37,6 +37,7 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; public class OngoingSegmentReplicationsTests extends IndexShardTestCase { @@ -126,7 +127,7 @@ public void onResponse(GetSegmentFilesResponse getSegmentFilesResponse) { @Override public void onFailure(Exception e) { logger.error("Unexpected failure", e); - Assert.fail(); + Assert.fail("Unexpected failure from startSegmentCopy listener: " + e); } }); } @@ -228,4 +229,47 @@ public void testShardAlreadyReplicatingToNode() throws IOException { replications.prepareForReplication(request, segmentSegmentFileChunkWriter); assertThrows(OpenSearchException.class, () -> { replications.prepareForReplication(request, segmentSegmentFileChunkWriter); }); } + + public void testStartReplicationWithNoFilesToFetch() throws IOException { + // create a replications object and request a checkpoint. + OngoingSegmentReplications replications = spy(new OngoingSegmentReplications(mockIndicesService, recoverySettings)); + final CheckpointInfoRequest request = new CheckpointInfoRequest( + 1L, + replica.routingEntry().allocationId().getId(), + replicaDiscoveryNode, + testCheckpoint + ); + // mock the FileChunkWriter so we can assert its ever called. + final FileChunkWriter segmentSegmentFileChunkWriter = mock(FileChunkWriter.class); + // Prepare for replication step - and ensure copyState is added to cache. + final CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); + assertTrue(replications.isInCopyStateMap(request.getCheckpoint())); + assertEquals(1, replications.size()); + assertEquals(1, copyState.refCount()); + + getSegmentFilesRequest = new GetSegmentFilesRequest( + 1L, + replica.routingEntry().allocationId().getId(), + replicaDiscoveryNode, + Collections.emptyList(), + testCheckpoint + ); + + // invoke startSegmentCopy and assert our fileChunkWriter is never invoked. + replications.startSegmentCopy(getSegmentFilesRequest, new ActionListener<>() { + @Override + public void onResponse(GetSegmentFilesResponse getSegmentFilesResponse) { + assertEquals(Collections.emptyList(), getSegmentFilesResponse.files); + assertEquals(0, copyState.refCount()); + assertFalse(replications.isInCopyStateMap(request.getCheckpoint())); + verifyNoInteractions(segmentSegmentFileChunkWriter); + } + + @Override + public void onFailure(Exception e) { + logger.error("Unexpected failure", e); + Assert.fail(); + } + }); + } } diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index a100e84dcf8ae..e9ef5ba30c865 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -183,6 +183,8 @@ import org.opensearch.indices.recovery.PeerRecoverySourceService; import org.opensearch.indices.recovery.PeerRecoveryTargetService; import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.indices.replication.SegmentReplicationSourceFactory; +import org.opensearch.indices.replication.SegmentReplicationTargetService; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.ingest.IngestService; import org.opensearch.monitor.StatusInfo; @@ -1847,6 +1849,12 @@ public void onFailure(final Exception e) { clusterService, threadPool, new PeerRecoveryTargetService(threadPool, transportService, recoverySettings, clusterService), + new SegmentReplicationTargetService( + threadPool, + recoverySettings, + transportService, + new SegmentReplicationSourceFactory(transportService, recoverySettings, clusterService) + ), shardStateAction, new NodeMappingRefreshAction(transportService, metadataMappingService), repositoriesService,