Skip to content

Commit

Permalink
Vertx: use NoopShutdownExecutorService in the prod mode
Browse files Browse the repository at this point in the history
- in the prod mode we wrap the ExecutorService and the shutdown() and
shutdownNow() are deliberately not delegated
- this is a workaround to solve the problem described in
#16833 (comment)
- the Vertx instance is closed before
io.quarkus.runtime.ExecutorRecorder.createShutdownTask() is used
- and when it's closed the underlying worker thread pool (which is in
the prod mode backed by the ExecutorBuildItem) is closed as well
- as a result the quarkus.thread-pool.shutdown-interrupt config property
and logic defined in ExecutorRecorder.createShutdownTask() is completely
ignored
  • Loading branch information
mkouba committed Feb 1, 2024
1 parent 8f66719 commit 83d00b6
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 46 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package io.quarkus.runtime.util;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* Forwards all method calls to the executor service returned from the {@link #delegate()} method. Only non-default methods
* declared on the {@link ExecutorService} interface are forwarded.
*/
public abstract class ForwardingExecutorService implements ExecutorService {

protected abstract ExecutorService delegate();

@Override
public void execute(Runnable command) {
delegate().execute(command);
}

@Override
public void shutdown() {
delegate().shutdown();
}

@Override
public List<Runnable> shutdownNow() {
return delegate().shutdownNow();
}

@Override
public boolean isShutdown() {
return delegate().isShutdown();
}

@Override
public boolean isTerminated() {
return delegate().isTerminated();
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return delegate().awaitTermination(timeout, unit);
}

@Override
public <T> Future<T> submit(Callable<T> task) {
return delegate().submit(task);
}

@Override
public <T> Future<T> submit(Runnable task, T result) {
return delegate().submit(task, result);
}

@Override
public Future<?> submit(Runnable task) {
return delegate().submit(task);
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
return delegate().invokeAll(tasks);
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException {
return delegate().invokeAll(tasks, timeout, unit);
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
return delegate().invokeAny(tasks);
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return delegate().invokeAny(tasks, timeout, unit);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package io.quarkus.vertx.core.runtime;

import java.util.List;
import java.util.concurrent.ExecutorService;

import org.jboss.logging.Logger;

import io.quarkus.runtime.util.ForwardingExecutorService;

/**
* This executor is only used in the prod mode as the Vertx worker thread pool.
*/
class NoopShutdownExecutorService extends ForwardingExecutorService {

private static final Logger LOG = Logger.getLogger(NoopShutdownExecutorService.class);

private final ExecutorService delegate;

NoopShutdownExecutorService(ExecutorService delegate) {
this.delegate = delegate;
}

@Override
protected ExecutorService delegate() {
return delegate;
}

@Override
public void shutdown() {
LOG.debug("shutdown() deliberately not delegated");
}

@Override
public List<Runnable> shutdownNow() {
LOG.debug("shutdownNow() deliberately not delegated");
return List.of();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,16 @@ public class VertxCoreRecorder {
public Supplier<Vertx> configureVertx(VertxConfiguration config, ThreadPoolConfig threadPoolConfig,
LaunchMode launchMode, ShutdownContext shutdown, List<Consumer<VertxOptions>> customizers,
ExecutorService executorProxy) {
QuarkusExecutorFactory.sharedExecutor = executorProxy;
if (launchMode == LaunchMode.NORMAL) {
// In the prod mode we wrap the ExecutorService and the shutdown() and shutdownNow() are deliberately not delegated
// This is a workaround to solve the problem described in https://github.com/quarkusio/quarkus/issues/16833#issuecomment-1917042589
// The Vertx instance is closed before io.quarkus.runtime.ExecutorRecorder.createShutdownTask() is used
// And when it's closed the underlying worker thread pool (which is in the prod mode backed by the ExecutorBuildItem) is closed as well
// As a result the quarkus.thread-pool.shutdown-interrupt config property and logic defined in ExecutorRecorder.createShutdownTask() is completely ignored
QuarkusExecutorFactory.sharedExecutor = new NoopShutdownExecutorService(executorProxy);
} else {
QuarkusExecutorFactory.sharedExecutor = executorProxy;
}
if (launchMode != LaunchMode.DEVELOPMENT) {
vertx = new VertxSupplier(launchMode, config, customizers, threadPoolConfig, shutdown);
// we need this to be part of the last shutdown tasks because closing it early (basically before Arc)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,24 @@
package io.quarkus.virtual.threads;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import io.quarkus.runtime.util.ForwardingExecutorService;

/**
* An implementation of {@code ExecutorService} that delegates to the real executor, while disallowing termination.
*/
class DelegatingExecutorService implements ExecutorService {
class DelegatingExecutorService extends ForwardingExecutorService {
private final ExecutorService delegate;

DelegatingExecutorService(final ExecutorService delegate) {
this.delegate = delegate;
}

public void execute(final Runnable command) {
delegate.execute(command);
@Override
protected ExecutorService delegate() {
return delegate;
}

public boolean isShutdown() {
Expand All @@ -45,43 +43,6 @@ public List<Runnable> shutdownNow() {
throw new UnsupportedOperationException("shutdownNow not allowed on managed executor service");
}

@Override
public <T> Future<T> submit(Callable<T> task) {
return delegate.submit(task);
}

@Override
public <T> Future<T> submit(Runnable task, T result) {
return delegate.submit(task, result);
}

@Override
public Future<?> submit(Runnable task) {
return delegate.submit(task);
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
return delegate.invokeAll(tasks);
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException {
return delegate.invokeAll(tasks, timeout, unit);
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
return delegate.invokeAny(tasks);
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return delegate.invokeAny(tasks, timeout, unit);
}

public String toString() {
return delegate.toString();
}
Expand Down

0 comments on commit 83d00b6

Please sign in to comment.