Skip to content

Commit

Permalink
Wrap the managed worker thread pool to disallow shutdown by applicati…
Browse files Browse the repository at this point in the history
…on/other extensions

Related to #16833 #43228
  • Loading branch information
ozangunalp committed Sep 17, 2024
1 parent b8c5a62 commit 68822e5
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.wildfly.common.cpu.ProcessorInfo;

import io.quarkus.runtime.annotations.Recorder;
import io.quarkus.runtime.util.NoopShutdownScheduledExecutorService;

/**
*
Expand Down Expand Up @@ -57,8 +58,19 @@ public void run() {
if (threadPoolConfig.prefill) {
underlying.prestartAllCoreThreads();
}
current = underlying;
return underlying;
ScheduledExecutorService managed = underlying;
// In prod and test mode, we wrap the ExecutorService and the shutdown() and shutdownNow() are deliberately not delegated
// This is to prevent the application and other extensions from shutting down the executor service
// The problem was described in https://github.com/quarkusio/quarkus/issues/16833#issuecomment-1917042589
// and https://github.com/quarkusio/quarkus/issues/43228
// For example, the Vertx instance is closed before io.quarkus.runtime.ExecutorRecorder.createShutdownTask() is used
// And when it's closed the underlying worker thread pool (which is in the prod mode backed by the ExecutorBuildItem) is closed as well
// As a result the quarkus.thread-pool.shutdown-interrupt config property and logic defined in ExecutorRecorder.createShutdownTask() is completely ignored
if (launchMode != LaunchMode.DEVELOPMENT) {
managed = new NoopShutdownScheduledExecutorService(underlying);
}
current = managed;
return managed;
}

private static Runnable createShutdownTask(ThreadPoolConfig threadPoolConfig, EnhancedQueueExecutor executor) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package io.quarkus.runtime.util;

import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/**
* Forwards all method calls to the scheduled executor service returned from the {@link #delegate()} method. Only non-default
* methods
* declared on the {@link ScheduledExecutorService} interface are forwarded.
*/
public abstract class ForwardingScheduledExecutorService extends ForwardingExecutorService implements ScheduledExecutorService {

protected abstract ScheduledExecutorService delegate();

@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return delegate().schedule(command, delay, unit);
}

@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return delegate().schedule(callable, delay, unit);
}

@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
return delegate().scheduleAtFixedRate(command, initialDelay, period, unit);
}

@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
return delegate().scheduleWithFixedDelay(command, initialDelay, delay, unit);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package io.quarkus.runtime.util;

import java.util.List;
import java.util.concurrent.ScheduledExecutorService;

import org.jboss.logging.Logger;

/**
* Forwards all method calls to the scheduled executor service returned from the {@link #delegate()} method.
* Does not allow shutdown
*/
public class NoopShutdownScheduledExecutorService extends ForwardingScheduledExecutorService {

private static final Logger LOG = Logger.getLogger(NoopShutdownScheduledExecutorService.class);

private final ScheduledExecutorService delegate;

public NoopShutdownScheduledExecutorService(final ScheduledExecutorService delegate) {
this.delegate = delegate;
}

@Override
protected ScheduledExecutorService delegate() {
return delegate;
}

@Override
public boolean isShutdown() {
// managed executors are never shut down from the application's perspective
return false;
}

@Override
public boolean isTerminated() {
// managed executors are never shut down from the application's perspective
return false;
}

@Override
public void shutdown() {
LOG.debug("shutdown() not allowed on managed executor service");
}

@Override
public List<Runnable> shutdownNow() {
LOG.debug("shutdownNow() not allowed on managed executor service");
return List.of();
}

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -101,16 +101,8 @@ public class VertxCoreRecorder {
public Supplier<Vertx> configureVertx(VertxConfiguration config, ThreadPoolConfig threadPoolConfig,
LaunchMode launchMode, ShutdownContext shutdown, List<Consumer<VertxOptions>> customizers,
ExecutorService executorProxy) {
if (launchMode == LaunchMode.NORMAL) {
// In prod mode, we wrap the ExecutorService and the shutdown() and shutdownNow() are deliberately not delegated
// This is a workaround to solve the problem described in https://github.com/quarkusio/quarkus/issues/16833#issuecomment-1917042589
// The Vertx instance is closed before io.quarkus.runtime.ExecutorRecorder.createShutdownTask() is used
// And when it's closed the underlying worker thread pool (which is in the prod mode backed by the ExecutorBuildItem) is closed as well
// As a result the quarkus.thread-pool.shutdown-interrupt config property and logic defined in ExecutorRecorder.createShutdownTask() is completely ignored
QuarkusExecutorFactory.sharedExecutor = new NoopShutdownExecutorService(executorProxy);
} else {
QuarkusExecutorFactory.sharedExecutor = executorProxy;
}
// The wrapper previously here to prevent the executor to be shutdown prematurely is moved to higher level to the io.quarkus.runtime.ExecutorRecorder
QuarkusExecutorFactory.sharedExecutor = executorProxy;
if (launchMode != LaunchMode.DEVELOPMENT) {
vertx = new VertxSupplier(launchMode, config, customizers, threadPoolConfig, shutdown);
// we need this to be part of the last shutdown tasks because closing it early (basically before Arc)
Expand Down

0 comments on commit 68822e5

Please sign in to comment.