Skip to content

Commit

Permalink
Vertx: use DevModeExecutorService in the dev mode
Browse files Browse the repository at this point in the history
- in dev mode we use a special executor for the worker pool where
 the underlying executor can be shut down and then replaced with a new re-initialized executor
- this is a workaround to solve the problem described in #16833 (comment)
- the Vertx instance is reused between restarts but we must attempt to shut down this executor,
 for example to cancel/interrupt the scheduled methods
  • Loading branch information
mkouba committed Jan 31, 2024
1 parent b83079a commit 3315463
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public class RuntimeUpdatesProcessor implements HotReplacementContext, Closeable
final Map<Path, Long> sourceFileTimestamps = new ConcurrentHashMap<>();

private final List<Runnable> preScanSteps = new CopyOnWriteArrayList<>();
private final List<Runnable> postRestartSteps = new CopyOnWriteArrayList<>();
private final List<Consumer<Set<String>>> noRestartChangesConsumers = new CopyOnWriteArrayList<>();
private final List<HotReplacementSetup> hotReplacementSetup = new ArrayList<>();
private final List<Runnable> deploymentFailedStartHandlers = new ArrayList<>();
Expand Down Expand Up @@ -541,6 +542,13 @@ public boolean doScan(boolean userInitiated, boolean forceRestart) {
restartCallback.accept(filesChanged, changedClassResults);
long timeNanoSeconds = System.nanoTime() - startNanoseconds;
log.infof("Live reload total time: %ss ", Timing.convertToBigDecimalSeconds(timeNanoSeconds));
for (Runnable step : postRestartSteps) {
try {
step.run();
} catch (Throwable t) {
log.error("Post Restart step failed", t);
}
}
if (TimeUnit.SECONDS.convert(timeNanoSeconds, TimeUnit.NANOSECONDS) >= 4 && !instrumentationEnabled()) {
if (!instrumentationLogPrinted) {
instrumentationLogPrinted = true;
Expand Down Expand Up @@ -593,6 +601,11 @@ public void addPreScanStep(Runnable runnable) {
preScanSteps.add(runnable);
}

@Override
public void addPostRestartStep(Runnable runnable) {
postRestartSteps.add(runnable);
}

@Override
public void consumeNoRestartChanges(Consumer<Set<String>> consumer) {
noRestartChangesConsumers.add(consumer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,9 @@ public interface HotReplacementContext {
* @return A set of changed files
*/
Set<String> syncState(Map<String, String> fileHashes);

/**
* Adds a task that is run after the restart is performed.
*/
void addPostRestartStep(Runnable runnable);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
import io.quarkus.dev.console.DevConsoleManager;
import io.quarkus.dev.spi.HotReplacementContext;
import io.quarkus.dev.spi.HotReplacementSetup;
import io.quarkus.vertx.core.runtime.QuarkusExecutorFactory;
import io.quarkus.vertx.core.runtime.VertxCoreRecorder;
import io.quarkus.vertx.http.runtime.VertxHttpRecorder;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.http.HttpServerResponse;
Expand Down Expand Up @@ -50,6 +52,15 @@ public void run() {
RemoteSyncHandler.doPreScan();
}
});
hotReplacementContext.addPostRestartStep(new Runnable() {
@Override
public void run() {
// If not on a worker thread then attempt to re-initialize the dev mode executor
if (!Context.isOnWorkerThread()) {
QuarkusExecutorFactory.reinitializeDevModeExecutor();
}
}
});
}

@Override
Expand Down Expand Up @@ -181,6 +192,9 @@ public Boolean call() {
}, false).onComplete(new Handler<AsyncResult<Boolean>>() {
@Override
public void handle(AsyncResult<Boolean> event) {
// When the HTTP request is processed then attempt to re-initialize the dev mode executor
QuarkusExecutorFactory.reinitializeDevModeExecutor();

if (event.failed()) {
handleDeploymentProblem(routingContext, event.cause());
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package io.quarkus.vertx.core.runtime;

import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;

import org.jboss.logging.Logger;

/**
* This executor is only used in the dev mode as the Vertx worker thread pool.
* <p>
* The underlying executor can be shut down and then replaced with a new re-initialized executor.
*/
class DevModeExecutorService extends ForwardingExecutorService {

private static final Logger LOG = Logger.getLogger(DevModeExecutorService.class);

private final Supplier<ExecutorService> initializer;
private volatile ExecutorService executor;

DevModeExecutorService(Supplier<ExecutorService> initializer) {
this.initializer = initializer;
this.executor = initializer.get();
}

@Override
protected ExecutorService delegate() {
return executor;
}

/**
* Shutdown the underlying executor and then initialize a new one.
*/
void reinit() {
ExecutorService oldExecutor = this.executor;
if (oldExecutor != null) {
oldExecutor.shutdownNow();
}
this.executor = initializer.get();
LOG.debug("Dev mode executor re-initialized");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

import org.jboss.logging.Logger;
import org.jboss.threads.EnhancedQueueExecutor;
Expand All @@ -15,6 +16,7 @@

public class QuarkusExecutorFactory implements ExecutorServiceFactory {
static volatile ExecutorService sharedExecutor;
static volatile DevModeExecutorService devModeExecutor;
private static final AtomicInteger executorCount = new AtomicInteger(0);
private static final Logger log = Logger.getLogger(QuarkusExecutorFactory.class);

Expand All @@ -28,20 +30,51 @@ public QuarkusExecutorFactory(VertxConfiguration conf, LaunchMode launchMode) {

@Override
public ExecutorService createExecutor(ThreadFactory threadFactory, Integer concurrency, Integer maxConcurrency) {
// The current Vertx impl creates two external executors during initialization
// The first one is used for the worker thread pool, the second one is used internally,
// and additional executors may be created on demand
// Unfortunately, there is no way to distinguish the particular executor types
// Therefore, we only consider the first one as the worker thread pool
// Note that in future versions of Vertx this may change!
if (executorCount.incrementAndGet() == 1) {
// The first executor should be the worker thread pool
if (launchMode != LaunchMode.DEVELOPMENT) {
if (sharedExecutor == null) {
log.warn("Shared executor not set. Unshared executor will be created for blocking work");
// This should only happen in tests using Vertx directly in a unit test
sharedExecutor = internalCreateExecutor(threadFactory, concurrency, maxConcurrency);
}
return sharedExecutor;
} else {
// In dev mode we use a special executor for the worker pool
// where the underlying executor can be shut down and then replaced with a new re-initialized executor
// This is a workaround to solve the problem described in https://github.com/quarkusio/quarkus/issues/16833#issuecomment-1917042589
// The Vertx instance is reused between restarts but we must attempt to shut down this executor,
// for example to cancel/interrupt the scheduled methods
devModeExecutor = new DevModeExecutorService(new Supplier<ExecutorService>() {
@Override
public ExecutorService get() {
return internalCreateExecutor(threadFactory, concurrency, maxConcurrency);
}
});
return devModeExecutor;
}
}

return internalCreateExecutor(threadFactory, concurrency, maxConcurrency);
}

/**
* In dev mode, shut down the underlying executor and then initialize a new one.
*
* @see DevModeExecutorService
*/
public static void reinitializeDevModeExecutor() {
DevModeExecutorService executor = QuarkusExecutorFactory.devModeExecutor;
if (executor != null) {
executor.reinit();
}
}

private ExecutorService internalCreateExecutor(ThreadFactory threadFactory, Integer concurrency, Integer maxConcurrency) {
final EnhancedQueueExecutor.Builder builder = new EnhancedQueueExecutor.Builder()
.setRegisterMBean(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public Supplier<Vertx> configureVertx(VertxConfiguration config, ThreadPoolConfi
LaunchMode launchMode, ShutdownContext shutdown, List<Consumer<VertxOptions>> customizers,
ExecutorService executorProxy) {
if (launchMode == LaunchMode.NORMAL) {
// In the prod mode we wrap the ExecutorService and the shutdown() and shutdownNow() are deliberately not delegated
// In prod mode, we wrap the ExecutorService and the shutdown() and shutdownNow() are deliberately not delegated
// This is a workaround to solve the problem described in https://github.com/quarkusio/quarkus/issues/16833#issuecomment-1917042589
// The Vertx instance is closed before io.quarkus.runtime.ExecutorRecorder.createShutdownTask() is used
// And when it's closed the underlying worker thread pool (which is in the prod mode backed by the ExecutorBuildItem) is closed as well
Expand Down

0 comments on commit 3315463

Please sign in to comment.