Skip to content

Commit

Permalink
fix ITs
Browse files Browse the repository at this point in the history
Signed-off-by: bansvaru <[email protected]>
  • Loading branch information
linuxpi committed Jul 19, 2023
1 parent 4dabc7d commit 98c77fc
Show file tree
Hide file tree
Showing 11 changed files with 78 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
public abstract class AbstractRemoteStoreMockRepositoryIntegTestCase extends AbstractSnapshotIntegTestCase {

protected static final String REPOSITORY_NAME = "my-segment-repo-1";
protected static final String TRANSLOG_REPOSITORY_NAME = "my-translog-repo-1";
protected static final String INDEX_NAME = "remote-store-test-idx-1";

@Override
Expand Down Expand Up @@ -63,13 +64,15 @@ protected Settings remoteStoreIndexSettings(int numberOfReplicas) {
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME)
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, REPOSITORY_NAME)
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, TRANSLOG_REPOSITORY_NAME)
.build();
}

protected void deleteRepo() {
logger.info("--> Deleting the repository={}", REPOSITORY_NAME);
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME));
logger.info("--> Deleting the repository={}", TRANSLOG_REPOSITORY_NAME);
assertAcked(clusterAdmin().prepareDeleteRepository(TRANSLOG_REPOSITORY_NAME));
}

protected String setup(Path repoLocation, double ioFailureRate, String skipExceptionBlobList, long maxFailure) {
Expand All @@ -89,6 +92,8 @@ protected String setup(Path repoLocation, double ioFailureRate, String skipExcep
.put("skip_exception_on_blobs", skipExceptionBlobList)
.put("max_failure_number", maxFailure)
);
logger.info("--> Creating repository={} at the path={}", TRANSLOG_REPOSITORY_NAME, repoLocation);
createRepository(TRANSLOG_REPOSITORY_NAME, "mock", Settings.builder().put("location", repoLocation));

String dataNodeName = internalCluster().startDataOnlyNodes(1).get(0);
createIndex(INDEX_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ private Settings defaultIndexSettings() {
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME)
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, REPOSITORY_NAME)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT)
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "300s")
Expand Down Expand Up @@ -95,10 +96,7 @@ protected Settings remoteStoreIndexSettings(int numberOfReplicas, long totalFiel
}

protected Settings remoteTranslogIndexSettings(int numberOfReplicas, int numberOfShards) {
return Settings.builder()
.put(remoteStoreIndexSettings(numberOfReplicas, numberOfShards))
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, REPOSITORY_NAME)
.build();
return Settings.builder().put(remoteStoreIndexSettings(numberOfReplicas, numberOfShards)).build();
}

protected Settings remoteTranslogIndexSettings(int numberOfReplicas) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,9 @@ private void verifyRestoredData(Map<String, Long> indexStats, boolean checkTotal
assertHitCount(client().prepareSearch(INDEX_NAME).setSize(0).get(), indexStats.get(statsGranularity) + 1);
}

private void testRestoreFlow(boolean remoteTranslog, int numberOfIterations, boolean invokeFlush) throws IOException {
private void testRestoreFlow(int numberOfIterations, boolean invokeFlush) throws IOException {
internalCluster().startDataOnlyNodes(3);
if (remoteTranslog) {
createIndex(INDEX_NAME, remoteTranslogIndexSettings(0));
} else {
createIndex(INDEX_NAME, remoteStoreIndexSettings(0));
}
createIndex(INDEX_NAME, remoteStoreIndexSettings(0));
ensureYellowAndNoInitializingShards(INDEX_NAME);
ensureGreen(INDEX_NAME);

Expand All @@ -130,44 +126,40 @@ private void testRestoreFlow(boolean remoteTranslog, int numberOfIterations, boo
client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME), PlainActionFuture.newFuture());
ensureGreen(INDEX_NAME);

if (remoteTranslog) {
verifyRestoredData(indexStats, true);
} else {
verifyRestoredData(indexStats, false);
}
}

