Skip to content

Commit

Permalink
Fix flaky test SegmentReplicationIT.testScrollWithOngoingSegmentRepli…
Browse files Browse the repository at this point in the history
…cation (opensearch-project#7572)

This test has flaky failures for two reasons:
1. Fetches list of temporary files on disk starting with ".replication" before the replica has time to
flush received chunks to disk. Fixed by wrapping the assertion that a tmp file exists with assertBusy.
2. Asserts that the count of tmp files is exactly the same before/after a scroll request is cleared. However,
it is possible that additional tmp files have been written to disk concurrently, causing a count mismatch. Fixed
by removing the size assertion.  For the sake of this test we don't care if this is the case, as long as the tmp files
originally fetched remain after a scroll query is cleared.

Signed-off-by: Marc Handalian <[email protected]>
Signed-off-by: Shivansh Arora <[email protected]>
  • Loading branch information
mch2 authored and shiv0408 committed Apr 25, 2024
1 parent 4532977 commit fe3a5b6
Showing 1 changed file with 13 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1045,19 +1045,24 @@ public void testScrollWithOngoingSegmentReplication() throws Exception {
// wait for segrep to start and copy temporary files
waitForFileCopy.await();

// verify replica contains temporary files
IndexShard replicaShard = getIndexShard(replica, INDEX_NAME);
List<String> temporaryFiles = Arrays.stream(replicaShard.store().directory().listAll())
.filter(fileName -> fileName.startsWith(REPLICATION_PREFIX))
.collect(Collectors.toList());
logger.info("--> temporaryFiles {}", temporaryFiles);
assertTrue(temporaryFiles.size() > 0);
final IndexShard replicaShard = getIndexShard(replica, INDEX_NAME);
// Wait until replica has written a tmp file to disk.
List<String> temporaryFiles = new ArrayList<>();
assertBusy(() -> {
// verify replica contains temporary files
temporaryFiles.addAll(
Arrays.stream(replicaShard.store().directory().listAll())
.filter(fileName -> fileName.startsWith(REPLICATION_PREFIX))
.collect(Collectors.toList())
);
logger.info("--> temporaryFiles {}", temporaryFiles);
assertTrue(temporaryFiles.size() > 0);
});

// Clear scroll query, this should clean up files on replica
client(replica).prepareClearScroll().addScrollId(searchResponse.getScrollId()).get();

// verify temporary files still exist
replicaShard = getIndexShard(replica, INDEX_NAME);
List<String> temporaryFilesPostClear = Arrays.stream(replicaShard.store().directory().listAll())
.filter(fileName -> fileName.startsWith(REPLICATION_PREFIX))
.collect(Collectors.toList());
Expand All @@ -1066,7 +1071,6 @@ public void testScrollWithOngoingSegmentReplication() throws Exception {
// Unblock segment replication
blockFileCopy.countDown();

assertEquals(temporaryFiles.size(), temporaryFilesPostClear.size());
assertTrue(temporaryFilesPostClear.containsAll(temporaryFiles));

// wait for replica to catch up and verify doc count
Expand Down

0 comments on commit fe3a5b6

Please sign in to comment.