Skip to content

Commit

Permalink
Add tests
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <[email protected]>
  • Loading branch information
Sachin Kale committed Sep 4, 2024
1 parent 8a29eb1 commit 54ad9af
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import static org.hamcrest.Matchers.lessThan;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class DeleteSnapshotITV2 extends AbstractSnapshotIntegTestCase {
public class DeleteSnapshotV2IT extends AbstractSnapshotIntegTestCase {

private static final String REMOTE_REPO_NAME = "remote-store-repo-name";

Expand Down Expand Up @@ -276,9 +276,11 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exceptio
Path indexPath = Path.of(String.valueOf(remoteStoreRepoPath), indexUUID);
Path shardPath = Path.of(String.valueOf(indexPath), "0");
Path segmentsPath = Path.of(String.valueOf(shardPath), "segments");
Path translogPath = Path.of(String.valueOf(shardPath), "translog");

// Get total segments remote store directory file count for deleted index and shard 0
int segmentFilesCountBeforeDeletingSnapshot1 = RemoteStoreBaseIntegTestCase.getFileCount(segmentsPath);
int translogFilesCountBeforeDeletingSnapshot1 = RemoteStoreBaseIntegTestCase.getFileCount(translogPath);

RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO);

Expand Down Expand Up @@ -312,6 +314,13 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exceptio
assertThat(RemoteStoreBaseIntegTestCase.getFileCount(segmentsPath), lessThan(segmentFilesCountAfterDeletingSnapshot1));
} catch (Exception e) {}
}, 60, TimeUnit.SECONDS);

assertBusy(() -> {
try {
assertThat(RemoteStoreBaseIntegTestCase.getFileCount(translogPath), lessThan(translogFilesCountBeforeDeletingSnapshot1));
} catch (Exception e) {}
}, 60, TimeUnit.SECONDS);

}

