From a1e60d35c6cb7f83dd6c18795e3eb8a7bc8d6cb6 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Wed, 31 Aug 2022 13:52:59 -0700 Subject: [PATCH] Segment Replication - Fix NoSuchFileException errors caused when computing metadata snapshot on primary shards. This change fixes the errors that occur when computing metadata snapshots on primary shards from the latest in-memory SegmentInfos. The error occurs when a segments_N file that is referenced by the in-memory infos is deleted as part of a concurrent commit. The segments themselves are incref'd by IndexWriter.incRefDeleter but the commit file (Segments_N) is not. This change resolves this by ignoring the segments_N file when computing metadata for CopyState and only sending incref'd segment files to replicas. Signed-off-by: Marc Handalian --- CHANGELOG.md | 1 + .../replication/SegmentReplicationIT.java | 50 +++++++++++++++- .../org/opensearch/index/store/Store.java | 38 +++++++++--- .../replication/CheckpointInfoResponse.java | 26 +++----- .../SegmentReplicationSourceService.java | 7 +-- .../replication/SegmentReplicationTarget.java | 24 ++------ .../indices/replication/common/CopyState.java | 31 +++------- .../SegmentReplicationIndexShardTests.java | 7 +-- .../opensearch/index/store/StoreTests.java | 59 ++++++++++++++----- .../OngoingSegmentReplicationsTests.java | 28 +++++---- .../SegmentReplicationSourceHandlerTests.java | 8 ++- .../SegmentReplicationSourceServiceTests.java | 2 - .../SegmentReplicationTargetServiceTests.java | 2 +- .../SegmentReplicationTargetTests.java | 18 +++--- .../replication/common/CopyStateTests.java | 10 +--- .../index/shard/IndexShardTestCase.java | 8 +-- 16 files changed, 178 insertions(+), 141 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0930923805d96..251ca174a4e05 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,6 +41,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Fix flaky random test `NRTReplicationEngineTests.testUpdateSegments` ([#4352](https://github.com/opensearch-project/OpenSearch/pull/4352)) - [Segment Replication] Extend FileChunkWriter to allow cancel on transport client ([#4386](https://github.com/opensearch-project/OpenSearch/pull/4386)) - [Segment Replication] Add check to cancel ongoing replication with old primary on onNewCheckpoint on replica ([#4363](https://github.com/opensearch-project/OpenSearch/pull/4363)) +- Fix NoSuchFileExceptions with segment replication when computing primary metadata snapshots ([#4366](https://github.com/opensearch-project/OpenSearch/pull/4366)) ### Security - CVE-2022-25857 org.yaml:snakeyaml DOS vulnerability ([#4341](https://github.com/opensearch-project/OpenSearch/pull/4341)) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 16e9d78b17826..9b2ab753832d3 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -9,7 +9,6 @@ package org.opensearch.indices.replication; import com.carrotsearch.randomizedtesting.RandomizedTest; -import org.apache.lucene.index.SegmentInfos; import org.junit.BeforeClass; import org.opensearch.action.admin.indices.segments.IndexShardSegments; import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse; @@ -586,13 +585,56 @@ private void assertSegmentStats(int numberOfReplicas) throws IOException { ClusterState state = client(internalCluster().getMasterName()).admin().cluster().prepareState().get().getState(); final DiscoveryNode replicaNode = state.nodes().resolveNode(replicaShardRouting.currentNodeId()); IndexShard indexShard = getIndexShard(replicaNode.getName()); - final String lastCommitSegmentsFileName = SegmentInfos.getLastCommitSegmentsFileName(indexShard.store().directory()); // calls to readCommit will fail if a valid commit point and all its segments are not in the store. - SegmentInfos.readCommit(indexShard.store().directory(), lastCommitSegmentsFileName); + indexShard.store().readLastCommittedSegmentsInfo(); } } } + public void testDropPrimaryDuringReplication() throws Exception { + final Settings settings = Settings.builder() + .put(indexSettings()) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 6) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build(); + final String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(); + final String primaryNode = internalCluster().startDataOnlyNode(Settings.EMPTY); + createIndex(INDEX_NAME, settings); + internalCluster().startDataOnlyNodes(6); + ensureGreen(INDEX_NAME); + + int initialDocCount = scaledRandomIntBetween(100, 200); + try ( + BackgroundIndexer indexer = new BackgroundIndexer( + INDEX_NAME, + "_doc", + client(), + -1, + RandomizedTest.scaledRandomIntBetween(2, 5), + false, + random() + ) + ) { + indexer.start(initialDocCount); + waitForDocs(initialDocCount, indexer); + refresh(INDEX_NAME); + // don't wait for replication to complete, stop the primary immediately. + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode)); + ensureYellow(INDEX_NAME); + + // start another replica. + internalCluster().startDataOnlyNode(); + ensureGreen(INDEX_NAME); + + // index another doc and refresh - without this the new replica won't catch up. + client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").get(); + + flushAndRefresh(INDEX_NAME); + waitForReplicaUpdate(); + assertSegmentStats(6); + } + } + /** * Waits until the replica is caught up to the latest primary segments gen. * @throws Exception if assertion fails @@ -611,10 +653,12 @@ private void waitForReplicaUpdate() throws Exception { final List replicaShardSegments = segmentListMap.get(false); // if we don't have any segments yet, proceed. final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get(); + logger.debug("Primary Segments: {}", primaryShardSegments.getSegments()); if (primaryShardSegments.getSegments().isEmpty() == false) { final Map latestPrimarySegments = getLatestSegments(primaryShardSegments); final Long latestPrimaryGen = latestPrimarySegments.values().stream().findFirst().map(Segment::getGeneration).get(); for (ShardSegments shardSegments : replicaShardSegments) { + logger.debug("Replica {} Segments: {}", shardSegments.getShardRouting(), shardSegments.getSegments()); final boolean isReplicaCaughtUpToPrimary = shardSegments.getSegments() .stream() .anyMatch(segment -> segment.getGeneration() == latestPrimaryGen); diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index 58598ab2d08f4..de96ad03e2423 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -105,6 +105,7 @@ import java.nio.file.NoSuchFileException; import java.nio.file.Path; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -122,6 +123,7 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.unmodifiableMap; import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY; +import static org.opensearch.index.store.Store.MetadataSnapshot.loadMetadata; /** * A Store provides plain access to files written by an opensearch index shard. Each shard @@ -334,6 +336,16 @@ public MetadataSnapshot getMetadata(SegmentInfos segmentInfos) throws IOExceptio return new MetadataSnapshot(segmentInfos, directory, logger); } + /** + * Fetch a map of StoreFileMetadata for segments, ignoring commit points. + * @param segmentInfos {@link SegmentInfos} from which to compute metadata. + * @return {@link Map} map file name to {@link StoreFileMetadata}. + */ + public Map getSegmentMetadataMap(SegmentInfos segmentInfos) throws IOException { + assert indexSettings.isSegRepEnabled(); + return loadMetadata(segmentInfos, directory, logger, true).fileMetadata; + } + /** * Renames all the given files from the key of the map to the * value of the map. All successfully renamed files are removed from the map in-place. @@ -709,31 +721,34 @@ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) thr } /** - * This method deletes every file in this store that is not contained in either the remote or local metadata snapshots. + * Segment Replication method - + * This method deletes every file in this store that is not referenced by the passed in SegmentInfos or + * part of the latest on-disk commit point. * This method is used for segment replication when the in memory SegmentInfos can be ahead of the on disk segment file. * In this case files from both snapshots must be preserved. Verification has been done that all files are present on disk. * @param reason the reason for this cleanup operation logged for each deleted file - * @param localSnapshot The local snapshot from in memory SegmentInfos. + * @param infos {@link SegmentInfos} Files from this infos will be preserved on disk if present. * @throws IllegalStateException if the latest snapshot in this store differs from the given one after the cleanup. */ - public void cleanupAndPreserveLatestCommitPoint(String reason, MetadataSnapshot localSnapshot) throws IOException { + public void cleanupAndPreserveLatestCommitPoint(String reason, SegmentInfos infos) throws IOException { + assert indexSettings.isSegRepEnabled(); // fetch a snapshot from the latest on disk Segments_N file. This can be behind // the passed in local in memory snapshot, so we want to ensure files it references are not removed. metadataLock.writeLock().lock(); try (Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) { - cleanupFiles(reason, localSnapshot, getMetadata(readLastCommittedSegmentsInfo())); + cleanupFiles(reason, getMetadata(readLastCommittedSegmentsInfo()), infos.files(true)); } finally { metadataLock.writeLock().unlock(); } } - private void cleanupFiles(String reason, MetadataSnapshot localSnapshot, @Nullable MetadataSnapshot additionalSnapshot) + private void cleanupFiles(String reason, MetadataSnapshot localSnapshot, @Nullable Collection additionalFiles) throws IOException { assert metadataLock.isWriteLockedByCurrentThread(); for (String existingFile : directory.listAll()) { if (Store.isAutogenerated(existingFile) || localSnapshot.contains(existingFile) - || (additionalSnapshot != null && additionalSnapshot.contains(existingFile))) { + || (additionalFiles != null && additionalFiles.contains(existingFile))) { // don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete // checksum) continue; @@ -1033,6 +1048,11 @@ static LoadedMetadata loadMetadata(IndexCommit commit, Directory directory, Logg } static LoadedMetadata loadMetadata(SegmentInfos segmentInfos, Directory directory, Logger logger) throws IOException { + return loadMetadata(segmentInfos, directory, logger, false); + } + + static LoadedMetadata loadMetadata(SegmentInfos segmentInfos, Directory directory, Logger logger, boolean ignoreSegmentsFile) + throws IOException { long numDocs = Lucene.getNumDocs(segmentInfos); Map commitUserDataBuilder = new HashMap<>(); commitUserDataBuilder.putAll(segmentInfos.getUserData()); @@ -1067,8 +1087,10 @@ static LoadedMetadata loadMetadata(SegmentInfos segmentInfos, Directory director if (maxVersion == null) { maxVersion = org.opensearch.Version.CURRENT.minimumIndexCompatibilityVersion().luceneVersion; } - final String segmentsFile = segmentInfos.getSegmentsFileName(); - checksumFromLuceneFile(directory, segmentsFile, builder, logger, maxVersion, true); + if (ignoreSegmentsFile == false) { + final String segmentsFile = segmentInfos.getSegmentsFileName(); + checksumFromLuceneFile(directory, segmentsFile, builder, logger, maxVersion, true); + } return new LoadedMetadata(unmodifiableMap(builder), unmodifiableMap(commitUserDataBuilder), numDocs); } diff --git a/server/src/main/java/org/opensearch/indices/replication/CheckpointInfoResponse.java b/server/src/main/java/org/opensearch/indices/replication/CheckpointInfoResponse.java index a73a3b54184da..d2baca49b3c27 100644 --- a/server/src/main/java/org/opensearch/indices/replication/CheckpointInfoResponse.java +++ b/server/src/main/java/org/opensearch/indices/replication/CheckpointInfoResponse.java @@ -10,13 +10,12 @@ import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; -import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.transport.TransportResponse; import java.io.IOException; -import java.util.Set; +import java.util.Map; /** * Response returned from a {@link SegmentReplicationSource} that includes the file metadata, and SegmentInfos @@ -28,52 +27,41 @@ public class CheckpointInfoResponse extends TransportResponse { private final ReplicationCheckpoint checkpoint; - private final Store.MetadataSnapshot snapshot; + private final Map snapshot; private final byte[] infosBytes; - // pendingDeleteFiles are segments that have been merged away in the latest in memory SegmentInfos - // but are still referenced by the latest commit point (Segments_N). - private final Set pendingDeleteFiles; public CheckpointInfoResponse( final ReplicationCheckpoint checkpoint, - final Store.MetadataSnapshot snapshot, - final byte[] infosBytes, - final Set additionalFiles + final Map snapshot, + final byte[] infosBytes ) { this.checkpoint = checkpoint; this.snapshot = snapshot; this.infosBytes = infosBytes; - this.pendingDeleteFiles = additionalFiles; } public CheckpointInfoResponse(StreamInput in) throws IOException { this.checkpoint = new ReplicationCheckpoint(in); - this.snapshot = new Store.MetadataSnapshot(in); + this.snapshot = in.readMap(StreamInput::readString, StoreFileMetadata::new); this.infosBytes = in.readByteArray(); - this.pendingDeleteFiles = in.readSet(StoreFileMetadata::new); } @Override public void writeTo(StreamOutput out) throws IOException { checkpoint.writeTo(out); - snapshot.writeTo(out); + out.writeMap(snapshot, StreamOutput::writeString, (valueOut, fc) -> fc.writeTo(valueOut)); out.writeByteArray(infosBytes); - out.writeCollection(pendingDeleteFiles); } public ReplicationCheckpoint getCheckpoint() { return checkpoint; } - public Store.MetadataSnapshot getSnapshot() { + public Map getSnapshot() { return snapshot; } public byte[] getInfosBytes() { return infosBytes; } - - public Set getPendingDeleteFiles() { - return pendingDeleteFiles; - } } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java index db3f87201b774..91b8243440ac5 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java @@ -133,12 +133,7 @@ public void messageReceived(CheckpointInfoRequest request, TransportChannel chan ); final CopyState copyState = ongoingSegmentReplications.prepareForReplication(request, segmentSegmentFileChunkWriter); channel.sendResponse( - new CheckpointInfoResponse( - copyState.getCheckpoint(), - copyState.getMetadataSnapshot(), - copyState.getInfosBytes(), - copyState.getPendingDeleteFiles() - ) + new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes()) ); timer.stop(); logger.trace( diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index 7c28406036ddd..e1e5a6afa0002 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -37,12 +37,8 @@ import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; +import java.util.Collections; /** * Represents the target of a replication event. @@ -178,7 +174,7 @@ private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener filesToFetch = new ArrayList(diff.missing); - Set storeFiles = new HashSet<>(Arrays.asList(store.directory().listAll())); - final Set pendingDeleteFiles = checkpointInfo.getPendingDeleteFiles() - .stream() - .filter(f -> storeFiles.contains(f.name()) == false) - .collect(Collectors.toSet()); - - filesToFetch.addAll(pendingDeleteFiles); - logger.trace("Files to fetch {}", filesToFetch); - - for (StoreFileMetadata file : filesToFetch) { + for (StoreFileMetadata file : diff.missing) { state.getIndex().addFileDetail(file.name(), file.length(), false); } // always send a req even if not fetching files so the primary can clear the copyState for this shard. state.setStage(SegmentReplicationState.Stage.GET_FILES); cancellableThreads.checkForCancel(); - source.getSegmentFiles(getId(), checkpointInfo.getCheckpoint(), filesToFetch, store, getFilesListener); + source.getSegmentFiles(getId(), checkpointInfo.getCheckpoint(), diff.missing, store, getFilesListener); } private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse, ActionListener listener) { @@ -231,7 +217,7 @@ private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse, responseCheckpoint.getSegmentsGen() ); indexShard.finalizeReplication(infos, responseCheckpoint.getSeqNo()); - store.cleanupAndPreserveLatestCommitPoint("finalize - clean with in memory infos", store.getMetadata(infos)); + store.cleanupAndPreserveLatestCommitPoint("finalize - clean with in memory infos", infos); } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) { // this is a fatal exception at this stage. // this means we transferred files from the remote that have not be checksummed and they are diff --git a/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java b/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java index c0e0b4dee2b3f..9ded0125b7c17 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java @@ -8,6 +8,8 @@ package org.opensearch.indices.replication.common; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.store.ByteBuffersDataOutput; @@ -15,14 +17,12 @@ import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.util.concurrent.AbstractRefCounted; import org.opensearch.index.shard.IndexShard; -import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import java.io.IOException; import java.io.UncheckedIOException; -import java.util.HashSet; -import java.util.Set; +import java.util.Map; /** * An Opensearch-specific version of Lucene's CopyState class that @@ -37,11 +37,11 @@ public class CopyState extends AbstractRefCounted { private final ReplicationCheckpoint requestedReplicationCheckpoint; /** Actual ReplicationCheckpoint returned by the shard */ private final ReplicationCheckpoint replicationCheckpoint; - private final Store.MetadataSnapshot metadataSnapshot; - private final HashSet pendingDeleteFiles; + private final Map metadataMap; private final byte[] infosBytes; private GatedCloseable commitRef; private final IndexShard shard; + public static final Logger logger = LogManager.getLogger(CopyState.class); public CopyState(ReplicationCheckpoint requestedReplicationCheckpoint, IndexShard shard) throws IOException { super("CopyState-" + shard.shardId()); @@ -49,7 +49,7 @@ public CopyState(ReplicationCheckpoint requestedReplicationCheckpoint, IndexShar this.shard = shard; this.segmentInfosRef = shard.getSegmentInfosSnapshot(); SegmentInfos segmentInfos = this.segmentInfosRef.get(); - this.metadataSnapshot = shard.store().getMetadata(segmentInfos); + this.metadataMap = shard.store().getSegmentMetadataMap(segmentInfos); this.replicationCheckpoint = new ReplicationCheckpoint( shard.shardId(), shard.getOperationPrimaryTerm(), @@ -57,18 +57,7 @@ public CopyState(ReplicationCheckpoint requestedReplicationCheckpoint, IndexShar shard.getProcessedLocalCheckpoint(), segmentInfos.getVersion() ); - - // Send files that are merged away in the latest SegmentInfos but not in the latest on disk Segments_N. - // This ensures that the store on replicas is in sync with the store on primaries. this.commitRef = shard.acquireLastIndexCommit(false); - Store.MetadataSnapshot metadata = shard.store().getMetadata(this.commitRef.get()); - final Store.RecoveryDiff diff = metadata.recoveryDiff(this.metadataSnapshot); - this.pendingDeleteFiles = new HashSet<>(diff.missing); - if (this.pendingDeleteFiles.isEmpty()) { - // If there are no additional files we can release the last commit immediately. - this.commitRef.close(); - this.commitRef = null; - } ByteBuffersDataOutput buffer = new ByteBuffersDataOutput(); // resource description and name are not used, but resource description cannot be null @@ -95,18 +84,14 @@ public ReplicationCheckpoint getCheckpoint() { return replicationCheckpoint; } - public Store.MetadataSnapshot getMetadataSnapshot() { - return metadataSnapshot; + public Map getMetadataMap() { + return metadataMap; } public byte[] getInfosBytes() { return infosBytes; } - public Set getPendingDeleteFiles() { - return pendingDeleteFiles; - } - public IndexShard getShard() { return shard; } diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 88a3bdad53d0c..3af882a8087ec 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -489,12 +489,7 @@ private void resolveCheckpointInfoResponseListener(ActionListener metadataSnapshot = store.getSegmentMetadataMap(store.readLastCommittedSegmentsInfo()); + // no docs indexed only _N file exists. + assertTrue(metadataSnapshot.isEmpty()); + + // commit some docs to create segments. + commitRandomDocs(store); + + final Map snapshotAfterCommit = store.getSegmentMetadataMap(store.readLastCommittedSegmentsInfo()); + assertFalse(snapshotAfterCommit.isEmpty()); + assertFalse(snapshotAfterCommit.keySet().stream().anyMatch((name) -> name.startsWith(IndexFileNames.SEGMENTS))); + store.close(); + } + + private void commitRandomDocs(Store store) throws IOException { IndexWriterConfig indexWriterConfig = newIndexWriterConfig(random(), new MockAnalyzer(random())).setCodec( TestUtil.getDefaultCodec() ); @@ -1173,19 +1214,5 @@ public void testcleanupAndPreserveLatestCommitPoint() throws IOException { writer.addDocument(doc); writer.commit(); writer.close(); - - Store.MetadataSnapshot commitMetadata = store.getMetadata(); - - Store.MetadataSnapshot refreshMetadata = Store.MetadataSnapshot.EMPTY; - - store.cleanupAndPreserveLatestCommitPoint("test", refreshMetadata); - - // we want to ensure commitMetadata files are preserved after calling cleanup - for (String existingFile : store.directory().listAll()) { - assert (commitMetadata.contains(existingFile) == true); - } - - deleteContent(store.directory()); - IOUtils.close(store); } } diff --git a/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java b/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java index f49ee0471b5e8..bd3106454f49b 100644 --- a/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java @@ -11,28 +11,30 @@ import org.junit.Assert; import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; +import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.CancellableThreads; import org.opensearch.common.xcontent.XContentType; import org.opensearch.index.IndexService; +import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; import org.opensearch.index.shard.ShardId; -import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.IndicesService; import org.opensearch.indices.recovery.FileChunkWriter; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.CopyState; +import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.transport.TransportService; import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; -import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -55,15 +57,18 @@ public class OngoingSegmentReplicationsTests extends IndexShardTestCase { private GetSegmentFilesRequest getSegmentFilesRequest; - final Settings settings = Settings.builder().put("node.name", SegmentReplicationTargetServiceTests.class.getSimpleName()).build(); + final Settings settings = Settings.builder() + .put("node.name", SegmentReplicationTargetServiceTests.class.getSimpleName()) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build(); final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings); @Override public void setUp() throws Exception { super.setUp(); - primary = newStartedShard(true); - replica = newShard(primary.shardId(), false); + primary = newStartedShard(true, settings); + replica = newShard(false, settings, new NRTReplicationEngineFactory()); recoverReplica(replica, primary, true); replicaDiscoveryNode = replica.recoveryState().getTargetNode(); primaryDiscoveryNode = replica.recoveryState().getSourceNode(); @@ -93,6 +98,8 @@ public void tearDown() throws Exception { } public void testPrepareAndSendSegments() throws IOException { + indexDoc(primary, "1", "{\"foo\" : \"baz\"}", XContentType.JSON, "foobar"); + primary.refresh("Test"); OngoingSegmentReplications replications = spy(new OngoingSegmentReplications(mockIndicesService, recoverySettings)); final CheckpointInfoRequest request = new CheckpointInfoRequest( 1L, @@ -112,17 +119,14 @@ public void testPrepareAndSendSegments() throws IOException { 1L, replica.routingEntry().allocationId().getId(), replicaDiscoveryNode, - new ArrayList<>(copyState.getMetadataSnapshot().asMap().values()), + new ArrayList<>(copyState.getMetadataMap().values()), testCheckpoint ); - final Collection expectedFiles = List.copyOf(primary.store().getMetadata().asMap().values()); replications.startSegmentCopy(getSegmentFilesRequest, new ActionListener<>() { @Override public void onResponse(GetSegmentFilesResponse getSegmentFilesResponse) { - assertEquals(1, getSegmentFilesResponse.files.size()); - assertEquals(1, expectedFiles.size()); - assertTrue(expectedFiles.stream().findFirst().get().isSame(getSegmentFilesResponse.files.get(0))); + assertEquals(copyState.getMetadataMap().size(), getSegmentFilesResponse.files.size()); assertEquals(0, copyState.refCount()); assertFalse(replications.isInCopyStateMap(request.getCheckpoint())); assertEquals(0, replications.size()); @@ -181,7 +185,7 @@ public void testCancelReplication_AfterSendFilesStarts() throws IOException, Int 1L, replica.routingEntry().allocationId().getId(), replicaDiscoveryNode, - new ArrayList<>(copyState.getMetadataSnapshot().asMap().values()), + new ArrayList<>(copyState.getMetadataMap().values()), testCheckpoint ); replications.startSegmentCopy(getSegmentFilesRequest, new ActionListener<>() { diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java index 5f6ec7e505805..cde5cd980a91d 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java @@ -19,6 +19,7 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.CancellableThreads; +import org.opensearch.common.xcontent.XContentType; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; import org.opensearch.index.store.StoreFileMetadata; @@ -76,7 +77,7 @@ public void testSendFiles() throws IOException { 1 ); - final List expectedFiles = List.copyOf(copyState.getMetadataSnapshot().asMap().values()); + final List expectedFiles = List.copyOf(copyState.getMetadataMap().values()); final GetSegmentFilesRequest getSegmentFilesRequest = new GetSegmentFilesRequest( 1L, @@ -137,6 +138,9 @@ public void onFailure(Exception e) { } public void testSendFileFails() throws IOException { + // index some docs on the primary so a segment is created. + indexDoc(primary, "1", "{\"foo\" : \"baz\"}", XContentType.JSON, "foobar"); + primary.refresh("Test"); chunkWriter = (fileMetadata, position, content, lastChunk, totalTranslogOps, listener) -> listener.onFailure( new OpenSearchException("Test") ); @@ -153,7 +157,7 @@ public void testSendFileFails() throws IOException { 1 ); - final List expectedFiles = List.copyOf(copyState.getMetadataSnapshot().asMap().values()); + final List expectedFiles = List.copyOf(copyState.getMetadataMap().values()); final GetSegmentFilesRequest getSegmentFilesRequest = new GetSegmentFilesRequest( 1L, diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceServiceTests.java index 4bfdd81d50a1e..dcb6b6dad32dd 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceServiceTests.java @@ -121,8 +121,6 @@ public void testCheckpointInfo() { public void onResponse(CheckpointInfoResponse response) { assertEquals(testCheckpoint, response.getCheckpoint()); assertNotNull(response.getInfosBytes()); - // CopyStateTests sets up one pending delete file and one committed segments file - assertEquals(1, response.getPendingDeleteFiles().size()); assertEquals(1, response.getSnapshot().size()); } diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index 1d253b0a9a300..a674ab6151f79 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -59,7 +59,7 @@ public void setUp() throws Exception { .put("node.name", SegmentReplicationTargetServiceTests.class.getSimpleName()) .build(); final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - primaryShard = newStartedShard(true); + primaryShard = newStartedShard(true, settings); replicaShard = newShard(false, settings, new NRTReplicationEngineFactory()); recoverReplica(replicaShard, primaryShard, true); checkpoint = new ReplicationCheckpoint(replicaShard.shardId(), 0L, 0L, 0L, 0L); diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java index 11217a46b3c69..93b0f46115fed 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java @@ -51,7 +51,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.Random; import java.util.Arrays; @@ -135,7 +134,7 @@ public void getCheckpointMetadata( ReplicationCheckpoint checkpoint, ActionListener listener ) { - listener.onResponse(new CheckpointInfoResponse(checkpoint, SI_SNAPSHOT, buffer.toArrayCopy(), Set.of(PENDING_DELETE_FILE))); + listener.onResponse(new CheckpointInfoResponse(checkpoint, SI_SNAPSHOT.asMap(), buffer.toArrayCopy())); } @Override @@ -146,9 +145,8 @@ public void getSegmentFiles( Store store, ActionListener listener ) { - assertEquals(filesToFetch.size(), 2); + assertEquals(1, filesToFetch.size()); assert (filesToFetch.contains(SEGMENTS_FILE)); - assert (filesToFetch.contains(PENDING_DELETE_FILE)); listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); } }; @@ -230,7 +228,7 @@ public void getCheckpointMetadata( ReplicationCheckpoint checkpoint, ActionListener listener ) { - listener.onResponse(new CheckpointInfoResponse(checkpoint, SI_SNAPSHOT, buffer.toArrayCopy(), Set.of(PENDING_DELETE_FILE))); + listener.onResponse(new CheckpointInfoResponse(checkpoint, SI_SNAPSHOT.asMap(), buffer.toArrayCopy())); } @Override @@ -273,7 +271,7 @@ public void getCheckpointMetadata( ReplicationCheckpoint checkpoint, ActionListener listener ) { - listener.onResponse(new CheckpointInfoResponse(checkpoint, SI_SNAPSHOT, buffer.toArrayCopy(), Set.of(PENDING_DELETE_FILE))); + listener.onResponse(new CheckpointInfoResponse(checkpoint, SI_SNAPSHOT.asMap(), buffer.toArrayCopy())); } @Override @@ -318,7 +316,7 @@ public void getCheckpointMetadata( ReplicationCheckpoint checkpoint, ActionListener listener ) { - listener.onResponse(new CheckpointInfoResponse(checkpoint, SI_SNAPSHOT, buffer.toArrayCopy(), Set.of(PENDING_DELETE_FILE))); + listener.onResponse(new CheckpointInfoResponse(checkpoint, SI_SNAPSHOT.asMap(), buffer.toArrayCopy())); } @Override @@ -362,7 +360,7 @@ public void getCheckpointMetadata( ReplicationCheckpoint checkpoint, ActionListener listener ) { - listener.onResponse(new CheckpointInfoResponse(checkpoint, SI_SNAPSHOT, buffer.toArrayCopy(), Set.of(PENDING_DELETE_FILE))); + listener.onResponse(new CheckpointInfoResponse(checkpoint, SI_SNAPSHOT.asMap(), buffer.toArrayCopy())); } @Override @@ -413,9 +411,7 @@ public void getCheckpointMetadata( ReplicationCheckpoint checkpoint, ActionListener listener ) { - listener.onResponse( - new CheckpointInfoResponse(checkpoint, storeMetadataSnapshots.get(1), buffer.toArrayCopy(), Set.of(PENDING_DELETE_FILE)) - ); + listener.onResponse(new CheckpointInfoResponse(checkpoint, storeMetadataSnapshots.get(1).asMap(), buffer.toArrayCopy())); } @Override diff --git a/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java b/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java index a6f0cf7e98411..77a4a6d22039e 100644 --- a/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java @@ -22,7 +22,6 @@ import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import java.io.IOException; -import java.util.Set; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -32,6 +31,7 @@ public class CopyStateTests extends IndexShardTestCase { private static final long EXPECTED_LONG_VALUE = 1L; private static final ShardId TEST_SHARD_ID = new ShardId("testIndex", "testUUID", 0); private static final StoreFileMetadata SEGMENTS_FILE = new StoreFileMetadata(IndexFileNames.SEGMENTS, 1L, "0", Version.LATEST); + private static final StoreFileMetadata SEGMENT_FILE = new StoreFileMetadata("_0.si", 1L, "0", Version.LATEST); private static final StoreFileMetadata PENDING_DELETE_FILE = new StoreFileMetadata("pendingDelete.del", 1L, "1", Version.LATEST); private static final Store.MetadataSnapshot COMMIT_SNAPSHOT = new Store.MetadataSnapshot( @@ -41,7 +41,7 @@ public class CopyStateTests extends IndexShardTestCase { ); private static final Store.MetadataSnapshot SI_SNAPSHOT = new Store.MetadataSnapshot( - Map.of(SEGMENTS_FILE.name(), SEGMENTS_FILE), + Map.of(SEGMENT_FILE.name(), SEGMENT_FILE), null, 0 ); @@ -61,10 +61,6 @@ public void testCopyStateCreation() throws IOException { // version was never set so this should be zero assertEquals(0, checkpoint.getSegmentInfosVersion()); assertEquals(EXPECTED_LONG_VALUE, checkpoint.getPrimaryTerm()); - - Set pendingDeleteFiles = copyState.getPendingDeleteFiles(); - assertEquals(1, pendingDeleteFiles.size()); - assertTrue(pendingDeleteFiles.contains(PENDING_DELETE_FILE)); } public static IndexShard createMockIndexShard() throws IOException { @@ -78,7 +74,7 @@ public static IndexShard createMockIndexShard() throws IOException { SegmentInfos testSegmentInfos = new SegmentInfos(Version.LATEST.major); when(mockShard.getSegmentInfosSnapshot()).thenReturn(new GatedCloseable<>(testSegmentInfos, () -> {})); - when(mockStore.getMetadata(testSegmentInfos)).thenReturn(SI_SNAPSHOT); + when(mockStore.getSegmentMetadataMap(testSegmentInfos)).thenReturn(SI_SNAPSHOT.asMap()); IndexCommit mockIndexCommit = mock(IndexCommit.class); when(mockShard.acquireLastIndexCommit(false)).thenReturn(new GatedCloseable<>(mockIndexCommit, () -> {})); diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 1b40cb4f2dfa3..3302ffd810bd4 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -134,6 +134,7 @@ import java.io.IOException; import java.util.ArrayList; import java.nio.file.Path; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -1200,12 +1201,7 @@ public void getCheckpointMetadata( try { final CopyState copyState = new CopyState(ReplicationCheckpoint.empty(primaryShard.shardId), primaryShard); listener.onResponse( - new CheckpointInfoResponse( - copyState.getCheckpoint(), - copyState.getMetadataSnapshot(), - copyState.getInfosBytes(), - copyState.getPendingDeleteFiles() - ) + new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes()) ); } catch (IOException e) { logger.error("Unexpected error computing CopyState", e);