-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Parallelize stale index deletion #100316
Parallelize stale index deletion #100316
Conversation
After deleting a snapshot today we clean up all the now-dangling indices sequentially, which can be rather slow. With this commit we parallelize the work across the whole `SNAPSHOT` pool on the master node. Closes elastic#61513 Co-authored-by: Piyush Daftary <[email protected]>
Pinging @elastic/es-distributed (Team:Distributed) |
Hi @DaveCTurner, I've created a changelog YAML for you. |
@elasticmachine please run elasticsearch-ci/part-1 |
I'm slightly concerned about the lack of backpressure in this area, we could in theory end up with an ever-increasing pile of delete work in the queue; previously that would eventually have blocked all the snapshot threads but with this change even that doesn't happen any more. I've raised this point for discussion with the team. |
One possible solution would be to preserve today's behaviour of always blocking at least one diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
index fa699a76fadd..f7f9ed22efe4 100644
--- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
+++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
@@ -147,6 +147,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
@@ -1194,6 +1195,28 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
}));
}
}
+
+ // dedicate a single SNAPSHOT thread for this work, so that if we fall too far behind with deletes then eventually we stop taking
+ // snapshots too
+ threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() {
+ @Override
+ protected void doRun() {
+ final AtomicBoolean isDone = new AtomicBoolean(true);
+ final Releasable ref = () -> isDone.set(true);
+ ActionListener<Releasable> nextTask;
+ while ((nextTask = staleBlobDeleteRunner.takeNextTask()) != null) {
+ isDone.set(false);
+ nextTask.onResponse(ref);
+ assert isDone.get();
+ }
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ logger.error("unexpected failure while processing deletes on dedicated snapshot thread", e);
+ assert false : e;
+ }
+ });
}
/** |
@elasticmachine please run elasticsearch-ci/part-2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR has suprisingly rich context and interesting technical details. Thanks for the chance to review. My comments are mostly for my educational purpose. Thanks!
final var survivingIndexIds = newRepoData.getIndices().values().stream().map(IndexId::getId).collect(Collectors.toSet()); | ||
for (final var indexEntry : foundIndices.entrySet()) { | ||
final var indexSnId = indexEntry.getKey(); | ||
if (survivingIndexIds.contains(indexSnId)) { | ||
continue; | ||
} | ||
staleBlobDeleteRunner.enqueueTask(listeners.acquire(ref -> { | ||
try (ref) { | ||
logger.debug("[{}] Found stale index [{}]. Cleaning it up", metadata.name(), indexSnId); | ||
final var deleteResult = indexEntry.getValue().delete(OperationPurpose.SNAPSHOT); | ||
blobsDeleted.addAndGet(deleteResult.blobsDeleted()); | ||
bytesDeleted.addAndGet(deleteResult.bytesDeleted()); | ||
logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId); | ||
} catch (IOException e) { | ||
logger.warn(() -> format(""" | ||
[%s] index %s is no longer part of any snapshot in the repository, \ | ||
but failed to clean up its index folder""", metadata.name(), indexSnId), e); | ||
} | ||
})); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to keep this and above in their own separate methods as how they are now? Fewer nesting levels could be helpful?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I inlined these because we ended up having to pass really quite a lot of parameters in, and it wasn't really even a coherent set of parameters so much as just "the things needed to run this method". It's still less than one screenful (on my screen anyway) and nicely exposes the execution pattern, so tbh I prefer it as it is now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is actually kind of a code smell throughout this class (and the snapshot codebase more generally). I have been working on a refactoring that should help simplify things in this area and will open a followup in the next few days.
server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoriesIT.java
Show resolved
Hide resolved
@@ -295,4 +297,58 @@ public void testRepositoryConflict() throws Exception { | |||
logger.info("--> wait until snapshot deletion is finished"); | |||
assertAcked(future.actionGet()); | |||
} | |||
|
|||
public void testLeakedStaleIndicesAreDeletedBySubsequentDelete() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a cool test and I learnt a few tips and tricks from it. But it does not test the new parallelization change. Do we care?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test is largely from the original contributor, but I think it's a reasonable test to write. I've played around with a few ideas for testing the new threading more precisely but it seems pretty tricky, and kinda doesn't matter so much as long as we do actually do the work somehow. I think I had an idea for a test tho.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok my idea seems to work, see testCleanupStaleBlobsConcurrency
added in b56455c (sorry for the force-push)
if (isDone.get() == false) { | ||
logger.error("runSyncTasksEagerly() was called on a queue [{}] containing an async task: [{}]", taskRunnerName, task); | ||
assert false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a best effort, right? A mis-behaving task can still fork but release the reference?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes indeed but that doesn't matter to us really. The ref
is just so that the AbstractThrottledTaskRunner
can track the relevant activities to completion. If we did fork an untracked task then presumably we're tracking it somewhere else.
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() { | ||
@Override | ||
protected void doRun() { | ||
staleBlobDeleteRunner.runSyncTasksEagerly(); | ||
} | ||
|
||
@Override | ||
public void onFailure(Exception e) { | ||
logger.error("unexpected failure while processing deletes on dedicated snapshot thread", e); | ||
assert false : e; | ||
} | ||
|
||
@Override | ||
public void onRejection(Exception e) { | ||
if (e instanceof EsRejectedExecutionException esre && esre.isExecutorShutdown()) { | ||
return; | ||
} | ||
super.onRejection(e); | ||
} | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have this managed inside the task runner? It would be helpful for reuse.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that's a good point. With only one caller it's hard to know where to put the abstraction boundary, but I can see value in allowing callers to pass in an Executor
. They can always use EsExecutors.DIRECT_EXECUTOR_SERVICE
if they really don't want to fork.
assertTrue(queue.isEmpty()); | ||
assertNoRunningTasks(taskRunner); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For my learning:
- Why do we need assert the queue is empty here? We already asserted it has 0 size four lines above. I don't see how the queue size can increase after all tasks are enqueued?
- In the existing
assertNoRunningTasks
, why is it necessary to spawn a batch ofRunnable
s before verifyrunningTasks
size is 0? We already verified that thequeue
is empty. It seems to me that just verifyrunningTasks
is sufficient? Also running the extraRunnable
does not guarantee that they actually excercise every thread in the pool. So not sure the purpose here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really we're just doing the same as the other tests here, verifying that the task runner is completely finished at the end of the test.
In assertNoRunningTasks()
we need to wait for all the tasks spawned by the AbstractThrottledTaskRunner
to completely finish before we can be sure the running task count is zero. That's because we call runningTasks.decrementAndGet()
after completing each task, so when e.g. executedCountDown
completes the count will not have reached zero yet. In order to make sure that the thread pool is completely free from all its current work, we enqueue N barrier tasks, which must therefore all be waiting on the barrier at the same time.
Put differently, if you remove that and run the tests in a loop then you should see them fail occasionally.
I'll add a comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for dragging on this point. But I think this threads flushing behaviour is not necessary for this test because the releasable
is used in a try-with-resource block and should be released before executedCountDown
finishes. This behaviour is needed for testEnqueueSpawnsNewTasksUpToMax
because it explicitly closes the releasable after the countDown. If I flip the order, it also runs successfully 10K times without the thread flushing logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah hm I see what you mean now. Yes I think you're right we could simplify this if we made sure to release all the task refs before allowing that test to proceed.
0aef608
to
b56455c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
The new concurrency test is pretty awesome 👍
* Run a single task on the given executor which eagerly pulls tasks from the queue and executes them. This must only be used if the | ||
* tasks in the queue are all synchronous, i.e. they release their ref before returning from {@code onResponse()}. | ||
*/ | ||
public void runSyncTasksEagerly(Executor executor) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not have an overload version of runSyncTasksEagerly()
that just use the TaskRunner's own executor for the eager run as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that could work too. I'm going to wait and see on that idea tho, I'd rather do one or the other, and the choice depends on whether we have other users that want to do something else or whether everyone just forks another task on AbstractThrottledTaskRunner#executor
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
Thanks both! |
After deleting a snapshot today we clean up all the now-dangling indices
sequentially, which can be rather slow. With this commit we parallelize
the work across the whole
SNAPSHOT
pool on the master node.Closes #61513
Co-authored-by: Piyush Daftary [email protected]