From 83d00b6b072d687518b45cdfaad20acdad3c58ec Mon Sep 17 00:00:00 2001 From: Martin Kouba Date: Tue, 30 Jan 2024 16:06:07 +0100 Subject: [PATCH] Vertx: use NoopShutdownExecutorService in the prod mode - in the prod mode we wrap the ExecutorService and the shutdown() and shutdownNow() are deliberately not delegated - this is a workaround to solve the problem described in https://github.com/quarkusio/quarkus/issues/16833#issuecomment-1917042589 - the Vertx instance is closed before io.quarkus.runtime.ExecutorRecorder.createShutdownTask() is used - and when it's closed the underlying worker thread pool (which is in the prod mode backed by the ExecutorBuildItem) is closed as well - as a result the quarkus.thread-pool.shutdown-interrupt config property and logic defined in ExecutorRecorder.createShutdownTask() is completely ignored --- .../util/ForwardingExecutorService.java | 87 +++++++++++++++++++ .../runtime/NoopShutdownExecutorService.java | 39 +++++++++ .../vertx/core/runtime/VertxCoreRecorder.java | 11 ++- .../threads/DelegatingExecutorService.java | 51 ++--------- 4 files changed, 142 insertions(+), 46 deletions(-) create mode 100644 core/runtime/src/main/java/io/quarkus/runtime/util/ForwardingExecutorService.java create mode 100644 extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/NoopShutdownExecutorService.java diff --git a/core/runtime/src/main/java/io/quarkus/runtime/util/ForwardingExecutorService.java b/core/runtime/src/main/java/io/quarkus/runtime/util/ForwardingExecutorService.java new file mode 100644 index 0000000000000..6a7c779b51945 --- /dev/null +++ b/core/runtime/src/main/java/io/quarkus/runtime/util/ForwardingExecutorService.java @@ -0,0 +1,87 @@ +package io.quarkus.runtime.util; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Forwards all method calls to the executor service returned from the {@link #delegate()} method. Only non-default methods + * declared on the {@link ExecutorService} interface are forwarded. + */ +public abstract class ForwardingExecutorService implements ExecutorService { + + protected abstract ExecutorService delegate(); + + @Override + public void execute(Runnable command) { + delegate().execute(command); + } + + @Override + public void shutdown() { + delegate().shutdown(); + } + + @Override + public List shutdownNow() { + return delegate().shutdownNow(); + } + + @Override + public boolean isShutdown() { + return delegate().isShutdown(); + } + + @Override + public boolean isTerminated() { + return delegate().isTerminated(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return delegate().awaitTermination(timeout, unit); + } + + @Override + public Future submit(Callable task) { + return delegate().submit(task); + } + + @Override + public Future submit(Runnable task, T result) { + return delegate().submit(task, result); + } + + @Override + public Future submit(Runnable task) { + return delegate().submit(task); + } + + @Override + public List> invokeAll(Collection> tasks) throws InterruptedException { + return delegate().invokeAll(tasks); + } + + @Override + public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) + throws InterruptedException { + return delegate().invokeAll(tasks, timeout, unit); + } + + @Override + public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { + return delegate().invokeAny(tasks); + } + + @Override + public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return delegate().invokeAny(tasks, timeout, unit); + } + +} diff --git a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/NoopShutdownExecutorService.java b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/NoopShutdownExecutorService.java new file mode 100644 index 0000000000000..01529b4437569 --- /dev/null +++ b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/NoopShutdownExecutorService.java @@ -0,0 +1,39 @@ +package io.quarkus.vertx.core.runtime; + +import java.util.List; +import java.util.concurrent.ExecutorService; + +import org.jboss.logging.Logger; + +import io.quarkus.runtime.util.ForwardingExecutorService; + +/** + * This executor is only used in the prod mode as the Vertx worker thread pool. + */ +class NoopShutdownExecutorService extends ForwardingExecutorService { + + private static final Logger LOG = Logger.getLogger(NoopShutdownExecutorService.class); + + private final ExecutorService delegate; + + NoopShutdownExecutorService(ExecutorService delegate) { + this.delegate = delegate; + } + + @Override + protected ExecutorService delegate() { + return delegate; + } + + @Override + public void shutdown() { + LOG.debug("shutdown() deliberately not delegated"); + } + + @Override + public List shutdownNow() { + LOG.debug("shutdownNow() deliberately not delegated"); + return List.of(); + } + +} diff --git a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/VertxCoreRecorder.java b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/VertxCoreRecorder.java index d22eca827daa4..213d0c416a895 100644 --- a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/VertxCoreRecorder.java +++ b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/VertxCoreRecorder.java @@ -100,7 +100,16 @@ public class VertxCoreRecorder { public Supplier configureVertx(VertxConfiguration config, ThreadPoolConfig threadPoolConfig, LaunchMode launchMode, ShutdownContext shutdown, List> customizers, ExecutorService executorProxy) { - QuarkusExecutorFactory.sharedExecutor = executorProxy; + if (launchMode == LaunchMode.NORMAL) { + // In the prod mode we wrap the ExecutorService and the shutdown() and shutdownNow() are deliberately not delegated + // This is a workaround to solve the problem described in https://github.com/quarkusio/quarkus/issues/16833#issuecomment-1917042589 + // The Vertx instance is closed before io.quarkus.runtime.ExecutorRecorder.createShutdownTask() is used + // And when it's closed the underlying worker thread pool (which is in the prod mode backed by the ExecutorBuildItem) is closed as well + // As a result the quarkus.thread-pool.shutdown-interrupt config property and logic defined in ExecutorRecorder.createShutdownTask() is completely ignored + QuarkusExecutorFactory.sharedExecutor = new NoopShutdownExecutorService(executorProxy); + } else { + QuarkusExecutorFactory.sharedExecutor = executorProxy; + } if (launchMode != LaunchMode.DEVELOPMENT) { vertx = new VertxSupplier(launchMode, config, customizers, threadPoolConfig, shutdown); // we need this to be part of the last shutdown tasks because closing it early (basically before Arc) diff --git a/extensions/virtual-threads/runtime/src/main/java/io/quarkus/virtual/threads/DelegatingExecutorService.java b/extensions/virtual-threads/runtime/src/main/java/io/quarkus/virtual/threads/DelegatingExecutorService.java index 089515a5f162f..d4b31e6f0ae8a 100644 --- a/extensions/virtual-threads/runtime/src/main/java/io/quarkus/virtual/threads/DelegatingExecutorService.java +++ b/extensions/virtual-threads/runtime/src/main/java/io/quarkus/virtual/threads/DelegatingExecutorService.java @@ -1,26 +1,24 @@ package io.quarkus.virtual.threads; -import java.util.Collection; import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; + +import io.quarkus.runtime.util.ForwardingExecutorService; /** * An implementation of {@code ExecutorService} that delegates to the real executor, while disallowing termination. */ -class DelegatingExecutorService implements ExecutorService { +class DelegatingExecutorService extends ForwardingExecutorService { private final ExecutorService delegate; DelegatingExecutorService(final ExecutorService delegate) { this.delegate = delegate; } - public void execute(final Runnable command) { - delegate.execute(command); + @Override + protected ExecutorService delegate() { + return delegate; } public boolean isShutdown() { @@ -45,43 +43,6 @@ public List shutdownNow() { throw new UnsupportedOperationException("shutdownNow not allowed on managed executor service"); } - @Override - public Future submit(Callable task) { - return delegate.submit(task); - } - - @Override - public Future submit(Runnable task, T result) { - return delegate.submit(task, result); - } - - @Override - public Future submit(Runnable task) { - return delegate.submit(task); - } - - @Override - public List> invokeAll(Collection> tasks) throws InterruptedException { - return delegate.invokeAll(tasks); - } - - @Override - public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) - throws InterruptedException { - return delegate.invokeAll(tasks, timeout, unit); - } - - @Override - public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { - return delegate.invokeAny(tasks); - } - - @Override - public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - return delegate.invokeAny(tasks, timeout, unit); - } - public String toString() { return delegate.toString(); }