Skip to content

Commit

Permalink
Parallelize stale index deletion (#100316)
Browse files Browse the repository at this point in the history
After deleting a snapshot today we clean up all the now-dangling indices
sequentially, which can be rather slow. With this commit we parallelize
the work across the whole `SNAPSHOT` pool on the master node.

Closes #61513

Co-authored-by: Piyush Daftary <[email protected]>
  • Loading branch information
DaveCTurner and piyushdaftary authored Oct 9, 2023
1 parent 8ff7dee commit cadcb9b
Show file tree
Hide file tree
Showing 7 changed files with 403 additions and 84 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/100316.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 100316
summary: Parallelize stale index deletion
area: Snapshot/Restore
type: enhancement
issues:
- 61513
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,38 @@
import org.elasticsearch.action.admin.cluster.repositories.verify.VerifyRepositoryResponse;
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.SimpleBatchedExecutor;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.RepositoryConflictException;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.RepositoryVerificationException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.snapshots.mockstore.MockRepository;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.threadpool.ThreadPool;

import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.IntSupplier;
import java.util.function.ToLongFunction;

import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.READONLY_SETTING_KEY;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
Expand Down Expand Up @@ -295,4 +311,183 @@ public void testRepositoryConflict() throws Exception {
logger.info("--> wait until snapshot deletion is finished");
assertAcked(future.actionGet());
}

public void testLeakedStaleIndicesAreDeletedBySubsequentDelete() throws Exception {
Client client = client();
Path repositoryPath = randomRepoPath();
final String repositoryName = "test-repo";
final String snapshot1Name = "test-snap-1";
final String snapshot2Name = "test-snap-2";

logger.info("--> creating repository at {}", repositoryPath.toAbsolutePath());
createRepository(repositoryName, "mock", repositoryPath);

logger.info("--> creating index-1 and ingest data");
createIndex("test-idx-1");
ensureGreen();
for (int j = 0; j < 10; j++) {
indexDoc("test-idx-1", Integer.toString(10 + j), "foo", "bar" + 10 + j);
}
refresh();

logger.info("--> creating first snapshot");
createFullSnapshot(repositoryName, snapshot1Name);

logger.info("--> creating index-2 and ingest data");
createIndex("test-idx-2");
ensureGreen();
for (int j = 0; j < 10; j++) {
indexDoc("test-idx-2", Integer.toString(10 + j), "foo", "bar" + 10 + j);
}
refresh();

logger.info("--> creating second snapshot");
createFullSnapshot(repositoryName, snapshot2Name);

// Make repository throw exceptions when trying to delete stale indices
// This will make sure stale indices stay in repository after snapshot delete
final var repository = (MockRepository) internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class)
.repository(repositoryName);
repository.setFailOnDeleteContainer(true);

logger.info("--> delete the second snapshot");
client.admin().cluster().prepareDeleteSnapshot(repositoryName, snapshot2Name).get();

// Make repository work normally
repository.setFailOnDeleteContainer(false);

// This snapshot should delete last snapshot's residual stale indices as well
logger.info("--> delete snapshot one");
client.admin().cluster().prepareDeleteSnapshot(repositoryName, snapshot1Name).get();

logger.info("--> check no leftover files");
assertFileCount(repositoryPath, 2); // just the index-N and index.latest blobs

logger.info("--> done");
}

public void testCleanupStaleBlobsConcurrency() throws Exception {
// This test is verifying the detailed behaviour of cleanup tasks that are enqueued after a snapshot delete is committed to the
// repository, ensuring that we see exactly the right number of tasks enqueued at each stage to demonstrate that we do use all the
// threads available to us, but don't spam the threadpool queue with all the tasks at once, and that we submit one task that drains
// the queue eagerly to provide backpressure. That means this test is sensitive to changes in the breakdown of the cleanup work
// after a snapshot delete.

final var client = client();
final var repositoryPath = randomRepoPath();
final var repositoryName = "test-repo";
createRepository(repositoryName, "mock", repositoryPath);

final var threadPool = internalCluster().getCurrentMasterNodeInstance(ThreadPool.class);
final var snapshotPoolSize = threadPool.info(ThreadPool.Names.SNAPSHOT).getMax();
final var indexCount = snapshotPoolSize * 3;

for (int i = 0; i < indexCount; i++) {
createIndex("test-idx-" + i);
for (int j = 0; j < 10; j++) {
indexDoc("test-idx-" + i, Integer.toString(10 + j), "foo", "bar" + 10 + j);
}
}

ensureGreen();

final var snapshotName = "test-snap";
createFullSnapshot(repositoryName, snapshotName);

final var executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
final var barrier = new CyclicBarrier(snapshotPoolSize + 1);
final var keepBlocking = new AtomicBoolean(true);
final var clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);
final ToLongFunction<ClusterState> repoGenFn = s -> RepositoriesMetadata.get(s).repository(repositoryName).generation();
final var repositoryGenerationBeforeDelete = repoGenFn.applyAsLong(clusterService.state());
final ClusterStateListener clusterStateListener = event -> {
if (repoGenFn.applyAsLong(event.previousState()) == repositoryGenerationBeforeDelete
&& repoGenFn.applyAsLong(event.state()) > repositoryGenerationBeforeDelete) {
// We are updating the safe repository generation which indicates that the snapshot delete is complete. Once this cluster
// state update completes we will enqueue all the cleanup work on the SNAPSHOT pool. So here we prepare for that by blocking
// all the SNAPSHOT threads:

// All but one of the threads just repeatedly block on the barrier without picking up any new tasks
for (int i = 0; i < snapshotPoolSize - 1; i++) {
executor.execute(() -> {
while (keepBlocking.get()) {
safeAwait(barrier);
safeAwait(barrier);
}
});
}

// The last thread runs a task which blocks on the barrier and then enqueues itself again, at the back of the queue,
// so that this thread will run everything _currently_ in the queue each time the barrier is released, in the order in which
// it was enqueued, and will then block on the barrier again.
new Runnable() {
@Override
public void run() {
executor.execute(() -> {
safeAwait(barrier);
safeAwait(barrier);
if (keepBlocking.get()) {
this.run();
}
});
}
}.run();
}
};
clusterService.addListener(clusterStateListener);

