diff --git a/core/deployment/src/main/java/io/quarkus/deployment/dev/RuntimeUpdatesProcessor.java b/core/deployment/src/main/java/io/quarkus/deployment/dev/RuntimeUpdatesProcessor.java index 0e44200ee8043..1b57c9aa9ab57 100644 --- a/core/deployment/src/main/java/io/quarkus/deployment/dev/RuntimeUpdatesProcessor.java +++ b/core/deployment/src/main/java/io/quarkus/deployment/dev/RuntimeUpdatesProcessor.java @@ -103,6 +103,7 @@ public class RuntimeUpdatesProcessor implements HotReplacementContext, Closeable final Map sourceFileTimestamps = new ConcurrentHashMap<>(); private final List preScanSteps = new CopyOnWriteArrayList<>(); + private final List postRestartSteps = new CopyOnWriteArrayList<>(); private final List>> noRestartChangesConsumers = new CopyOnWriteArrayList<>(); private final List hotReplacementSetup = new ArrayList<>(); private final List deploymentFailedStartHandlers = new ArrayList<>(); @@ -541,6 +542,13 @@ public boolean doScan(boolean userInitiated, boolean forceRestart) { restartCallback.accept(filesChanged, changedClassResults); long timeNanoSeconds = System.nanoTime() - startNanoseconds; log.infof("Live reload total time: %ss ", Timing.convertToBigDecimalSeconds(timeNanoSeconds)); + for (Runnable step : postRestartSteps) { + try { + step.run(); + } catch (Throwable t) { + log.error("Post Restart step failed", t); + } + } if (TimeUnit.SECONDS.convert(timeNanoSeconds, TimeUnit.NANOSECONDS) >= 4 && !instrumentationEnabled()) { if (!instrumentationLogPrinted) { instrumentationLogPrinted = true; @@ -593,6 +601,11 @@ public void addPreScanStep(Runnable runnable) { preScanSteps.add(runnable); } + @Override + public void addPostRestartStep(Runnable runnable) { + postRestartSteps.add(runnable); + } + @Override public void consumeNoRestartChanges(Consumer> consumer) { noRestartChangesConsumers.add(consumer); diff --git a/core/devmode-spi/src/main/java/io/quarkus/dev/spi/HotReplacementContext.java b/core/devmode-spi/src/main/java/io/quarkus/dev/spi/HotReplacementContext.java index 715196ea038f8..15f3b0163a960 100644 --- a/core/devmode-spi/src/main/java/io/quarkus/dev/spi/HotReplacementContext.java +++ b/core/devmode-spi/src/main/java/io/quarkus/dev/spi/HotReplacementContext.java @@ -68,4 +68,9 @@ public interface HotReplacementContext { * @return A set of changed files */ Set syncState(Map fileHashes); + + /** + * Adds a task that is run after the restart is performed. + */ + void addPostRestartStep(Runnable runnable); } diff --git a/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/devmode/VertxHttpHotReplacementSetup.java b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/devmode/VertxHttpHotReplacementSetup.java index 3125c1c6bb4ff..41c6297526d89 100644 --- a/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/devmode/VertxHttpHotReplacementSetup.java +++ b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/devmode/VertxHttpHotReplacementSetup.java @@ -17,9 +17,11 @@ import io.quarkus.dev.console.DevConsoleManager; import io.quarkus.dev.spi.HotReplacementContext; import io.quarkus.dev.spi.HotReplacementSetup; +import io.quarkus.vertx.core.runtime.QuarkusExecutorFactory; import io.quarkus.vertx.core.runtime.VertxCoreRecorder; import io.quarkus.vertx.http.runtime.VertxHttpRecorder; import io.vertx.core.AsyncResult; +import io.vertx.core.Context; import io.vertx.core.Handler; import io.vertx.core.MultiMap; import io.vertx.core.http.HttpServerResponse; @@ -50,6 +52,15 @@ public void run() { RemoteSyncHandler.doPreScan(); } }); + hotReplacementContext.addPostRestartStep(new Runnable() { + @Override + public void run() { + // If not on a worker thread then attempt to re-initialize the dev mode executor + if (!Context.isOnWorkerThread()) { + QuarkusExecutorFactory.reinitializeDevModeExecutor(); + } + } + }); } @Override @@ -181,6 +192,9 @@ public Boolean call() { }, false).onComplete(new Handler>() { @Override public void handle(AsyncResult event) { + // When the HTTP request is processed then attempt to re-initialize the dev mode executor + QuarkusExecutorFactory.reinitializeDevModeExecutor(); + if (event.failed()) { handleDeploymentProblem(routingContext, event.cause()); } else { diff --git a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/DevModeExecutorService.java b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/DevModeExecutorService.java new file mode 100644 index 0000000000000..78dbca3507338 --- /dev/null +++ b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/DevModeExecutorService.java @@ -0,0 +1,42 @@ +package io.quarkus.vertx.core.runtime; + +import java.util.concurrent.ExecutorService; +import java.util.function.Supplier; + +import org.jboss.logging.Logger; + +/** + * This executor is only used in the dev mode as the Vertx worker thread pool. + *

+ * The underlying executor can be shut down and then replaced with a new re-initialized executor. + */ +class DevModeExecutorService extends ForwardingExecutorService { + + private static final Logger LOG = Logger.getLogger(DevModeExecutorService.class); + + private final Supplier initializer; + private volatile ExecutorService executor; + + DevModeExecutorService(Supplier initializer) { + this.initializer = initializer; + this.executor = initializer.get(); + } + + @Override + protected ExecutorService delegate() { + return executor; + } + + /** + * Shutdown the underlying executor and then initialize a new one. + */ + void reinit() { + ExecutorService oldExecutor = this.executor; + if (oldExecutor != null) { + oldExecutor.shutdownNow(); + } + this.executor = initializer.get(); + LOG.debug("Dev mode executor re-initialized"); + } + +} diff --git a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/QuarkusExecutorFactory.java b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/QuarkusExecutorFactory.java index 7c43e9e68efff..82ef39b8f4b00 100644 --- a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/QuarkusExecutorFactory.java +++ b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/QuarkusExecutorFactory.java @@ -3,6 +3,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import org.jboss.logging.Logger; import org.jboss.threads.EnhancedQueueExecutor; @@ -15,6 +16,7 @@ public class QuarkusExecutorFactory implements ExecutorServiceFactory { static volatile ExecutorService sharedExecutor; + static volatile DevModeExecutorService devModeExecutor; private static final AtomicInteger executorCount = new AtomicInteger(0); private static final Logger log = Logger.getLogger(QuarkusExecutorFactory.class); @@ -28,7 +30,14 @@ public QuarkusExecutorFactory(VertxConfiguration conf, LaunchMode launchMode) { @Override public ExecutorService createExecutor(ThreadFactory threadFactory, Integer concurrency, Integer maxConcurrency) { + // The current Vertx impl creates two external executors during initialization + // The first one is used for the worker thread pool, the second one is used internally, + // and additional executors may be created on demand + // Unfortunately, there is no way to distinguish the particular executor types + // Therefore, we only consider the first one as the worker thread pool + // Note that in future versions of Vertx this may change! if (executorCount.incrementAndGet() == 1) { + // The first executor should be the worker thread pool if (launchMode != LaunchMode.DEVELOPMENT) { if (sharedExecutor == null) { log.warn("Shared executor not set. Unshared executor will be created for blocking work"); @@ -36,12 +45,36 @@ public ExecutorService createExecutor(ThreadFactory threadFactory, Integer concu sharedExecutor = internalCreateExecutor(threadFactory, concurrency, maxConcurrency); } return sharedExecutor; + } else { + // In dev mode we use a special executor for the worker pool + // where the underlying executor can be shut down and then replaced with a new re-initialized executor + // This is a workaround to solve the problem described in https://github.com/quarkusio/quarkus/issues/16833#issuecomment-1917042589 + // The Vertx instance is reused between restarts but we must attempt to shut down this executor, + // for example to cancel/interrupt the scheduled methods + devModeExecutor = new DevModeExecutorService(new Supplier() { + @Override + public ExecutorService get() { + return internalCreateExecutor(threadFactory, concurrency, maxConcurrency); + } + }); + return devModeExecutor; } } - return internalCreateExecutor(threadFactory, concurrency, maxConcurrency); } + /** + * In dev mode, shut down the underlying executor and then initialize a new one. + * + * @see DevModeExecutorService + */ + public static void reinitializeDevModeExecutor() { + DevModeExecutorService executor = QuarkusExecutorFactory.devModeExecutor; + if (executor != null) { + executor.reinit(); + } + } + private ExecutorService internalCreateExecutor(ThreadFactory threadFactory, Integer concurrency, Integer maxConcurrency) { final EnhancedQueueExecutor.Builder builder = new EnhancedQueueExecutor.Builder() .setRegisterMBean(false) diff --git a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/VertxCoreRecorder.java b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/VertxCoreRecorder.java index 213d0c416a895..9ecb3690f41ee 100644 --- a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/VertxCoreRecorder.java +++ b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/VertxCoreRecorder.java @@ -101,7 +101,7 @@ public Supplier configureVertx(VertxConfiguration config, ThreadPoolConfi LaunchMode launchMode, ShutdownContext shutdown, List> customizers, ExecutorService executorProxy) { if (launchMode == LaunchMode.NORMAL) { - // In the prod mode we wrap the ExecutorService and the shutdown() and shutdownNow() are deliberately not delegated + // In prod mode, we wrap the ExecutorService and the shutdown() and shutdownNow() are deliberately not delegated // This is a workaround to solve the problem described in https://github.com/quarkusio/quarkus/issues/16833#issuecomment-1917042589 // The Vertx instance is closed before io.quarkus.runtime.ExecutorRecorder.createShutdownTask() is used // And when it's closed the underlying worker thread pool (which is in the prod mode backed by the ExecutorBuildItem) is closed as well