From fe8ad1e33eccde04375e2c5c14c7599c84bedf7b Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Wed, 10 Aug 2022 14:37:23 -0700 Subject: [PATCH] Update recoveryDiff logic to ingore missing files causing exception on replica during copy Signed-off-by: Suraj Singh --- .../org/opensearch/index/store/Store.java | 31 ++-- .../replication/SegmentReplicationTarget.java | 2 +- .../SegmentReplicationTargetTests.java | 148 +++++++++++++++++- 3 files changed, 160 insertions(+), 21 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index 9811f1ff3324c..163717ad94c2c 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -1106,9 +1106,9 @@ public Map asMap() { * Helper method used to group store files according to segment and commit. * * @see MetadataSnapshot#recoveryDiff(MetadataSnapshot) - * @see MetadataSnapshot#getFilesRecoveryDiff(MetadataSnapshot) + * @see MetadataSnapshot#segmentReplicationDiff(MetadataSnapshot) */ - public Object[] getGroupedFiles() { + private Iterable> getGroupedFilesIterable() { final Map> perSegment = new HashMap<>(); final List perCommitStoreFiles = new ArrayList<>(); for (StoreFileMetadata meta : this) { @@ -1123,7 +1123,7 @@ public Object[] getGroupedFiles() { perSegment.computeIfAbsent(segmentId, k -> new ArrayList<>()).add(meta); } } - return new Object[] { perSegment, perCommitStoreFiles }; + return Iterables.concat(perSegment.values(), Collections.singleton(perCommitStoreFiles)); } /** @@ -1163,11 +1163,8 @@ public RecoveryDiff recoveryDiff(MetadataSnapshot recoveryTargetSnapshot) { final List identical = new ArrayList<>(); final List different = new ArrayList<>(); final List missing = new ArrayList<>(); - Object[] groupedFiles = getGroupedFiles(); - final Map> perSegment = (Map>) groupedFiles[0]; - final List perCommitStoreFiles = (List) groupedFiles[1]; final ArrayList identicalFiles = new ArrayList<>(); - for (List segmentFiles : Iterables.concat(perSegment.values(), Collections.singleton(perCommitStoreFiles))) { + for (List segmentFiles : getGroupedFilesIterable()) { identicalFiles.clear(); boolean consistent = true; for (StoreFileMetadata meta : segmentFiles) { @@ -1203,24 +1200,28 @@ public RecoveryDiff recoveryDiff(MetadataSnapshot recoveryTargetSnapshot) { } /** + * Segment Replication method * Returns a diff between the two snapshots that can be used for getting list of files to copy over to a replica for segment replication. The given snapshot is treated as the - * target and this snapshot as the source. + * target and this snapshot as the source. The returned diff will hold a list of files that are: + *
    + *
  • identical: they exist in both snapshots and they can be considered the same ie. they don't need to be recovered
  • + *
  • different: they exist in both snapshots but their they are not identical
  • + *
  • missing: files that exist in the source but not in the target
  • + *
*/ - public RecoveryDiff getFilesRecoveryDiff(MetadataSnapshot recoveryTargetSnapshot) { + public RecoveryDiff segmentReplicationDiff(MetadataSnapshot recoveryTargetSnapshot) { final List identical = new ArrayList<>(); final List different = new ArrayList<>(); final List missing = new ArrayList<>(); - Object[] groupedFiles = getGroupedFiles(); - final Map> perSegment = (Map>) groupedFiles[0]; - final List perCommitStoreFiles = (List) groupedFiles[1]; final ArrayList identicalFiles = new ArrayList<>(); - for (List segmentFiles : Iterables.concat(perSegment.values(), Collections.singleton(perCommitStoreFiles))) { + for (List segmentFiles : getGroupedFilesIterable()) { identicalFiles.clear(); boolean consistent = true; for (StoreFileMetadata meta : segmentFiles) { StoreFileMetadata storeFileMetadata = recoveryTargetSnapshot.get(meta.name()); if (storeFileMetadata == null) { - consistent = false; + // Do not consider missing files as inconsistent in SegRep as replicas may lag while primary updates + // documents and generate new files specific to a segment missing.add(meta); } else if (storeFileMetadata.isSame(meta) == false) { consistent = false; @@ -1231,6 +1232,8 @@ public RecoveryDiff getFilesRecoveryDiff(MetadataSnapshot recoveryTargetSnapshot } if (consistent) { identical.addAll(identicalFiles); + } else { + different.addAll(identicalFiles); } } RecoveryDiff recoveryDiff = new RecoveryDiff( diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index 462a2433ee600..73d9a2f805d75 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -154,7 +154,7 @@ private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener storeMetadataSnapshots = generateStoreMetadataSnapshot(docCount); + + SegmentReplicationSource segrepSource = new SegmentReplicationSource() { + @Override + public void getCheckpointMetadata( + long replicationId, + ReplicationCheckpoint checkpoint, + ActionListener listener + ) { + listener.onResponse( + new CheckpointInfoResponse(checkpoint, storeMetadataSnapshots.get(1), buffer.toArrayCopy(), Set.of(PENDING_DELETE_FILE)) + ); + } + + @Override + public void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + Store store, + ActionListener listener + ) { + listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); + } + }; + SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock( + SegmentReplicationTargetService.SegmentReplicationListener.class + ); + + segrepTarget = spy(new SegmentReplicationTarget(repCheckpoint, indexShard, segrepSource, segRepListener)); + when(segrepTarget.getMetadataSnapshot()).thenReturn(storeMetadataSnapshots.get(0)); + segrepTarget.startReplication(new ActionListener() { + @Override + public void onResponse(Void replicationResponse) { + logger.info("No error processing checkpoint info"); + } + + @Override + public void onFailure(Exception e) { + assert (e instanceof IllegalStateException); + } + }); + } + + /** + * Generates a list of Store.MetadataSnapshot with two elements where second snapshot has extra files due to delete + * operation. Two snpashots are generated inside this method to ensure they have files with same checksum other + * than ones generated due to delete operations + * @param docCount + * @return + * @throws IOException + */ + List generateStoreMetadataSnapshot(int docCount) throws IOException { + List docList = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + Document document = new Document(); + String text = new String(new char[] { (char) (97 + i), (char) (97 + i) }); + document.add(new StringField("id", "" + i, random().nextBoolean() ? Field.Store.YES : Field.Store.NO)); + document.add(new TextField("str", text, Field.Store.YES)); + docList.add(document); + } + long seed = random().nextLong(); + Random random = new Random(seed); + IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random)).setCodec(TestUtil.getDefaultCodec()); + iwc.setMergePolicy(NoMergePolicy.INSTANCE); + iwc.setUseCompoundFile(true); + final ShardId shardId = new ShardId("index", "_na_", 1); + Store store = new Store(shardId, INDEX_SETTINGS, StoreTests.newDirectory(random()), new DummyShardLock(shardId)); + IndexWriter writer = new IndexWriter(store.directory(), iwc); + for (Document d : docList) { + writer.addDocument(d); + } + writer.commit(); + Store.MetadataSnapshot storeMetadata = store.getMetadata(); + // Delete one document to generate .liv file + writer.deleteDocuments(new Term("id", Integer.toString(random().nextInt(docCount)))); + writer.commit(); + Store.MetadataSnapshot storeMetadataWithDeletes = store.getMetadata(); + deleteContent(store.directory()); + writer.close(); + store.close(); + return Arrays.asList(storeMetadata, storeMetadataWithDeletes); + } + + public static void deleteContent(Directory directory) throws IOException { + final String[] files = directory.listAll(); + final List exceptions = new ArrayList<>(); + for (String file : files) { + try { + directory.deleteFile(file); + } catch (NoSuchFileException | FileNotFoundException e) { + // ignore + } catch (IOException e) { + exceptions.add(e); + } + } + ExceptionsHelper.rethrowAndSuppress(exceptions); + } + @Override public void tearDown() throws Exception { super.tearDown();