Skip to content

Commit

Permalink
fix ITs
Browse files Browse the repository at this point in the history
Signed-off-by: bansvaru <[email protected]>
  • Loading branch information
linuxpi committed Jul 19, 2023
1 parent 4dabc7d commit 4d3d7a0
Show file tree
Hide file tree
Showing 9 changed files with 65 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
public abstract class AbstractRemoteStoreMockRepositoryIntegTestCase extends AbstractSnapshotIntegTestCase {

protected static final String REPOSITORY_NAME = "my-segment-repo-1";
protected static final String TRANSLOG_REPOSITORY_NAME = "my-translog-repo-1";
protected static final String INDEX_NAME = "remote-store-test-idx-1";

@Override
Expand Down Expand Up @@ -63,13 +64,15 @@ protected Settings remoteStoreIndexSettings(int numberOfReplicas) {
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME)
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, REPOSITORY_NAME)
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, TRANSLOG_REPOSITORY_NAME)
.build();
}

protected void deleteRepo() {
logger.info("--> Deleting the repository={}", REPOSITORY_NAME);
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME));
logger.info("--> Deleting the repository={}", TRANSLOG_REPOSITORY_NAME);
assertAcked(clusterAdmin().prepareDeleteRepository(TRANSLOG_REPOSITORY_NAME));
}

protected String setup(Path repoLocation, double ioFailureRate, String skipExceptionBlobList, long maxFailure) {
Expand All @@ -89,6 +92,8 @@ protected String setup(Path repoLocation, double ioFailureRate, String skipExcep
.put("skip_exception_on_blobs", skipExceptionBlobList)
.put("max_failure_number", maxFailure)
);
logger.info("--> Creating repository={} at the path={}", TRANSLOG_REPOSITORY_NAME, repoLocation);
createRepository(TRANSLOG_REPOSITORY_NAME, "mock", Settings.builder().put("location", repoLocation));

String dataNodeName = internalCluster().startDataOnlyNodes(1).get(0);
createIndex(INDEX_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ private Settings defaultIndexSettings() {
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME)
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, REPOSITORY_NAME)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT)
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "300s")
Expand Down Expand Up @@ -95,10 +96,7 @@ protected Settings remoteStoreIndexSettings(int numberOfReplicas, long totalFiel
}

protected Settings remoteTranslogIndexSettings(int numberOfReplicas, int numberOfShards) {
return Settings.builder()
.put(remoteStoreIndexSettings(numberOfReplicas, numberOfShards))
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, REPOSITORY_NAME)
.build();
return Settings.builder().put(remoteStoreIndexSettings(numberOfReplicas, numberOfShards)).build();
}

protected Settings remoteTranslogIndexSettings(int numberOfReplicas) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,9 @@ private void verifyRestoredData(Map<String, Long> indexStats, boolean checkTotal
assertHitCount(client().prepareSearch(INDEX_NAME).setSize(0).get(), indexStats.get(statsGranularity) + 1);
}

private void testRestoreFlow(boolean remoteTranslog, int numberOfIterations, boolean invokeFlush) throws IOException {
private void testRestoreFlow(int numberOfIterations, boolean invokeFlush) throws IOException {
internalCluster().startDataOnlyNodes(3);
if (remoteTranslog) {
createIndex(INDEX_NAME, remoteTranslogIndexSettings(0));
} else {
createIndex(INDEX_NAME, remoteStoreIndexSettings(0));
}
createIndex(INDEX_NAME, remoteStoreIndexSettings(0));
ensureYellowAndNoInitializingShards(INDEX_NAME);
ensureGreen(INDEX_NAME);

Expand All @@ -130,44 +126,40 @@ private void testRestoreFlow(boolean remoteTranslog, int numberOfIterations, boo
client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME), PlainActionFuture.newFuture());
ensureGreen(INDEX_NAME);

if (remoteTranslog) {
verifyRestoredData(indexStats, true);
} else {
verifyRestoredData(indexStats, false);
}
}

public void testRemoteSegmentStoreRestoreWithNoDataPostCommit() throws IOException {
testRestoreFlow(false, 1, true);
}

public void testRemoteSegmentStoreRestoreWithNoDataPostRefresh() throws IOException {
testRestoreFlow(false, 1, false);
}

public void testRemoteSegmentStoreRestoreWithRefreshedData() throws IOException {
testRestoreFlow(false, randomIntBetween(2, 5), false);
verifyRestoredData(indexStats, true);
}

