Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speedup snapshot stale indices delete #64513

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.RepositoryVerificationException;
import org.elasticsearch.snapshots.mockstore.MockRepository;
import org.elasticsearch.test.ESIntegTestCase;

import java.nio.file.Path;
Expand Down Expand Up @@ -225,4 +227,61 @@ public void testRepositoryVerification() {
assertThat(ExceptionsHelper.stackTrace(ex), containsString("is not shared"));
}
}

public void testResidualStaleIndicesAreDeletedByConsecutiveDelete() throws Exception {
Client client = client();
Path repositoryPath = randomRepoPath();
final String repositoryName = "test-repo";
final String snapshot1Name = "test-snap-1";
final String snapshot2Name = "test-snap-2";

logger.info("--> creating repository at {}", repositoryPath.toAbsolutePath());
createRepository(repositoryName, "mock", repositoryPath);

int numberOfFiles = numberOfFiles(repositoryPath);

logger.info("--> creating index-1 and ingest data");
createIndex("test-idx-1");
ensureGreen();
for (int j = 0; j < 10; j++) {
indexDoc("test-idx-1", Integer.toString( 10 + j), "foo", "bar" + 10 + j);
}
refresh();

logger.info("--> creating first snapshot");
createFullSnapshot(repositoryName, snapshot1Name);

logger.info("--> creating index-2 and ingest data");
createIndex("test-idx-2");
ensureGreen();
for (int j = 0; j < 10; j++) {
indexDoc("test-idx-2", Integer.toString( 10 + j), "foo", "bar" + 10 + j);
}
refresh();

logger.info("--> creating second snapshot");
createFullSnapshot(repositoryName, snapshot2Name);

// Make repository to throw exception when trying to delete stale indices
// This will make sure stale indices stays in repository after snapshot delete
String masterNode = internalCluster().getMasterName();
((MockRepository)internalCluster().getInstance(RepositoriesService.class, masterNode).repository("test-repo")).
setThrowExceptionWhileDelete(true);

logger.info("--> delete the second snapshot");
client.admin().cluster().prepareDeleteSnapshot(repositoryName, snapshot2Name).get();

// Make repository to work normally
((MockRepository)internalCluster().getInstance(RepositoriesService.class, masterNode).repository("test-repo")).
setThrowExceptionWhileDelete(false);

// This snapshot should delete last snapshot's residual stale indices as well
logger.info("--> delete snapshot one");
client.admin().cluster().prepareDeleteSnapshot(repositoryName, snapshot1Name).get();

logger.info("--> make sure that number of files is back to what it was when the first snapshot was made");
assertFileCount(repositoryPath, numberOfFiles + 2);

logger.info("--> done");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -931,7 +931,7 @@ private void cleanupStaleBlobs(Collection<SnapshotId> deletedSnapshots, Map<Stri
if (foundIndices.keySet().equals(survivingIndexIds)) {
groupedListener.onResponse(DeleteResult.ZERO);
} else {
executor.execute(ActionRunnable.supply(groupedListener, () -> cleanupStaleIndices(foundIndices, survivingIndexIds)));
cleanupStaleIndices(foundIndices, survivingIndexIds, groupedListener);
}
}

Expand Down Expand Up @@ -1039,31 +1039,64 @@ private List<String> cleanupStaleRootFiles(long previousGeneration, Collection<S
return Collections.emptyList();
}

private DeleteResult cleanupStaleIndices(Map<String, BlobContainer> foundIndices, Set<String> survivingIndexIds) {
DeleteResult deleteResult = DeleteResult.ZERO;
private void cleanupStaleIndices(Map<String, BlobContainer> foundIndices, Set<String> survivingIndexIds,
GroupedActionListener<DeleteResult> listener) {
final GroupedActionListener<DeleteResult> groupedListener = new GroupedActionListener<>(ActionListener.wrap(deleteResults -> {
DeleteResult deleteResult = DeleteResult.ZERO;
for (DeleteResult result : deleteResults) {
deleteResult = deleteResult.add(result);
}
listener.onResponse(deleteResult);
}, listener::onFailure), foundIndices.size() - survivingIndexIds.size());

try {
final BlockingQueue<Map.Entry<String, BlobContainer>> staleIndicesToDelete = new LinkedBlockingQueue<>();
for (Map.Entry<String, BlobContainer> indexEntry : foundIndices.entrySet()) {
final String indexSnId = indexEntry.getKey();
try {
if (survivingIndexIds.contains(indexSnId) == false) {
logger.debug("[{}] Found stale index [{}]. Cleaning it up", metadata.name(), indexSnId);
deleteResult = deleteResult.add(indexEntry.getValue().delete());
logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId);
}
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage(
"[{}] index {} is no longer part of any snapshots in the repository, " +
"but failed to clean up their index folders", metadata.name(), indexSnId), e);
if (survivingIndexIds.contains(indexEntry.getKey()) == false) {
staleIndicesToDelete.put(indexEntry);
}
}

// Start as many workers as fit into the snapshot pool at once at the most
final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(),
foundIndices.size() - survivingIndexIds.size());
for (int i = 0; i < workers; ++i) {
executeOneStaleIndexDelete(staleIndicesToDelete, groupedListener);
}
} catch (Exception e) {
// TODO: We shouldn't be blanket catching and suppressing all exceptions here and instead handle them safely upstream.
// Currently this catch exists as a stop gap solution to tackle unexpected runtime exceptions from implementations
// bubbling up and breaking the snapshot functionality.
assert false : e;
logger.warn(new ParameterizedMessage("[{}] Exception during cleanup of stale indices", metadata.name()), e);
}
return deleteResult;
}