final var deleteFuture = new PlainActionFuture<AcknowledgedResponse>();
client.admin().cluster().prepareDeleteSnapshot(repositoryName, snapshotName).execute(deleteFuture);

safeAwait(barrier); // wait for all the snapshot threads to be blocked
clusterService.removeListener(clusterStateListener);

// We must wait for all the cleanup work to be enqueued (with the throttled runner at least) so we can be sure of exactly how it
// will execute. The cleanup work is enqueued by the master service thread on completion of the cluster state update which increases
// the root blob generation in the repo metadata, so it is sufficient to wait for another no-op task to run on the master service:
PlainActionFuture.get(fut -> clusterService.createTaskQueue("test", Priority.NORMAL, new SimpleBatchedExecutor<>() {
@Override
public Tuple<ClusterState, Object> executeTask(ClusterStateTaskListener clusterStateTaskListener, ClusterState clusterState) {
return Tuple.tuple(clusterState, null);
}

@Override
public void taskSucceeded(ClusterStateTaskListener clusterStateTaskListener, Object ignored) {
fut.onResponse(null);
}
}).submitTask("test", e -> fail(), null), 10, TimeUnit.SECONDS);

final IntSupplier queueLength = () -> threadPool.stats()
.stats()
.stream()
.filter(s -> s.name().equals(ThreadPool.Names.SNAPSHOT))
.findFirst()
.orElseThrow()
.queue();

// There are indexCount (=3*snapshotPoolSize) index-deletion tasks, plus one for cleaning up the root metadata. However, the
// throttled runner only enqueues one task per SNAPSHOT thread to start with, and then the eager runner adds another one. This shows
// we are not spamming the threadpool with all the tasks at once, which means that other snapshot activities can run alongside this
// cleanup.
assertThat(queueLength.getAsInt(), equalTo(snapshotPoolSize + 1));

safeAwait(barrier); // unblock the barrier thread and let it process the queue
safeAwait(barrier); // wait for the queue to be processed

// We first ran all the one-task actions, each of which completes and puts another one-task action into the queue. Then the eager
// runner runs all the remaining tasks.
assertThat(queueLength.getAsInt(), equalTo(snapshotPoolSize));

safeAwait(barrier); // unblock the barrier thread and let it process the queue
safeAwait(barrier); // wait for the queue to be processed

// Since the eager runner already ran all the remaining tasks, when the enqueued actions run they add no more work to the queue.
assertThat(queueLength.getAsInt(), equalTo(0));

assertFileCount(repositoryPath, 2); // just the index-N and index.latest blobs

keepBlocking.set(false);
safeAwait(barrier); // release the threads so they can exit
assertTrue(deleteFuture.get(10, TimeUnit.SECONDS).isAcknowledged());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,12 @@ public DeleteResult add(DeleteResult other) {
public DeleteResult add(long blobs, long bytes) {
return new DeleteResult(blobsDeleted + blobs, bytesDeleted + bytes);
}

public static DeleteResult of(long blobs, long bytes) {
if (blobs == 0 && bytes == 0) {
return ZERO;
} else {
return new DeleteResult(blobs, bytes);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

/**
Expand Down Expand Up @@ -155,4 +156,54 @@ int runningTasks() {
return runningTasks.get();
}

/**
* Run a single task on the given executor which eagerly pulls tasks from the queue and executes them. This must only be used if the
* tasks in the queue are all synchronous, i.e. they release their ref before returning from {@code onResponse()}.
*/
public void runSyncTasksEagerly(Executor executor) {
executor.execute(new AbstractRunnable() {
@Override
protected void doRun() {
final AtomicBoolean isDone = new AtomicBoolean(true);
final Releasable ref = () -> isDone.set(true);
ActionListener<Releasable> task;
while ((task = tasks.poll()) != null) {
isDone.set(false);
try {
logger.trace("[{}] eagerly running task {}", taskRunnerName, task);
task.onResponse(ref);
} catch (Exception e) {
logger.error(Strings.format("[%s] task %s failed", taskRunnerName, task), e);
assert false : e;
task.onFailure(e);
return;
}
if (isDone.get() == false) {
logger.error(
"runSyncTasksEagerly() was called on a queue [{}] containing an async task: [{}]",
taskRunnerName,
task
);
assert false;
return;
}
}
}

@Override
public void onFailure(Exception e) {
logger.error("unexpected failure in runSyncTasksEagerly", e);
assert false : e;
}

@Override
public void onRejection(Exception e) {
if (e instanceof EsRejectedExecutionException) {
logger.debug("runSyncTasksEagerly was rejected", e);
} else {
onFailure(e);
}
}
});
}
}
Loading

0 comments on commit cadcb9b

Please sign in to comment.