From b5cffe9be5cc27ee59c656ac63ea58f9d091f81e Mon Sep 17 00:00:00 2001 From: Poojita Raj Date: Tue, 26 Jul 2022 09:09:32 -0700 Subject: [PATCH 1/3] Update Store for segment replication dif Signed-off-by: Poojita Raj Signed-off-by: Poojita Raj --- .../replication/SegmentReplicationIT.java | 52 ++++++++++++ .../org/opensearch/index/store/Store.java | 83 +++++++++++++++---- .../replication/SegmentReplicationTarget.java | 2 +- 3 files changed, 120 insertions(+), 17 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index dae2fa04a3a7e..96ac703b9837e 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -15,6 +15,7 @@ import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse; import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest; import org.opensearch.action.admin.indices.segments.ShardSegments; +import org.opensearch.action.support.WriteRequest; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; @@ -36,6 +37,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; @@ -247,6 +249,56 @@ public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception { assertSegmentStats(REPLICA_COUNT); } + public void testDeleteOperations() throws Exception { + final String nodeA = internalCluster().startNode(); + final String nodeB = internalCluster().startNode(); + + createIndex(INDEX_NAME); + ensureGreen(INDEX_NAME); + final int initialDocCount = scaledRandomIntBetween(0, 200); + try ( + BackgroundIndexer indexer = new BackgroundIndexer( + INDEX_NAME, + "_doc", + client(), + -1, + RandomizedTest.scaledRandomIntBetween(2, 5), + false, + random() + ) + ) { + indexer.start(initialDocCount); + waitForDocs(initialDocCount, indexer); + refresh(INDEX_NAME); + waitForReplicaUpdate(); + + // wait a short amount of time to give replication a chance to complete. + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + + final int additionalDocCount = scaledRandomIntBetween(0, 200); + final int expectedHitCount = initialDocCount + additionalDocCount; + indexer.start(additionalDocCount); + waitForDocs(expectedHitCount, indexer); + waitForReplicaUpdate(); + + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + + ensureGreen(INDEX_NAME); + + Set ids = indexer.getIds(); + String id = ids.toArray()[0].toString(); + client(nodeA).prepareDelete(INDEX_NAME, id).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); + + refresh(INDEX_NAME); + waitForReplicaUpdate(); + + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount - 1); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount - 1); + } + } + private void assertSegmentStats(int numberOfReplicas) throws IOException { final IndicesSegmentResponse indicesSegmentResponse = client().admin().indices().segments(new IndicesSegmentsRequest()).actionGet(); diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index 6828ab7d91b2c..9811f1ff3324c 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -110,8 +110,8 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.Optional; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -1102,6 +1102,30 @@ public Map asMap() { private static final String LIV_FILE_EXTENSION = "liv"; // lucene 5 delete file private static final String SEGMENT_INFO_EXTENSION = "si"; + /** + * Helper method used to group store files according to segment and commit. + * + * @see MetadataSnapshot#recoveryDiff(MetadataSnapshot) + * @see MetadataSnapshot#getFilesRecoveryDiff(MetadataSnapshot) + */ + public Object[] getGroupedFiles() { + final Map> perSegment = new HashMap<>(); + final List perCommitStoreFiles = new ArrayList<>(); + for (StoreFileMetadata meta : this) { + final String segmentId = IndexFileNames.parseSegmentName(meta.name()); + final String extension = IndexFileNames.getExtension(meta.name()); + if (IndexFileNames.SEGMENTS.equals(segmentId) + || DEL_FILE_EXTENSION.equals(extension) + || LIV_FILE_EXTENSION.equals(extension)) { + // only treat del files as per-commit files fnm files are generational but only for upgradable DV + perCommitStoreFiles.add(meta); + } else { + perSegment.computeIfAbsent(segmentId, k -> new ArrayList<>()).add(meta); + } + } + return new Object[] { perSegment, perCommitStoreFiles }; + } + /** * Returns a diff between the two snapshots that can be used for recovery. The given snapshot is treated as the * recovery target and this snapshot as the source. The returned diff will hold a list of files that are: @@ -1139,21 +1163,9 @@ public RecoveryDiff recoveryDiff(MetadataSnapshot recoveryTargetSnapshot) { final List identical = new ArrayList<>(); final List different = new ArrayList<>(); final List missing = new ArrayList<>(); - final Map> perSegment = new HashMap<>(); - final List perCommitStoreFiles = new ArrayList<>(); - - for (StoreFileMetadata meta : this) { - final String segmentId = IndexFileNames.parseSegmentName(meta.name()); - final String extension = IndexFileNames.getExtension(meta.name()); - if (IndexFileNames.SEGMENTS.equals(segmentId) - || DEL_FILE_EXTENSION.equals(extension) - || LIV_FILE_EXTENSION.equals(extension)) { - // only treat del files as per-commit files fnm files are generational but only for upgradable DV - perCommitStoreFiles.add(meta); - } else { - perSegment.computeIfAbsent(segmentId, k -> new ArrayList<>()).add(meta); - } - } + Object[] groupedFiles = getGroupedFiles(); + final Map> perSegment = (Map>) groupedFiles[0]; + final List perCommitStoreFiles = (List) groupedFiles[1]; final ArrayList identicalFiles = new ArrayList<>(); for (List segmentFiles : Iterables.concat(perSegment.values(), Collections.singleton(perCommitStoreFiles))) { identicalFiles.clear(); @@ -1190,6 +1202,45 @@ public RecoveryDiff recoveryDiff(MetadataSnapshot recoveryTargetSnapshot) { return recoveryDiff; } + /** + * Returns a diff between the two snapshots that can be used for getting list of files to copy over to a replica for segment replication. The given snapshot is treated as the + * target and this snapshot as the source. + */ + public RecoveryDiff getFilesRecoveryDiff(MetadataSnapshot recoveryTargetSnapshot) { + final List identical = new ArrayList<>(); + final List different = new ArrayList<>(); + final List missing = new ArrayList<>(); + Object[] groupedFiles = getGroupedFiles(); + final Map> perSegment = (Map>) groupedFiles[0]; + final List perCommitStoreFiles = (List) groupedFiles[1]; + final ArrayList identicalFiles = new ArrayList<>(); + for (List segmentFiles : Iterables.concat(perSegment.values(), Collections.singleton(perCommitStoreFiles))) { + identicalFiles.clear(); + boolean consistent = true; + for (StoreFileMetadata meta : segmentFiles) { + StoreFileMetadata storeFileMetadata = recoveryTargetSnapshot.get(meta.name()); + if (storeFileMetadata == null) { + consistent = false; + missing.add(meta); + } else if (storeFileMetadata.isSame(meta) == false) { + consistent = false; + different.add(meta); + } else { + identicalFiles.add(meta); + } + } + if (consistent) { + identical.addAll(identicalFiles); + } + } + RecoveryDiff recoveryDiff = new RecoveryDiff( + Collections.unmodifiableList(identical), + Collections.unmodifiableList(different), + Collections.unmodifiableList(missing) + ); + return recoveryDiff; + } + /** * Returns the number of files in this snapshot */ diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index 516cfa91a787b..462a2433ee600 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -154,7 +154,7 @@ private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener Date: Wed, 10 Aug 2022 14:37:23 -0700 Subject: [PATCH 2/3] Update recoveryDiff logic to ingore missing files causing exception on replica during copy Signed-off-by: Suraj Singh --- .../org/opensearch/index/store/Store.java | 31 ++-- .../replication/SegmentReplicationTarget.java | 2 +- .../SegmentReplicationTargetTests.java | 148 +++++++++++++++++- 3 files changed, 160 insertions(+), 21 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index 9811f1ff3324c..163717ad94c2c 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -1106,9 +1106,9 @@ public Map asMap() { * Helper method used to group store files according to segment and commit. * * @see MetadataSnapshot#recoveryDiff(MetadataSnapshot) - * @see MetadataSnapshot#getFilesRecoveryDiff(MetadataSnapshot) + * @see MetadataSnapshot#segmentReplicationDiff(MetadataSnapshot) */ - public Object[] getGroupedFiles() { + private Iterable> getGroupedFilesIterable() { final Map> perSegment = new HashMap<>(); final List perCommitStoreFiles = new ArrayList<>(); for (StoreFileMetadata meta : this) { @@ -1123,7 +1123,7 @@ public Object[] getGroupedFiles() { perSegment.computeIfAbsent(segmentId, k -> new ArrayList<>()).add(meta); } } - return new Object[] { perSegment, perCommitStoreFiles }; + return Iterables.concat(perSegment.values(), Collections.singleton(perCommitStoreFiles)); } /** @@ -1163,11 +1163,8 @@ public RecoveryDiff recoveryDiff(MetadataSnapshot recoveryTargetSnapshot) { final List identical = new ArrayList<>(); final List different = new ArrayList<>(); final List missing = new ArrayList<>(); - Object[] groupedFiles = getGroupedFiles(); - final Map> perSegment = (Map>) groupedFiles[0]; - final List perCommitStoreFiles = (List) groupedFiles[1]; final ArrayList identicalFiles = new ArrayList<>(); - for (List segmentFiles : Iterables.concat(perSegment.values(), Collections.singleton(perCommitStoreFiles))) { + for (List segmentFiles : getGroupedFilesIterable()) { identicalFiles.clear(); boolean consistent = true; for (StoreFileMetadata meta : segmentFiles) { @@ -1203,24 +1200,28 @@ public RecoveryDiff recoveryDiff(MetadataSnapshot recoveryTargetSnapshot) { } /** + * Segment Replication method * Returns a diff between the two snapshots that can be used for getting list of files to copy over to a replica for segment replication. The given snapshot is treated as the - * target and this snapshot as the source. + * target and this snapshot as the source. The returned diff will hold a list of files that are: + *
    + *
  • identical: they exist in both snapshots and they can be considered the same ie. they don't need to be recovered
  • + *
  • different: they exist in both snapshots but their they are not identical
  • + *
  • missing: files that exist in the source but not in the target
  • + *
*/ - public RecoveryDiff getFilesRecoveryDiff(MetadataSnapshot recoveryTargetSnapshot) { + public RecoveryDiff segmentReplicationDiff(MetadataSnapshot recoveryTargetSnapshot) { final List identical = new ArrayList<>(); final List different = new ArrayList<>(); final List missing = new ArrayList<>(); - Object[] groupedFiles = getGroupedFiles(); - final Map> perSegment = (Map>) groupedFiles[0]; - final List perCommitStoreFiles = (List) groupedFiles[1]; final ArrayList identicalFiles = new ArrayList<>(); - for (List segmentFiles : Iterables.concat(perSegment.values(), Collections.singleton(perCommitStoreFiles))) { + for (List segmentFiles : getGroupedFilesIterable()) { identicalFiles.clear(); boolean consistent = true; for (StoreFileMetadata meta : segmentFiles) { StoreFileMetadata storeFileMetadata = recoveryTargetSnapshot.get(meta.name()); if (storeFileMetadata == null) { - consistent = false; + // Do not consider missing files as inconsistent in SegRep as replicas may lag while primary updates + // documents and generate new files specific to a segment missing.add(meta); } else if (storeFileMetadata.isSame(meta) == false) { consistent = false; @@ -1231,6 +1232,8 @@ public RecoveryDiff getFilesRecoveryDiff(MetadataSnapshot recoveryTargetSnapshot } if (consistent) { identical.addAll(identicalFiles); + } else { + different.addAll(identicalFiles); } } RecoveryDiff recoveryDiff = new RecoveryDiff( diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index 462a2433ee600..73d9a2f805d75 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -154,7 +154,7 @@ private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener storeMetadataSnapshots = generateStoreMetadataSnapshot(docCount); + + SegmentReplicationSource segrepSource = new SegmentReplicationSource() { + @Override + public void getCheckpointMetadata( + long replicationId, + ReplicationCheckpoint checkpoint, + ActionListener listener + ) { + listener.onResponse( + new CheckpointInfoResponse(checkpoint, storeMetadataSnapshots.get(1), buffer.toArrayCopy(), Set.of(PENDING_DELETE_FILE)) + ); + } + + @Override + public void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + Store store, + ActionListener listener + ) { + listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); + } + }; + SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock( + SegmentReplicationTargetService.SegmentReplicationListener.class + ); + + segrepTarget = spy(new SegmentReplicationTarget(repCheckpoint, indexShard, segrepSource, segRepListener)); + when(segrepTarget.getMetadataSnapshot()).thenReturn(storeMetadataSnapshots.get(0)); + segrepTarget.startReplication(new ActionListener() { + @Override + public void onResponse(Void replicationResponse) { + logger.info("No error processing checkpoint info"); + } + + @Override + public void onFailure(Exception e) { + assert (e instanceof IllegalStateException); + } + }); + } + + /** + * Generates a list of Store.MetadataSnapshot with two elements where second snapshot has extra files due to delete + * operation. Two snpashots are generated inside this method to ensure they have files with same checksum other + * than ones generated due to delete operations + * @param docCount + * @return + * @throws IOException + */ + List generateStoreMetadataSnapshot(int docCount) throws IOException { + List docList = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + Document document = new Document(); + String text = new String(new char[] { (char) (97 + i), (char) (97 + i) }); + document.add(new StringField("id", "" + i, random().nextBoolean() ? Field.Store.YES : Field.Store.NO)); + document.add(new TextField("str", text, Field.Store.YES)); + docList.add(document); + } + long seed = random().nextLong(); + Random random = new Random(seed); + IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random)).setCodec(TestUtil.getDefaultCodec()); + iwc.setMergePolicy(NoMergePolicy.INSTANCE); + iwc.setUseCompoundFile(true); + final ShardId shardId = new ShardId("index", "_na_", 1); + Store store = new Store(shardId, INDEX_SETTINGS, StoreTests.newDirectory(random()), new DummyShardLock(shardId)); + IndexWriter writer = new IndexWriter(store.directory(), iwc); + for (Document d : docList) { + writer.addDocument(d); + } + writer.commit(); + Store.MetadataSnapshot storeMetadata = store.getMetadata(); + // Delete one document to generate .liv file + writer.deleteDocuments(new Term("id", Integer.toString(random().nextInt(docCount)))); + writer.commit(); + Store.MetadataSnapshot storeMetadataWithDeletes = store.getMetadata(); + deleteContent(store.directory()); + writer.close(); + store.close(); + return Arrays.asList(storeMetadata, storeMetadataWithDeletes); + } + + public static void deleteContent(Directory directory) throws IOException { + final String[] files = directory.listAll(); + final List exceptions = new ArrayList<>(); + for (String file : files) { + try { + directory.deleteFile(file); + } catch (NoSuchFileException | FileNotFoundException e) { + // ignore + } catch (IOException e) { + exceptions.add(e); + } + } + ExceptionsHelper.rethrowAndSuppress(exceptions); + } + @Override public void tearDown() throws Exception { super.tearDown(); From 9350ba62633864576b7d7e5634401d98017ef554 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Thu, 11 Aug 2022 21:34:19 -0700 Subject: [PATCH 3/3] Address review comments Signed-off-by: Suraj Singh --- .../SegmentReplicationTargetTests.java | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java index ef2d39c67e861..1157c463785ac 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java @@ -102,7 +102,10 @@ public class SegmentReplicationTargetTests extends IndexShardTestCase { public void setUp() throws Exception { super.setUp(); - Settings indexSettings = getIndexSettings(); + Settings indexSettings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build(); indexShard = newStartedShard(false, indexSettings, new NRTReplicationEngineFactory()); spyIndexShard = spy(indexShard); @@ -122,13 +125,6 @@ public void setUp() throws Exception { ); } - private Settings getIndexSettings() { - return Settings.builder() - .put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT) - .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) - .build(); - } - public void testSuccessfulResponse_startReplication() { SegmentReplicationSource segrepSource = new SegmentReplicationSource() { @@ -398,6 +394,9 @@ public void onFailure(Exception e) { */ public void test_MissingFiles_NotCausingFailure() throws IOException { int docCount = 1 + random().nextInt(10); + // Generate a list of MetadataSnapshot containing two elements. The second snapshot contains extra files + // generated due to delete operations. These two snapshots can then be used in test to mock the primary shard + // snapshot (2nd element which contains delete operations) and replica's existing snapshot (1st element). List storeMetadataSnapshots = generateStoreMetadataSnapshot(docCount); SegmentReplicationSource segrepSource = new SegmentReplicationSource() { @@ -444,15 +443,14 @@ public void onFailure(Exception e) { /** * Generates a list of Store.MetadataSnapshot with two elements where second snapshot has extra files due to delete - * operation. Two snpashots are generated inside this method to ensure they have files with same checksum other - * than ones generated due to delete operations + * operation. A list of snapshots is returned so that identical files have same checksum. * @param docCount * @return * @throws IOException */ List generateStoreMetadataSnapshot(int docCount) throws IOException { List docList = new ArrayList<>(); - for (int i = 0; i < 10; i++) { + for (int i = 0; i < docCount; i++) { Document document = new Document(); String text = new String(new char[] { (char) (97 + i), (char) (97 + i) }); document.add(new StringField("id", "" + i, random().nextBoolean() ? Field.Store.YES : Field.Store.NO));