-
Notifications
You must be signed in to change notification settings - Fork 24.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Decouple BulkProcessor from ThreadPool #26727
Decouple BulkProcessor from ThreadPool #26727
Conversation
/** | ||
* Scheduler that allows to schedule one-shot and periodic commands. | ||
*/ | ||
public class Scheduler extends AbstractComponent implements Closeable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking of an interface that is part of the bulk processor like:
interface Scheduler {
ScheduledFuture<?> schedule(TimeValue delay, String executor, Runnable command);
Cancellable scheduleWithFixedDelay(Runnable command, TimeValue interval, String executor);
void shutdown();
}
That way core doesn't depend on bulk processor stuff, bulk processor stuff depends on core. Or at least can be adapted into core.
And when we want to work with a ThreadPool we could implement it like:
new Scheduler() {
ScheduledFuture<?> schedule(TimeValue delay, String executor, Runnable command) {
return threadPool.schedule(delay, executor, command);
}
Cancellable scheduleWithFixedDelay(Runnable command, TimeValue interval) {
ThreadPool.Cancellable c = threadPool.scheduleWithFixedDelay(threadPool.preserveContext(command), interval, SAME);
return new Cancellable() {
void cancel() {c.cancel();}
boolean isCancelled() {return c.isCancelled();}
}
void shutdown() {
// Do nothing because we don't want to shut down the shared ThreadPool.
}
}
Is that crazy? It feels like that lets users plug it into whatever scheduling solution they want.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer if users didn't have to do anything. I did try the interface idea, but I quickly realized that I need the same code in both scheduler impls (the one used by ThreadPool and the one that BulkProcessor uses).
How would you implement scheduleWithFixedDelay in the bulk processor impl? This question led me to the current impl. Some of the reasons are in the description of this PR. From then on, I had to add some more methods to the base class, but in hindsight it kinda makes sense to have the scheduler bits extracted out and shared, I thought.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something like:
class ThreadPoolExecutorScheduler implements Scheduler {
private final ScheduledThreadPoolExecutor exec;
public ScheduledFuture<?> schedule(TimeValue delay, Runnable command) {
return exec.schedule(command, delay.nanos(), TimeUnit.NANOS);
}
public Cancellable scheduleWithFixedDelay(Runnable command, TimeValue interval) {
ScheduledFuture<?> c = exec.schedule(command, 0, interval.nanos(), TimeUnit.NANOS);
return new Cancellable() {
void cancel() {c.cancel();}
boolean isCancelled() {return c.isCancelled();}
}
public void shutdown() {
exec.shutdown();
}
}
I really don't want stuff like Settings
and AbstractComponent
to make their way too far into the high level rest client if we can help it. I mean, ThreadPool
is even worse because it is all of those things, so the change as you have it is an improvement.
I think the user doesn't really have to do anything. If the BulkProcessor by default builds it own scheduler building a ScheduledThreadPoolExecutor
on the fly then great. If they call builder.withScheduler(ScheduledThreadPoolExecutor)
then we can build a scheduler for them. And if they want ultimate control they can call builder.withScheduler(Scheduler)
. And we can have a builder.withSchedulerFromThreadPool(threadPool)
for transport client users.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Er, I was using bad names in the comment above. Edited.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Talked with @javanna - the trouble with what I propose is that ScheduledThreadPoolExecutor
doesn't react the same way to failures as ThreadPool
. Especially with regards to scheduleWithFixedDelay
style stuff. In ThreadPool
we retry even if it fails. ScheduledThreadPoolExecutor
doesn't. We think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you meant to call exec.scheduldWithFixedDelay
from scheduleWithFixedDelay
. The problem is that "If any execution of the task encounters an exception, subsequent executions are suppressed." In our own impl (currently in ThreadPool
) a failure won't affect subsequent runs, which will still happen. That is the main reason why I didn't go for an interface, and move some more methods to the base impl, which can be shared between ES core and the high-level client. I do see how AbstractComponent is annoying, together with Settings, but that is a problem that we may want to fix in ES core directly and probably just a small downside in the context of this change?
I dunno. I feel like it'd still be cleaner to go with the interface solution and catch I admit I don't like refactoring ThreadPool like this either and I can't really put my finger on why. I think it might be time to get another reviewer involved to get some fresh ideas. |
1 similar comment
I dunno. I feel like it'd still be cleaner to go with the interface solution and catch I admit I don't like refactoring ThreadPool like this either and I can't really put my finger on why. I think it might be time to get another reviewer involved to get some fresh ideas. |
4d25c01
to
1055498
Compare
@nik9000 this is ready for another round, I succeeded in doing what you suggested. I like it better too. The only downsides is we don't have logged failures from the REST client, as logging in the scheduler was really in the way. All the rest should work the same, without code duplication. Let me know what you think. Also @jasontedor this is ready for you too ;) |
80e06e4
to
5d38e43
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like an improvement to me. I think it'd be nice to have some javadocs on the static methods.
@jasontedor should probably look at the ThreadPool
changes as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like an improvement to me. I think it'd be nice to have some javadocs on the static methods.
@jasontedor should probably look at the ThreadPool
changes as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left some nits, otherwise this is so good, so so so good. Thank you for doing this.
.setBulkActions(nbItems + 1) | ||
.build()) { | ||
|
||
try(BulkProcessor processor = BulkProcessor.builder(hlClient::bulkAsync, listener) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have a space between the try
and (
?
(e) -> { | ||
if (logger.isDebugEnabled()) { | ||
logger.debug((Supplier<?>) () -> new ParameterizedMessage("scheduled task [{}] was rejected on thread pool [{}]", | ||
command, executor), e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This log statement does not have toString
on the command yet the other warn does, can you make them consistent?
@@ -722,27 +715,29 @@ private static boolean awaitTermination( | |||
* Returns <code>true</code> if the given pool was terminated successfully. If the termination timed out, | |||
* the service is <code>null</code> this method will return <code>false</code>. | |||
*/ | |||
public static boolean terminate(ThreadPool pool, long timeout, TimeUnit timeUnit) { | |||
if (pool != null) { | |||
public static boolean terminate(ThreadPool threadPool, long timeout, TimeUnit timeUnit) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you revert all changes to this method, they are cosmetic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
@@ -376,7 +366,7 @@ public void shutdown() { | |||
scheduler.shutdown(); | |||
for (ExecutorHolder executor : executors.values()) { | |||
if (executor.executor() instanceof ThreadPoolExecutor) { | |||
((ThreadPoolExecutor) executor.executor()).shutdown(); | |||
executor.executor().shutdown(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are cosmetic changes, let's revert them here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks like a good change to me though. It resolves a warning in my IDE caused by the unnecessary cast. Why not keep it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It’s fine @javanna, pull it in in this one. I would prefer we keep changes like this out of PRs, they can go in separately, I want less to think about when I review.
Such a class can be used from the BulkProcessor to schedule retries and the flush task. This allows to remove the ThreadPool dependency from BulkProcessor, which requires to provide settings that contain node.name. Instead, it needs now a Scheduler that is much lighter.
5d38e43
to
9de40e1
Compare
Introduce minimal thread scheduler as a base class for `ThreadPool`. Such a class can be used from the `BulkProcessor` to schedule retries and the flush task. This allows to remove the `ThreadPool` dependency from `BulkProcessor`, which requires to provide settings that contain `node.name` and also needed log4j for logging. Instead, it needs now a `Scheduler` that is much lighter and gets automatically created and shut down on close. Closes #26028
* master: Ignore .DS_Store files on macOS Docs: Fix ingest geoip config location (elastic#27110) Adjust SHA-512 supported format on plugin install Make ShardSearchTarget optional when parsing ShardSearchFailure (elastic#27078) [Docs] Clarify mapping `index` option default (elastic#27104) Decouple BulkProcessor from ThreadPool (elastic#26727) Stats to record how often the ClusterState diff mechanism is used successfully (elastic#26973) Tie-break shard path decision based on total number of shards on path (elastic#27039)
The goal of this PR is to not require to provide a
ThreadPool
when creating aBulkProcessor
. This way, we make it easier to use theBulkProcessor
from the high-level REST client.The initial idea around removing the need for passing in a
ThreadPool
toBulkProcessor
was to require two functions:BiFunction<Runnable, TimeValue, ScheduledFuture<?>>
to schedule a task andFunction<Runnable, Runnable>
to wrap the command into one that preserves the thread context. When trying that approach, a few issues came up. The flush task is a periodic task (scheduled throughscheduleWithFixedDelay
) and gets executed on theGENERIC
thread pool rather thanSAME
. Also theThreadPool#scheduleWithFixedDelay
method has different semantics compared to the standardScheduledThreadPoolExecutor#scheduleWithFixedDelay
when it comes to failures.The approach I went for in this PR is based on extracting the minimal code that allows to schedule tasks into a new base interface called
Scheduler
which theBulkProcessor
depends on.ThreadPool
implementsScheduler
and enhances it, so that all of the components that already usedBulkProcessor
and had aThreadPool
available can keep on doing the same exactly the same way, while the high-level REST client internally creates its own instance of theScheduler
(users don't even have to provide it while they previously needed to create theThreadPool
), which works exactly as theThreadPool
but it doesn't support multiple executors, and shuts it down on close (also it doesn't log anything in case of failures).This approach allows to not have to reinvent the wheel in the high-level REST client and use the same code that we already have to schedule one-shot and periodic tasks, all without requiring an entire
ThreadPool
which the high-level REST client doesn't need.Closes #26028