private void executeOneStaleIndexDelete(BlockingQueue<Map.Entry<String, BlobContainer>> staleIndicesToDelete,
GroupedActionListener<DeleteResult> listener) throws InterruptedException {
Map.Entry<String, BlobContainer> indexEntry = staleIndicesToDelete.poll(0L, TimeUnit.MILLISECONDS);
if (indexEntry == null) {
return;
} else {
final String indexSnId = indexEntry.getKey();
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(listener, () -> {
try {
DeleteResult staleIndexDeleteResult = indexEntry.getValue().delete();
logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId);
executeOneStaleIndexDelete(staleIndicesToDelete, listener);
return staleIndexDeleteResult;
} catch (IOException e) {
piyushdaftary marked this conversation as resolved.
Show resolved Hide resolved
logger.warn(() -> new ParameterizedMessage(
"[{}] index {} is no longer part of any snapshots in the repository, " +
"but failed to clean up their index folders", metadata.name(), indexSnId), e);
return DeleteResult.ZERO;
} catch (Exception e) {
assert false : e;
logger.warn(new ParameterizedMessage("[{}] Exception during single stale index delete", metadata.name()), e);
return DeleteResult.ZERO;
}
}));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ public long getFailureCount() {
private volatile boolean throwReadErrorAfterUnblock = false;

private volatile boolean blocked = false;
private volatile boolean setThrowExceptionWhileDelete;

public MockRepository(RepositoryMetadata metadata, Environment environment,
NamedXContentRegistry namedXContentRegistry, ClusterService clusterService, BigArrays bigArrays,
Expand Down Expand Up @@ -247,6 +248,10 @@ public void setFailReadsAfterUnblock(boolean failReadsAfterUnblock) {
this.failReadsAfterUnblock = failReadsAfterUnblock;
}

public void setThrowExceptionWhileDelete(boolean throwError) {
setThrowExceptionWhileDelete = throwError;
}

public boolean blocked() {
return blocked;
}
Expand Down Expand Up @@ -415,6 +420,9 @@ public InputStream readBlob(String name, long position, long length) throws IOEx
@Override
public DeleteResult delete() throws IOException {
DeleteResult deleteResult = DeleteResult.ZERO;
if (setThrowExceptionWhileDelete) {
throw new IOException("Random exception");
}
for (BlobContainer child : children().values()) {
deleteResult = deleteResult.add(child.delete());
}
Expand Down