Skip to content

Commit

Permalink
Speedup snapshot stale indices delete (opensearch-project#613)
Browse files Browse the repository at this point in the history
Instead of snapshot delete of stale indices being a single threaded operation this commit makes 
it a multithreaded operation and delete multiple stale indices in parallel using SNAPSHOT 
threadpool's workers.

Signed-off-by: Piyush Daftary <[email protected]>
  • Loading branch information
piyushdaftary authored and nknize committed Apr 27, 2021
1 parent 0e9f74e commit 7255cbc
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@
import org.opensearch.common.io.FileSystemUtils;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.ByteSizeUnit;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.RepositoryException;
import org.opensearch.repositories.RepositoryVerificationException;
import org.opensearch.snapshots.mockstore.MockRepository;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;
Expand Down Expand Up @@ -124,6 +126,63 @@ public void testRepositoryCreation() throws Exception {
assertThat(repositoriesResponse.repositories().size(), equalTo(0));
}

public void testResidualStaleIndicesAreDeletedByConsecutiveDelete() throws Exception {
Client client = client();
Path repositoryPath = randomRepoPath();
final String repositoryName = "test-repo";
final String snapshot1Name = "test-snap-1";
final String snapshot2Name = "test-snap-2";

logger.info("--> creating repository at {}", repositoryPath.toAbsolutePath());
createRepository(repositoryName, "mock", repositoryPath);

int numberOfFiles = numberOfFiles(repositoryPath);

logger.info("--> creating index-1 and ingest data");
createIndex("test-idx-1");
ensureGreen();
for (int j = 0; j < 10; j++) {
index("test-idx-1", "_doc", Integer.toString( 10 + j), "foo", "bar" + 10 + j);
}
refresh();

logger.info("--> creating first snapshot");
createFullSnapshot(repositoryName, snapshot1Name);

logger.info("--> creating index-2 and ingest data");
createIndex("test-idx-2");
ensureGreen();
for (int j = 0; j < 10; j++) {
index("test-idx-2", "_doc", Integer.toString( 10 + j), "foo", "bar" + 10 + j);
}
refresh();

logger.info("--> creating second snapshot");
createFullSnapshot(repositoryName, snapshot2Name);

// Make repository to throw exception when trying to delete stale indices
// This will make sure stale indices stays in repository after snapshot delete
String masterNode = internalCluster().getMasterName();
((MockRepository)internalCluster().getInstance(RepositoriesService.class, masterNode).repository("test-repo")).
setThrowExceptionWhileDelete(true);

logger.info("--> delete the second snapshot");
client.admin().cluster().prepareDeleteSnapshot(repositoryName, snapshot2Name).get();

// Make repository to work normally
((MockRepository)internalCluster().getInstance(RepositoriesService.class, masterNode).repository("test-repo")).
setThrowExceptionWhileDelete(false);

// This snapshot should delete last snapshot's residual stale indices as well
logger.info("--> delete snapshot one");
client.admin().cluster().prepareDeleteSnapshot(repositoryName, snapshot1Name).get();

logger.info("--> make sure that number of files is back to what it was when the first snapshot was made");
assertFileCount(repositoryPath, numberOfFiles + 2);

logger.info("--> done");
}

private RepositoryMetadata findRepository(List<RepositoryMetadata> repositories, String name) {
for (RepositoryMetadata repository : repositories) {
if (repository.name().equals(name)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -945,7 +945,7 @@ private void cleanupStaleBlobs(Collection<SnapshotId> deletedSnapshots, Map<Stri
if (foundIndices.keySet().equals(survivingIndexIds)) {
groupedListener.onResponse(DeleteResult.ZERO);
} else {
executor.execute(ActionRunnable.supply(groupedListener, () -> cleanupStaleIndices(foundIndices, survivingIndexIds)));
cleanupStaleIndices(foundIndices, survivingIndexIds, groupedListener);
}
}

Expand Down Expand Up @@ -1053,31 +1053,64 @@ private List<String> cleanupStaleRootFiles(long previousGeneration, Collection<S
return Collections.emptyList();
}

private DeleteResult cleanupStaleIndices(Map<String, BlobContainer> foundIndices, Set<String> survivingIndexIds) {
DeleteResult deleteResult = DeleteResult.ZERO;
private void cleanupStaleIndices(Map<String, BlobContainer> foundIndices, Set<String> survivingIndexIds,
GroupedActionListener<DeleteResult> listener) {
final GroupedActionListener<DeleteResult> groupedListener = new GroupedActionListener<>(ActionListener.wrap(deleteResults -> {
DeleteResult deleteResult = DeleteResult.ZERO;
for (DeleteResult result : deleteResults) {
deleteResult = deleteResult.add(result);
}
listener.onResponse(deleteResult);
}, listener::onFailure), foundIndices.size() - survivingIndexIds.size());

try {
final BlockingQueue<Map.Entry<String, BlobContainer>> staleIndicesToDelete = new LinkedBlockingQueue<>();
for (Map.Entry<String, BlobContainer> indexEntry : foundIndices.entrySet()) {
final String indexSnId = indexEntry.getKey();
try {
if (survivingIndexIds.contains(indexSnId) == false) {
logger.debug("[{}] Found stale index [{}]. Cleaning it up", metadata.name(), indexSnId);
deleteResult = deleteResult.add(indexEntry.getValue().delete());
logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId);
}
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage(
"[{}] index {} is no longer part of any snapshots in the repository, " +
"but failed to clean up their index folders", metadata.name(), indexSnId), e);
if (survivingIndexIds.contains(indexEntry.getKey()) == false) {
staleIndicesToDelete.put(indexEntry);
}
}

// Start as many workers as fit into the snapshot pool at once at the most
final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(),
foundIndices.size() - survivingIndexIds.size());
for (int i = 0; i < workers; ++i) {
executeOneStaleIndexDelete(staleIndicesToDelete, groupedListener);
}
} catch (Exception e) {
// TODO: We shouldn't be blanket catching and suppressing all exceptions here and instead handle them safely upstream.
// Currently this catch exists as a stop gap solution to tackle unexpected runtime exceptions from implementations
// bubbling up and breaking the snapshot functionality.
assert false : e;
logger.warn(new ParameterizedMessage("[{}] Exception during cleanup of stale indices", metadata.name()), e);
}
return deleteResult;
}

private void executeOneStaleIndexDelete(BlockingQueue<Map.Entry<String, BlobContainer>> staleIndicesToDelete,
GroupedActionListener<DeleteResult> listener) throws InterruptedException {
Map.Entry<String, BlobContainer> indexEntry = staleIndicesToDelete.poll(0L, TimeUnit.MILLISECONDS);
if (indexEntry == null) {
return;
} else {
final String indexSnId = indexEntry.getKey();
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(listener, () -> {
try {
DeleteResult staleIndexDeleteResult = indexEntry.getValue().delete();
logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId);
executeOneStaleIndexDelete(staleIndicesToDelete, listener);
return staleIndexDeleteResult;
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage(
"[{}] index {} is no longer part of any snapshots in the repository, " +
"but failed to clean up their index folders", metadata.name(), indexSnId), e);
return DeleteResult.ZERO;
} catch (Exception e) {
assert false : e;
logger.warn(new ParameterizedMessage("[{}] Exception during single stale index delete", metadata.name()), e);
return DeleteResult.ZERO;
}
}));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ public long getFailureCount() {
private volatile boolean throwReadErrorAfterUnblock = false;

private volatile boolean blocked = false;
private volatile boolean setThrowExceptionWhileDelete;

public MockRepository(RepositoryMetadata metadata, Environment environment,
NamedXContentRegistry namedXContentRegistry, ClusterService clusterService,
Expand Down Expand Up @@ -257,6 +258,10 @@ public void setFailReadsAfterUnblock(boolean failReadsAfterUnblock) {
this.failReadsAfterUnblock = failReadsAfterUnblock;
}

public void setThrowExceptionWhileDelete(boolean throwError) {
setThrowExceptionWhileDelete = throwError;
}

public boolean blocked() {
return blocked;
}
Expand Down Expand Up @@ -425,6 +430,9 @@ public InputStream readBlob(String name, long position, long length) throws IOEx
@Override
public DeleteResult delete() throws IOException {
DeleteResult deleteResult = DeleteResult.ZERO;
if (setThrowExceptionWhileDelete) {
throw new IOException("Random exception");
}
for (BlobContainer child : children().values()) {
deleteResult = deleteResult.add(child.delete());
}
Expand Down

0 comments on commit 7255cbc

Please sign in to comment.