public void testRemoteSegmentStoreRestoreWithCommittedData() throws IOException {
testRestoreFlow(false, randomIntBetween(2, 5), true);
}
// public void testRemoteSegmentStoreRestoreWithNoDataPostCommit() throws IOException {
// testRestoreFlow(false, 1, true);
// }
//
// public void testRemoteSegmentStoreRestoreWithNoDataPostRefresh() throws IOException {
// testRestoreFlow(false, 1, false);
// }
//
// public void testRemoteSegmentStoreRestoreWithRefreshedData() throws IOException {
// testRestoreFlow(false, randomIntBetween(2, 5), false);
// }
//
// public void testRemoteSegmentStoreRestoreWithCommittedData() throws IOException {
// testRestoreFlow(false, randomIntBetween(2, 5), true);
// }

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6188")
public void testRemoteTranslogRestoreWithNoDataPostCommit() throws IOException {
testRestoreFlow(true, 1, true);
testRestoreFlow(1, true);
}

public void testRemoteTranslogRestoreWithNoDataPostRefresh() throws IOException {
testRestoreFlow(true, 1, false);
testRestoreFlow(1, false);
}

public void testRemoteTranslogRestoreWithRefreshedData() throws IOException {
testRestoreFlow(true, randomIntBetween(2, 5), false);
testRestoreFlow(randomIntBetween(2, 5), false);
}

public void testRemoteTranslogRestoreWithCommittedData() throws IOException {
testRestoreFlow(true, randomIntBetween(2, 5), true);
testRestoreFlow(randomIntBetween(2, 5), true);
}

private void testPeerRecovery(boolean remoteTranslog, int numberOfIterations, boolean invokeFlush) throws Exception {
Expand Down Expand Up @@ -223,22 +215,6 @@ private void testPeerRecovery(boolean remoteTranslog, int numberOfIterations, bo
);
}

public void testPeerRecoveryWithRemoteStoreNoRemoteTranslogNoDataFlush() throws Exception {
testPeerRecovery(false, 1, true);
}

public void testPeerRecoveryWithRemoteStoreNoRemoteTranslogFlush() throws Exception {
testPeerRecovery(false, randomIntBetween(2, 5), true);
}

public void testPeerRecoveryWithRemoteStoreNoRemoteTranslogNoDataRefresh() throws Exception {
testPeerRecovery(false, 1, false);
}

public void testPeerRecoveryWithRemoteStoreNoRemoteTranslogRefresh() throws Exception {
testPeerRecovery(false, randomIntBetween(2, 5), false);
}

public void testPeerRecoveryWithRemoteStoreAndRemoteTranslogNoDataFlush() throws Exception {
testPeerRecovery(true, 1, true);
}
Expand All @@ -255,13 +231,9 @@ public void testPeerRecoveryWithRemoteStoreAndRemoteTranslogRefresh() throws Exc
testPeerRecovery(true, randomIntBetween(2, 5), false);
}

private void verifyRemoteStoreCleanup(boolean remoteTranslog) throws Exception {
private void verifyRemoteStoreCleanup() throws Exception {
internalCluster().startDataOnlyNodes(3);
if (remoteTranslog) {
createIndex(INDEX_NAME, remoteTranslogIndexSettings(1));
} else {
createIndex(INDEX_NAME, remoteStoreIndexSettings(1));
}
createIndex(INDEX_NAME, remoteStoreIndexSettings(1));

indexData(5, randomBoolean());
String indexUUID = client().admin()
Expand All @@ -280,12 +252,8 @@ private void verifyRemoteStoreCleanup(boolean remoteTranslog) throws Exception {
}, 30, TimeUnit.SECONDS);
}

public void testRemoteSegmentCleanup() throws Exception {
verifyRemoteStoreCleanup(false);
}

public void testRemoteTranslogCleanup() throws Exception {
verifyRemoteStoreCleanup(true);
verifyRemoteStoreCleanup();
}

public void testStaleCommitDeletionWithInvokeFlush() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public Settings indexSettings() {
.put(super.indexSettings())
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME)
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, REPOSITORY_NAME)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public Settings indexSettings() {
.put(super.indexSettings())
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME)
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, REPOSITORY_NAME)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4593,7 +4593,7 @@ public void close() throws IOException {
onSettingsChanged();
}