public void testRemoteSegmentStoreRestoreWithNoDataPostCommit() throws IOException {
testRestoreFlow(false, 1, true);
}

public void testRemoteSegmentStoreRestoreWithNoDataPostRefresh() throws IOException {
testRestoreFlow(false, 1, false);
}

public void testRemoteSegmentStoreRestoreWithRefreshedData() throws IOException {
testRestoreFlow(false, randomIntBetween(2, 5), false);
verifyRestoredData(indexStats, true);
}

public void testRemoteSegmentStoreRestoreWithCommittedData() throws IOException {
testRestoreFlow(false, randomIntBetween(2, 5), true);
}
// public void testRemoteSegmentStoreRestoreWithNoDataPostCommit() throws IOException {
// testRestoreFlow(false, 1, true);
// }
//
// public void testRemoteSegmentStoreRestoreWithNoDataPostRefresh() throws IOException {
// testRestoreFlow(false, 1, false);
// }
//
// public void testRemoteSegmentStoreRestoreWithRefreshedData() throws IOException {
// testRestoreFlow(false, randomIntBetween(2, 5), false);
// }
//
// public void testRemoteSegmentStoreRestoreWithCommittedData() throws IOException {
// testRestoreFlow(false, randomIntBetween(2, 5), true);
// }

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6188")
public void testRemoteTranslogRestoreWithNoDataPostCommit() throws IOException {
testRestoreFlow(true, 1, true);
testRestoreFlow(1, true);
}

public void testRemoteTranslogRestoreWithNoDataPostRefresh() throws IOException {
testRestoreFlow(true, 1, false);
testRestoreFlow(1, false);
}

public void testRemoteTranslogRestoreWithRefreshedData() throws IOException {
testRestoreFlow(true, randomIntBetween(2, 5), false);
testRestoreFlow(randomIntBetween(2, 5), false);
}

public void testRemoteTranslogRestoreWithCommittedData() throws IOException {
testRestoreFlow(true, randomIntBetween(2, 5), true);
testRestoreFlow(randomIntBetween(2, 5), true);
}

private void testPeerRecovery(boolean remoteTranslog, int numberOfIterations, boolean invokeFlush) throws Exception {
Expand Down Expand Up @@ -223,22 +215,6 @@ private void testPeerRecovery(boolean remoteTranslog, int numberOfIterations, bo
);
}

public void testPeerRecoveryWithRemoteStoreNoRemoteTranslogNoDataFlush() throws Exception {
testPeerRecovery(false, 1, true);
}

public void testPeerRecoveryWithRemoteStoreNoRemoteTranslogFlush() throws Exception {
testPeerRecovery(false, randomIntBetween(2, 5), true);
}

public void testPeerRecoveryWithRemoteStoreNoRemoteTranslogNoDataRefresh() throws Exception {
testPeerRecovery(false, 1, false);
}

public void testPeerRecoveryWithRemoteStoreNoRemoteTranslogRefresh() throws Exception {
testPeerRecovery(false, randomIntBetween(2, 5), false);
}

public void testPeerRecoveryWithRemoteStoreAndRemoteTranslogNoDataFlush() throws Exception {
testPeerRecovery(true, 1, true);
}
Expand All @@ -255,13 +231,9 @@ public void testPeerRecoveryWithRemoteStoreAndRemoteTranslogRefresh() throws Exc
testPeerRecovery(true, randomIntBetween(2, 5), false);
}

private void verifyRemoteStoreCleanup(boolean remoteTranslog) throws Exception {
private void verifyRemoteStoreCleanup() throws Exception {
internalCluster().startDataOnlyNodes(3);
if (remoteTranslog) {
createIndex(INDEX_NAME, remoteTranslogIndexSettings(1));
} else {
createIndex(INDEX_NAME, remoteStoreIndexSettings(1));
}
createIndex(INDEX_NAME, remoteStoreIndexSettings(1));

indexData(5, randomBoolean());
String indexUUID = client().admin()
Expand All @@ -280,12 +252,8 @@ private void verifyRemoteStoreCleanup(boolean remoteTranslog) throws Exception {
}, 30, TimeUnit.SECONDS);
}

