From cadcb9b5fded6b836b6b0b5690df5fab91524167 Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 9 Oct 2023 14:07:19 +0100 Subject: [PATCH] Parallelize stale index deletion (#100316) After deleting a snapshot today we clean up all the now-dangling indices sequentially, which can be rather slow. With this commit we parallelize the work across the whole `SNAPSHOT` pool on the master node. Closes #61513 Co-authored-by: Piyush Daftary --- docs/changelog/100316.yaml | 6 + .../snapshots/RepositoriesIT.java | 195 ++++++++++++++++++ .../common/blobstore/DeleteResult.java | 8 + .../AbstractThrottledTaskRunner.java | 51 +++++ .../blobstore/BlobStoreRepository.java | 165 ++++++++------- .../AbstractThrottledTaskRunnerTests.java | 50 +++++ .../snapshots/mockstore/MockRepository.java | 12 ++ 7 files changed, 403 insertions(+), 84 deletions(-) create mode 100644 docs/changelog/100316.yaml diff --git a/docs/changelog/100316.yaml b/docs/changelog/100316.yaml new file mode 100644 index 0000000000000..9efb64a332dc1 --- /dev/null +++ b/docs/changelog/100316.yaml @@ -0,0 +1,6 @@ +pr: 100316 +summary: Parallelize stale index deletion +area: Snapshot/Restore +type: enhancement +issues: + - 61513 diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoriesIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoriesIT.java index ffa45e3136b51..5f4c270f69348 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoriesIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoriesIT.java @@ -13,22 +13,38 @@ import org.elasticsearch.action.admin.cluster.repositories.verify.VerifyRepositoryResponse; import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotAction; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.ClusterStateTaskListener; +import org.elasticsearch.cluster.SimpleBatchedExecutor; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.RepositoriesMetadata; import org.elasticsearch.cluster.metadata.RepositoryMetadata; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Priority; import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.RepositoryConflictException; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.RepositoryVerificationException; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.snapshots.mockstore.MockRepository; import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.threadpool.ThreadPool; import java.nio.file.Path; import java.util.List; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.IntSupplier; +import java.util.function.ToLongFunction; import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.READONLY_SETTING_KEY; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; @@ -295,4 +311,183 @@ public void testRepositoryConflict() throws Exception { logger.info("--> wait until snapshot deletion is finished"); assertAcked(future.actionGet()); } + + public void testLeakedStaleIndicesAreDeletedBySubsequentDelete() throws Exception { + Client client = client(); + Path repositoryPath = randomRepoPath(); + final String repositoryName = "test-repo"; + final String snapshot1Name = "test-snap-1"; + final String snapshot2Name = "test-snap-2"; + + logger.info("--> creating repository at {}", repositoryPath.toAbsolutePath()); + createRepository(repositoryName, "mock", repositoryPath); + + logger.info("--> creating index-1 and ingest data"); + createIndex("test-idx-1"); + ensureGreen(); + for (int j = 0; j < 10; j++) { + indexDoc("test-idx-1", Integer.toString(10 + j), "foo", "bar" + 10 + j); + } + refresh(); + + logger.info("--> creating first snapshot"); + createFullSnapshot(repositoryName, snapshot1Name); + + logger.info("--> creating index-2 and ingest data"); + createIndex("test-idx-2"); + ensureGreen(); + for (int j = 0; j < 10; j++) { + indexDoc("test-idx-2", Integer.toString(10 + j), "foo", "bar" + 10 + j); + } + refresh(); + + logger.info("--> creating second snapshot"); + createFullSnapshot(repositoryName, snapshot2Name); + + // Make repository throw exceptions when trying to delete stale indices + // This will make sure stale indices stay in repository after snapshot delete + final var repository = (MockRepository) internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class) + .repository(repositoryName); + repository.setFailOnDeleteContainer(true); + + logger.info("--> delete the second snapshot"); + client.admin().cluster().prepareDeleteSnapshot(repositoryName, snapshot2Name).get(); + + // Make repository work normally + repository.setFailOnDeleteContainer(false); + + // This snapshot should delete last snapshot's residual stale indices as well + logger.info("--> delete snapshot one"); + client.admin().cluster().prepareDeleteSnapshot(repositoryName, snapshot1Name).get(); + + logger.info("--> check no leftover files"); + assertFileCount(repositoryPath, 2); // just the index-N and index.latest blobs + + logger.info("--> done"); + } + + public void testCleanupStaleBlobsConcurrency() throws Exception { + // This test is verifying the detailed behaviour of cleanup tasks that are enqueued after a snapshot delete is committed to the + // repository, ensuring that we see exactly the right number of tasks enqueued at each stage to demonstrate that we do use all the + // threads available to us, but don't spam the threadpool queue with all the tasks at once, and that we submit one task that drains + // the queue eagerly to provide backpressure. That means this test is sensitive to changes in the breakdown of the cleanup work + // after a snapshot delete. + + final var client = client(); + final var repositoryPath = randomRepoPath(); + final var repositoryName = "test-repo"; + createRepository(repositoryName, "mock", repositoryPath); + + final var threadPool = internalCluster().getCurrentMasterNodeInstance(ThreadPool.class); + final var snapshotPoolSize = threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(); + final var indexCount = snapshotPoolSize * 3; + + for (int i = 0; i < indexCount; i++) { + createIndex("test-idx-" + i); + for (int j = 0; j < 10; j++) { + indexDoc("test-idx-" + i, Integer.toString(10 + j), "foo", "bar" + 10 + j); + } + } + + ensureGreen(); + + final var snapshotName = "test-snap"; + createFullSnapshot(repositoryName, snapshotName); + + final var executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); + final var barrier = new CyclicBarrier(snapshotPoolSize + 1); + final var keepBlocking = new AtomicBoolean(true); + final var clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class); + final ToLongFunction repoGenFn = s -> RepositoriesMetadata.get(s).repository(repositoryName).generation(); + final var repositoryGenerationBeforeDelete = repoGenFn.applyAsLong(clusterService.state()); + final ClusterStateListener clusterStateListener = event -> { + if (repoGenFn.applyAsLong(event.previousState()) == repositoryGenerationBeforeDelete + && repoGenFn.applyAsLong(event.state()) > repositoryGenerationBeforeDelete) { + // We are updating the safe repository generation which indicates that the snapshot delete is complete. Once this cluster + // state update completes we will enqueue all the cleanup work on the SNAPSHOT pool. So here we prepare for that by blocking + // all the SNAPSHOT threads: + + // All but one of the threads just repeatedly block on the barrier without picking up any new tasks + for (int i = 0; i < snapshotPoolSize - 1; i++) { + executor.execute(() -> { + while (keepBlocking.get()) { + safeAwait(barrier); + safeAwait(barrier); + } + }); + } + + // The last thread runs a task which blocks on the barrier and then enqueues itself again, at the back of the queue, + // so that this thread will run everything _currently_ in the queue each time the barrier is released, in the order in which + // it was enqueued, and will then block on the barrier again. + new Runnable() { + @Override + public void run() { + executor.execute(() -> { + safeAwait(barrier); + safeAwait(barrier); + if (keepBlocking.get()) { + this.run(); + } + }); + } + }.run(); + } + }; + clusterService.addListener(clusterStateListener); + + final var deleteFuture = new PlainActionFuture(); + client.admin().cluster().prepareDeleteSnapshot(repositoryName, snapshotName).execute(deleteFuture); + + safeAwait(barrier); // wait for all the snapshot threads to be blocked + clusterService.removeListener(clusterStateListener); + + // We must wait for all the cleanup work to be enqueued (with the throttled runner at least) so we can be sure of exactly how it + // will execute. The cleanup work is enqueued by the master service thread on completion of the cluster state update which increases + // the root blob generation in the repo metadata, so it is sufficient to wait for another no-op task to run on the master service: + PlainActionFuture.get(fut -> clusterService.createTaskQueue("test", Priority.NORMAL, new SimpleBatchedExecutor<>() { + @Override + public Tuple executeTask(ClusterStateTaskListener clusterStateTaskListener, ClusterState clusterState) { + return Tuple.tuple(clusterState, null); + } + + @Override + public void taskSucceeded(ClusterStateTaskListener clusterStateTaskListener, Object ignored) { + fut.onResponse(null); + } + }).submitTask("test", e -> fail(), null), 10, TimeUnit.SECONDS); + + final IntSupplier queueLength = () -> threadPool.stats() + .stats() + .stream() + .filter(s -> s.name().equals(ThreadPool.Names.SNAPSHOT)) + .findFirst() + .orElseThrow() + .queue(); + + // There are indexCount (=3*snapshotPoolSize) index-deletion tasks, plus one for cleaning up the root metadata. However, the + // throttled runner only enqueues one task per SNAPSHOT thread to start with, and then the eager runner adds another one. This shows + // we are not spamming the threadpool with all the tasks at once, which means that other snapshot activities can run alongside this + // cleanup. + assertThat(queueLength.getAsInt(), equalTo(snapshotPoolSize + 1)); + + safeAwait(barrier); // unblock the barrier thread and let it process the queue + safeAwait(barrier); // wait for the queue to be processed + + // We first ran all the one-task actions, each of which completes and puts another one-task action into the queue. Then the eager + // runner runs all the remaining tasks. + assertThat(queueLength.getAsInt(), equalTo(snapshotPoolSize)); + + safeAwait(barrier); // unblock the barrier thread and let it process the queue + safeAwait(barrier); // wait for the queue to be processed + + // Since the eager runner already ran all the remaining tasks, when the enqueued actions run they add no more work to the queue. + assertThat(queueLength.getAsInt(), equalTo(0)); + + assertFileCount(repositoryPath, 2); // just the index-N and index.latest blobs + + keepBlocking.set(false); + safeAwait(barrier); // release the threads so they can exit + assertTrue(deleteFuture.get(10, TimeUnit.SECONDS).isAcknowledged()); + } } diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/DeleteResult.java b/server/src/main/java/org/elasticsearch/common/blobstore/DeleteResult.java index c12685db77696..8630ed4cc768f 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/DeleteResult.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/DeleteResult.java @@ -38,4 +38,12 @@ public DeleteResult add(DeleteResult other) { public DeleteResult add(long blobs, long bytes) { return new DeleteResult(blobsDeleted + blobs, bytesDeleted + bytes); } + + public static DeleteResult of(long blobs, long bytes) { + if (blobs == 0 && bytes == 0) { + return ZERO; + } else { + return new DeleteResult(blobs, bytes); + } + } } diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractThrottledTaskRunner.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractThrottledTaskRunner.java index ea37dad5ba218..898f73abbd1e9 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractThrottledTaskRunner.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractThrottledTaskRunner.java @@ -17,6 +17,7 @@ import java.util.Queue; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; /** @@ -155,4 +156,54 @@ int runningTasks() { return runningTasks.get(); } + /** + * Run a single task on the given executor which eagerly pulls tasks from the queue and executes them. This must only be used if the + * tasks in the queue are all synchronous, i.e. they release their ref before returning from {@code onResponse()}. + */ + public void runSyncTasksEagerly(Executor executor) { + executor.execute(new AbstractRunnable() { + @Override + protected void doRun() { + final AtomicBoolean isDone = new AtomicBoolean(true); + final Releasable ref = () -> isDone.set(true); + ActionListener task; + while ((task = tasks.poll()) != null) { + isDone.set(false); + try { + logger.trace("[{}] eagerly running task {}", taskRunnerName, task); + task.onResponse(ref); + } catch (Exception e) { + logger.error(Strings.format("[%s] task %s failed", taskRunnerName, task), e); + assert false : e; + task.onFailure(e); + return; + } + if (isDone.get() == false) { + logger.error( + "runSyncTasksEagerly() was called on a queue [{}] containing an async task: [{}]", + taskRunnerName, + task + ); + assert false; + return; + } + } + } + + @Override + public void onFailure(Exception e) { + logger.error("unexpected failure in runSyncTasksEagerly", e); + assert false : e; + } + + @Override + public void onRejection(Exception e) { + if (e instanceof EsRejectedExecutionException) { + logger.debug("runSyncTasksEagerly was rejected", e); + } else { + onFailure(e); + } + } + }); + } } diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 0825337143f8e..82a38d74d25e1 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -72,6 +72,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.ListenableFuture; +import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner; import org.elasticsearch.common.xcontent.ChunkedToXContent; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.core.CheckedConsumer; @@ -390,6 +391,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp private final ShardSnapshotTaskRunner shardSnapshotTaskRunner; + private final ThrottledTaskRunner staleBlobDeleteRunner; + /** * Constructs new BlobStoreRepository * @param metadata The metadata for this repository including name and settings @@ -430,6 +433,11 @@ protected BlobStoreRepository( this::doSnapshotShard, this::snapshotFile ); + staleBlobDeleteRunner = new ThrottledTaskRunner( + "cleanupStaleBlobs", + threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), + threadPool.executor(ThreadPool.Names.SNAPSHOT) + ); } @Override @@ -1150,31 +1158,65 @@ private void cleanupStaleBlobs( RepositoryData newRepoData, ActionListener listener ) { - final GroupedActionListener groupedListener = new GroupedActionListener<>(2, ActionListener.wrap(deleteResults -> { - DeleteResult deleteResult = DeleteResult.ZERO; - for (DeleteResult result : deleteResults) { - deleteResult = deleteResult.add(result); + final var blobsDeleted = new AtomicLong(); + final var bytesDeleted = new AtomicLong(); + try (var listeners = new RefCountingListener(listener.map(ignored -> DeleteResult.of(blobsDeleted.get(), bytesDeleted.get())))) { + + final List staleRootBlobs = staleRootBlobs(newRepoData, rootBlobs.keySet()); + if (staleRootBlobs.isEmpty() == false) { + staleBlobDeleteRunner.enqueueTask(listeners.acquire(ref -> { + try (ref) { + logStaleRootLevelBlobs(newRepoData.getGenId() - 1, deletedSnapshots, staleRootBlobs); + deleteFromContainer(blobContainer(), staleRootBlobs.iterator()); + for (final var staleRootBlob : staleRootBlobs) { + bytesDeleted.addAndGet(rootBlobs.get(staleRootBlob).length()); + } + blobsDeleted.addAndGet(staleRootBlobs.size()); + } catch (Exception e) { + logger.warn( + () -> format( + "[%s] The following blobs are no longer part of any snapshot [%s] but failed to remove them", + metadata.name(), + staleRootBlobs + ), + e + ); + } + })); } - listener.onResponse(deleteResult); - }, listener::onFailure)); - final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); - final List staleRootBlobs = staleRootBlobs(newRepoData, rootBlobs.keySet()); - if (staleRootBlobs.isEmpty()) { - groupedListener.onResponse(DeleteResult.ZERO); - } else { - executor.execute(ActionRunnable.supply(groupedListener, () -> { - List deletedBlobs = cleanupStaleRootFiles(newRepoData.getGenId() - 1, deletedSnapshots, staleRootBlobs); - return new DeleteResult(deletedBlobs.size(), deletedBlobs.stream().mapToLong(name -> rootBlobs.get(name).length()).sum()); - })); + final var survivingIndexIds = newRepoData.getIndices().values().stream().map(IndexId::getId).collect(Collectors.toSet()); + for (final var indexEntry : foundIndices.entrySet()) { + final var indexSnId = indexEntry.getKey(); + if (survivingIndexIds.contains(indexSnId)) { + continue; + } + staleBlobDeleteRunner.enqueueTask(listeners.acquire(ref -> { + try (ref) { + logger.debug("[{}] Found stale index [{}]. Cleaning it up", metadata.name(), indexSnId); + final var deleteResult = indexEntry.getValue().delete(OperationPurpose.SNAPSHOT); + blobsDeleted.addAndGet(deleteResult.blobsDeleted()); + bytesDeleted.addAndGet(deleteResult.bytesDeleted()); + logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId); + } catch (IOException e) { + logger.warn(() -> format(""" + [%s] index %s is no longer part of any snapshot in the repository, \ + but failed to clean up its index folder""", metadata.name(), indexSnId), e); + } + })); + } } - final Set survivingIndexIds = newRepoData.getIndices().values().stream().map(IndexId::getId).collect(Collectors.toSet()); - if (foundIndices.keySet().equals(survivingIndexIds)) { - groupedListener.onResponse(DeleteResult.ZERO); - } else { - executor.execute(ActionRunnable.supply(groupedListener, () -> cleanupStaleIndices(foundIndices, survivingIndexIds))); - } + // If we did the cleanup of stale indices purely using a throttled executor then there would be no backpressure to prevent us from + // falling arbitrarily far behind. But nor do we want to dedicate all the SNAPSHOT threads to stale index cleanups because that + // would slow down other snapshot operations in situations that do not need backpressure. + // + // The solution is to dedicate one SNAPSHOT thread to doing the cleanups eagerly, alongside the throttled executor which spreads + // the rest of the work across the other threads if they are free. If the eager cleanup loop doesn't finish before the next one + // starts then we dedicate another SNAPSHOT thread to the deletions, and so on, until eventually either we catch up or the SNAPSHOT + // pool is fully occupied with blob deletions, which pushes back on other snapshot operations. + + staleBlobDeleteRunner.runSyncTasksEagerly(threadPool.executor(ThreadPool.Names.SNAPSHOT)); } /** @@ -1183,8 +1225,8 @@ private void cleanupStaleBlobs( * TODO: Add shard level cleanups * TODO: Add unreferenced index metadata cleanup *
    - *
  • Deleting stale indices {@link #cleanupStaleIndices}
  • - *
  • Deleting unreferenced root level blobs {@link #cleanupStaleRootFiles}
  • + *
  • Deleting stale indices
  • + *
  • Deleting unreferenced root level blobs
  • *
* @param repositoryStateId Current repository state id * @param repositoryMetaVersion version of the updated repository metadata to write @@ -1262,70 +1304,25 @@ private static List staleRootBlobs(RepositoryData repositoryData, Set cleanupStaleRootFiles( - long previousGeneration, - Collection deletedSnapshots, - List blobsToDelete - ) { - if (blobsToDelete.isEmpty()) { - return blobsToDelete; - } - try { - if (logger.isInfoEnabled()) { - // If we're running root level cleanup as part of a snapshot delete we should not log the snapshot- and global metadata - // blobs associated with the just deleted snapshots as they are expected to exist and not stale. Otherwise every snapshot - // delete would also log a confusing INFO message about "stale blobs". - final Set blobNamesToIgnore = deletedSnapshots.stream() - .flatMap( - snapshotId -> Stream.of( - GLOBAL_METADATA_FORMAT.blobName(snapshotId.getUUID()), - SNAPSHOT_FORMAT.blobName(snapshotId.getUUID()), - INDEX_FILE_PREFIX + previousGeneration - ) + private void logStaleRootLevelBlobs(long previousGeneration, Collection deletedSnapshots, List blobsToDelete) { + if (logger.isInfoEnabled()) { + // If we're running root level cleanup as part of a snapshot delete we should not log the snapshot- and global metadata + // blobs associated with the just deleted snapshots as they are expected to exist and not stale. Otherwise every snapshot + // delete would also log a confusing INFO message about "stale blobs". + final Set blobNamesToIgnore = deletedSnapshots.stream() + .flatMap( + snapshotId -> Stream.of( + GLOBAL_METADATA_FORMAT.blobName(snapshotId.getUUID()), + SNAPSHOT_FORMAT.blobName(snapshotId.getUUID()), + INDEX_FILE_PREFIX + previousGeneration ) - .collect(Collectors.toSet()); - final List blobsToLog = blobsToDelete.stream().filter(b -> blobNamesToIgnore.contains(b) == false).toList(); - if (blobsToLog.isEmpty() == false) { - logger.info("[{}] Found stale root level blobs {}. Cleaning them up", metadata.name(), blobsToLog); - } - } - deleteFromContainer(blobContainer(), blobsToDelete.iterator()); - return blobsToDelete; - } catch (Exception e) { - logger.warn( - () -> format( - "[%s] The following blobs are no longer part of any snapshot [%s] but failed to remove them", - metadata.name(), - blobsToDelete - ), - e - ); - } - return Collections.emptyList(); - } - - private DeleteResult cleanupStaleIndices(Map foundIndices, Set survivingIndexIds) { - DeleteResult deleteResult = DeleteResult.ZERO; - for (Map.Entry indexEntry : foundIndices.entrySet()) { - final String indexSnId = indexEntry.getKey(); - try { - if (survivingIndexIds.contains(indexSnId) == false) { - logger.debug("[{}] Found stale index [{}]. Cleaning it up", metadata.name(), indexSnId); - deleteResult = deleteResult.add(indexEntry.getValue().delete(OperationPurpose.SNAPSHOT)); - logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId); - } - } catch (Exception e) { - logger.warn( - () -> format( - "[%s] index %s is no longer part of any snapshot in the repository, " + "but failed to clean up its index folder", - metadata.name(), - indexSnId - ), - e - ); + ) + .collect(Collectors.toSet()); + final List blobsToLog = blobsToDelete.stream().filter(b -> blobNamesToIgnore.contains(b) == false).toList(); + if (blobsToLog.isEmpty() == false) { + logger.info("[{}] Found stale root level blobs {}. Cleaning them up", metadata.name(), blobsToLog); } } - return deleteResult; } @Override diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractThrottledTaskRunnerTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractThrottledTaskRunnerTests.java index 7298512603b7a..b7524b0ad215e 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractThrottledTaskRunnerTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractThrottledTaskRunnerTests.java @@ -21,6 +21,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.lessThanOrEqualTo; @@ -141,6 +142,55 @@ public void onResponse(Releasable releasable) { assertNoRunningTasks(taskRunner); } + public void testRunSyncTasksEagerly() { + final int maxTasks = randomIntBetween(1, maxThreads); + final int taskCount = between(maxTasks, maxTasks * 2); + final var barrier = new CyclicBarrier(maxTasks + 1); + final var executedCountDown = new CountDownLatch(taskCount); + final var testThread = Thread.currentThread(); + + class TestTask implements ActionListener { + + @Override + public void onFailure(Exception e) { + throw new AssertionError(e); + } + + @Override + public void onResponse(Releasable releasable) { + try (releasable) { + if (Thread.currentThread() != testThread) { + safeAwait(barrier); + safeAwait(barrier); + } + } finally { + executedCountDown.countDown(); + } + } + } + + final BlockingQueue queue = ConcurrentCollections.newBlockingQueue(); + final AbstractThrottledTaskRunner taskRunner = new AbstractThrottledTaskRunner<>("test", maxTasks, executor, queue); + for (int i = 0; i < taskCount; i++) { + taskRunner.enqueueTask(new TestTask()); + } + + safeAwait(barrier); + assertThat(taskRunner.runningTasks(), equalTo(maxTasks)); // maxTasks tasks are running now + assertEquals(taskCount - maxTasks, queue.size()); // the remainder are enqueued + + final var capturedTask = new AtomicReference(); + taskRunner.runSyncTasksEagerly(t -> assertTrue(capturedTask.compareAndSet(null, t))); + assertEquals(taskCount - maxTasks, queue.size()); // hasn't run any tasks yet + capturedTask.get().run(); + assertTrue(queue.isEmpty()); + + safeAwait(barrier); + safeAwait(executedCountDown); + assertTrue(queue.isEmpty()); + assertNoRunningTasks(taskRunner); + } + public void testFailsTasksOnRejectionOrShutdown() throws Exception { final var executor = randomBoolean() ? EsExecutors.newScaling("test", maxThreads, maxThreads, 0, TimeUnit.MILLISECONDS, true, threadFactory, threadContext) diff --git a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java index d5b59ef3274ea..1e4c328e9b1ac 100644 --- a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java @@ -170,6 +170,8 @@ public long getFailureCount() { private volatile boolean blocked = false; + private volatile boolean failOnDeleteContainer = false; + public MockRepository( RepositoryMetadata metadata, Environment environment, @@ -352,6 +354,13 @@ public void setBlockOnceOnReadSnapshotInfoIfAlreadyBlocked() { blockOnceOnReadSnapshotInfo.set(true); } + /** + * Sets the fail-on-delete-container flag, which if {@code true} throws an exception when deleting a {@link BlobContainer}. + */ + public void setFailOnDeleteContainer(boolean failOnDeleteContainer) { + this.failOnDeleteContainer = failOnDeleteContainer; + } + public boolean blocked() { return blocked; } @@ -550,6 +559,9 @@ public InputStream readBlob(OperationPurpose purpose, String name, long position @Override public DeleteResult delete(OperationPurpose purpose) throws IOException { + if (failOnDeleteContainer) { + throw new IOException("simulated delete-container failure"); + } DeleteResult deleteResult = DeleteResult.ZERO; for (BlobContainer child : children(purpose).values()) { deleteResult = deleteResult.add(child.delete(purpose));