From 67867cec2273fc193df525c7023b5c75f823ec77 Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 5 Oct 2023 05:11:34 +0100 Subject: [PATCH 01/13] Parallelize stale index deletion After deleting a snapshot today we clean up all the now-dangling indices sequentially, which can be rather slow. With this commit we parallelize the work across the whole `SNAPSHOT` pool on the master node. Closes #61513 Co-authored-by: Piyush Daftary --- .../snapshots/RepositoriesIT.java | 56 +++++++ .../common/blobstore/DeleteResult.java | 8 + .../blobstore/BlobStoreRepository.java | 156 ++++++++---------- .../snapshots/mockstore/MockRepository.java | 12 ++ 4 files changed, 147 insertions(+), 85 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoriesIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoriesIT.java index ffa45e3136b51..3facfa6319766 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoriesIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoriesIT.java @@ -21,10 +21,12 @@ import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.RepositoryConflictException; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.RepositoryVerificationException; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.snapshots.mockstore.MockRepository; import org.elasticsearch.test.ESIntegTestCase; import java.nio.file.Path; @@ -295,4 +297,58 @@ public void testRepositoryConflict() throws Exception { logger.info("--> wait until snapshot deletion is finished"); assertAcked(future.actionGet()); } + + public void testLeakedStaleIndicesAreDeletedBySubsequentDelete() throws Exception { + Client client = client(); + Path repositoryPath = randomRepoPath(); + final String repositoryName = "test-repo"; + final String snapshot1Name = "test-snap-1"; + final String snapshot2Name = "test-snap-2"; + + logger.info("--> creating repository at {}", repositoryPath.toAbsolutePath()); + createRepository(repositoryName, "mock", repositoryPath); + + logger.info("--> creating index-1 and ingest data"); + createIndex("test-idx-1"); + ensureGreen(); + for (int j = 0; j < 10; j++) { + indexDoc("test-idx-1", Integer.toString(10 + j), "foo", "bar" + 10 + j); + } + refresh(); + + logger.info("--> creating first snapshot"); + createFullSnapshot(repositoryName, snapshot1Name); + + logger.info("--> creating index-2 and ingest data"); + createIndex("test-idx-2"); + ensureGreen(); + for (int j = 0; j < 10; j++) { + indexDoc("test-idx-2", Integer.toString(10 + j), "foo", "bar" + 10 + j); + } + refresh(); + + logger.info("--> creating second snapshot"); + createFullSnapshot(repositoryName, snapshot2Name); + + // Make repository throw exceptions when trying to delete stale indices + // This will make sure stale indices stay in repository after snapshot delete + final var repository = (MockRepository) internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class) + .repository(repositoryName); + repository.setFailOnDelete(true); + + logger.info("--> delete the second snapshot"); + client.admin().cluster().prepareDeleteSnapshot(repositoryName, snapshot2Name).get(); + + // Make repository work normally + repository.setFailOnDelete(false); + + // This snapshot should delete last snapshot's residual stale indices as well + logger.info("--> delete snapshot one"); + client.admin().cluster().prepareDeleteSnapshot(repositoryName, snapshot1Name).get(); + + logger.info("--> check no leftover files"); + assertFileCount(repositoryPath, 2); // just the index-N and index.latest blobs + + logger.info("--> done"); + } } diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/DeleteResult.java b/server/src/main/java/org/elasticsearch/common/blobstore/DeleteResult.java index c12685db77696..8630ed4cc768f 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/DeleteResult.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/DeleteResult.java @@ -38,4 +38,12 @@ public DeleteResult add(DeleteResult other) { public DeleteResult add(long blobs, long bytes) { return new DeleteResult(blobsDeleted + blobs, bytesDeleted + bytes); } + + public static DeleteResult of(long blobs, long bytes) { + if (blobs == 0 && bytes == 0) { + return ZERO; + } else { + return new DeleteResult(blobs, bytes); + } + } } diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 33e682354c9cc..fa699a76fadd7 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -72,6 +72,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.ListenableFuture; +import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner; import org.elasticsearch.common.xcontent.ChunkedToXContent; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.core.CheckedConsumer; @@ -390,6 +391,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp private final ShardSnapshotTaskRunner shardSnapshotTaskRunner; + private final ThrottledTaskRunner staleBlobDeleteRunner; + /** * Constructs new BlobStoreRepository * @param metadata The metadata for this repository including name and settings @@ -430,6 +433,11 @@ protected BlobStoreRepository( this::doSnapshotShard, this::snapshotFile ); + staleBlobDeleteRunner = new ThrottledTaskRunner( + "cleanupStaleBlobs", + threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), + threadPool.executor(ThreadPool.Names.SNAPSHOT) + ); } @Override @@ -1138,30 +1146,53 @@ private void cleanupStaleBlobs( RepositoryData newRepoData, ActionListener listener ) { - final GroupedActionListener groupedListener = new GroupedActionListener<>(2, ActionListener.wrap(deleteResults -> { - DeleteResult deleteResult = DeleteResult.ZERO; - for (DeleteResult result : deleteResults) { - deleteResult = deleteResult.add(result); + final var blobsDeleted = new AtomicLong(); + final var bytesDeleted = new AtomicLong(); + try (var listeners = new RefCountingListener(listener.map(ignored -> DeleteResult.of(blobsDeleted.get(), bytesDeleted.get())))) { + + final List staleRootBlobs = staleRootBlobs(newRepoData, rootBlobs.keySet()); + if (staleRootBlobs.isEmpty() == false) { + staleBlobDeleteRunner.enqueueTask(listeners.acquire(ref -> { + try (ref) { + logStaleRootLevelBlobs(newRepoData.getGenId() - 1, deletedSnapshots, staleRootBlobs); + deleteFromContainer(blobContainer(), staleRootBlobs.iterator()); + for (final var staleRootBlob : staleRootBlobs) { + bytesDeleted.addAndGet(rootBlobs.get(staleRootBlob).length()); + } + blobsDeleted.addAndGet(staleRootBlobs.size()); + } catch (Exception e) { + logger.warn( + () -> format( + "[%s] The following blobs are no longer part of any snapshot [%s] but failed to remove them", + metadata.name(), + staleRootBlobs + ), + e + ); + } + })); } - listener.onResponse(deleteResult); - }, listener::onFailure)); - - final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); - final List staleRootBlobs = staleRootBlobs(newRepoData, rootBlobs.keySet()); - if (staleRootBlobs.isEmpty()) { - groupedListener.onResponse(DeleteResult.ZERO); - } else { - executor.execute(ActionRunnable.supply(groupedListener, () -> { - List deletedBlobs = cleanupStaleRootFiles(newRepoData.getGenId() - 1, deletedSnapshots, staleRootBlobs); - return new DeleteResult(deletedBlobs.size(), deletedBlobs.stream().mapToLong(name -> rootBlobs.get(name).length()).sum()); - })); - } - final Set survivingIndexIds = newRepoData.getIndices().values().stream().map(IndexId::getId).collect(Collectors.toSet()); - if (foundIndices.keySet().equals(survivingIndexIds)) { - groupedListener.onResponse(DeleteResult.ZERO); - } else { - executor.execute(ActionRunnable.supply(groupedListener, () -> cleanupStaleIndices(foundIndices, survivingIndexIds))); + final var survivingIndexIds = newRepoData.getIndices().values().stream().map(IndexId::getId).collect(Collectors.toSet()); + for (final var indexEntry : foundIndices.entrySet()) { + final var indexSnId = indexEntry.getKey(); + if (survivingIndexIds.contains(indexSnId)) { + continue; + } + staleBlobDeleteRunner.enqueueTask(listeners.acquire(ref -> { + try (ref) { + logger.debug("[{}] Found stale index [{}]. Cleaning it up", metadata.name(), indexSnId); + final var deleteResult = indexEntry.getValue().delete(OperationPurpose.SNAPSHOT); + blobsDeleted.addAndGet(deleteResult.blobsDeleted()); + bytesDeleted.addAndGet(deleteResult.bytesDeleted()); + logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId); + } catch (IOException e) { + logger.warn(() -> format(""" + [%s] index %s is no longer part of any snapshot in the repository, \ + but failed to clean up its index folder""", metadata.name(), indexSnId), e); + } + })); + } } } @@ -1171,8 +1202,8 @@ private void cleanupStaleBlobs( * TODO: Add shard level cleanups * TODO: Add unreferenced index metadata cleanup *
    - *
  • Deleting stale indices {@link #cleanupStaleIndices}
  • - *
  • Deleting unreferenced root level blobs {@link #cleanupStaleRootFiles}
  • + *
  • Deleting stale indices
  • + *
  • Deleting unreferenced root level blobs
  • *
* @param repositoryStateId Current repository state id * @param repositoryMetaVersion version of the updated repository metadata to write @@ -1250,70 +1281,25 @@ private static List staleRootBlobs(RepositoryData repositoryData, Set cleanupStaleRootFiles( - long previousGeneration, - Collection deletedSnapshots, - List blobsToDelete - ) { - if (blobsToDelete.isEmpty()) { - return blobsToDelete; - } - try { - if (logger.isInfoEnabled()) { - // If we're running root level cleanup as part of a snapshot delete we should not log the snapshot- and global metadata - // blobs associated with the just deleted snapshots as they are expected to exist and not stale. Otherwise every snapshot - // delete would also log a confusing INFO message about "stale blobs". - final Set blobNamesToIgnore = deletedSnapshots.stream() - .flatMap( - snapshotId -> Stream.of( - GLOBAL_METADATA_FORMAT.blobName(snapshotId.getUUID()), - SNAPSHOT_FORMAT.blobName(snapshotId.getUUID()), - INDEX_FILE_PREFIX + previousGeneration - ) + private void logStaleRootLevelBlobs(long previousGeneration, Collection deletedSnapshots, List blobsToDelete) { + if (logger.isInfoEnabled()) { + // If we're running root level cleanup as part of a snapshot delete we should not log the snapshot- and global metadata + // blobs associated with the just deleted snapshots as they are expected to exist and not stale. Otherwise every snapshot + // delete would also log a confusing INFO message about "stale blobs". + final Set blobNamesToIgnore = deletedSnapshots.stream() + .flatMap( + snapshotId -> Stream.of( + GLOBAL_METADATA_FORMAT.blobName(snapshotId.getUUID()), + SNAPSHOT_FORMAT.blobName(snapshotId.getUUID()), + INDEX_FILE_PREFIX + previousGeneration ) - .collect(Collectors.toSet()); - final List blobsToLog = blobsToDelete.stream().filter(b -> blobNamesToIgnore.contains(b) == false).toList(); - if (blobsToLog.isEmpty() == false) { - logger.info("[{}] Found stale root level blobs {}. Cleaning them up", metadata.name(), blobsToLog); - } - } - deleteFromContainer(blobContainer(), blobsToDelete.iterator()); - return blobsToDelete; - } catch (Exception e) { - logger.warn( - () -> format( - "[%s] The following blobs are no longer part of any snapshot [%s] but failed to remove them", - metadata.name(), - blobsToDelete - ), - e - ); - } - return Collections.emptyList(); - } - - private DeleteResult cleanupStaleIndices(Map foundIndices, Set survivingIndexIds) { - DeleteResult deleteResult = DeleteResult.ZERO; - for (Map.Entry indexEntry : foundIndices.entrySet()) { - final String indexSnId = indexEntry.getKey(); - try { - if (survivingIndexIds.contains(indexSnId) == false) { - logger.debug("[{}] Found stale index [{}]. Cleaning it up", metadata.name(), indexSnId); - deleteResult = deleteResult.add(indexEntry.getValue().delete(OperationPurpose.SNAPSHOT)); - logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId); - } - } catch (Exception e) { - logger.warn( - () -> format( - "[%s] index %s is no longer part of any snapshot in the repository, " + "but failed to clean up its index folder", - metadata.name(), - indexSnId - ), - e - ); + ) + .collect(Collectors.toSet()); + final List blobsToLog = blobsToDelete.stream().filter(b -> blobNamesToIgnore.contains(b) == false).toList(); + if (blobsToLog.isEmpty() == false) { + logger.info("[{}] Found stale root level blobs {}. Cleaning them up", metadata.name(), blobsToLog); } } - return deleteResult; } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java index d5b59ef3274ea..182526ea0a176 100644 --- a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java @@ -170,6 +170,8 @@ public long getFailureCount() { private volatile boolean blocked = false; + private volatile boolean failOnDelete = false; + public MockRepository( RepositoryMetadata metadata, Environment environment, @@ -352,6 +354,13 @@ public void setBlockOnceOnReadSnapshotInfoIfAlreadyBlocked() { blockOnceOnReadSnapshotInfo.set(true); } + /** + * Sets the fail-on-delete flag, which if {@code true} throws an exception when deleting a blob. + */ + public void setFailOnDelete(boolean failOnDelete) { + this.failOnDelete = failOnDelete; + } + public boolean blocked() { return blocked; } @@ -550,6 +559,9 @@ public InputStream readBlob(OperationPurpose purpose, String name, long position @Override public DeleteResult delete(OperationPurpose purpose) throws IOException { + if (failOnDelete) { + throw new IOException("simulated delete failure"); + } DeleteResult deleteResult = DeleteResult.ZERO; for (BlobContainer child : children(purpose).values()) { deleteResult = deleteResult.add(child.delete(purpose)); From e13f1fb10d9de86d2cc8f617de912b5a38ec36a4 Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 5 Oct 2023 06:05:14 +0100 Subject: [PATCH 02/13] Update docs/changelog/100316.yaml --- docs/changelog/100316.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 docs/changelog/100316.yaml diff --git a/docs/changelog/100316.yaml b/docs/changelog/100316.yaml new file mode 100644 index 0000000000000..9efb64a332dc1 --- /dev/null +++ b/docs/changelog/100316.yaml @@ -0,0 +1,6 @@ +pr: 100316 +summary: Parallelize stale index deletion +area: Snapshot/Restore +type: enhancement +issues: + - 61513 From 462af510940aa5f985c4cc2871250d9186f703c0 Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 5 Oct 2023 06:50:28 +0100 Subject: [PATCH 03/13] Rename --- .../org/elasticsearch/snapshots/RepositoriesIT.java | 4 ++-- .../snapshots/mockstore/MockRepository.java | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoriesIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoriesIT.java index 3facfa6319766..13c30d6264ef2 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoriesIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoriesIT.java @@ -334,13 +334,13 @@ public void testLeakedStaleIndicesAreDeletedBySubsequentDelete() throws Exceptio // This will make sure stale indices stay in repository after snapshot delete final var repository = (MockRepository) internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class) .repository(repositoryName); - repository.setFailOnDelete(true); + repository.setFailOnDeleteContainer(true); logger.info("--> delete the second snapshot"); client.admin().cluster().prepareDeleteSnapshot(repositoryName, snapshot2Name).get(); // Make repository work normally - repository.setFailOnDelete(false); + repository.setFailOnDeleteContainer(false); // This snapshot should delete last snapshot's residual stale indices as well logger.info("--> delete snapshot one"); diff --git a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java index 182526ea0a176..1e4c328e9b1ac 100644 --- a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java @@ -170,7 +170,7 @@ public long getFailureCount() { private volatile boolean blocked = false; - private volatile boolean failOnDelete = false; + private volatile boolean failOnDeleteContainer = false; public MockRepository( RepositoryMetadata metadata, @@ -355,10 +355,10 @@ public void setBlockOnceOnReadSnapshotInfoIfAlreadyBlocked() { } /** - * Sets the fail-on-delete flag, which if {@code true} throws an exception when deleting a blob. + * Sets the fail-on-delete-container flag, which if {@code true} throws an exception when deleting a {@link BlobContainer}. */ - public void setFailOnDelete(boolean failOnDelete) { - this.failOnDelete = failOnDelete; + public void setFailOnDeleteContainer(boolean failOnDeleteContainer) { + this.failOnDeleteContainer = failOnDeleteContainer; } public boolean blocked() { @@ -559,8 +559,8 @@ public InputStream readBlob(OperationPurpose purpose, String name, long position @Override public DeleteResult delete(OperationPurpose purpose) throws IOException { - if (failOnDelete) { - throw new IOException("simulated delete failure"); + if (failOnDeleteContainer) { + throw new IOException("simulated delete-container failure"); } DeleteResult deleteResult = DeleteResult.ZERO; for (BlobContainer child : children(purpose).values()) { From 5062861783639f24c3c75db0ce14e893a028ab5e Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 5 Oct 2023 09:45:22 +0100 Subject: [PATCH 04/13] Introduce runSyncTasksEagerly() --- .../AbstractThrottledTaskRunner.java | 27 +++++++++++ .../AbstractThrottledTaskRunnerTests.java | 46 +++++++++++++++++++ 2 files changed, 73 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractThrottledTaskRunner.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractThrottledTaskRunner.java index ea37dad5ba218..5995955f942c4 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractThrottledTaskRunner.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractThrottledTaskRunner.java @@ -17,6 +17,7 @@ import java.util.Queue; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; /** @@ -155,4 +156,30 @@ int runningTasks() { return runningTasks.get(); } + /** + * Eagerly pull tasks from the queue and execute them on this thread. This must only be used if the tasks in the queue are all + * synchronous, i.e. they release their ref before returning from {@code onResponse()}. + */ + public void runSyncTasksEagerly() { + final AtomicBoolean isDone = new AtomicBoolean(true); + final Releasable ref = () -> isDone.set(true); + ActionListener task; + while ((task = tasks.poll()) != null) { + isDone.set(false); + try { + logger.trace("[{}] eagerly running task {}", taskRunnerName, task); + task.onResponse(ref); + } catch (Exception e) { + logger.error(Strings.format("[%s] task %s failed", taskRunnerName, task), e); + assert false : e; + task.onFailure(e); + return; + } + if (isDone.get() == false) { + logger.error("runSyncTasksEagerly() was called on a queue [{}] containing an async task: [{}]", taskRunnerName, task); + assert false; + return; + } + } + } } diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractThrottledTaskRunnerTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractThrottledTaskRunnerTests.java index 8134b39672cc3..d943a236a9297 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractThrottledTaskRunnerTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractThrottledTaskRunnerTests.java @@ -143,6 +143,52 @@ public void onResponse(Releasable releasable) { assertNoRunningTasks(taskRunner); } + public void testRunSyncTasksEagerly() { + final int maxTasks = randomIntBetween(1, maxThreads); + final int taskCount = between(maxTasks, maxTasks * 2); + final var barrier = new CyclicBarrier(maxTasks + 1); + final var executedCountDown = new CountDownLatch(taskCount); + final var testThread = Thread.currentThread(); + + class TestTask implements ActionListener { + + @Override + public void onFailure(Exception e) { + throw new AssertionError(e); + } + + @Override + public void onResponse(Releasable releasable) { + try (releasable) { + if (Thread.currentThread() != testThread) { + safeAwait(barrier); + safeAwait(barrier); + } + } finally { + executedCountDown.countDown(); + } + } + } + + final BlockingQueue queue = ConcurrentCollections.newBlockingQueue(); + final AbstractThrottledTaskRunner taskRunner = new AbstractThrottledTaskRunner<>("test", maxTasks, executor, queue); + for (int i = 0; i < taskCount; i++) { + taskRunner.enqueueTask(new TestTask()); + } + + safeAwait(barrier); + assertEquals(taskCount - maxTasks, queue.size()); + assertThat(taskRunner.runningTasks(), equalTo(maxTasks)); + + taskRunner.runSyncTasksEagerly(); + assertEquals(0, queue.size()); + + safeAwait(barrier); + safeAwait(executedCountDown); + assertTrue(queue.isEmpty()); + assertNoRunningTasks(taskRunner); + } + public void testFailsTasksOnRejectionOrShutdown() throws Exception { final var executor = randomBoolean() ? EsExecutors.newScaling("test", maxThreads, maxThreads, 0, TimeUnit.MILLISECONDS, true, threadFactory, threadContext) From 07f759b5b6a8b508c579aa80d2f791776824ba3d Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 5 Oct 2023 09:45:44 +0100 Subject: [PATCH 05/13] Reinstate backpressure --- .../blobstore/BlobStoreRepository.java | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index fa699a76fadd7..465c061d13298 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -1194,6 +1194,28 @@ private void cleanupStaleBlobs( })); } } + + // If we did the cleanup of stale indices purely using a throttled executor then there would be no backpressure to prevent us from + // falling arbitrarily far behind. But nor do we want to dedicate all the SNAPSHOT threads to stale index cleanups because that + // would slow down other snapshot operations in situations that do not need backpressure. + // + // The solution is to dedicate one SNAPSHOT thread to doing the cleanups eagerly, alongside the throttled executor which spreads + // the rest of the work across the other threads if they are free. If the eager cleanup loop doesn't finish before the next one + // starts then we dedicate another SNAPSHOT thread to the deletions, and so on, until eventually either we catch up or the SNAPSHOT + // pool is fully occupied with blob deletions, which pushes back on other snapshot operations. + + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() { + @Override + protected void doRun() { + staleBlobDeleteRunner.runSyncTasksEagerly(); + } + + @Override + public void onFailure(Exception e) { + logger.error("unexpected failure while processing deletes on dedicated snapshot thread", e); + assert false : e; + } + }); } /** From f607f48a297c337ec785fbaebf8b5443e9f2e9d7 Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 5 Oct 2023 12:44:10 +0100 Subject: [PATCH 06/13] Rejections on shutdown are legit --- .../repositories/blobstore/BlobStoreRepository.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 465c061d13298..28c70c243b006 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -70,6 +70,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.ListenableFuture; import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner; @@ -1215,6 +1216,14 @@ public void onFailure(Exception e) { logger.error("unexpected failure while processing deletes on dedicated snapshot thread", e); assert false : e; } + + @Override + public void onRejection(Exception e) { + if (e instanceof EsRejectedExecutionException esre && esre.isExecutorShutdown()) { + return; + } + super.onRejection(e); + } }); } From 2be9141569dc416964f8b1cd79c107c981628952 Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 5 Oct 2023 14:31:08 +0100 Subject: [PATCH 07/13] Add Executor parameter to runSyncTasksEagerly --- .../AbstractThrottledTaskRunner.java | 64 +++++++++++++------ .../blobstore/BlobStoreRepository.java | 22 +------ .../AbstractThrottledTaskRunnerTests.java | 12 ++-- 3 files changed, 53 insertions(+), 45 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractThrottledTaskRunner.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractThrottledTaskRunner.java index 5995955f942c4..898f73abbd1e9 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractThrottledTaskRunner.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractThrottledTaskRunner.java @@ -157,29 +157,53 @@ int runningTasks() { } /** - * Eagerly pull tasks from the queue and execute them on this thread. This must only be used if the tasks in the queue are all - * synchronous, i.e. they release their ref before returning from {@code onResponse()}. + * Run a single task on the given executor which eagerly pulls tasks from the queue and executes them. This must only be used if the + * tasks in the queue are all synchronous, i.e. they release their ref before returning from {@code onResponse()}. */ - public void runSyncTasksEagerly() { - final AtomicBoolean isDone = new AtomicBoolean(true); - final Releasable ref = () -> isDone.set(true); - ActionListener task; - while ((task = tasks.poll()) != null) { - isDone.set(false); - try { - logger.trace("[{}] eagerly running task {}", taskRunnerName, task); - task.onResponse(ref); - } catch (Exception e) { - logger.error(Strings.format("[%s] task %s failed", taskRunnerName, task), e); + public void runSyncTasksEagerly(Executor executor) { + executor.execute(new AbstractRunnable() { + @Override + protected void doRun() { + final AtomicBoolean isDone = new AtomicBoolean(true); + final Releasable ref = () -> isDone.set(true); + ActionListener task; + while ((task = tasks.poll()) != null) { + isDone.set(false); + try { + logger.trace("[{}] eagerly running task {}", taskRunnerName, task); + task.onResponse(ref); + } catch (Exception e) { + logger.error(Strings.format("[%s] task %s failed", taskRunnerName, task), e); + assert false : e; + task.onFailure(e); + return; + } + if (isDone.get() == false) { + logger.error( + "runSyncTasksEagerly() was called on a queue [{}] containing an async task: [{}]", + taskRunnerName, + task + ); + assert false; + return; + } + } + } + + @Override + public void onFailure(Exception e) { + logger.error("unexpected failure in runSyncTasksEagerly", e); assert false : e; - task.onFailure(e); - return; } - if (isDone.get() == false) { - logger.error("runSyncTasksEagerly() was called on a queue [{}] containing an async task: [{}]", taskRunnerName, task); - assert false; - return; + + @Override + public void onRejection(Exception e) { + if (e instanceof EsRejectedExecutionException) { + logger.debug("runSyncTasksEagerly was rejected", e); + } else { + onFailure(e); + } } - } + }); } } diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 28c70c243b006..676a6a6126d02 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -70,7 +70,6 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.ListenableFuture; import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner; @@ -1205,26 +1204,7 @@ private void cleanupStaleBlobs( // starts then we dedicate another SNAPSHOT thread to the deletions, and so on, until eventually either we catch up or the SNAPSHOT // pool is fully occupied with blob deletions, which pushes back on other snapshot operations. - threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() { - @Override - protected void doRun() { - staleBlobDeleteRunner.runSyncTasksEagerly(); - } - - @Override - public void onFailure(Exception e) { - logger.error("unexpected failure while processing deletes on dedicated snapshot thread", e); - assert false : e; - } - - @Override - public void onRejection(Exception e) { - if (e instanceof EsRejectedExecutionException esre && esre.isExecutorShutdown()) { - return; - } - super.onRejection(e); - } - }); + staleBlobDeleteRunner.runSyncTasksEagerly(threadPool.executor(ThreadPool.Names.SNAPSHOT)); } /** diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractThrottledTaskRunnerTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractThrottledTaskRunnerTests.java index d7e9e5ed08657..b7524b0ad215e 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractThrottledTaskRunnerTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractThrottledTaskRunnerTests.java @@ -21,6 +21,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.lessThanOrEqualTo; @@ -175,11 +176,14 @@ public void onResponse(Releasable releasable) { } safeAwait(barrier); - assertEquals(taskCount - maxTasks, queue.size()); - assertThat(taskRunner.runningTasks(), equalTo(maxTasks)); + assertThat(taskRunner.runningTasks(), equalTo(maxTasks)); // maxTasks tasks are running now + assertEquals(taskCount - maxTasks, queue.size()); // the remainder are enqueued - taskRunner.runSyncTasksEagerly(); - assertEquals(0, queue.size()); + final var capturedTask = new AtomicReference(); + taskRunner.runSyncTasksEagerly(t -> assertTrue(capturedTask.compareAndSet(null, t))); + assertEquals(taskCount - maxTasks, queue.size()); // hasn't run any tasks yet + capturedTask.get().run(); + assertTrue(queue.isEmpty()); safeAwait(barrier); safeAwait(executedCountDown); From a804b22ea845bafc2215d9ed5dc1a463081a02eb Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 5 Oct 2023 14:46:50 +0100 Subject: [PATCH 08/13] Comment about assertNoRunningTasks --- .../util/concurrent/AbstractThrottledTaskRunnerTests.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractThrottledTaskRunnerTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractThrottledTaskRunnerTests.java index b7524b0ad215e..8fef292f4347f 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractThrottledTaskRunnerTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractThrottledTaskRunnerTests.java @@ -248,6 +248,10 @@ public void onResponse(Releasable releasable) { } private void assertNoRunningTasks(AbstractThrottledTaskRunner taskRunner) { + // We decrement the running task count after the end of each task, although on the thread that ran the task, so there is no + // guarantee that it has reached zero yet. First we must flush the executor by running another task on every one of its threads + // which ensures that those threads have all finished doing AbstractThrottledTaskRunner-related work and in particular that the + // running tasks count is now accurate. final var barrier = new CyclicBarrier(maxThreads + 1); for (int i = 0; i < maxThreads; i++) { executor.execute(() -> safeAwait(barrier)); From 97ed99e6474b809454c7ad6bcf854d2e86e5ae30 Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 5 Oct 2023 16:30:27 +0100 Subject: [PATCH 09/13] Comment is not necessary --- .../util/concurrent/AbstractThrottledTaskRunnerTests.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractThrottledTaskRunnerTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractThrottledTaskRunnerTests.java index 8fef292f4347f..b7524b0ad215e 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractThrottledTaskRunnerTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractThrottledTaskRunnerTests.java @@ -248,10 +248,6 @@ public void onResponse(Releasable releasable) { } private void assertNoRunningTasks(AbstractThrottledTaskRunner taskRunner) { - // We decrement the running task count after the end of each task, although on the thread that ran the task, so there is no - // guarantee that it has reached zero yet. First we must flush the executor by running another task on every one of its threads - // which ensures that those threads have all finished doing AbstractThrottledTaskRunner-related work and in particular that the - // running tasks count is now accurate. final var barrier = new CyclicBarrier(maxThreads + 1); for (int i = 0; i < maxThreads; i++) { executor.execute(() -> safeAwait(barrier)); From b56455c12cc845958808978b6627d2778558037e Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 5 Oct 2023 16:46:24 +0100 Subject: [PATCH 10/13] Add testCleanupStaleBlobsConcurrency --- .../snapshots/RepositoriesIT.java | 129 ++++++++++++++++++ 1 file changed, 129 insertions(+) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoriesIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoriesIT.java index 13c30d6264ef2..4a52c0183b92d 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoriesIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoriesIT.java @@ -13,14 +13,22 @@ import org.elasticsearch.action.admin.cluster.repositories.verify.VerifyRepositoryResponse; import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotAction; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.ClusterStateTaskListener; +import org.elasticsearch.cluster.SimpleBatchedExecutor; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.RepositoriesMetadata; import org.elasticsearch.cluster.metadata.RepositoryMetadata; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Priority; import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.core.Tuple; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.RepositoryConflictException; import org.elasticsearch.repositories.RepositoryException; @@ -28,9 +36,14 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.snapshots.mockstore.MockRepository; import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.threadpool.ThreadPool; import java.nio.file.Path; import java.util.List; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.ToLongFunction; import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.READONLY_SETTING_KEY; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; @@ -351,4 +364,120 @@ public void testLeakedStaleIndicesAreDeletedBySubsequentDelete() throws Exceptio logger.info("--> done"); } + + public void testCleanupStaleBlobsConcurrency() throws Exception { + // This test is verifying the detailed behaviour of cleanup tasks that are enqueued after a snapshot delete is committed to the + // repository, ensuring that we see exactly the right number of tasks enqueued at each stage to demonstrate that we do use all the + // threads available to us, but don't spam the threadpool queue with all the tasks at once, and that we submit one task that drains + // the queue eagerly to provide backpressure. That means this test is sensitive to changes in the breakdown of the cleanup work + // after a snapshot delete. + + final var client = client(); + final var repositoryPath = randomRepoPath(); + final var repositoryName = "test-repo"; + createRepository(repositoryName, "mock", repositoryPath); + + final var threadPool = internalCluster().getCurrentMasterNodeInstance(ThreadPool.class); + final var snapshotPoolSize = threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(); + final var indexCount = snapshotPoolSize * 3; + + for (int i = 0; i < indexCount; i++) { + createIndex("test-idx-" + i); + for (int j = 0; j < 10; j++) { + indexDoc("test-idx-" + i, Integer.toString(10 + j), "foo", "bar" + 10 + j); + } + } + + ensureGreen(); + + final var snapshotName = "test-snap"; + createFullSnapshot(repositoryName, snapshotName); + + final var executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); + final var barrier = new CyclicBarrier(snapshotPoolSize + 1); + final var keepBlocking = new AtomicBoolean(true); + final var clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class); + final ToLongFunction repoGenFn = s -> RepositoriesMetadata.get(s).repository(repositoryName).generation(); + final var repositoryGenerationBeforeDelete = repoGenFn.applyAsLong(clusterService.state()); + final ClusterStateListener clusterStateListener = event -> { + if (repoGenFn.applyAsLong(event.previousState()) == repositoryGenerationBeforeDelete + && repoGenFn.applyAsLong(event.state()) > repositoryGenerationBeforeDelete) { + + for (int i = 0; i < snapshotPoolSize - 1; i++) { + executor.execute(() -> { + while (keepBlocking.get()) { + safeAwait(barrier); + safeAwait(barrier); + } + }); + } + + new Runnable() { + @Override + public void run() { + executor.execute(() -> { + safeAwait(barrier); + safeAwait(barrier); + if (keepBlocking.get()) { + this.run(); + } + }); + } + }.run(); + } + }; + clusterService.addListener(clusterStateListener); + + final var deleteFuture = new PlainActionFuture(); + client.admin().cluster().prepareDeleteSnapshot(repositoryName, snapshotName).execute(deleteFuture); + + safeAwait(barrier); // wait for all the snapshot threads to be blocked + clusterService.removeListener(clusterStateListener); + + // flush the cluster applier thread by running another task, so we can be sure that all the snapshot delete tasks are enqueued + PlainActionFuture.get(fut -> clusterService.createTaskQueue("test", Priority.NORMAL, new SimpleBatchedExecutor<>() { + @Override + public Tuple executeTask(ClusterStateTaskListener clusterStateTaskListener, ClusterState clusterState) { + return Tuple.tuple(clusterState, null); + } + + @Override + public void taskSucceeded(ClusterStateTaskListener clusterStateTaskListener, Object ignored) { + fut.onResponse(null); + } + }).submitTask("test", e -> fail(), null), 10, TimeUnit.SECONDS); + + // Throttled runner enqueued one task per worker, and the eager runner added another one + assertThat( + threadPool.stats().stats().stream().filter(s -> s.name().equals(ThreadPool.Names.SNAPSHOT)).findFirst().orElseThrow().queue(), + equalTo(snapshotPoolSize + 1) + ); + + safeAwait(barrier); // unblock the barrier thread and let it process the queue + safeAwait(barrier); // wait for the queue to be processed + + // Each task completed by the throttled runner will have enqueued another one, but the eager runner drained the rest of the queue + assertThat( + threadPool.stats().stats().stream().filter(s -> s.name().equals(ThreadPool.Names.SNAPSHOT)).findFirst().orElseThrow().queue(), + equalTo(snapshotPoolSize) + ); + + safeAwait(barrier); // unblock the barrier thread and let it process the queue + safeAwait(barrier); // wait for the queue to be processed + + // There was no more work to process + assertThat( + threadPool.stats().stats().stream().filter(s -> s.name().equals(ThreadPool.Names.SNAPSHOT)).findFirst().orElseThrow().queue(), + equalTo(0) + ); + + try { + assertFileCount(repositoryPath, 2); // just the index-N and index.latest blobs + } finally { + keepBlocking.set(false); + safeAwait(barrier); + } + + assertTrue(deleteFuture.get(10, TimeUnit.SECONDS).isAcknowledged()); + } } From 488e4bba259524013074f9c0a55426ae40e00928 Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 5 Oct 2023 16:53:30 +0100 Subject: [PATCH 11/13] Minor test cleanup --- .../snapshots/RepositoriesIT.java | 33 +++++++++---------- 1 file changed, 15 insertions(+), 18 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoriesIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoriesIT.java index 4a52c0183b92d..59d788aa44d7b 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoriesIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoriesIT.java @@ -43,6 +43,7 @@ import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.LongSupplier; import java.util.function.ToLongFunction; import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.READONLY_SETTING_KEY; @@ -447,37 +448,33 @@ public void taskSucceeded(ClusterStateTaskListener clusterStateTaskListener, Obj } }).submitTask("test", e -> fail(), null), 10, TimeUnit.SECONDS); + final LongSupplier queueLength = () -> threadPool.stats() + .stats() + .stream() + .filter(s -> s.name().equals(ThreadPool.Names.SNAPSHOT)) + .findFirst() + .orElseThrow() + .queue(); + // Throttled runner enqueued one task per worker, and the eager runner added another one - assertThat( - threadPool.stats().stats().stream().filter(s -> s.name().equals(ThreadPool.Names.SNAPSHOT)).findFirst().orElseThrow().queue(), - equalTo(snapshotPoolSize + 1) - ); + assertThat(queueLength.getAsLong(), equalTo(snapshotPoolSize + 1)); safeAwait(barrier); // unblock the barrier thread and let it process the queue safeAwait(barrier); // wait for the queue to be processed // Each task completed by the throttled runner will have enqueued another one, but the eager runner drained the rest of the queue - assertThat( - threadPool.stats().stats().stream().filter(s -> s.name().equals(ThreadPool.Names.SNAPSHOT)).findFirst().orElseThrow().queue(), - equalTo(snapshotPoolSize) - ); + assertThat(queueLength.getAsLong(), equalTo(snapshotPoolSize)); safeAwait(barrier); // unblock the barrier thread and let it process the queue safeAwait(barrier); // wait for the queue to be processed // There was no more work to process - assertThat( - threadPool.stats().stats().stream().filter(s -> s.name().equals(ThreadPool.Names.SNAPSHOT)).findFirst().orElseThrow().queue(), - equalTo(0) - ); + assertThat(queueLength.getAsLong(), equalTo(0)); - try { - assertFileCount(repositoryPath, 2); // just the index-N and index.latest blobs - } finally { - keepBlocking.set(false); - safeAwait(barrier); - } + assertFileCount(repositoryPath, 2); // just the index-N and index.latest blobs + keepBlocking.set(false); + safeAwait(barrier); // release the threads so they can exit assertTrue(deleteFuture.get(10, TimeUnit.SECONDS).isAcknowledged()); } } From bdff6e2e7b9c4ed229f11c42272e8212fbe2cded Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 5 Oct 2023 16:56:36 +0100 Subject: [PATCH 12/13] Numbers is numbers --- .../org/elasticsearch/snapshots/RepositoriesIT.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoriesIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoriesIT.java index 59d788aa44d7b..d2da27c59c3c7 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoriesIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoriesIT.java @@ -43,7 +43,7 @@ import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.LongSupplier; +import java.util.function.IntSupplier; import java.util.function.ToLongFunction; import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.READONLY_SETTING_KEY; @@ -448,7 +448,7 @@ public void taskSucceeded(ClusterStateTaskListener clusterStateTaskListener, Obj } }).submitTask("test", e -> fail(), null), 10, TimeUnit.SECONDS); - final LongSupplier queueLength = () -> threadPool.stats() + final IntSupplier queueLength = () -> threadPool.stats() .stats() .stream() .filter(s -> s.name().equals(ThreadPool.Names.SNAPSHOT)) @@ -457,19 +457,19 @@ public void taskSucceeded(ClusterStateTaskListener clusterStateTaskListener, Obj .queue(); // Throttled runner enqueued one task per worker, and the eager runner added another one - assertThat(queueLength.getAsLong(), equalTo(snapshotPoolSize + 1)); + assertThat(queueLength.getAsInt(), equalTo(snapshotPoolSize + 1)); safeAwait(barrier); // unblock the barrier thread and let it process the queue safeAwait(barrier); // wait for the queue to be processed // Each task completed by the throttled runner will have enqueued another one, but the eager runner drained the rest of the queue - assertThat(queueLength.getAsLong(), equalTo(snapshotPoolSize)); + assertThat(queueLength.getAsInt(), equalTo(snapshotPoolSize)); safeAwait(barrier); // unblock the barrier thread and let it process the queue safeAwait(barrier); // wait for the queue to be processed // There was no more work to process - assertThat(queueLength.getAsLong(), equalTo(0)); + assertThat(queueLength.getAsInt(), equalTo(0)); assertFileCount(repositoryPath, 2); // just the index-N and index.latest blobs From 4592ad16117ba65e87206d0293be0ead549c85d8 Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 5 Oct 2023 19:26:31 +0100 Subject: [PATCH 13/13] More commentary --- .../snapshots/RepositoriesIT.java | 21 +++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoriesIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoriesIT.java index d2da27c59c3c7..5f4c270f69348 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoriesIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoriesIT.java @@ -403,7 +403,11 @@ public void testCleanupStaleBlobsConcurrency() throws Exception { final ClusterStateListener clusterStateListener = event -> { if (repoGenFn.applyAsLong(event.previousState()) == repositoryGenerationBeforeDelete && repoGenFn.applyAsLong(event.state()) > repositoryGenerationBeforeDelete) { + // We are updating the safe repository generation which indicates that the snapshot delete is complete. Once this cluster + // state update completes we will enqueue all the cleanup work on the SNAPSHOT pool. So here we prepare for that by blocking + // all the SNAPSHOT threads: + // All but one of the threads just repeatedly block on the barrier without picking up any new tasks for (int i = 0; i < snapshotPoolSize - 1; i++) { executor.execute(() -> { while (keepBlocking.get()) { @@ -413,6 +417,9 @@ public void testCleanupStaleBlobsConcurrency() throws Exception { }); } + // The last thread runs a task which blocks on the barrier and then enqueues itself again, at the back of the queue, + // so that this thread will run everything _currently_ in the queue each time the barrier is released, in the order in which + // it was enqueued, and will then block on the barrier again. new Runnable() { @Override public void run() { @@ -435,7 +442,9 @@ public void run() { safeAwait(barrier); // wait for all the snapshot threads to be blocked clusterService.removeListener(clusterStateListener); - // flush the cluster applier thread by running another task, so we can be sure that all the snapshot delete tasks are enqueued + // We must wait for all the cleanup work to be enqueued (with the throttled runner at least) so we can be sure of exactly how it + // will execute. The cleanup work is enqueued by the master service thread on completion of the cluster state update which increases + // the root blob generation in the repo metadata, so it is sufficient to wait for another no-op task to run on the master service: PlainActionFuture.get(fut -> clusterService.createTaskQueue("test", Priority.NORMAL, new SimpleBatchedExecutor<>() { @Override public Tuple executeTask(ClusterStateTaskListener clusterStateTaskListener, ClusterState clusterState) { @@ -456,19 +465,23 @@ public void taskSucceeded(ClusterStateTaskListener clusterStateTaskListener, Obj .orElseThrow() .queue(); - // Throttled runner enqueued one task per worker, and the eager runner added another one + // There are indexCount (=3*snapshotPoolSize) index-deletion tasks, plus one for cleaning up the root metadata. However, the + // throttled runner only enqueues one task per SNAPSHOT thread to start with, and then the eager runner adds another one. This shows + // we are not spamming the threadpool with all the tasks at once, which means that other snapshot activities can run alongside this + // cleanup. assertThat(queueLength.getAsInt(), equalTo(snapshotPoolSize + 1)); safeAwait(barrier); // unblock the barrier thread and let it process the queue safeAwait(barrier); // wait for the queue to be processed - // Each task completed by the throttled runner will have enqueued another one, but the eager runner drained the rest of the queue + // We first ran all the one-task actions, each of which completes and puts another one-task action into the queue. Then the eager + // runner runs all the remaining tasks. assertThat(queueLength.getAsInt(), equalTo(snapshotPoolSize)); safeAwait(barrier); // unblock the barrier thread and let it process the queue safeAwait(barrier); // wait for the queue to be processed - // There was no more work to process + // Since the eager runner already ran all the remaining tasks, when the enqueued actions run they add no more work to the queue. assertThat(queueLength.getAsInt(), equalTo(0)); assertFileCount(repositoryPath, 2); // just the index-N and index.latest blobs