Skip to content

Commit

Permalink
Add test ensuring commits are cleaned up on replicas.
Browse files Browse the repository at this point in the history
Signed-off-by: Marc Handalian <[email protected]>
  • Loading branch information
mch2 committed Aug 9, 2023
1 parent de713f2 commit 4494c3a
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ public void getSegmentFiles(
listener.onResponse(new GetSegmentFilesResponse(Collections.emptyList()));
return;
}
logger.trace("Downloading segments files from remote store {}", filesToFetch);

RemoteSegmentMetadata remoteSegmentMetadata = remoteDirectory.readLatestMetadataFile();
List<StoreFileMetadata> downloadedSegments = new ArrayList<>();
Collection<String> directoryFiles = List.of(indexShard.store().directory().listAll());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public void startReplication(ActionListener<Void> listener) {
}, listener::onFailure);

getFilesListener.whenComplete(response -> {
finalizeReplication(checkpointInfoListener.result(), getFilesListener.result());
finalizeReplication(checkpointInfoListener.result());
listener.onResponse(null);
}, listener::onFailure);
}
Expand Down Expand Up @@ -200,8 +200,7 @@ private List<StoreFileMetadata> getFiles(CheckpointInfoResponse checkpointInfo)
return diff.missing;
}

private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse, GetSegmentFilesResponse getSegmentFilesResponse)
throws OpenSearchCorruptionException {
private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse) throws OpenSearchCorruptionException {
cancellableThreads.checkForCancel();
state.setStage(SegmentReplicationState.Stage.FINALIZE_REPLICATION);
// Handle empty SegmentInfos bytes for recovering replicas
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.index.shard;

import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.util.Version;
import org.hamcrest.MatcherAssert;
Expand All @@ -25,10 +26,12 @@

import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -306,4 +309,44 @@ public void testPrimaryRestart_PrimaryHasExtraCommits() throws Exception {
assertTrue(diff.different.isEmpty());
}
}

public void testRepicaCleansUpOldCommitsWhenReceivingNew() throws Exception {
final Path remotePath = createTempDir();
try (ReplicationGroup shards = createGroup(1, getIndexSettings(), indexMapping, new NRTReplicationEngineFactory(), remotePath)) {
shards.startAll();
final IndexShard primary = shards.getPrimary();
final IndexShard replica = shards.getReplicas().get(0);
shards.indexDocs(1);
flushShard(primary);
replicateSegments(primary, shards.getReplicas());
assertDocCount(primary, 1);
assertDocCount(replica, 1);
assertEquals("segments_4", replica.store().readLastCommittedSegmentsInfo().getSegmentsFileName());
assertSingleSegmentFile(replica, "segments_4");

shards.indexDocs(1);
primary.refresh("test");
replicateSegments(primary, shards.getReplicas());
assertDocCount(replica, 2);
assertSingleSegmentFile(replica, "segments_4");

shards.indexDocs(1);
flushShard(primary);
replicateSegments(primary, shards.getReplicas());
assertDocCount(replica, 3);
assertSingleSegmentFile(replica, "segments_5");

final Store.RecoveryDiff diff = Store.segmentReplicationDiff(primary.getSegmentMetadataMap(), replica.getSegmentMetadataMap());
assertTrue(diff.missing.isEmpty());
assertTrue(diff.different.isEmpty());
}
}

private void assertSingleSegmentFile(IndexShard shard, String fileName) throws IOException {
final Set<String> segmentsFileNames = Arrays.stream(shard.store().directory().listAll())
.filter(file -> file.startsWith(IndexFileNames.SEGMENTS))
.collect(Collectors.toSet());
assertEquals("Expected a single segment file", 1, segmentsFileNames.size());
assertEquals(segmentsFileNames.stream().findFirst().get(), fileName);
}
}

0 comments on commit 4494c3a

Please sign in to comment.