public void testRemoteSegmentCleanup() throws Exception {
verifyRemoteStoreCleanup(false);
}

public void testRemoteTranslogCleanup() throws Exception {
verifyRemoteStoreCleanup(true);
verifyRemoteStoreCleanup();
}

public void testStaleCommitDeletionWithInvokeFlush() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public Settings indexSettings() {
.put(super.indexSettings())
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME)
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, REPOSITORY_NAME)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public Settings indexSettings() {
.put(super.indexSettings())
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME)
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, REPOSITORY_NAME)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4593,7 +4593,7 @@ public void close() throws IOException {
onSettingsChanged();
}

public void syncRemoteTranslogAndUpdateGlobalCheckpoint() throws IOException {
private void syncRemoteTranslogAndUpdateGlobalCheckpoint() throws IOException {
syncTranslogFilesFromRemoteTranslog();
loadGlobalCheckpointToReplicationTracker();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans
Set<TransferFileSnapshot> toUpload = new HashSet<>(transferSnapshot.getTranslogTransferMetadata().getCount());
try {
toUpload.addAll(fileTransferTracker.exclusionFilter(transferSnapshot.getTranslogFileSnapshots()));
// toUpload.addAll(fileTransferTracker.exclusionFilter((transferSnapshot.getCheckpointFileSnapshots())));
toUpload.addAll(fileTransferTracker.exclusionFilter((transferSnapshot.getCheckpointFileSnapshots())));
if (toUpload.isEmpty()) {
logger.trace("Nothing to upload for transfer");
translogTransferListener.onUploadComplete(transferSnapshot);
Expand Down Expand Up @@ -363,29 +363,32 @@ public void onFailure(Exception e) {
}

public void deleteStaleTranslogMetadataFilesAsync(Runnable onCompletion) {
ActionListener<List<BlobMetadata>> al = new ActionListener<>() {
@Override
public void onResponse(List<BlobMetadata> blobMetadata) {
List<String> sortedMetadataFiles = blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList());
if (sortedMetadataFiles.size() <= 1) {
logger.trace("Remote Metadata file count is {}, so skipping deletion", sortedMetadataFiles.size());
onCompletion.run();
return;
}
List<String> metadataFilesToDelete = sortedMetadataFiles.subList(1, sortedMetadataFiles.size());
logger.trace("Deleting remote translog metadata files {}", metadataFilesToDelete);
deleteMetadataFilesAsync(metadataFilesToDelete, onCompletion);
}

@Override
public void onFailure(Exception e) {
logger.error("Exception occurred while listing translog metadata files from remote store", e);
onCompletion.run();
}
};

try {
transferService.listAllInSortedOrderAsync(ThreadPool.Names.REMOTE_PURGE, remoteMetadataTransferPath, Integer.MAX_VALUE, al);
transferService.listAllInSortedOrderAsync(
ThreadPool.Names.REMOTE_PURGE,
remoteMetadataTransferPath,
Integer.MAX_VALUE,
new ActionListener<>() {
@Override
public void onResponse(List<BlobMetadata> blobMetadata) {
List<String> sortedMetadataFiles = blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList());
if (sortedMetadataFiles.size() <= 1) {
logger.trace("Remote Metadata file count is {}, so skipping deletion", sortedMetadataFiles.size());
onCompletion.run();
return;
}
List<String> metadataFilesToDelete = sortedMetadataFiles.subList(1, sortedMetadataFiles.size());
logger.trace("Deleting remote translog metadata files {}", metadataFilesToDelete);
deleteMetadataFilesAsync(metadataFilesToDelete, onCompletion);
}

@Override
public void onFailure(Exception e) {
logger.error("Exception occurred while listing translog metadata files from remote store", e);
onCompletion.run();
}
}
);
} catch (Exception e) {
logger.error("Exception occurred while listing translog metadata files from remote store", e);
onCompletion.run();
Expand Down
Loading

0 comments on commit 98c77fc

Please sign in to comment.