Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vertx: use NoopShutdownExecutorService and DevModeExecutorService #38478

Merged
merged 2 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public class RuntimeUpdatesProcessor implements HotReplacementContext, Closeable
final Map<Path, Long> sourceFileTimestamps = new ConcurrentHashMap<>();

private final List<Runnable> preScanSteps = new CopyOnWriteArrayList<>();
private final List<Runnable> postRestartSteps = new CopyOnWriteArrayList<>();
private final List<Consumer<Set<String>>> noRestartChangesConsumers = new CopyOnWriteArrayList<>();
private final List<HotReplacementSetup> hotReplacementSetup = new ArrayList<>();
private final List<Runnable> deploymentFailedStartHandlers = new ArrayList<>();
Expand Down Expand Up @@ -541,6 +542,13 @@ public boolean doScan(boolean userInitiated, boolean forceRestart) {
restartCallback.accept(filesChanged, changedClassResults);
long timeNanoSeconds = System.nanoTime() - startNanoseconds;
log.infof("Live reload total time: %ss ", Timing.convertToBigDecimalSeconds(timeNanoSeconds));
for (Runnable step : postRestartSteps) {
try {
step.run();
} catch (Throwable t) {
log.error("Post Restart step failed", t);
}
}
if (TimeUnit.SECONDS.convert(timeNanoSeconds, TimeUnit.NANOSECONDS) >= 4 && !instrumentationEnabled()) {
if (!instrumentationLogPrinted) {
instrumentationLogPrinted = true;
Expand Down Expand Up @@ -593,6 +601,11 @@ public void addPreScanStep(Runnable runnable) {
preScanSteps.add(runnable);
}

@Override
public void addPostRestartStep(Runnable runnable) {
postRestartSteps.add(runnable);
}

@Override
public void consumeNoRestartChanges(Consumer<Set<String>> consumer) {
noRestartChangesConsumers.add(consumer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,9 @@ public interface HotReplacementContext {
* @return A set of changed files
*/
Set<String> syncState(Map<String, String> fileHashes);

/**
* Adds a task that is run after the restart is performed.
*/
void addPostRestartStep(Runnable runnable);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package io.quarkus.runtime.util;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* Forwards all method calls to the executor service returned from the {@link #delegate()} method. Only non-default methods
* declared on the {@link ExecutorService} interface are forwarded.
*/
public abstract class ForwardingExecutorService implements ExecutorService {

protected abstract ExecutorService delegate();

@Override
public void execute(Runnable command) {
delegate().execute(command);
}

@Override
public void shutdown() {
delegate().shutdown();
}

@Override
public List<Runnable> shutdownNow() {
return delegate().shutdownNow();
}

@Override
public boolean isShutdown() {
return delegate().isShutdown();
}

@Override
public boolean isTerminated() {
return delegate().isTerminated();
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return delegate().awaitTermination(timeout, unit);
}

@Override
public <T> Future<T> submit(Callable<T> task) {
return delegate().submit(task);
}

@Override
public <T> Future<T> submit(Runnable task, T result) {
return delegate().submit(task, result);
}

@Override
public Future<?> submit(Runnable task) {
return delegate().submit(task);
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
return delegate().invokeAll(tasks);
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException {
return delegate().invokeAll(tasks, timeout, unit);
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
return delegate().invokeAny(tasks);
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return delegate().invokeAny(tasks, timeout, unit);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
import io.quarkus.dev.console.DevConsoleManager;
import io.quarkus.dev.spi.HotReplacementContext;
import io.quarkus.dev.spi.HotReplacementSetup;
import io.quarkus.vertx.core.runtime.QuarkusExecutorFactory;
import io.quarkus.vertx.core.runtime.VertxCoreRecorder;
import io.quarkus.vertx.http.runtime.VertxHttpRecorder;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.http.HttpServerResponse;
Expand Down Expand Up @@ -50,6 +52,15 @@ public void run() {
RemoteSyncHandler.doPreScan();
}
});
hotReplacementContext.addPostRestartStep(new Runnable() {
@Override
public void run() {
// If not on a worker thread then attempt to re-initialize the dev mode executor
if (!Context.isOnWorkerThread()) {
QuarkusExecutorFactory.reinitializeDevModeExecutor();
}
}
});
}

@Override
Expand Down Expand Up @@ -186,6 +197,7 @@ public void handle(AsyncResult<Boolean> event) {
} else {
boolean restart = event.result();
if (restart) {
QuarkusExecutorFactory.reinitializeDevModeExecutor();
routingContext.request().headers().set(HEADER_NAME, "true");
VertxHttpRecorder.getRootHandler().handle(routingContext.request());
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package io.quarkus.vertx.core.runtime;

import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;

import org.jboss.logging.Logger;

import io.quarkus.runtime.util.ForwardingExecutorService;

/**
* This executor is only used in the dev mode as the Vertx worker thread pool.
* <p>
* The underlying executor can be shut down and then replaced with a new re-initialized executor.
*/
class DevModeExecutorService extends ForwardingExecutorService {

private static final Logger LOG = Logger.getLogger(DevModeExecutorService.class);

private final Supplier<ExecutorService> initializer;
private volatile ExecutorService executor;

DevModeExecutorService(Supplier<ExecutorService> initializer) {
this.initializer = initializer;
this.executor = initializer.get();
gastaldi marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
protected ExecutorService delegate() {
return executor;
}

/**
* Shutdown the underlying executor and then initialize a new one.
*/
void reinit() {
ExecutorService oldExecutor = this.executor;
if (oldExecutor != null) {
oldExecutor.shutdownNow();
}
this.executor = initializer.get();
LOG.debug("Dev mode executor re-initialized");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package io.quarkus.vertx.core.runtime;

import java.util.List;
import java.util.concurrent.ExecutorService;

import org.jboss.logging.Logger;

import io.quarkus.runtime.util.ForwardingExecutorService;

/**
* This executor is only used in the prod mode as the Vertx worker thread pool.
*/
class NoopShutdownExecutorService extends ForwardingExecutorService {

private static final Logger LOG = Logger.getLogger(NoopShutdownExecutorService.class);

private final ExecutorService delegate;

NoopShutdownExecutorService(ExecutorService delegate) {
this.delegate = delegate;
}

@Override
protected ExecutorService delegate() {
return delegate;
}

@Override
public void shutdown() {
LOG.debug("shutdown() deliberately not delegated");
}

@Override
public List<Runnable> shutdownNow() {
LOG.debug("shutdownNow() deliberately not delegated");
return List.of();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

import org.jboss.logging.Logger;
import org.jboss.threads.EnhancedQueueExecutor;
Expand All @@ -15,6 +16,7 @@

public class QuarkusExecutorFactory implements ExecutorServiceFactory {
static volatile ExecutorService sharedExecutor;
static volatile DevModeExecutorService devModeExecutor;
private static final AtomicInteger executorCount = new AtomicInteger(0);
private static final Logger log = Logger.getLogger(QuarkusExecutorFactory.class);

Expand All @@ -28,20 +30,51 @@ public QuarkusExecutorFactory(VertxConfiguration conf, LaunchMode launchMode) {

@Override
public ExecutorService createExecutor(ThreadFactory threadFactory, Integer concurrency, Integer maxConcurrency) {
// The current Vertx impl creates two external executors during initialization
// The first one is used for the worker thread pool, the second one is used internally,
// and additional executors may be created on demand
// Unfortunately, there is no way to distinguish the particular executor types
// Therefore, we only consider the first one as the worker thread pool
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is IMO quite risky but the Vertx API does not provide a way to distinguish the particular executor types... 🤷

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The internal one is for file system operations only.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fear that this may actually break @Blocking specifying a worker pool name.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I think it was already broken...

We could change the Vert.x SPI to get the "type" (worker, internal, custom)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My point is that the ordering can change any time. The working thread pool is here: https://github.com/eclipse-vertx/vert.x/blob/master/src/main/java/io/vertx/core/impl/VertxImpl.java#L190. And the internal pool is here: https://github.com/eclipse-vertx/vert.x/blob/master/src/main/java/io/vertx/core/impl/VertxImpl.java#L193. Just a few lines below. You see?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could change the Vert.x SPI to get the "type" (worker, internal, custom)

Yes, that would be helpful.

CC @jponge

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, that's for @vietj :-D

// Note that in future versions of Vertx this may change!
if (executorCount.incrementAndGet() == 1) {
// The first executor should be the worker thread pool
if (launchMode != LaunchMode.DEVELOPMENT) {
if (sharedExecutor == null) {
log.warn("Shared executor not set. Unshared executor will be created for blocking work");
// This should only happen in tests using Vertx directly in a unit test
sharedExecutor = internalCreateExecutor(threadFactory, concurrency, maxConcurrency);
}
return sharedExecutor;
} else {
// In dev mode we use a special executor for the worker pool
// where the underlying executor can be shut down and then replaced with a new re-initialized executor
// This is a workaround to solve the problem described in https://github.com/quarkusio/quarkus/issues/16833#issuecomment-1917042589
// The Vertx instance is reused between restarts but we must attempt to shut down this executor,
// for example to cancel/interrupt the scheduled methods
devModeExecutor = new DevModeExecutorService(new Supplier<ExecutorService>() {
@Override
public ExecutorService get() {
return internalCreateExecutor(threadFactory, concurrency, maxConcurrency);
}
});
return devModeExecutor;
}
}

return internalCreateExecutor(threadFactory, concurrency, maxConcurrency);
}

/**
* In dev mode, shut down the underlying executor and then initialize a new one.
*
* @see DevModeExecutorService
*/
public static void reinitializeDevModeExecutor() {
DevModeExecutorService executor = QuarkusExecutorFactory.devModeExecutor;
if (executor != null) {
executor.reinit();
}
}

private ExecutorService internalCreateExecutor(ThreadFactory threadFactory, Integer concurrency, Integer maxConcurrency) {
final EnhancedQueueExecutor.Builder builder = new EnhancedQueueExecutor.Builder()
.setRegisterMBean(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,16 @@ public class VertxCoreRecorder {
public Supplier<Vertx> configureVertx(VertxConfiguration config, ThreadPoolConfig threadPoolConfig,
LaunchMode launchMode, ShutdownContext shutdown, List<Consumer<VertxOptions>> customizers,
ExecutorService executorProxy) {
QuarkusExecutorFactory.sharedExecutor = executorProxy;
if (launchMode == LaunchMode.NORMAL) {
// In prod mode, we wrap the ExecutorService and the shutdown() and shutdownNow() are deliberately not delegated
// This is a workaround to solve the problem described in https://github.com/quarkusio/quarkus/issues/16833#issuecomment-1917042589
// The Vertx instance is closed before io.quarkus.runtime.ExecutorRecorder.createShutdownTask() is used
// And when it's closed the underlying worker thread pool (which is in the prod mode backed by the ExecutorBuildItem) is closed as well
// As a result the quarkus.thread-pool.shutdown-interrupt config property and logic defined in ExecutorRecorder.createShutdownTask() is completely ignored
QuarkusExecutorFactory.sharedExecutor = new NoopShutdownExecutorService(executorProxy);
} else {
QuarkusExecutorFactory.sharedExecutor = executorProxy;
}
if (launchMode != LaunchMode.DEVELOPMENT) {
vertx = new VertxSupplier(launchMode, config, customizers, threadPoolConfig, shutdown);
// we need this to be part of the last shutdown tasks because closing it early (basically before Arc)
Expand Down
Loading
Loading