private Settings snapshotV2Settings(Path remoteStoreRepoPath) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.blobstore.BlobMetadata;
import org.opensearch.common.collect.Tuple;
Expand All @@ -35,8 +34,6 @@
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BooleanSupplier;
import java.util.function.LongConsumer;
Expand Down Expand Up @@ -430,26 +427,14 @@ private static Long getMinPrimaryTermInRemote(
Logger logger
) {
if (minPrimaryTermInRemote.get() == Long.MAX_VALUE) {
CountDownLatch latch = new CountDownLatch(1);
translogTransferManager.listPrimaryTermsInRemoteAsync(new LatchedActionListener<>(new ActionListener<>() {
@Override
public void onResponse(Set<Long> primaryTermsInRemote) {
try {
Set<Long> primaryTermsInRemote = translogTransferManager.listPrimaryTermsInRemote();
if (primaryTermsInRemote.isEmpty() == false) {
Optional<Long> minPrimaryTerm = primaryTermsInRemote.stream().min(Long::compareTo);
minPrimaryTerm.ifPresent(minPrimaryTermInRemote::set);
}

@Override
public void onFailure(Exception e) {
logger.error("Exception while fetching min primary term from remote translog", e);
}
}, latch));

try {
if (latch.await(5, TimeUnit.MINUTES) == false) {
logger.error("Timeout while fetching min primary term from remote translog");
}
} catch (InterruptedException e) {
logger.error("Exception while fetching min primary term from remote translog", e);
} catch (IOException e) {
logger.error("Exception while listing primary terms in remote translog", e);
}
}
return minPrimaryTermInRemote.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -520,23 +520,6 @@ public void deleteGenerationAsync(long primaryTerm, Set<Long> generations, Runna
*/
public void deletePrimaryTermsAsync(long minPrimaryTermToKeep) {
logger.info("Deleting primary terms from remote store lesser than {}", minPrimaryTermToKeep);
listPrimaryTermsInRemoteAsync(new ActionListener<>() {
@Override
public void onResponse(Set<Long> primaryTermsInRemote) {
Set<Long> primaryTermsToDelete = primaryTermsInRemote.stream()
.filter(term -> term < minPrimaryTermToKeep)
.collect(Collectors.toSet());
primaryTermsToDelete.forEach(term -> deletePrimaryTermAsync(term));
}

@Override
public void onFailure(Exception e) {
logger.error("Exception occurred while deleting primary terms from remote store", e);
}
});
}

public void listPrimaryTermsInRemoteAsync(ActionListener<Set<Long>> listener) {
transferService.listFoldersAsync(ThreadPool.Names.REMOTE_PURGE, remoteDataTransferPath, new ActionListener<>() {
@Override
public void onResponse(Set<String> folders) {
Expand All @@ -549,17 +532,27 @@ public void onResponse(Set<String> folders) {
}
return false;
}).map(Long::parseLong).collect(Collectors.toSet());
listener.onResponse(primaryTermsInRemote);
Set<Long> primaryTermsToDelete = primaryTermsInRemote.stream()
.filter(term -> term < minPrimaryTermToKeep)
.collect(Collectors.toSet());
primaryTermsToDelete.forEach(term -> deletePrimaryTermAsync(term));
}

@Override
public void onFailure(Exception e) {
logger.error("Exception occurred while getting primary terms from remote store", e);
listener.onFailure(e);
}
});
}

public Set<Long> listPrimaryTermsInRemote() throws IOException {
Set<String> primaryTermsStr = transferService.listFolders(remoteDataTransferPath);
if (primaryTermsStr != null) {
return primaryTermsStr.stream().map(Long::parseLong).collect(Collectors.toSet());
}
return new HashSet<>();
}

/**
* Handles deletion of all translog files associated with a primary term.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2219,7 +2219,7 @@ private void remoteTranslogCleanupAsync(
try {
RemoteFsTimestampAwareTranslog.cleanup(translogTransferManager);
} catch (IOException e) {
//
logger.error("Exception while cleaning up remote translog for shard: " + shardId, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,34 @@ public void onFailure(Exception e) {
assertNull(TranslogTransferMetadata.getMinMaxTranslogGenerationFromFilename(malformedMdFileName));
}

public void testGetMinMaxPrimaryTermFromFilename() throws Exception {
// New format metadata file
String newFormatMetadataFile =
"metadata__9223372036854775800__9223372036854774799__9223370311919910393__node1__9223372036438563958__2__1";
Tuple<Long, Long> minMaxPrimaryterm = TranslogTransferMetadata.getMinMaxPrimaryTermFromFilename(newFormatMetadataFile);
Long minPrimaryTerm = 2L;
Long maxPrimaryTerm = 7L;
assertEquals(minPrimaryTerm, minMaxPrimaryterm.v1());
assertEquals(maxPrimaryTerm, minMaxPrimaryterm.v2());

// Old format metadata file
String oldFormatMdFilename = "metadata__9223372036438563903__9223372036854774799__9223370311919910393__31__1";
assertNull(TranslogTransferMetadata.getMinMaxPrimaryTermFromFilename(oldFormatMdFilename));

// Node id containing separator
String nodeIdWithSeparator =
"metadata__9223372036854775800__9223372036854774799__9223370311919910393__node__1__9223372036438563958__2__1";
minMaxPrimaryterm = TranslogTransferMetadata.getMinMaxPrimaryTermFromFilename(nodeIdWithSeparator);
minPrimaryTerm = 2L;
maxPrimaryTerm = 7L;
assertEquals(minPrimaryTerm, minMaxPrimaryterm.v1());
assertEquals(maxPrimaryTerm, minMaxPrimaryterm.v2());

// Malformed md filename
String malformedMdFileName = "metadata__9223372036854775800__9223372036854774799__9223370311919910393__node1__xyz__3qwe__1";
assertNull(TranslogTransferMetadata.getMinMaxPrimaryTermFromFilename(malformedMdFileName));
}

public void testIndexDeletionWithNoPinnedTimestampNoRecentMdFiles() throws Exception {
RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO);
ArrayList<Translog.Operation> ops = new ArrayList<>();
Expand Down

0 comments on commit 54ad9af

Please sign in to comment.