From d12f7f48a744a2d0de2fe2e5aa7908ce0658277d Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Wed, 26 Jul 2023 01:09:26 -0700 Subject: [PATCH 1/8] Remove divergent commit logic with segment replication. This change removes divergent commit paths for segrep node-node and remote store. All replicas with segrep enabled will perform local commits and ignore any incoming segments_n file. This changes the recovery sync with remote store to also exclude the segments_n so that only the fetched infos bytes are committed before an engine is opened. This change also updates deletion logic with segment replication to automatically delete when a file is decref'd to 0. Signed-off-by: Marc Handalian --- .../replication/SegmentReplicationIT.java | 10 +- .../index/engine/NRTReplicationEngine.java | 64 ++++++----- .../index/engine/ReplicaFileTracker.java | 97 +++++++++++++++++ .../opensearch/index/shard/IndexShard.java | 27 +++-- .../shard/RemoteStoreRefreshListener.java | 1 - .../store/RemoteSegmentStoreDirectory.java | 2 +- .../index/store/ReplicaFileTracker.java | 51 --------- .../org/opensearch/index/store/Store.java | 101 +----------------- .../RemoteStoreReplicationSource.java | 10 +- .../replication/SegmentReplicationTarget.java | 22 +--- .../engine/NRTReplicationEngineTests.java | 55 ++-------- .../opensearch/index/store/StoreTests.java | 46 -------- 12 files changed, 170 insertions(+), 316 deletions(-) create mode 100644 server/src/main/java/org/opensearch/index/engine/ReplicaFileTracker.java delete mode 100644 server/src/main/java/org/opensearch/index/store/ReplicaFileTracker.java diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 66469241b83d8..3a8c732611a1e 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -282,7 +282,6 @@ public void testIndexReopenClose() throws Exception { } public void testScrollWithConcurrentIndexAndSearch() throws Exception { - assumeFalse("Skipping the test with Remote store as its flaky.", segmentReplicationWithRemoteEnabled()); final String primary = internalCluster().startDataOnlyNode(); final String replica = internalCluster().startDataOnlyNode(); createIndex(INDEX_NAME); @@ -657,7 +656,6 @@ public void testDeleteOperations() throws Exception { * from xlog. */ public void testReplicationPostDeleteAndForceMerge() throws Exception { - assumeFalse("Skipping the test with Remote store as its flaky.", segmentReplicationWithRemoteEnabled()); final String primary = internalCluster().startDataOnlyNode(); createIndex(INDEX_NAME); final String replica = internalCluster().startDataOnlyNode(); @@ -966,7 +964,6 @@ private void assertAllocationIdsInReplicaShardStats(Set expected, Set userData = store.readLastCommittedSegmentsInfo().getUserData(); + final Map userData = this.lastCommittedSegmentInfos.getUserData(); final String translogUUID = Objects.requireNonNull(userData.get(Translog.TRANSLOG_UUID_KEY)); translogManagerRef = new WriteOnlyTranslogManager( engineConfig.getTranslogConfig(), @@ -116,18 +124,23 @@ public void onAfterTranslogSync() { engineConfig.getPrimaryModeSupplier() ); this.translogManager = translogManagerRef; - this.shouldCommit = engineConfig.getIndexSettings().isRemoteStoreEnabled() == false; } catch (IOException e) { IOUtils.closeWhileHandlingException(store::decRef, readerManager, translogManagerRef); throw new EngineCreationFailureException(shardId, "failed to create engine", e); } } + public void cleanUnreferencedFiles() throws IOException { + replicaFileTracker.deleteUnreferencedFiles( + Stream.of(store.directory().listAll()).filter(f -> f.startsWith("write.lock") == false).collect(Collectors.toSet()) + ); + } + private NRTReplicationReaderManager buildReaderManager() throws IOException { return new NRTReplicationReaderManager( OpenSearchDirectoryReader.wrap(getDirectoryReader(), shardId), - store::incRefFileDeleter, - store::decRefFileDeleter + replicaFileTracker::incRef, + replicaFileTracker::decRef ); } @@ -159,18 +172,19 @@ public synchronized void updateSegments(final SegmentInfos infos) throws IOExcep /** * Persist the latest live SegmentInfos. * - * This method creates a commit point from the latest SegmentInfos. It is intended to be used when this shard is about to be promoted as the new primary. - * - * TODO: If this method is invoked while the engine is currently updating segments on its reader, wait for that update to complete so the updated segments are used. - * + * This method creates a commit point from the latest SegmentInfos. * * @throws IOException - When there is an IO error committing the SegmentInfos. */ private void commitSegmentInfos(SegmentInfos infos) throws IOException { - if (shouldCommit) { - store.commitSegmentInfos(infos, localCheckpointTracker.getMaxSeqNo(), localCheckpointTracker.getProcessedCheckpoint()); - } + // get a reference to the previous commit files so they can be decref'd once a new commit is made. + final Collection previousCommitFiles = getLastCommittedSegmentInfos().files(true); + store.commitSegmentInfos(infos, localCheckpointTracker.getMaxSeqNo(), localCheckpointTracker.getProcessedCheckpoint()); this.lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo(); + // incref the latest on-disk commit. + replicaFileTracker.incRef(this.lastCommittedSegmentInfos.files(true)); + // decref the prev commit. + replicaFileTracker.decRef(previousCommitFiles); translogManager.syncTranslog(); } @@ -379,21 +393,17 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) { assert rwl.isWriteLockedByCurrentThread() || failEngineLock.isHeldByCurrentThread() : "Either the write lock must be held or the engine must be currently be failing itself"; try { - // if remote store is enabled, all segments durably persisted - if (shouldCommit) { - final SegmentInfos latestSegmentInfos = getLatestSegmentInfos(); - /* - This is a workaround solution which decreases the chances of conflict on replica nodes when same file is copied - from two different primaries during failover. Increasing counter helps in avoiding this conflict as counter is - used to generate new segment file names. The ideal solution is to identify the counter from previous primary. - */ + final SegmentInfos latestSegmentInfos = getLatestSegmentInfos(); + /* + This is a workaround solution which decreases the chances of conflict on replica nodes when same file is copied + from two different primaries during failover. Increasing counter helps in avoiding this conflict as counter is + used to generate new segment file names. The ideal solution is to identify the counter from previous primary. + */ + if (engineConfig.getIndexSettings().isRemoteStoreEnabled() == false) { latestSegmentInfos.counter = latestSegmentInfos.counter + SI_COUNTER_INCREMENT; latestSegmentInfos.changed(); - commitSegmentInfos(latestSegmentInfos); - } else { - store.directory().sync(List.of(store.directory().listAll())); - store.directory().syncMetaData(); } + commitSegmentInfos(latestSegmentInfos); IOUtils.close(readerManager, translogManager, store::decRef); } catch (Exception e) { logger.warn("failed to close engine", e); @@ -453,8 +463,8 @@ public synchronized GatedCloseable getSegmentInfosSnapshot() { // incref all files try { final Collection files = latestSegmentInfos.files(false); - store.incRefFileDeleter(files); - return new GatedCloseable<>(latestSegmentInfos, () -> store.decRefFileDeleter(files)); + replicaFileTracker.incRef(files); + return new GatedCloseable<>(latestSegmentInfos, () -> { replicaFileTracker.decRef(files); }); } catch (IOException e) { throw new EngineException(shardId, e.getMessage(), e); } diff --git a/server/src/main/java/org/opensearch/index/engine/ReplicaFileTracker.java b/server/src/main/java/org/opensearch/index/engine/ReplicaFileTracker.java new file mode 100644 index 0000000000000..4b076b98c1d29 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/engine/ReplicaFileTracker.java @@ -0,0 +1,97 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.index.IndexFileNames; + +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.function.BiConsumer; + +/** + * This class is heavily influenced by Lucene's ReplicaFileDeleter class used to keep track of + * segment files that should be preserved on replicas between replication events. + * + * https://github.com/apache/lucene/blob/main/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaFileDeleter.java + * + * @opensearch.internal + */ +final class ReplicaFileTracker { + + public static final Logger logger = LogManager.getLogger(ReplicaFileTracker.class); + private final Map refCounts = new HashMap<>(); + private final BiConsumer fileDeleter; + + public ReplicaFileTracker(BiConsumer fileDeleter) { + this.fileDeleter = fileDeleter; + } + + public synchronized void incRef(Collection fileNames) { + for (String fileName : fileNames) { + refCounts.merge(fileName, 1, Integer::sum); + } + } + + public synchronized void decRef(Collection fileNames) { + Set toDelete = new HashSet<>(); + for (String fileName : fileNames) { + Integer curCount = refCounts.get(fileName); + assert curCount != null : "fileName=" + fileName; + assert curCount > 0; + if (curCount == 1) { + refCounts.remove(fileName); + toDelete.add(fileName); + } else { + refCounts.put(fileName, curCount - 1); + } + } + if (toDelete.isEmpty() == false) { + delete(toDelete); + } + } + + public void deleteUnreferencedFiles(Collection toDelete) { + for (String file : toDelete) { + if (canDelete(file)) { + delete(file); + } + } + } + + private synchronized void delete(Collection toDelete) { + // First pass: delete any segments_N files. We do these first to be certain stale commit points + // are removed + // before we remove any files they reference, in case we crash right now: + for (String fileName : toDelete) { + if (fileName.startsWith(IndexFileNames.SEGMENTS)) { + delete(fileName); + } + } + for (String fileName : toDelete) { + if (fileName.startsWith(IndexFileNames.SEGMENTS) == false) { + delete(fileName); + } + } + } + + private synchronized void delete(String fileName) { + assert canDelete(fileName); + fileDeleter.accept("delete unreferenced", fileName); + } + + private synchronized boolean canDelete(String fileName) { + return refCounts.containsKey(fileName) == false && fileName.startsWith("write.lock") == false; + } + +} diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 7b5f96892c005..d3c6a9b955130 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -56,7 +56,6 @@ import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.apache.lucene.util.ThreadInterruptedException; -import org.opensearch.common.lucene.store.ByteArrayIndexInput; import org.opensearch.cluster.metadata.DataStream; import org.opensearch.core.Assertions; import org.opensearch.ExceptionsHelper; @@ -4657,7 +4656,12 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re RemoteSegmentMetadata remoteSegmentMetadata = remoteDirectory.init(); Map uploadedSegments = remoteDirectory - .getSegmentsUploadedToRemoteStore(); + .getSegmentsUploadedToRemoteStore() + .entrySet() + .stream() + // ignore any segments_n uploaded to the store, we will commit the infos bytes locally. + .filter(entry -> entry.getKey().startsWith(IndexFileNames.SEGMENTS) == false) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); store.incRef(); remoteStore.incRef(); try { @@ -4678,19 +4682,12 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re copySegmentFiles(storeDirectory, remoteDirectory, null, uploadedSegments, overrideLocal); if (refreshLevelSegmentSync && remoteSegmentMetadata != null) { - try ( - ChecksumIndexInput indexInput = new BufferedChecksumIndexInput( - new ByteArrayIndexInput("Snapshot of SegmentInfos", remoteSegmentMetadata.getSegmentInfosBytes()) - ); - ) { - SegmentInfos infosSnapshot = SegmentInfos.readCommit( - store.directory(), - indexInput, - remoteSegmentMetadata.getGeneration() - ); - long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY)); - store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint); - } + final SegmentInfos infosSnapshot = store.buildSegmentInfos( + remoteSegmentMetadata.getSegmentInfosBytes(), + remoteSegmentMetadata.getGeneration() + ); + long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY)); + store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint); } } catch (IOException e) { throw new IndexShardRecoveryException(shardId, "Exception while copying segment files from remote segment store", e); diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 9fbc3748f9383..99e901aa6e4f9 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -222,7 +222,6 @@ private boolean syncSegments() { // move. long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint(); Collection localSegmentsPostRefresh = segmentInfos.files(true); - // Create a map of file name to size and update the refresh segment tracker updateLocalSizeMapAndTracker(localSegmentsPostRefresh); CountDownLatch latch = new CountDownLatch(1); diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index 2e5ac10fefdda..9d5adc241ea0e 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -197,7 +197,7 @@ public RemoteSegmentMetadata readLatestMetadataFile() throws IOException { if (metadataFiles.isEmpty() == false) { String latestMetadataFile = metadataFiles.get(0); - logger.info("Reading latest Metadata file {}", latestMetadataFile); + logger.trace("Reading latest Metadata file {}", latestMetadataFile); remoteSegmentMetadata = readMetadataFile(latestMetadataFile); } else { logger.info("No metadata file found, this can happen for new index with no data uploaded to remote segment store"); diff --git a/server/src/main/java/org/opensearch/index/store/ReplicaFileTracker.java b/server/src/main/java/org/opensearch/index/store/ReplicaFileTracker.java deleted file mode 100644 index 0ec282619337c..0000000000000 --- a/server/src/main/java/org/opensearch/index/store/ReplicaFileTracker.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.index.store; - -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; - -/** - * This class is a version of Lucene's ReplicaFileDeleter class used to keep track of - * segment files that should be preserved on replicas between replication events. - * The difference is this component does not actually perform any deletions, it only handles refcounts. - * Our deletions are made through Store.java. - * - * https://github.com/apache/lucene/blob/main/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaFileDeleter.java - * - * @opensearch.internal - */ -final class ReplicaFileTracker { - - private final Map refCounts = new HashMap<>(); - - public synchronized void incRef(Collection fileNames) { - for (String fileName : fileNames) { - refCounts.merge(fileName, 1, Integer::sum); - } - } - - public synchronized void decRef(Collection fileNames) { - for (String fileName : fileNames) { - Integer curCount = refCounts.get(fileName); - assert curCount != null : "fileName=" + fileName; - assert curCount > 0; - if (curCount == 1) { - refCounts.remove(fileName); - } else { - refCounts.put(fileName, curCount - 1); - } - } - } - - public synchronized boolean canDelete(String fileName) { - return refCounts.containsKey(fileName) == false; - } -} diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index 921deae41946a..e94b89efb6cf6 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -65,7 +65,6 @@ import org.apache.lucene.util.BytesRefBuilder; import org.apache.lucene.util.Version; import org.opensearch.ExceptionsHelper; -import org.opensearch.common.CheckedConsumer; import org.opensearch.common.UUIDs; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.common.io.stream.BytesStreamOutput; @@ -105,7 +104,6 @@ import java.nio.file.NoSuchFileException; import java.nio.file.Path; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -124,7 +122,6 @@ import static java.util.Collections.unmodifiableMap; import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY; import static org.opensearch.index.store.Store.MetadataSnapshot.loadMetadata; -import static org.opensearch.indices.replication.SegmentReplicationTarget.REPLICATION_PREFIX; /** * A Store provides plain access to files written by an opensearch index shard. Each shard @@ -185,7 +182,6 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref // used to ref count files when a new Reader is opened for PIT/Scroll queries // prevents segment files deletion until the PIT/Scroll expires or is discarded - private final ReplicaFileTracker replicaFileTracker; private final AbstractRefCounted refCounter = new AbstractRefCounted("store") { @Override @@ -207,8 +203,6 @@ public Store(ShardId shardId, IndexSettings indexSettings, Directory directory, this.directory = new StoreDirectory(sizeCachingDir, Loggers.getLogger("index.store.deletes", shardId)); this.shardLock = shardLock; this.onClose = onClose; - this.replicaFileTracker = indexSettings.isSegRepEnabled() ? new ReplicaFileTracker() : null; - assert onClose != null; assert shardLock != null; assert shardLock.getShardId().equals(shardId); @@ -788,90 +782,17 @@ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) thr } } - /** - * Segment Replication method - * This method deletes files in store that are not referenced by latest on-disk commit point - * - * @param reason the reason for this cleanup operation logged for each deleted file - * @param fileToConsiderForCleanUp Files to consider for clean up. - * - * @throws IOException Exception on locking. - */ - public void cleanupAndPreserveLatestCommitPoint(Collection fileToConsiderForCleanUp, String reason) throws IOException { - assert indexSettings.isSegRepEnabled(); - // fetch a snapshot from the latest on disk Segments_N file. This can be behind - // the passed in local in memory snapshot, so we want to ensure files it references are not removed. - metadataLock.writeLock().lock(); - try (Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) { - cleanupFiles(fileToConsiderForCleanUp, reason, this.readLastCommittedSegmentsInfo().files(true)); - } finally { - metadataLock.writeLock().unlock(); - } - } - - private void cleanupFiles(Collection filesToConsiderForCleanup, String reason, Collection lastCommittedSegmentInfos) { - assert metadataLock.isWriteLockedByCurrentThread(); - for (String existingFile : filesToConsiderForCleanup) { - if (Store.isAutogenerated(existingFile) || lastCommittedSegmentInfos != null && lastCommittedSegmentInfos.contains(existingFile) - // also ensure we are not deleting a file referenced by an active reader. - || replicaFileTracker != null && replicaFileTracker.canDelete(existingFile) == false - // Prevent temporary replication files as it should be cleaned up MultiFileWriter - || existingFile.startsWith(REPLICATION_PREFIX)) { - // don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete - // checksum) - continue; - } - try { - directory.deleteFile(reason, existingFile); - } catch (IOException ex) { - if (existingFile.startsWith(IndexFileNames.SEGMENTS) || existingFile.startsWith(CORRUPTED_MARKER_NAME_PREFIX)) { - // TODO do we need to also fail this if we can't delete the pending commit file? - // if one of those files can't be deleted we better fail the cleanup otherwise we might leave an old commit - // point around? - throw new IllegalStateException("Can't delete " + existingFile + " - cleanup failed", ex); - } - logger.debug(() -> new ParameterizedMessage("failed to delete file [{}]", existingFile), ex); - // ignore, we don't really care, will get deleted later on - } - } - } - /** * Segment replication method * * This method takes the segment info bytes to build SegmentInfos. It inc'refs files pointed by passed in SegmentInfos * bytes to ensure they are not deleted. * - * @param tmpToFileName Map of temporary replication file to actual file name * @param infosBytes bytes[] of SegmentInfos supposed to be sent over by primary excluding segment_N file * @param segmentsGen segment generation number - * @param finalizeConsumer consumer for action on passed in SegmentInfos - * @param renameConsumer consumer for action on temporary copied over files * @throws IOException Exception while reading store and building segment infos */ - public void buildInfosFromBytes( - Map tmpToFileName, - byte[] infosBytes, - long segmentsGen, - CheckedConsumer finalizeConsumer, - CheckedConsumer, IOException> renameConsumer - ) throws IOException { - metadataLock.writeLock().lock(); - try { - final List values = new ArrayList<>(tmpToFileName.values()); - incRefFileDeleter(values); - try { - renameConsumer.accept(tmpToFileName); - finalizeConsumer.accept(buildSegmentInfos(infosBytes, segmentsGen)); - } finally { - decRefFileDeleter(values); - } - } finally { - metadataLock.writeLock().unlock(); - } - } - - private SegmentInfos buildSegmentInfos(byte[] infosBytes, long segmentsGen) throws IOException { + public SegmentInfos buildSegmentInfos(byte[] infosBytes, long segmentsGen) throws IOException { try (final ChecksumIndexInput input = toIndexInput(infosBytes)) { return SegmentInfos.readCommit(directory, input, segmentsGen); } @@ -959,7 +880,6 @@ public void commitSegmentInfos(SegmentInfos latestSegmentInfos, long maxSeqNo, l latestSegmentInfos.commit(directory()); directory.sync(latestSegmentInfos.files(true)); directory.syncMetaData(); - cleanupAndPreserveLatestCommitPoint(List.of(this.directory.listAll()), "After commit"); } finally { metadataLock.writeLock().unlock(); } @@ -2017,23 +1937,4 @@ private static IndexWriterConfig newIndexWriterConfig() { // we also don't specify a codec here and merges should use the engines for this index .setMergePolicy(NoMergePolicy.INSTANCE); } - - public void incRefFileDeleter(Collection files) { - if (this.indexSettings.isSegRepEnabled()) { - this.replicaFileTracker.incRef(files); - } - } - - public void decRefFileDeleter(Collection files) { - if (this.indexSettings.isSegRepEnabled()) { - this.replicaFileTracker.decRef(files); - try { - this.cleanupAndPreserveLatestCommitPoint(files, "On reader close"); - } catch (IOException e) { - // Log but do not rethrow - we can try cleaning up again after next replication cycle. - // If that were to fail, the shard will as well. - logger.error("Unable to clean store after reader closed", e); - } - } - } } diff --git a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java index 2b22af070023a..725cab8351a8c 100644 --- a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java +++ b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java @@ -11,9 +11,11 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.store.Directory; +import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.store.IOContext; import org.apache.lucene.util.Version; +import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.core.action.ActionListener; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardState; @@ -57,10 +59,10 @@ public void getCheckpointMetadata( ) { Map metadataMap; // TODO: Need to figure out a way to pass this information for segment metadata via remote store. - final Version version = indexShard.getSegmentInfosSnapshot().get().getCommitLuceneVersion(); - try { - RemoteSegmentMetadata mdFile = remoteDirectory.init(); - // During initial recovery flow, the remote store might not have metadata as primary hasn't uploaded anything yet. + try (final GatedCloseable segmentInfosSnapshot = indexShard.getSegmentInfosSnapshot()) { + final Version version = segmentInfosSnapshot.get().getCommitLuceneVersion(); + RemoteSegmentMetadata mdFile = remoteDirectory.init(); // During initial recovery flow, the remote store might not + // have metadata as primary hasn't uploaded anything yet. if (mdFile == null && indexShard.state().equals(IndexShardState.STARTED) == false) { listener.onResponse(new CheckpointInfoResponse(checkpoint, Collections.emptyMap(), null)); return; diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index 91c09e86f2602..bae00bc4e7a75 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -12,6 +12,7 @@ import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexFormatTooNewException; import org.apache.lucene.index.IndexFormatTooOldException; +import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.store.AlreadyClosedException; import org.opensearch.OpenSearchCorruptionException; import org.opensearch.OpenSearchException; @@ -34,8 +35,6 @@ import java.io.IOException; import java.util.List; import java.util.Locale; -import java.util.Map; -import java.util.stream.Collectors; /** * Represents the target of a replication event. @@ -213,23 +212,12 @@ private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse, try { store = store(); store.incRef(); - Map tempFileNames; - if (this.indexShard.indexSettings().isRemoteStoreEnabled() == true) { - tempFileNames = getSegmentFilesResponse.getFiles() - .stream() - .collect(Collectors.toMap(StoreFileMetadata::name, StoreFileMetadata::name)); - } else { - tempFileNames = multiFileWriter.getTempFileNames(); - } - store.buildInfosFromBytes( - tempFileNames, + multiFileWriter.renameAllTempFiles(); + final SegmentInfos infos = store.buildSegmentInfos( checkpointInfoResponse.getInfosBytes(), - checkpointInfoResponse.getCheckpoint().getSegmentsGen(), - indexShard::finalizeReplication, - this.indexShard.indexSettings().isRemoteStoreEnabled() == true - ? (files) -> {} - : (files) -> indexShard.store().renameTempFilesSafe(files) + checkpointInfoResponse.getCheckpoint().getSegmentsGen() ); + indexShard.finalizeReplication(infos); } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) { // this is a fatal exception at this stage. // this means we transferred files from the remote that have not be checksummed and they are diff --git a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java index 4c87df48f583f..c84fd6af07769 100644 --- a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java @@ -51,14 +51,6 @@ public class NRTReplicationEngineTests extends EngineTestCase { Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build() ); - private static final IndexSettings REMOTE_STORE_INDEX_SETTINGS = IndexSettingsModule.newIndexSettings( - "index", - Settings.builder() - .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) - .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, "true") - .build() - ); - public void testCreateEngine() throws IOException { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); try ( @@ -144,29 +136,6 @@ public void testUpdateSegments_replicaReceivesSISWithHigherGen() throws IOExcept } } - public void testUpdateSegments_replicaReceivesSISWithHigherGen_remoteStoreEnabled() throws IOException { - final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); - - try ( - final Store nrtEngineStore = createStore(REMOTE_STORE_INDEX_SETTINGS, newDirectory()); - final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, REMOTE_STORE_INDEX_SETTINGS) - ) { - // assume we start at the same gen. - assertEquals(2, nrtEngine.getLatestSegmentInfos().getGeneration()); - assertEquals(nrtEngine.getLatestSegmentInfos().getGeneration(), nrtEngine.getLastCommittedSegmentInfos().getGeneration()); - assertEquals(engine.getLatestSegmentInfos().getGeneration(), nrtEngine.getLatestSegmentInfos().getGeneration()); - - // flush the primary engine - we don't need any segments, just force a new commit point. - engine.flush(true, true); - assertEquals(3, engine.getLatestSegmentInfos().getGeneration()); - - // When remote store is enabled, we don't commit on replicas since all segments are durably persisted in the store - nrtEngine.updateSegments(engine.getLatestSegmentInfos()); - assertEquals(2, nrtEngine.getLastCommittedSegmentInfos().getGeneration()); - assertEquals(2, nrtEngine.getLatestSegmentInfos().getGeneration()); - } - } - public void testUpdateSegments_replicaReceivesSISWithLowerGen() throws IOException { // if the replica is already at segments_N that is received, it will commit segments_N+1. final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); @@ -186,13 +155,13 @@ public void testUpdateSegments_replicaReceivesSISWithLowerGen() throws IOExcepti final SegmentInfos primaryInfos = engine.getLatestSegmentInfos(); assertEquals(2, primaryInfos.getGeneration()); nrtEngine.updateSegments(primaryInfos); - assertEquals(4, nrtEngine.getLastCommittedSegmentInfos().getGeneration()); - assertEquals(4, nrtEngine.getLatestSegmentInfos().getGeneration()); + assertEquals(3, nrtEngine.getLastCommittedSegmentInfos().getGeneration()); + assertEquals(3, nrtEngine.getLatestSegmentInfos().getGeneration()); assertEquals(primaryInfos.getVersion(), nrtEngine.getLatestSegmentInfos().getVersion()); assertEquals(primaryInfos.getVersion(), nrtEngine.getLastCommittedSegmentInfos().getVersion()); nrtEngine.close(); - assertEquals(5, nrtEngine.getLastCommittedSegmentInfos().getGeneration()); + assertEquals(4, nrtEngine.getLastCommittedSegmentInfos().getGeneration()); } } @@ -228,7 +197,7 @@ public void testSimultaneousEngineCloseAndCommit() throws IOException, Interrupt } } - public void testUpdateSegments_replicaCommitsFirstReceivedInfos() throws IOException { + public void testUpdateSegments_replicaStartsAtCorrectGen() throws IOException { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); try ( @@ -237,6 +206,7 @@ public void testUpdateSegments_replicaCommitsFirstReceivedInfos() throws IOExcep ) { assertEquals(2, nrtEngine.getLastCommittedSegmentInfos().getGeneration()); assertEquals(2, nrtEngine.getLatestSegmentInfos().getGeneration()); + assertEquals(nrtEngine.getLastCommittedSegmentInfos().version, nrtEngine.getLatestSegmentInfos().getVersion()); // bump the latest infos version a couple of times so that we can assert the correct version after commit. engine.getLatestSegmentInfos().changed(); engine.getLatestSegmentInfos().changed(); @@ -248,7 +218,7 @@ public void testUpdateSegments_replicaCommitsFirstReceivedInfos() throws IOExcep nrtEngine.updateSegments(primaryInfos); final SegmentInfos lastCommittedSegmentInfos = nrtEngine.getLastCommittedSegmentInfos(); assertEquals(primaryInfos.getVersion(), nrtEngine.getLatestSegmentInfos().getVersion()); - assertEquals(primaryInfos.getVersion(), lastCommittedSegmentInfos.getVersion()); + assertEquals("Commit gen is not increased", 2, lastCommittedSegmentInfos.getGeneration()); } } @@ -375,16 +345,9 @@ private NRTReplicationEngine buildNrtReplicaEngine(AtomicLong globalCheckpoint, public void testGetSegmentInfosSnapshotPreservesFilesUntilRelease() throws Exception { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); - // TODO: Remove this divergent commit logic and copy Segments_N from primary with node-node. - // randomly toggle commit / no commit. - IndexSettings settings = REMOTE_STORE_INDEX_SETTINGS; - final boolean shouldCommit = randomBoolean(); - if (shouldCommit) { - settings = INDEX_SETTINGS; - } try ( - final Store nrtEngineStore = createStore(REMOTE_STORE_INDEX_SETTINGS, newDirectory()); - final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, settings) + final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory()); + final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, INDEX_SETTINGS) ) { // only index 2 docs here, this will create segments _0 and _1 and after forcemerge into _2. final int docCount = 2; @@ -436,7 +399,7 @@ public void testGetSegmentInfosSnapshotPreservesFilesUntilRelease() throws Excep // Ensure we still have all the active files. Note - we exclude the infos file here if we aren't committing // the nrt reader will still reference segments_n-1 after being loaded until a local commit occurs. - assertTrue(replicaFiles.containsAll(nrtEngine.getLatestSegmentInfos().files(shouldCommit))); + assertTrue(replicaFiles.containsAll(nrtEngine.getLatestSegmentInfos().files(false))); } } } diff --git a/server/src/test/java/org/opensearch/index/store/StoreTests.java b/server/src/test/java/org/opensearch/index/store/StoreTests.java index 9043dcce1b779..a4812a4a771bf 100644 --- a/server/src/test/java/org/opensearch/index/store/StoreTests.java +++ b/server/src/test/java/org/opensearch/index/store/StoreTests.java @@ -100,7 +100,6 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.HashMap; @@ -109,8 +108,6 @@ import java.util.Map; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; -import java.util.stream.Stream; import static java.util.Collections.unmodifiableMap; import static org.hamcrest.Matchers.anyOf; @@ -1169,49 +1166,6 @@ public void testGetMetadataWithSegmentInfos() throws IOException { store.close(); } - public void testCleanupAndPreserveLatestCommitPoint() throws IOException { - final ShardId shardId = new ShardId("index", "_na_", 1); - Store store = new Store( - shardId, - SEGMENT_REPLICATION_INDEX_SETTINGS, - StoreTests.newDirectory(random()), - new DummyShardLock(shardId) - ); - commitRandomDocs(store); - - Store.MetadataSnapshot commitMetadata = store.getMetadata(); - - // index more docs but only IW.flush, this will create additional files we'll clean up. - final IndexWriter writer = indexRandomDocs(store); - writer.flush(); - writer.close(); - - final List additionalSegments = new ArrayList<>(); - for (String file : store.directory().listAll()) { - if (commitMetadata.contains(file) == false) { - additionalSegments.add(file); - } - } - assertFalse(additionalSegments.isEmpty()); - - Collection filesToConsiderForCleanUp = Stream.of(store.readLastCommittedSegmentsInfo().files(true), additionalSegments) - .flatMap(Collection::stream) - .collect(Collectors.toList()); - - // clean up everything not in the latest commit point. - store.cleanupAndPreserveLatestCommitPoint(filesToConsiderForCleanUp, "test"); - - // we want to ensure commitMetadata files are preserved after calling cleanup - for (String existingFile : store.directory().listAll()) { - if (!IndexWriter.WRITE_LOCK_NAME.equals(existingFile)) { - assertTrue(commitMetadata.contains(existingFile)); - assertFalse(additionalSegments.contains(existingFile)); - } - } - deleteContent(store.directory()); - IOUtils.close(store); - } - public void testGetSegmentMetadataMap() throws IOException { final ShardId shardId = new ShardId("index", "_na_", 1); Store store = new Store( From 413bb2478e76b0dbe401ebb411e2e3129ca298b3 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Tue, 8 Aug 2023 00:41:38 -0700 Subject: [PATCH 2/8] Add more NRTReplicationEngineTests. Signed-off-by: Marc Handalian --- .../index/engine/NRTReplicationEngine.java | 16 +- .../index/engine/ReplicaFileTracker.java | 18 +- .../engine/NRTReplicationEngineTests.java | 182 +++++++++++++++++- ...licationWithNodeToNodeIndexShardTests.java | 48 +++-- 4 files changed, 220 insertions(+), 44 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index 42463c9d7cb4b..89aaad68043b7 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -59,9 +59,9 @@ public class NRTReplicationEngine extends Engine { private final CompletionStatsCache completionStatsCache; private final LocalCheckpointTracker localCheckpointTracker; private final WriteOnlyTranslogManager translogManager; - private final ReplicaFileTracker replicaFileTracker; + protected final ReplicaFileTracker replicaFileTracker; - private volatile long lastReceivedGen = SequenceNumbers.NO_OPS_PERFORMED; + private volatile long lastReceivedPrimaryGen = SequenceNumbers.NO_OPS_PERFORMED; private static final int SI_COUNTER_INCREMENT = 10; @@ -77,7 +77,6 @@ public NRTReplicationEngine(EngineConfig engineConfig) { replicaFileTracker.incRef(this.lastCommittedSegmentInfos.files(true)); // cleanup anything not referenced by the latest infos. cleanUnreferencedFiles(); - this.lastReceivedGen = lastCommittedSegmentInfos.getGeneration(); readerManager = buildReaderManager(); final SequenceNumbers.CommitInfo commitInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit( this.lastCommittedSegmentInfos.getUserData().entrySet() @@ -156,15 +155,16 @@ public synchronized void updateSegments(final SegmentInfos infos) throws IOExcep final long maxSeqNo = Long.parseLong(infos.userData.get(MAX_SEQ_NO)); final long incomingGeneration = infos.getGeneration(); readerManager.updateSegments(infos); - - // Commit and roll the translog when we receive a different generation than what was last received. - // lower/higher gens are possible from a new primary that was just elected. - if (incomingGeneration != lastReceivedGen) { + // Ensure that we commit and clear the local translog if a new commit has been made on the primary. + // We do not compare against the last local commit gen here because it is possible to receive + // a lower gen from a newly elected primary shard that is behind this shard's last commit gen. + // In that case we still commit into the next local generation. + if (incomingGeneration != this.lastReceivedPrimaryGen) { commitSegmentInfos(); translogManager.getDeletionPolicy().setLocalCheckpointOfSafeCommit(maxSeqNo); translogManager.rollTranslogGeneration(); } - lastReceivedGen = incomingGeneration; + this.lastReceivedPrimaryGen = incomingGeneration; localCheckpointTracker.fastForwardProcessedSeqNo(maxSeqNo); } } diff --git a/server/src/main/java/org/opensearch/index/engine/ReplicaFileTracker.java b/server/src/main/java/org/opensearch/index/engine/ReplicaFileTracker.java index 4b076b98c1d29..49600d26df453 100644 --- a/server/src/main/java/org/opensearch/index/engine/ReplicaFileTracker.java +++ b/server/src/main/java/org/opensearch/index/engine/ReplicaFileTracker.java @@ -10,12 +10,12 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.lucene.index.IndexFileNames; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.function.BiConsumer; @@ -43,6 +43,10 @@ public synchronized void incRef(Collection fileNames) { } } + public synchronized int refCount(String file) { + return Optional.ofNullable(refCounts.get(file)).orElse(0); + } + public synchronized void decRef(Collection fileNames) { Set toDelete = new HashSet<>(); for (String fileName : fileNames) { @@ -70,18 +74,8 @@ public void deleteUnreferencedFiles(Collection toDelete) { } private synchronized void delete(Collection toDelete) { - // First pass: delete any segments_N files. We do these first to be certain stale commit points - // are removed - // before we remove any files they reference, in case we crash right now: - for (String fileName : toDelete) { - if (fileName.startsWith(IndexFileNames.SEGMENTS)) { - delete(fileName); - } - } for (String fileName : toDelete) { - if (fileName.startsWith(IndexFileNames.SEGMENTS) == false) { - delete(fileName); - } + delete(fileName); } } diff --git a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java index c84fd6af07769..2ea4f13749e56 100644 --- a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java @@ -144,8 +144,12 @@ public void testUpdateSegments_replicaReceivesSISWithLowerGen() throws IOExcepti final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory()); final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore) ) { + assertEquals(5, nrtEngine.getLatestSegmentInfos().getVersion()); nrtEngine.getLatestSegmentInfos().changed(); nrtEngine.getLatestSegmentInfos().changed(); + assertEquals(7, nrtEngine.getLatestSegmentInfos().getVersion()); + assertEquals(2, nrtEngine.getLastCommittedSegmentInfos().getGeneration()); + // commit the infos to push us to segments_3. nrtEngine.commitSegmentInfos(); assertEquals(3, nrtEngine.getLastCommittedSegmentInfos().getGeneration()); @@ -154,14 +158,15 @@ public void testUpdateSegments_replicaReceivesSISWithLowerGen() throws IOExcepti // update the replica with segments_2 from the primary. final SegmentInfos primaryInfos = engine.getLatestSegmentInfos(); assertEquals(2, primaryInfos.getGeneration()); + assertEquals(5, primaryInfos.getVersion()); nrtEngine.updateSegments(primaryInfos); - assertEquals(3, nrtEngine.getLastCommittedSegmentInfos().getGeneration()); - assertEquals(3, nrtEngine.getLatestSegmentInfos().getGeneration()); + assertEquals(4, nrtEngine.getLastCommittedSegmentInfos().getGeneration()); + assertEquals(4, nrtEngine.getLatestSegmentInfos().getGeneration()); assertEquals(primaryInfos.getVersion(), nrtEngine.getLatestSegmentInfos().getVersion()); assertEquals(primaryInfos.getVersion(), nrtEngine.getLastCommittedSegmentInfos().getVersion()); nrtEngine.close(); - assertEquals(4, nrtEngine.getLastCommittedSegmentInfos().getGeneration()); + assertEquals(5, nrtEngine.getLastCommittedSegmentInfos().getGeneration()); } } @@ -197,7 +202,7 @@ public void testSimultaneousEngineCloseAndCommit() throws IOException, Interrupt } } - public void testUpdateSegments_replicaStartsAtCorrectGen() throws IOException { + public void testUpdateSegments_replicaCommitsFirstReceivedInfos() throws IOException { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); try ( @@ -206,7 +211,6 @@ public void testUpdateSegments_replicaStartsAtCorrectGen() throws IOException { ) { assertEquals(2, nrtEngine.getLastCommittedSegmentInfos().getGeneration()); assertEquals(2, nrtEngine.getLatestSegmentInfos().getGeneration()); - assertEquals(nrtEngine.getLastCommittedSegmentInfos().version, nrtEngine.getLatestSegmentInfos().getVersion()); // bump the latest infos version a couple of times so that we can assert the correct version after commit. engine.getLatestSegmentInfos().changed(); engine.getLatestSegmentInfos().changed(); @@ -218,7 +222,7 @@ public void testUpdateSegments_replicaStartsAtCorrectGen() throws IOException { nrtEngine.updateSegments(primaryInfos); final SegmentInfos lastCommittedSegmentInfos = nrtEngine.getLastCommittedSegmentInfos(); assertEquals(primaryInfos.getVersion(), nrtEngine.getLatestSegmentInfos().getVersion()); - assertEquals("Commit gen is not increased", 2, lastCommittedSegmentInfos.getGeneration()); + assertEquals(primaryInfos.getVersion(), lastCommittedSegmentInfos.getVersion()); } } @@ -402,4 +406,170 @@ public void testGetSegmentInfosSnapshotPreservesFilesUntilRelease() throws Excep assertTrue(replicaFiles.containsAll(nrtEngine.getLatestSegmentInfos().files(false))); } } + + public void testRemoveExtraFilesOnStartup() throws Exception { + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + List operations = generateHistoryOnReplica(2, randomBoolean(), randomBoolean(), randomBoolean()); + for (Engine.Operation op : operations) { + applyOperation(engine, op); + // refresh to create a lot of segments. + engine.refresh("test"); + } + try (final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());) { + final Collection extraSegments = engine.getLatestSegmentInfos().files(false); + for (String file : extraSegments) { + nrtEngineStore.directory().copyFrom(store.directory(), file, file, IOContext.DEFAULT); + } + List replicaFiles = List.of(nrtEngineStore.directory().listAll()); + for (String file : extraSegments) { + assertTrue(replicaFiles.contains(file)); + } + assertTrue(storeContainsAll(nrtEngineStore, extraSegments)); + try (NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, INDEX_SETTINGS)) { + replicaFiles = List.of(nrtEngineStore.directory().listAll()); + for (String file : extraSegments) { + assertFalse(replicaFiles.contains(file)); + } + } + } + } + + public void testPreserveLatestCommit() throws Exception { + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + + try ( + final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory()); + final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, INDEX_SETTINGS) + ) { + final int docCount = 4; + List operations = generateHistoryOnReplica(docCount, randomBoolean(), randomBoolean(), randomBoolean()); + indexOperations(nrtEngine, operations.subList(0, 2)); + // wipe the nrt directory initially so we can sync with primary. + cleanAndCopySegmentsFromPrimary(nrtEngine); + SegmentInfos primaryInfos; + + final SegmentInfos lastCommittedSegmentInfos = nrtEngine.getLastCommittedSegmentInfos(); + final Collection lastCommittedFiles = lastCommittedSegmentInfos.files(true); + assertTrue(storeContainsAll(nrtEngineStore, lastCommittedFiles)); + + // ensure segments and commit file are incref'd: + assertEquals( + "Segments_N is incref'd once", + 1, + nrtEngine.replicaFileTracker.refCount(lastCommittedSegmentInfos.getSegmentsFileName()) + ); + // segments are incref'd twice because they are loaded on the reader. + assertRefCount(nrtEngine, lastCommittedSegmentInfos.files(false), 2); + + // get and close a snapshot - this will decref files when closed. + final GatedCloseable segmentInfosSnapshot = nrtEngine.getSegmentInfosSnapshot(); + segmentInfosSnapshot.close(); + assertTrue(storeContainsAll(nrtEngineStore, lastCommittedFiles)); + + // index more docs and refresh the reader - this will incref/decref files again + indexOperations(nrtEngine, operations.subList(2, 4)); + primaryInfos = engine.getLatestSegmentInfos(); + copySegments(primaryInfos.files(false), nrtEngine); + nrtEngine.updateSegments(primaryInfos); + + // get the additional segments that are only on the reader - not part of a commit. + final Collection readerOnlySegments = primaryInfos.files(false); + readerOnlySegments.removeAll(lastCommittedFiles); + assertRefCount(nrtEngine, readerOnlySegments, 1); + + assertTrue(storeContainsAll(nrtEngineStore, lastCommittedFiles)); + assertEquals( + "Segments_N is incref'd once", + 1, + nrtEngine.replicaFileTracker.refCount(lastCommittedSegmentInfos.getSegmentsFileName()) + ); + assertRefCount(nrtEngine, lastCommittedSegmentInfos.files(false), 2); + + // flush the primary + engine.flush(true, true); + copySegments(engine.getLatestSegmentInfos().files(false), nrtEngine); + nrtEngine.updateSegments(engine.getLatestSegmentInfos()); + // after flush our segment_n is removed. + assertEquals( + "Segments_N is removed", + 0, + nrtEngine.replicaFileTracker.refCount(lastCommittedSegmentInfos.getSegmentsFileName()) + ); + assertFalse(storeContainsAll(nrtEngineStore, lastCommittedFiles)); + // close the engine - ensure we preserved the last commit + final SegmentInfos infosBeforeClose = nrtEngine.getLatestSegmentInfos(); + nrtEngine.close(); + assertTrue(storeContainsAll(nrtEngineStore, infosBeforeClose.files(false))); + assertEquals(store.readLastCommittedSegmentsInfo().files(false), infosBeforeClose.files(false)); + } + } + + private void assertRefCount(NRTReplicationEngine nrtEngine, Collection files, int count) { + for (String file : files) { + // refCount for our segments is 2 because they are still active on the reader + assertEquals(count, nrtEngine.replicaFileTracker.refCount(file)); + } + } + + private boolean storeContainsAll(Store nrtEngineStore, Collection lastCommittedFiles) throws IOException { + return List.of(nrtEngineStore.directory().listAll()).containsAll(lastCommittedFiles); + } + + private void cleanAndCopySegmentsFromPrimary(NRTReplicationEngine nrtEngine) throws IOException { + Lucene.cleanLuceneIndex(nrtEngine.store.directory()); + assertFalse( + Arrays.stream(nrtEngine.store.directory().listAll()) + .anyMatch(file -> file.equals("write.lock") == false && file.equals("extra0") == false) + ); + SegmentInfos primaryInfos = engine.getLatestSegmentInfos(); + copySegments(primaryInfos.files(false), nrtEngine); + nrtEngine.updateSegments(primaryInfos); + } + + private void indexOperations(NRTReplicationEngine nrtEngine, List operations) throws IOException { + for (Engine.Operation op : operations) { + applyOperation(engine, op); + applyOperation(nrtEngine, op); + engine.refresh("test"); + } + } + + public void testDecrefToZeroRemovesFile() throws IOException { + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + + try ( + final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory()); + final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, INDEX_SETTINGS) + ) { + Lucene.cleanLuceneIndex(nrtEngineStore.directory()); + copySegments(engine.getLatestSegmentInfos().files(true), nrtEngine); + nrtEngine.updateSegments(engine.getLatestSegmentInfos()); + final SegmentInfos lastCommittedSegmentInfos = nrtEngine.getLastCommittedSegmentInfos(); + assertEquals( + "Segments_N is incref'd to 1", + 1, + nrtEngine.replicaFileTracker.refCount(lastCommittedSegmentInfos.getSegmentsFileName()) + ); + // create a new commit and update infos + engine.flush(true, true); + nrtEngine.updateSegments(engine.getLatestSegmentInfos()); + assertEquals( + "Segments_N is removed", + 0, + nrtEngine.replicaFileTracker.refCount(lastCommittedSegmentInfos.getSegmentsFileName()) + ); + assertFalse(List.of(nrtEngineStore.directory().listAll()).contains(lastCommittedSegmentInfos.getSegmentsFileName())); + } + } + + private void copySegments(Collection latestPrimaryFiles, Engine nrtEngine) throws IOException { + final Store store = nrtEngine.store; + final List replicaFiles = List.of(store.directory().listAll()); + // copy new segments in and load reader. + for (String file : latestPrimaryFiles) { + if (replicaFiles.contains(file) == false) { + store.directory().copyFrom(this.store.directory(), file, file, IOContext.DEFAULT); + } + } + } } diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java index 1df7c72cbc8a8..0359a9b926cd9 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java @@ -432,45 +432,57 @@ public void testTemporaryFilesNotCleanup() throws Exception { } } - // Todo: Move this test to SegmentReplicationIndexShardTests so that it runs for both node-node & remote store public void testReplicaReceivesLowerGeneration() throws Exception { // when a replica gets incoming segments that are lower than what it currently has on disk. - - // start 3 nodes Gens: P [2], R [2], R[2] - // index some docs and flush twice, push to only 1 replica. - // State Gens: P [4], R-1 [3], R-2 [2] - // Promote R-2 as the new primary and demote the old primary. - // State Gens: R[4], R-1 [3], P [4] - *commit on close of NRTEngine, xlog replayed and commit made. - // index docs on new primary and flush - // replicate to all. - // Expected result: State Gens: P[4], R-1 [4], R-2 [4] + // this can happen when a replica is promoted that is further behind the other replicas. try (ReplicationGroup shards = createGroup(2, getIndexSettings(), new NRTReplicationEngineFactory())) { shards.startAll(); final IndexShard primary = shards.getPrimary(); - final IndexShard replica_1 = shards.getReplicas().get(0); + final IndexShard behindReplicaBeforeRestart = shards.getReplicas().get(0); final IndexShard replica_2 = shards.getReplicas().get(1); int numDocs = randomIntBetween(10, 100); + int totalDocs = numDocs; shards.indexDocs(numDocs); - flushShard(primary, false); - replicateSegments(primary, List.of(replica_1)); + flushShard(primary, true); + replicateSegments(primary, List.of(behindReplicaBeforeRestart)); numDocs = randomIntBetween(numDocs + 1, numDocs + 10); + totalDocs += numDocs; shards.indexDocs(numDocs); - flushShard(primary, false); - replicateSegments(primary, List.of(replica_1)); + flushShard(primary, true); + flushShard(primary, true); + flushShard(primary, true); + replicateSegments(primary, List.of(behindReplicaBeforeRestart)); + + // close behindReplicaBeforeRestart - we will re-open it after replica_2 is promoted as new primary. + assertEqualCommittedSegments(primary, behindReplicaBeforeRestart); - assertEqualCommittedSegments(primary, replica_1); + assertDocCount(behindReplicaBeforeRestart, totalDocs); + assertDocCount(replica_2, 0); shards.promoteReplicaToPrimary(replica_2).get(); - primary.close("demoted", false, false); + primary.close("demoted", randomBoolean(), false); primary.store().close(); IndexShard oldPrimary = shards.addReplicaWithExistingPath(primary.shardPath(), primary.routingEntry().currentNodeId()); shards.recoverReplica(oldPrimary); + behindReplicaBeforeRestart.close("restart", false, false); + behindReplicaBeforeRestart.store().close(); + shards.removeReplica(behindReplicaBeforeRestart); + final IndexShard behindReplicaAfterRestart = shards.addReplicaWithExistingPath( + behindReplicaBeforeRestart.shardPath(), + behindReplicaBeforeRestart.routingEntry().currentNodeId() + ); + shards.recoverReplica(behindReplicaAfterRestart); + numDocs = randomIntBetween(numDocs + 1, numDocs + 10); + totalDocs += numDocs; shards.indexDocs(numDocs); flushShard(replica_2, false); replicateSegments(replica_2, shards.getReplicas()); - assertEqualCommittedSegments(replica_2, oldPrimary, replica_1); + assertEqualCommittedSegments(replica_2, oldPrimary, behindReplicaAfterRestart); + assertDocCount(replica_2, totalDocs); + assertDocCount(oldPrimary, totalDocs); + assertDocCount(behindReplicaAfterRestart, totalDocs); } } From fc1d75673f7dd875f2bff481204bb09961df2ca1 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Tue, 8 Aug 2023 00:41:52 -0700 Subject: [PATCH 3/8] Ensure old commit files are wiped on remote store sync before we commit a new segmentInfos. Signed-off-by: Marc Handalian --- .../org/opensearch/index/shard/IndexShard.java | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index d3c6a9b955130..16559eeabfd9b 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -4659,8 +4659,9 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re .getSegmentsUploadedToRemoteStore() .entrySet() .stream() - // ignore any segments_n uploaded to the store, we will commit the infos bytes locally. - .filter(entry -> entry.getKey().startsWith(IndexFileNames.SEGMENTS) == false) + // if this is a refresh level sync, ignore any segments_n uploaded to the store, we will commit the received infos bytes + // locally. + .filter(entry -> refreshLevelSegmentSync && entry.getKey().startsWith(IndexFileNames.SEGMENTS) == false) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); store.incRef(); remoteStore.incRef(); @@ -4687,6 +4688,15 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re remoteSegmentMetadata.getGeneration() ); long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY)); + // delete any other commits, we want to start the engine only from a new commit made with the downloaded infos bytes. + // Extra segments will be wiped on engine open. + for (String file : List.of(store.directory().listAll())) { + if (file.startsWith(IndexFileNames.SEGMENTS)) { + store.deleteQuiet(file); + } + } + assert Arrays.stream(store.directory().listAll()).filter(f -> f.startsWith(IndexFileNames.SEGMENTS)).findAny().isEmpty() + : "There should not be any segments file in the dir"; store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint); } } catch (IOException e) { From a8106ca0291eab8929f3ea8e324fd9dd4010289c Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Tue, 8 Aug 2023 22:53:16 -0700 Subject: [PATCH 4/8] Add more shard level tests. Signed-off-by: Marc Handalian --- .../SegmentReplicationUsingRemoteStoreIT.java | 20 +++ .../index/shard/RemoteIndexShardTests.java | 134 ++++++++++++++++++ ...enSearchIndexLevelReplicationTestCase.java | 4 +- .../index/shard/IndexShardTestCase.java | 16 ++- 4 files changed, 170 insertions(+), 4 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreIT.java index 6f76c21cc0411..91c180f1f1ce0 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreIT.java @@ -10,6 +10,7 @@ import org.junit.After; import org.junit.Before; +import org.opensearch.action.support.WriteRequest; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; import org.opensearch.indices.replication.SegmentReplicationIT; @@ -67,4 +68,23 @@ public void teardown() { public void testPressureServiceStats() throws Exception { super.testPressureServiceStats(); } + + public void testRestartPrimary_NoReplicas() throws Exception { + final String primary = internalCluster().startDataOnlyNode(); + createIndex(INDEX_NAME); + ensureYellow(INDEX_NAME); + + assertEquals(getNodeContainingPrimaryShard().getName(), primary); + + client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); + if (randomBoolean()) { + flush(INDEX_NAME); + } else { + refresh(INDEX_NAME); + } + + internalCluster().restartNode(primary); + ensureYellow(INDEX_NAME); + assertDocCounts(1, primary); + } } diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java index a01169480de0b..b5fab9b8d4a3d 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java @@ -8,19 +8,29 @@ package org.opensearch.index.shard; +import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.util.Version; +import org.hamcrest.MatcherAssert; import org.junit.Before; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; import org.opensearch.index.engine.DocIdSeqNoAndSource; +import org.opensearch.index.engine.Engine; import org.opensearch.index.engine.InternalEngine; import org.opensearch.index.engine.NRTReplicationEngineFactory; +import org.opensearch.index.store.Store; +import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.replication.common.ReplicationType; import java.io.IOException; +import java.nio.file.Path; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.CountDownLatch; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; import static org.opensearch.index.engine.EngineTestCase.assertAtMostOneLuceneDocumentPerSequenceNumber; @@ -172,4 +182,128 @@ public void testNoDuplicateSeqNo() throws Exception { } } } + + public void testReplicaCommitsInfosBytesOnRecovery() throws Exception { + final Path remotePath = createTempDir(); + try (ReplicationGroup shards = createGroup(0, getIndexSettings(), indexMapping, new NRTReplicationEngineFactory(), remotePath)) { + shards.startAll(); + // ensure primary has uploaded something + shards.indexDocs(10); + shards.refresh("test"); + + final IndexShard primary = shards.getPrimary(); + final Engine primaryEngine = getEngine(primary); + assertNotNull(primaryEngine); + final SegmentInfos latestCommit = SegmentInfos.readLatestCommit(primary.store().directory()); + assertEquals("On-disk commit references no segments", Set.of("segments_3"), latestCommit.files(true)); + assertEquals( + "Latest remote commit On-disk commit references no segments", + Set.of("segments_3"), + primary.remoteStore().readLastCommittedSegmentsInfo().files(true) + ); + MatcherAssert.assertThat( + "Segments are referenced in memory only", + primaryEngine.getSegmentInfosSnapshot().get().files(false), + containsInAnyOrder("_0.cfe", "_0.si", "_0.cfs") + ); + + final IndexShard replica = shards.addReplica(remotePath); + replica.store().createEmpty(Version.LATEST); + assertEquals( + "Replica starts at empty segment 2", + Set.of("segments_1"), + replica.store().readLastCommittedSegmentsInfo().files(true) + ); + // commit replica infos so it has a conflicting commit with remote. + final SegmentInfos segmentCommitInfos = replica.store().readLastCommittedSegmentsInfo(); + segmentCommitInfos.commit(replica.store().directory()); + segmentCommitInfos.commit(replica.store().directory()); + assertEquals( + "Replica starts recovery at empty segment 3", + Set.of("segments_3"), + replica.store().readLastCommittedSegmentsInfo().files(true) + ); + + shards.recoverReplica(replica); + + final Engine replicaEngine = getEngine(replica); + assertNotNull(replicaEngine); + final SegmentInfos latestReplicaCommit = SegmentInfos.readLatestCommit(replica.store().directory()); + logger.info(List.of(replica.store().directory().listAll())); + MatcherAssert.assertThat( + "Replica commits infos bytes referencing latest refresh point", + latestReplicaCommit.files(true), + containsInAnyOrder("_0.cfe", "_0.si", "_0.cfs", "segments_5") + ); + MatcherAssert.assertThat( + "Segments are referenced in memory", + replicaEngine.getSegmentInfosSnapshot().get().files(false), + containsInAnyOrder("_0.cfe", "_0.si", "_0.cfs") + ); + final Store.RecoveryDiff recoveryDiff = Store.segmentReplicationDiff( + primary.getSegmentMetadataMap(), + replica.getSegmentMetadataMap() + ); + assertTrue(recoveryDiff.missing.isEmpty()); + assertTrue(recoveryDiff.different.isEmpty()); + } + } + + public void testPrimaryRestart() throws Exception { + final Path remotePath = createTempDir(); + try (ReplicationGroup shards = createGroup(0, getIndexSettings(), indexMapping, new NRTReplicationEngineFactory(), remotePath)) { + shards.startAll(); + // ensure primary has uploaded something + shards.indexDocs(10); + IndexShard primary = shards.getPrimary(); + if (randomBoolean()) { + flushShard(primary); + } else { + primary.refresh("test"); + } + assertDocCount(primary, 10); + // get a metadata map - we'll use segrep diff to ensure segments on reader are identical after restart. + final Map metadataBeforeRestart = primary.getSegmentMetadataMap(); + // restart the primary + shards.reinitPrimaryShard(remotePath); + // the store is open at this point but the shard has not yet run through recovery + primary = shards.getPrimary(); + shards.startPrimary(); + assertDocCount(primary, 10); + final Store.RecoveryDiff diff = Store.segmentReplicationDiff(metadataBeforeRestart, primary.getSegmentMetadataMap()); + assertTrue(diff.missing.isEmpty()); + assertTrue(diff.different.isEmpty()); + } + } + + public void testPrimaryRestart_PrimaryHasExtraCommits() throws Exception { + final Path remotePath = createTempDir(); + try (ReplicationGroup shards = createGroup(0, getIndexSettings(), indexMapping, new NRTReplicationEngineFactory(), remotePath)) { + shards.startAll(); + // ensure primary has uploaded something + shards.indexDocs(10); + IndexShard primary = shards.getPrimary(); + if (randomBoolean()) { + flushShard(primary); + } else { + primary.refresh("test"); + } + assertDocCount(primary, 10); + // get a metadata map - we'll use segrep diff to ensure segments on reader are identical after restart. + final Map metadataBeforeRestart = primary.getSegmentMetadataMap(); + // restart the primary + shards.reinitPrimaryShard(remotePath); + // the store is open at this point but the shard has not yet run through recovery + primary = shards.getPrimary(); + SegmentInfos latestPrimaryCommit = SegmentInfos.readLatestCommit(primary.store().directory()); + latestPrimaryCommit.commit(primary.store().directory()); + latestPrimaryCommit = SegmentInfos.readLatestCommit(primary.store().directory()); + latestPrimaryCommit.commit(primary.store().directory()); + shards.startPrimary(); + assertDocCount(primary, 10); + final Store.RecoveryDiff diff = Store.segmentReplicationDiff(metadataBeforeRestart, primary.getSegmentMetadataMap()); + assertTrue(diff.missing.isEmpty()); + assertTrue(diff.different.isEmpty()); + } + } } diff --git a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java index a0d5be240d552..53d2d4b8b40ab 100644 --- a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java @@ -622,8 +622,8 @@ public synchronized IndexShard getPrimary() { return primary; } - public synchronized void reinitPrimaryShard() throws IOException { - primary = reinitShard(primary); + public synchronized void reinitPrimaryShard(Path remotePath) throws IOException { + primary = reinitShard(primary, remotePath); computeReplicationTargets(); } diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index ebe43fc846899..a880aa25a4b4e 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -793,12 +793,18 @@ protected BlobContainer getBlobContainer(Path f) throws IOException { return new FsBlobContainer(fsBlobStore, blobPath, f); } + protected IndexShard reinitShard(IndexShard current, IndexingOperationListener... listeners) throws IOException { + return reinitShard(current, (Path) null, listeners); + } + /** * Takes an existing shard, closes it and starts a new initialing shard at the same location * + * @param current The current shard to reinit + * @param remotePath Remote path to recover from if remote storage is used * @param listeners new listerns to use for the newly created shard */ - protected IndexShard reinitShard(IndexShard current, IndexingOperationListener... listeners) throws IOException { + protected IndexShard reinitShard(IndexShard current, Path remotePath, IndexingOperationListener... listeners) throws IOException { final ShardRouting shardRouting = current.routingEntry(); return reinitShard( current, @@ -806,6 +812,7 @@ protected IndexShard reinitShard(IndexShard current, IndexingOperationListener.. shardRouting, shardRouting.primary() ? RecoverySource.ExistingStoreRecoverySource.INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE ), + remotePath, listeners ); } @@ -817,13 +824,18 @@ protected IndexShard reinitShard(IndexShard current, IndexingOperationListener.. * @param listeners new listerns to use for the newly created shard */ protected IndexShard reinitShard(IndexShard current, ShardRouting routing, IndexingOperationListener... listeners) throws IOException { + return reinitShard(current, routing, null, listeners); + } + + protected IndexShard reinitShard(IndexShard current, ShardRouting routing, Path remotePath, IndexingOperationListener... listeners) + throws IOException { return reinitShard( current, routing, current.indexSettings.getIndexMetadata(), current.engineFactory, current.engineConfigFactory, - null, + remotePath, listeners ); } From 7362c2033f02987f7eb9e4caa2df845dfd108b49 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Tue, 8 Aug 2023 23:16:40 -0700 Subject: [PATCH 5/8] Add test ensuring commits are cleaned up on replicas. Signed-off-by: Marc Handalian --- .../replication/SegmentReplicationTarget.java | 5 +-- .../index/shard/RemoteIndexShardTests.java | 43 +++++++++++++++++++ 2 files changed, 45 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index bae00bc4e7a75..376dae40cda90 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -169,7 +169,7 @@ public void startReplication(ActionListener listener) { }, listener::onFailure); getFilesListener.whenComplete(response -> { - finalizeReplication(checkpointInfoListener.result(), getFilesListener.result()); + finalizeReplication(checkpointInfoListener.result()); listener.onResponse(null); }, listener::onFailure); } @@ -200,8 +200,7 @@ private List getFiles(CheckpointInfoResponse checkpointInfo) return diff.missing; } - private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse, GetSegmentFilesResponse getSegmentFilesResponse) - throws OpenSearchCorruptionException { + private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse) throws OpenSearchCorruptionException { cancellableThreads.checkForCancel(); state.setStage(SegmentReplicationState.Stage.FINALIZE_REPLICATION); // Handle empty SegmentInfos bytes for recovering replicas diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java index b5fab9b8d4a3d..6cfe5e504eef2 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java @@ -8,6 +8,7 @@ package org.opensearch.index.shard; +import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.util.Version; import org.hamcrest.MatcherAssert; @@ -25,10 +26,12 @@ import java.io.IOException; import java.nio.file.Path; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.stream.Collectors; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; @@ -306,4 +309,44 @@ public void testPrimaryRestart_PrimaryHasExtraCommits() throws Exception { assertTrue(diff.different.isEmpty()); } } + + public void testRepicaCleansUpOldCommitsWhenReceivingNew() throws Exception { + final Path remotePath = createTempDir(); + try (ReplicationGroup shards = createGroup(1, getIndexSettings(), indexMapping, new NRTReplicationEngineFactory(), remotePath)) { + shards.startAll(); + final IndexShard primary = shards.getPrimary(); + final IndexShard replica = shards.getReplicas().get(0); + shards.indexDocs(1); + flushShard(primary); + replicateSegments(primary, shards.getReplicas()); + assertDocCount(primary, 1); + assertDocCount(replica, 1); + assertEquals("segments_4", replica.store().readLastCommittedSegmentsInfo().getSegmentsFileName()); + assertSingleSegmentFile(replica, "segments_4"); + + shards.indexDocs(1); + primary.refresh("test"); + replicateSegments(primary, shards.getReplicas()); + assertDocCount(replica, 2); + assertSingleSegmentFile(replica, "segments_4"); + + shards.indexDocs(1); + flushShard(primary); + replicateSegments(primary, shards.getReplicas()); + assertDocCount(replica, 3); + assertSingleSegmentFile(replica, "segments_5"); + + final Store.RecoveryDiff diff = Store.segmentReplicationDiff(primary.getSegmentMetadataMap(), replica.getSegmentMetadataMap()); + assertTrue(diff.missing.isEmpty()); + assertTrue(diff.different.isEmpty()); + } + } + + private void assertSingleSegmentFile(IndexShard shard, String fileName) throws IOException { + final Set segmentsFileNames = Arrays.stream(shard.store().directory().listAll()) + .filter(file -> file.startsWith(IndexFileNames.SEGMENTS)) + .collect(Collectors.toSet()); + assertEquals("Expected a single segment file", 1, segmentsFileNames.size()); + assertEquals(segmentsFileNames.stream().findFirst().get(), fileName); + } } From b555a26e4c878c7892c171c38ea3148ad7cbea1b Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Wed, 9 Aug 2023 09:20:12 -0700 Subject: [PATCH 6/8] Self review. Signed-off-by: Marc Handalian --- .../org/opensearch/index/engine/NRTReplicationEngine.java | 6 +----- .../org/opensearch/index/engine/ReplicaFileTracker.java | 5 +++-- .../opensearch/index/shard/RemoteStoreRefreshListener.java | 1 + .../opensearch/index/engine/NRTReplicationEngineTests.java | 4 +++- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index 89aaad68043b7..014d161ef8f66 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -40,8 +40,6 @@ import java.util.Objects; import java.util.concurrent.CountDownLatch; import java.util.function.BiFunction; -import java.util.stream.Collectors; -import java.util.stream.Stream; import static org.opensearch.index.seqno.SequenceNumbers.MAX_SEQ_NO; @@ -130,9 +128,7 @@ public void onAfterTranslogSync() { } public void cleanUnreferencedFiles() throws IOException { - replicaFileTracker.deleteUnreferencedFiles( - Stream.of(store.directory().listAll()).filter(f -> f.startsWith("write.lock") == false).collect(Collectors.toSet()) - ); + replicaFileTracker.deleteUnreferencedFiles(store.directory().listAll()); } private NRTReplicationReaderManager buildReaderManager() throws IOException { diff --git a/server/src/main/java/org/opensearch/index/engine/ReplicaFileTracker.java b/server/src/main/java/org/opensearch/index/engine/ReplicaFileTracker.java index 49600d26df453..2e8bd6409c2f6 100644 --- a/server/src/main/java/org/opensearch/index/engine/ReplicaFileTracker.java +++ b/server/src/main/java/org/opensearch/index/engine/ReplicaFileTracker.java @@ -32,6 +32,7 @@ final class ReplicaFileTracker { public static final Logger logger = LogManager.getLogger(ReplicaFileTracker.class); private final Map refCounts = new HashMap<>(); private final BiConsumer fileDeleter; + private final Set EXCLUDE_FILES = Set.of("write.lock"); public ReplicaFileTracker(BiConsumer fileDeleter) { this.fileDeleter = fileDeleter; @@ -65,7 +66,7 @@ public synchronized void decRef(Collection fileNames) { } } - public void deleteUnreferencedFiles(Collection toDelete) { + public void deleteUnreferencedFiles(String... toDelete) { for (String file : toDelete) { if (canDelete(file)) { delete(file); @@ -85,7 +86,7 @@ private synchronized void delete(String fileName) { } private synchronized boolean canDelete(String fileName) { - return refCounts.containsKey(fileName) == false && fileName.startsWith("write.lock") == false; + return EXCLUDE_FILES.contains(fileName) == false && refCounts.containsKey(fileName) == false; } } diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 99e901aa6e4f9..9fbc3748f9383 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -222,6 +222,7 @@ private boolean syncSegments() { // move. long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint(); Collection localSegmentsPostRefresh = segmentInfos.files(true); + // Create a map of file name to size and update the refresh segment tracker updateLocalSizeMapAndTracker(localSegmentsPostRefresh); CountDownLatch latch = new CountDownLatch(1); diff --git a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java index 2ea4f13749e56..0aa96e38ebf3e 100644 --- a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java @@ -13,6 +13,7 @@ import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.store.IOContext; +import org.apache.lucene.util.Version; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.UUIDs; import org.opensearch.common.concurrent.GatedCloseable; @@ -407,7 +408,7 @@ public void testGetSegmentInfosSnapshotPreservesFilesUntilRelease() throws Excep } } - public void testRemoveExtraFilesOnStartup() throws Exception { + public void testRemoveExtraSegmentsOnStartup() throws Exception { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); List operations = generateHistoryOnReplica(2, randomBoolean(), randomBoolean(), randomBoolean()); for (Engine.Operation op : operations) { @@ -416,6 +417,7 @@ public void testRemoveExtraFilesOnStartup() throws Exception { engine.refresh("test"); } try (final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());) { + nrtEngineStore.createEmpty(Version.LATEST); final Collection extraSegments = engine.getLatestSegmentInfos().files(false); for (String file : extraSegments) { nrtEngineStore.directory().copyFrom(store.directory(), file, file, IOContext.DEFAULT); From 078b35b8d92953f4489fbfa9efad80d5a4e73f86 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Wed, 9 Aug 2023 22:07:24 -0700 Subject: [PATCH 7/8] Use refresh level sync before recovery Signed-off-by: Marc Handalian --- .../java/org/opensearch/remotestore/RemoteStoreIT.java | 8 ++------ .../indices/recovery/PeerRecoveryTargetService.java | 2 +- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java index b7bb0cc6608d0..21151747fd776 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java @@ -87,12 +87,8 @@ private void testPeerRecovery(int numberOfIterations, boolean invokeFlush) throw .filter(rs -> rs.getRecoverySource().getType() == RecoverySource.Type.PEER) .findFirst(); assertFalse(recoverySource.isEmpty()); - if (numberOfIterations == 1 && invokeFlush) { - // segments_N file is copied to new replica - assertEquals(1, recoverySource.get().getIndex().recoveredFileCount()); - } else { - assertEquals(0, recoverySource.get().getIndex().recoveredFileCount()); - } + // segments_N file is copied to new replica + assertEquals(1, recoverySource.get().getIndex().recoveredFileCount()); IndexResponse response = indexSingleDoc(INDEX_NAME); assertEquals(indexStats.get(MAX_SEQ_NO_TOTAL) + 1, response.getSeqNo()); diff --git a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java index d3e62282d6b5c..d216721d702cc 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java @@ -245,7 +245,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi indexShard.prepareForIndexRecovery(); final boolean hasRemoteSegmentStore = indexShard.indexSettings().isRemoteStoreEnabled(); if (hasRemoteSegmentStore) { - indexShard.syncSegmentsFromRemoteSegmentStore(false, false); + indexShard.syncSegmentsFromRemoteSegmentStore(false, true); } final boolean hasRemoteTranslog = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled(); final boolean hasNoTranslog = indexShard.indexSettings().isRemoteSnapshot(); From 1989a2e91a28b480ea2b6c63b5bb217e2772e361 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Thu, 10 Aug 2023 12:25:17 -0700 Subject: [PATCH 8/8] PR feedback. Signed-off-by: Marc Handalian --- .../replication/SegmentReplicationIT.java | 19 +++++++++++++ .../SegmentReplicationUsingRemoteStoreIT.java | 20 ------------- .../index/engine/NRTReplicationEngine.java | 2 ++ .../RemoteStoreReplicationSource.java | 5 ++-- .../index/shard/RemoteIndexShardTests.java | 27 ------------------ .../SegmentReplicationIndexShardTests.java | 28 +++++++++++++++++++ 6 files changed, 52 insertions(+), 49 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 3a8c732611a1e..b14c59869ccbc 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -1419,4 +1419,23 @@ public void testIndexWhileRecoveringReplica() throws Exception { .get(); assertNoFailures(response); } + + public void testRestartPrimary_NoReplicas() throws Exception { + final String primary = internalCluster().startDataOnlyNode(); + createIndex(INDEX_NAME); + ensureYellow(INDEX_NAME); + + assertEquals(getNodeContainingPrimaryShard().getName(), primary); + + client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); + if (randomBoolean()) { + flush(INDEX_NAME); + } else { + refresh(INDEX_NAME); + } + + internalCluster().restartNode(primary); + ensureYellow(INDEX_NAME); + assertDocCounts(1, primary); + } } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreIT.java index 91c180f1f1ce0..6f76c21cc0411 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreIT.java @@ -10,7 +10,6 @@ import org.junit.After; import org.junit.Before; -import org.opensearch.action.support.WriteRequest; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; import org.opensearch.indices.replication.SegmentReplicationIT; @@ -68,23 +67,4 @@ public void teardown() { public void testPressureServiceStats() throws Exception { super.testPressureServiceStats(); } - - public void testRestartPrimary_NoReplicas() throws Exception { - final String primary = internalCluster().startDataOnlyNode(); - createIndex(INDEX_NAME); - ensureYellow(INDEX_NAME); - - assertEquals(getNodeContainingPrimaryShard().getName(), primary); - - client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); - if (randomBoolean()) { - flush(INDEX_NAME); - } else { - refresh(INDEX_NAME); - } - - internalCluster().restartNode(primary); - ensureYellow(INDEX_NAME); - assertDocCounts(1, primary); - } } diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index 014d161ef8f66..d545d9c1f6f45 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -394,6 +394,8 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) { This is a workaround solution which decreases the chances of conflict on replica nodes when same file is copied from two different primaries during failover. Increasing counter helps in avoiding this conflict as counter is used to generate new segment file names. The ideal solution is to identify the counter from previous primary. + This is not required for remote store implementations given on failover the replica re-syncs with the store + during promotion. */ if (engineConfig.getIndexSettings().isRemoteStoreEnabled() == false) { latestSegmentInfos.counter = latestSegmentInfos.counter + SI_COUNTER_INCREMENT; diff --git a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java index 725cab8351a8c..c463e6abd3df2 100644 --- a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java +++ b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java @@ -61,8 +61,9 @@ public void getCheckpointMetadata( // TODO: Need to figure out a way to pass this information for segment metadata via remote store. try (final GatedCloseable segmentInfosSnapshot = indexShard.getSegmentInfosSnapshot()) { final Version version = segmentInfosSnapshot.get().getCommitLuceneVersion(); - RemoteSegmentMetadata mdFile = remoteDirectory.init(); // During initial recovery flow, the remote store might not - // have metadata as primary hasn't uploaded anything yet. + RemoteSegmentMetadata mdFile = remoteDirectory.init(); + // During initial recovery flow, the remote store might not + // have metadata as primary hasn't uploaded anything yet. if (mdFile == null && indexShard.state().equals(IndexShardState.STARTED) == false) { listener.onResponse(new CheckpointInfoResponse(checkpoint, Collections.emptyMap(), null)); return; diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java index 6cfe5e504eef2..c22fbdd8850ff 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java @@ -252,33 +252,6 @@ public void testReplicaCommitsInfosBytesOnRecovery() throws Exception { } } - public void testPrimaryRestart() throws Exception { - final Path remotePath = createTempDir(); - try (ReplicationGroup shards = createGroup(0, getIndexSettings(), indexMapping, new NRTReplicationEngineFactory(), remotePath)) { - shards.startAll(); - // ensure primary has uploaded something - shards.indexDocs(10); - IndexShard primary = shards.getPrimary(); - if (randomBoolean()) { - flushShard(primary); - } else { - primary.refresh("test"); - } - assertDocCount(primary, 10); - // get a metadata map - we'll use segrep diff to ensure segments on reader are identical after restart. - final Map metadataBeforeRestart = primary.getSegmentMetadataMap(); - // restart the primary - shards.reinitPrimaryShard(remotePath); - // the store is open at this point but the shard has not yet run through recovery - primary = shards.getPrimary(); - shards.startPrimary(); - assertDocCount(primary, 10); - final Store.RecoveryDiff diff = Store.segmentReplicationDiff(metadataBeforeRestart, primary.getSegmentMetadataMap()); - assertTrue(diff.missing.isEmpty()); - assertTrue(diff.different.isEmpty()); - } - } - public void testPrimaryRestart_PrimaryHasExtraCommits() throws Exception { final Path remotePath = createTempDir(); try (ReplicationGroup shards = createGroup(0, getIndexSettings(), indexMapping, new NRTReplicationEngineFactory(), remotePath)) { diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 070bcae1b4a4b..88fcae17bf091 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -62,6 +62,7 @@ import org.opensearch.transport.TransportService; import java.io.IOException; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -773,6 +774,33 @@ public void testNoDuplicateSeqNo() throws Exception { } } + public void testPrimaryRestart() throws Exception { + final Path remotePath = createTempDir(); + try (ReplicationGroup shards = createGroup(0, getIndexSettings(), indexMapping, new NRTReplicationEngineFactory(), remotePath)) { + shards.startAll(); + // ensure primary has uploaded something + shards.indexDocs(10); + IndexShard primary = shards.getPrimary(); + if (randomBoolean()) { + flushShard(primary); + } else { + primary.refresh("test"); + } + assertDocCount(primary, 10); + // get a metadata map - we'll use segrep diff to ensure segments on reader are identical after restart. + final Map metadataBeforeRestart = primary.getSegmentMetadataMap(); + // restart the primary + shards.reinitPrimaryShard(remotePath); + // the store is open at this point but the shard has not yet run through recovery + primary = shards.getPrimary(); + shards.startPrimary(); + assertDocCount(primary, 10); + final Store.RecoveryDiff diff = Store.segmentReplicationDiff(metadataBeforeRestart, primary.getSegmentMetadataMap()); + assertTrue(diff.missing.isEmpty()); + assertTrue(diff.different.isEmpty()); + } + } + /** * Assert persisted and searchable doc counts. This method should not be used while docs are concurrently indexed because * it asserts point in time seqNos are relative to the doc counts.