Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize stale index deletion #100316

Merged
Merged
6 changes: 6 additions & 0 deletions docs/changelog/100316.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 100316
summary: Parallelize stale index deletion
area: Snapshot/Restore
type: enhancement
issues:
- 61513
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,38 @@
import org.elasticsearch.action.admin.cluster.repositories.verify.VerifyRepositoryResponse;
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.SimpleBatchedExecutor;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.RepositoryConflictException;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.RepositoryVerificationException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.snapshots.mockstore.MockRepository;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.threadpool.ThreadPool;

import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.IntSupplier;
import java.util.function.ToLongFunction;

import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.READONLY_SETTING_KEY;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
Expand Down Expand Up @@ -295,4 +311,183 @@ public void testRepositoryConflict() throws Exception {
logger.info("--> wait until snapshot deletion is finished");
assertAcked(future.actionGet());
}

public void testLeakedStaleIndicesAreDeletedBySubsequentDelete() throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a cool test and I learnt a few tips and tricks from it. But it does not test the new parallelization change. Do we care?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is largely from the original contributor, but I think it's a reasonable test to write. I've played around with a few ideas for testing the new threading more precisely but it seems pretty tricky, and kinda doesn't matter so much as long as we do actually do the work somehow. I think I had an idea for a test tho.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok my idea seems to work, see testCleanupStaleBlobsConcurrency added in b56455c (sorry for the force-push)

Client client = client();
Path repositoryPath = randomRepoPath();
final String repositoryName = "test-repo";
final String snapshot1Name = "test-snap-1";
final String snapshot2Name = "test-snap-2";

logger.info("--> creating repository at {}", repositoryPath.toAbsolutePath());
ywangd marked this conversation as resolved.
Show resolved Hide resolved
createRepository(repositoryName, "mock", repositoryPath);

logger.info("--> creating index-1 and ingest data");
createIndex("test-idx-1");
ensureGreen();
for (int j = 0; j < 10; j++) {
indexDoc("test-idx-1", Integer.toString(10 + j), "foo", "bar" + 10 + j);
}
refresh();

logger.info("--> creating first snapshot");
createFullSnapshot(repositoryName, snapshot1Name);

logger.info("--> creating index-2 and ingest data");
createIndex("test-idx-2");
ensureGreen();
for (int j = 0; j < 10; j++) {
indexDoc("test-idx-2", Integer.toString(10 + j), "foo", "bar" + 10 + j);
}
refresh();

logger.info("--> creating second snapshot");
createFullSnapshot(repositoryName, snapshot2Name);

// Make repository throw exceptions when trying to delete stale indices
// This will make sure stale indices stay in repository after snapshot delete
final var repository = (MockRepository) internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class)
.repository(repositoryName);
repository.setFailOnDeleteContainer(true);

logger.info("--> delete the second snapshot");
client.admin().cluster().prepareDeleteSnapshot(repositoryName, snapshot2Name).get();

// Make repository work normally
repository.setFailOnDeleteContainer(false);

// This snapshot should delete last snapshot's residual stale indices as well
logger.info("--> delete snapshot one");
client.admin().cluster().prepareDeleteSnapshot(repositoryName, snapshot1Name).get();

logger.info("--> check no leftover files");
assertFileCount(repositoryPath, 2); // just the index-N and index.latest blobs

logger.info("--> done");
}

public void testCleanupStaleBlobsConcurrency() throws Exception {
// This test is verifying the detailed behaviour of cleanup tasks that are enqueued after a snapshot delete is committed to the
// repository, ensuring that we see exactly the right number of tasks enqueued at each stage to demonstrate that we do use all the
// threads available to us, but don't spam the threadpool queue with all the tasks at once, and that we submit one task that drains
// the queue eagerly to provide backpressure. That means this test is sensitive to changes in the breakdown of the cleanup work
// after a snapshot delete.

final var client = client();
final var repositoryPath = randomRepoPath();
final var repositoryName = "test-repo";
createRepository(repositoryName, "mock", repositoryPath);

final var threadPool = internalCluster().getCurrentMasterNodeInstance(ThreadPool.class);
final var snapshotPoolSize = threadPool.info(ThreadPool.Names.SNAPSHOT).getMax();
final var indexCount = snapshotPoolSize * 3;

for (int i = 0; i < indexCount; i++) {
createIndex("test-idx-" + i);
for (int j = 0; j < 10; j++) {
indexDoc("test-idx-" + i, Integer.toString(10 + j), "foo", "bar" + 10 + j);
}
}

ensureGreen();

final var snapshotName = "test-snap";
createFullSnapshot(repositoryName, snapshotName);

final var executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
final var barrier = new CyclicBarrier(snapshotPoolSize + 1);
final var keepBlocking = new AtomicBoolean(true);
final var clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);
final ToLongFunction<ClusterState> repoGenFn = s -> RepositoriesMetadata.get(s).repository(repositoryName).generation();
final var repositoryGenerationBeforeDelete = repoGenFn.applyAsLong(clusterService.state());
final ClusterStateListener clusterStateListener = event -> {
if (repoGenFn.applyAsLong(event.previousState()) == repositoryGenerationBeforeDelete
&& repoGenFn.applyAsLong(event.state()) > repositoryGenerationBeforeDelete) {
// We are updating the safe repository generation which indicates that the snapshot delete is complete. Once this cluster
// state update completes we will enqueue all the cleanup work on the SNAPSHOT pool. So here we prepare for that by blocking
// all the SNAPSHOT threads:

// All but one of the threads just repeatedly block on the barrier without picking up any new tasks
for (int i = 0; i < snapshotPoolSize - 1; i++) {
executor.execute(() -> {
while (keepBlocking.get()) {
safeAwait(barrier);
safeAwait(barrier);
}
});
}

// The last thread runs a task which blocks on the barrier and then enqueues itself again, at the back of the queue,
// so that this thread will run everything _currently_ in the queue each time the barrier is released, in the order in which
// it was enqueued, and will then block on the barrier again.
new Runnable() {
@Override
public void run() {
executor.execute(() -> {
safeAwait(barrier);
safeAwait(barrier);
if (keepBlocking.get()) {
this.run();
}
});
}
}.run();
}
};
clusterService.addListener(clusterStateListener);