public void syncRemoteTranslogAndUpdateGlobalCheckpoint() throws IOException {
private void syncRemoteTranslogAndUpdateGlobalCheckpoint() throws IOException {
syncTranslogFilesFromRemoteTranslog();
loadGlobalCheckpointToReplicationTracker();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans
Set<TransferFileSnapshot> toUpload = new HashSet<>(transferSnapshot.getTranslogTransferMetadata().getCount());
try {
toUpload.addAll(fileTransferTracker.exclusionFilter(transferSnapshot.getTranslogFileSnapshots()));
// toUpload.addAll(fileTransferTracker.exclusionFilter((transferSnapshot.getCheckpointFileSnapshots())));
toUpload.addAll(fileTransferTracker.exclusionFilter((transferSnapshot.getCheckpointFileSnapshots())));
if (toUpload.isEmpty()) {
logger.trace("Nothing to upload for transfer");
translogTransferListener.onUploadComplete(transferSnapshot);
Expand Down Expand Up @@ -363,29 +363,32 @@ public void onFailure(Exception e) {
}

public void deleteStaleTranslogMetadataFilesAsync(Runnable onCompletion) {
ActionListener<List<BlobMetadata>> al = new ActionListener<>() {
@Override
public void onResponse(List<BlobMetadata> blobMetadata) {
List<String> sortedMetadataFiles = blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList());
if (sortedMetadataFiles.size() <= 1) {
logger.trace("Remote Metadata file count is {}, so skipping deletion", sortedMetadataFiles.size());
onCompletion.run();
return;
}
List<String> metadataFilesToDelete = sortedMetadataFiles.subList(1, sortedMetadataFiles.size());
logger.trace("Deleting remote translog metadata files {}", metadataFilesToDelete);
deleteMetadataFilesAsync(metadataFilesToDelete, onCompletion);
}

@Override
public void onFailure(Exception e) {
logger.error("Exception occurred while listing translog metadata files from remote store", e);
onCompletion.run();
}
};

try {
transferService.listAllInSortedOrderAsync(ThreadPool.Names.REMOTE_PURGE, remoteMetadataTransferPath, Integer.MAX_VALUE, al);
transferService.listAllInSortedOrderAsync(
ThreadPool.Names.REMOTE_PURGE,
remoteMetadataTransferPath,
Integer.MAX_VALUE,
new ActionListener<>() {
@Override
public void onResponse(List<BlobMetadata> blobMetadata) {
List<String> sortedMetadataFiles = blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList());
if (sortedMetadataFiles.size() <= 1) {
logger.trace("Remote Metadata file count is {}, so skipping deletion", sortedMetadataFiles.size());
onCompletion.run();
return;
}
List<String> metadataFilesToDelete = sortedMetadataFiles.subList(1, sortedMetadataFiles.size());
logger.trace("Deleting remote translog metadata files {}", metadataFilesToDelete);
deleteMetadataFilesAsync(metadataFilesToDelete, onCompletion);
}

@Override
public void onFailure(Exception e) {
logger.error("Exception occurred while listing translog metadata files from remote store", e);
onCompletion.run();
}
}
);
} catch (Exception e) {
logger.error("Exception occurred while listing translog metadata files from remote store", e);
onCompletion.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ private Settings getRemoteStoreBackedIndexSettings(String remoteStoreRepo) {
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, remoteStoreRepo)
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, remoteStoreRepo + "translog")
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, remoteStoreRepo)
.build();
}

Expand Down Expand Up @@ -366,7 +366,7 @@ public void testGetRemoteStoreShallowCopyShardMetadata() throws IOException {
.put("location", OpenSearchIntegTestCase.randomRepoPath(node().settings()))
.build();
createRepository(client, remoteStoreRepositoryName, remoteStoreRepoSettings);
createRepository(client, remoteStoreRepositoryName + "translog", remoteStoreRepoSettings);
// createRepository(client, remoteStoreRepositoryName + "translog", remoteStoreRepoSettings);

logger.info("--> creating a remote store enabled index and indexing documents");
final String remoteStoreIndexName = "test-rs-idx";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,7 @@ protected Settings getRemoteStoreBackedIndexSettings(String remoteStoreRepo) {
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, remoteStoreRepo)
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, remoteStoreRepo)
.build();
}

Expand Down

0 comments on commit 4d3d7a0

Please sign in to comment.