Skip to content

Commit

Permalink
Update recoveryDiff logic to ingore missing files causing exception o…
Browse files Browse the repository at this point in the history
…n replica during copy

Signed-off-by: Suraj Singh <[email protected]>
  • Loading branch information
dreamer-89 committed Aug 10, 2022
1 parent b5cffe9 commit fe8ad1e
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 21 deletions.
31 changes: 17 additions & 14 deletions server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -1106,9 +1106,9 @@ public Map<String, StoreFileMetadata> asMap() {
* Helper method used to group store files according to segment and commit.
*
* @see MetadataSnapshot#recoveryDiff(MetadataSnapshot)
* @see MetadataSnapshot#getFilesRecoveryDiff(MetadataSnapshot)
* @see MetadataSnapshot#segmentReplicationDiff(MetadataSnapshot)
*/
public Object[] getGroupedFiles() {
private Iterable<List<StoreFileMetadata>> getGroupedFilesIterable() {
final Map<String, List<StoreFileMetadata>> perSegment = new HashMap<>();
final List<StoreFileMetadata> perCommitStoreFiles = new ArrayList<>();
for (StoreFileMetadata meta : this) {
Expand All @@ -1123,7 +1123,7 @@ public Object[] getGroupedFiles() {
perSegment.computeIfAbsent(segmentId, k -> new ArrayList<>()).add(meta);
}
}
return new Object[] { perSegment, perCommitStoreFiles };
return Iterables.concat(perSegment.values(), Collections.singleton(perCommitStoreFiles));
}

/**
Expand Down Expand Up @@ -1163,11 +1163,8 @@ public RecoveryDiff recoveryDiff(MetadataSnapshot recoveryTargetSnapshot) {
final List<StoreFileMetadata> identical = new ArrayList<>();
final List<StoreFileMetadata> different = new ArrayList<>();
final List<StoreFileMetadata> missing = new ArrayList<>();
Object[] groupedFiles = getGroupedFiles();
final Map<String, List<StoreFileMetadata>> perSegment = (Map<String, List<StoreFileMetadata>>) groupedFiles[0];
final List<StoreFileMetadata> perCommitStoreFiles = (List<StoreFileMetadata>) groupedFiles[1];
final ArrayList<StoreFileMetadata> identicalFiles = new ArrayList<>();
for (List<StoreFileMetadata> segmentFiles : Iterables.concat(perSegment.values(), Collections.singleton(perCommitStoreFiles))) {
for (List<StoreFileMetadata> segmentFiles : getGroupedFilesIterable()) {
identicalFiles.clear();
boolean consistent = true;
for (StoreFileMetadata meta : segmentFiles) {
Expand Down Expand Up @@ -1203,24 +1200,28 @@ public RecoveryDiff recoveryDiff(MetadataSnapshot recoveryTargetSnapshot) {
}

/**
* Segment Replication method
* Returns a diff between the two snapshots that can be used for getting list of files to copy over to a replica for segment replication. The given snapshot is treated as the
* target and this snapshot as the source.
* target and this snapshot as the source. The returned diff will hold a list of files that are:
* <ul>
* <li>identical: they exist in both snapshots and they can be considered the same ie. they don't need to be recovered</li>
* <li>different: they exist in both snapshots but their they are not identical</li>
* <li>missing: files that exist in the source but not in the target</li>
* </ul>
*/
public RecoveryDiff getFilesRecoveryDiff(MetadataSnapshot recoveryTargetSnapshot) {
public RecoveryDiff segmentReplicationDiff(MetadataSnapshot recoveryTargetSnapshot) {
final List<StoreFileMetadata> identical = new ArrayList<>();
final List<StoreFileMetadata> different = new ArrayList<>();
final List<StoreFileMetadata> missing = new ArrayList<>();
Object[] groupedFiles = getGroupedFiles();
final Map<String, List<StoreFileMetadata>> perSegment = (Map<String, List<StoreFileMetadata>>) groupedFiles[0];
final List<StoreFileMetadata> perCommitStoreFiles = (List<StoreFileMetadata>) groupedFiles[1];
final ArrayList<StoreFileMetadata> identicalFiles = new ArrayList<>();
for (List<StoreFileMetadata> segmentFiles : Iterables.concat(perSegment.values(), Collections.singleton(perCommitStoreFiles))) {
for (List<StoreFileMetadata> segmentFiles : getGroupedFilesIterable()) {
identicalFiles.clear();
boolean consistent = true;
for (StoreFileMetadata meta : segmentFiles) {
StoreFileMetadata storeFileMetadata = recoveryTargetSnapshot.get(meta.name());
if (storeFileMetadata == null) {
consistent = false;
// Do not consider missing files as inconsistent in SegRep as replicas may lag while primary updates
// documents and generate new files specific to a segment
missing.add(meta);
} else if (storeFileMetadata.isSame(meta) == false) {
consistent = false;
Expand All @@ -1231,6 +1232,8 @@ public RecoveryDiff getFilesRecoveryDiff(MetadataSnapshot recoveryTargetSnapshot
}
if (consistent) {
identical.addAll(identicalFiles);
} else {
different.addAll(identicalFiles);
}
}
RecoveryDiff recoveryDiff = new RecoveryDiff(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener<GetSeg
throws IOException {
final Store.MetadataSnapshot snapshot = checkpointInfo.getSnapshot();
Store.MetadataSnapshot localMetadata = getMetadataSnapshot();
final Store.RecoveryDiff diff = snapshot.getFilesRecoveryDiff(localMetadata);
final Store.RecoveryDiff diff = snapshot.segmentReplicationDiff(localMetadata);
logger.debug("Replication diff {}", diff);
// Segments are immutable. So if the replica has any segments with the same name that differ from the one in the incoming snapshot
// from
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,51 @@

package org.opensearch.indices.replication;

import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.StringField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.store.ByteBuffersDataOutput;
import org.apache.lucene.store.ByteBuffersIndexOutput;
import org.apache.lucene.store.Directory;
import org.apache.lucene.tests.analysis.MockAnalyzer;
import org.apache.lucene.tests.util.TestUtil;
import org.apache.lucene.util.Version;
import org.junit.Assert;
import org.mockito.Mockito;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.ActionListener;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.engine.NRTReplicationEngineFactory;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardTestCase;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.index.store.StoreTests;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.DummyShardLock;
import org.opensearch.test.IndexSettingsModule;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Random;
import java.util.Arrays;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
Expand Down Expand Up @@ -69,16 +91,18 @@ public class SegmentReplicationTargetTests extends IndexShardTestCase {
0
);

private static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings(
"index",
Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT).build()
);

SegmentInfos testSegmentInfos;

@Override
public void setUp() throws Exception {

super.setUp();
Settings indexSettings = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();
Settings indexSettings = getIndexSettings();

indexShard = newStartedShard(false, indexSettings, new NRTReplicationEngineFactory());
spyIndexShard = spy(indexShard);
Expand All @@ -98,6 +122,13 @@ public void setUp() throws Exception {
);
}

private Settings getIndexSettings() {
return Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();
}

public void testSuccessfulResponse_startReplication() {

SegmentReplicationSource segrepSource = new SegmentReplicationSource() {
Expand Down Expand Up @@ -361,6 +392,111 @@ public void onFailure(Exception e) {
});
}

/**
* This tests ensures that new files generated on primary (due to delete operation) are not considered missing on replica
* @throws IOException
*/
public void test_MissingFiles_NotCausingFailure() throws IOException {
int docCount = 1 + random().nextInt(10);
List<Store.MetadataSnapshot> storeMetadataSnapshots = generateStoreMetadataSnapshot(docCount);

SegmentReplicationSource segrepSource = new SegmentReplicationSource() {
@Override
public void getCheckpointMetadata(
long replicationId,
ReplicationCheckpoint checkpoint,
ActionListener<CheckpointInfoResponse> listener
) {
listener.onResponse(
new CheckpointInfoResponse(checkpoint, storeMetadataSnapshots.get(1), buffer.toArrayCopy(), Set.of(PENDING_DELETE_FILE))
);
}

@Override
public void getSegmentFiles(
long replicationId,
ReplicationCheckpoint checkpoint,
List<StoreFileMetadata> filesToFetch,
Store store,
ActionListener<GetSegmentFilesResponse> listener
) {
listener.onResponse(new GetSegmentFilesResponse(filesToFetch));
}
};
SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock(
SegmentReplicationTargetService.SegmentReplicationListener.class
);

segrepTarget = spy(new SegmentReplicationTarget(repCheckpoint, indexShard, segrepSource, segRepListener));
when(segrepTarget.getMetadataSnapshot()).thenReturn(storeMetadataSnapshots.get(0));
segrepTarget.startReplication(new ActionListener<Void>() {
@Override
public void onResponse(Void replicationResponse) {
logger.info("No error processing checkpoint info");
}

@Override
public void onFailure(Exception e) {
assert (e instanceof IllegalStateException);
}
});
}

/**
* Generates a list of Store.MetadataSnapshot with two elements where second snapshot has extra files due to delete
* operation. Two snpashots are generated inside this method to ensure they have files with same checksum other
* than ones generated due to delete operations
* @param docCount
* @return
* @throws IOException
*/
List<Store.MetadataSnapshot> generateStoreMetadataSnapshot(int docCount) throws IOException {
List<Document> docList = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Document document = new Document();
String text = new String(new char[] { (char) (97 + i), (char) (97 + i) });
document.add(new StringField("id", "" + i, random().nextBoolean() ? Field.Store.YES : Field.Store.NO));
document.add(new TextField("str", text, Field.Store.YES));
docList.add(document);
}
long seed = random().nextLong();
Random random = new Random(seed);
IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random)).setCodec(TestUtil.getDefaultCodec());
iwc.setMergePolicy(NoMergePolicy.INSTANCE);
iwc.setUseCompoundFile(true);
final ShardId shardId = new ShardId("index", "_na_", 1);
Store store = new Store(shardId, INDEX_SETTINGS, StoreTests.newDirectory(random()), new DummyShardLock(shardId));
IndexWriter writer = new IndexWriter(store.directory(), iwc);
for (Document d : docList) {
writer.addDocument(d);
}
writer.commit();
Store.MetadataSnapshot storeMetadata = store.getMetadata();
// Delete one document to generate .liv file
writer.deleteDocuments(new Term("id", Integer.toString(random().nextInt(docCount))));
writer.commit();
Store.MetadataSnapshot storeMetadataWithDeletes = store.getMetadata();
deleteContent(store.directory());
writer.close();
store.close();
return Arrays.asList(storeMetadata, storeMetadataWithDeletes);
}

public static void deleteContent(Directory directory) throws IOException {
final String[] files = directory.listAll();
final List<IOException> exceptions = new ArrayList<>();
for (String file : files) {
try {
directory.deleteFile(file);
} catch (NoSuchFileException | FileNotFoundException e) {
// ignore
} catch (IOException e) {
exceptions.add(e);
}
}
ExceptionsHelper.rethrowAndSuppress(exceptions);
}

@Override
public void tearDown() throws Exception {
super.tearDown();
Expand Down

0 comments on commit fe8ad1e

Please sign in to comment.