final var deleteFuture = new PlainActionFuture<AcknowledgedResponse>();
client.admin().cluster().prepareDeleteSnapshot(repositoryName, snapshotName).execute(deleteFuture);

safeAwait(barrier); // wait for all the snapshot threads to be blocked
clusterService.removeListener(clusterStateListener);

// We must wait for all the cleanup work to be enqueued (with the throttled runner at least) so we can be sure of exactly how it
// will execute. The cleanup work is enqueued by the master service thread on completion of the cluster state update which increases
// the root blob generation in the repo metadata, so it is sufficient to wait for another no-op task to run on the master service:
PlainActionFuture.get(fut -> clusterService.createTaskQueue("test", Priority.NORMAL, new SimpleBatchedExecutor<>() {
@Override
public Tuple<ClusterState, Object> executeTask(ClusterStateTaskListener clusterStateTaskListener, ClusterState clusterState) {
return Tuple.tuple(clusterState, null);
}

@Override
public void taskSucceeded(ClusterStateTaskListener clusterStateTaskListener, Object ignored) {
fut.onResponse(null);
}
}).submitTask("test", e -> fail(), null), 10, TimeUnit.SECONDS);

final IntSupplier queueLength = () -> threadPool.stats()
.stats()
.stream()
.filter(s -> s.name().equals(ThreadPool.Names.SNAPSHOT))
.findFirst()
.orElseThrow()
.queue();

// There are indexCount (=3*snapshotPoolSize) index-deletion tasks, plus one for cleaning up the root metadata. However, the
// throttled runner only enqueues one task per SNAPSHOT thread to start with, and then the eager runner adds another one. This shows
// we are not spamming the threadpool with all the tasks at once, which means that other snapshot activities can run alongside this
// cleanup.
assertThat(queueLength.getAsInt(), equalTo(snapshotPoolSize + 1));

safeAwait(barrier); // unblock the barrier thread and let it process the queue
safeAwait(barrier); // wait for the queue to be processed

// We first ran all the one-task actions, each of which completes and puts another one-task action into the queue. Then the eager
// runner runs all the remaining tasks.
assertThat(queueLength.getAsInt(), equalTo(snapshotPoolSize));

safeAwait(barrier); // unblock the barrier thread and let it process the queue
safeAwait(barrier); // wait for the queue to be processed

// Since the eager runner already ran all the remaining tasks, when the enqueued actions run they add no more work to the queue.
assertThat(queueLength.getAsInt(), equalTo(0));

assertFileCount(repositoryPath, 2); // just the index-N and index.latest blobs

keepBlocking.set(false);
safeAwait(barrier); // release the threads so they can exit
assertTrue(deleteFuture.get(10, TimeUnit.SECONDS).isAcknowledged());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,12 @@ public DeleteResult add(DeleteResult other) {
public DeleteResult add(long blobs, long bytes) {
return new DeleteResult(blobsDeleted + blobs, bytesDeleted + bytes);
}

public static DeleteResult of(long blobs, long bytes) {
if (blobs == 0 && bytes == 0) {
return ZERO;
} else {
return new DeleteResult(blobs, bytes);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

/**
Expand Down Expand Up @@ -155,4 +156,54 @@ int runningTasks() {
return runningTasks.get();
}

/**
* Run a single task on the given executor which eagerly pulls tasks from the queue and executes them. This must only be used if the
* tasks in the queue are all synchronous, i.e. they release their ref before returning from {@code onResponse()}.
*/
public void runSyncTasksEagerly(Executor executor) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not have an overload version of runSyncTasksEagerly() that just use the TaskRunner's own executor for the eager run as well?

Copy link
Contributor Author

@DaveCTurner DaveCTurner Oct 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that could work too. I'm going to wait and see on that idea tho, I'd rather do one or the other, and the choice depends on whether we have other users that want to do something else or whether everyone just forks another task on AbstractThrottledTaskRunner#executor.

executor.execute(new AbstractRunnable() {
@Override
protected void doRun() {
final AtomicBoolean isDone = new AtomicBoolean(true);
final Releasable ref = () -> isDone.set(true);
ActionListener<Releasable> task;
while ((task = tasks.poll()) != null) {
isDone.set(false);
try {
logger.trace("[{}] eagerly running task {}", taskRunnerName, task);
task.onResponse(ref);
} catch (Exception e) {
logger.error(Strings.format("[%s] task %s failed", taskRunnerName, task), e);
assert false : e;
task.onFailure(e);
return;
}
if (isDone.get() == false) {
logger.error(
"runSyncTasksEagerly() was called on a queue [{}] containing an async task: [{}]",
taskRunnerName,
task
);
assert false;
return;
}
}
}

@Override
public void onFailure(Exception e) {
logger.error("unexpected failure in runSyncTasksEagerly", e);
assert false : e;
}

@Override
public void onRejection(Exception e) {
if (e instanceof EsRejectedExecutionException) {
logger.debug("runSyncTasksEagerly was rejected", e);
} else {
onFailure(e);
}
}
});
}
}
Loading