From ad6bfc930c05db533953dec22fdb1a1ff5ff30c7 Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Mon, 20 Jun 2022 17:37:23 +0200 Subject: [PATCH] Fix for #25818 and upgrade to Mutiny 1.6.0 This is part of a coordinated fix across Quarkus and Mutiny where scheduler wrapping would cause Vert.x context propagation not to be done. See https://github.com/smallrye/smallrye-mutiny/pull/955 Fixes #25818 --- bom/application/pom.xml | 2 +- .../mutiny/runtime/MutinyInfrastructure.java | 112 ++++++++++++++++-- .../regression/bug25818/BlockingService.java | 24 ++++ .../bug25818/ReproducerResource.java | 48 ++++++++ .../it/resteasy/mutiny/RegressionTest.java | 35 ++++++ 5 files changed, 208 insertions(+), 13 deletions(-) create mode 100644 integration-tests/resteasy-mutiny/src/main/java/io/quarkus/it/resteasy/mutiny/regression/bug25818/BlockingService.java create mode 100644 integration-tests/resteasy-mutiny/src/main/java/io/quarkus/it/resteasy/mutiny/regression/bug25818/ReproducerResource.java create mode 100644 integration-tests/resteasy-mutiny/src/test/java/io/quarkus/it/resteasy/mutiny/RegressionTest.java diff --git a/bom/application/pom.xml b/bom/application/pom.xml index 38365edd091ffd..f0167a72224008 100644 --- a/bom/application/pom.xml +++ b/bom/application/pom.xml @@ -133,7 +133,7 @@ 4.1.74.Final 1.0.3 3.5.0.Final - 1.5.0 + 1.6.0 3.1.0 1.8.0 1.1.8.4 diff --git a/extensions/mutiny/runtime/src/main/java/io/quarkus/mutiny/runtime/MutinyInfrastructure.java b/extensions/mutiny/runtime/src/main/java/io/quarkus/mutiny/runtime/MutinyInfrastructure.java index 3d606f5fd71bb2..fa197fbdc7e64d 100644 --- a/extensions/mutiny/runtime/src/main/java/io/quarkus/mutiny/runtime/MutinyInfrastructure.java +++ b/extensions/mutiny/runtime/src/main/java/io/quarkus/mutiny/runtime/MutinyInfrastructure.java @@ -1,8 +1,8 @@ package io.quarkus.mutiny.runtime; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.RejectedExecutionException; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.*; import java.util.function.BooleanSupplier; import java.util.function.Consumer; @@ -11,32 +11,120 @@ import io.quarkus.runtime.ShutdownContext; import io.quarkus.runtime.annotations.Recorder; import io.smallrye.mutiny.infrastructure.Infrastructure; +import io.smallrye.mutiny.infrastructure.MutinyScheduler; @Recorder public class MutinyInfrastructure { public static final String VERTX_EVENT_LOOP_THREAD_PREFIX = "vert.x-eventloop-thread-"; - public void configureMutinyInfrastructure(ExecutorService exec, ShutdownContext shutdownContext) { - //mutiny leaks a ScheduledExecutorService if you don't do this + public void configureMutinyInfrastructure(ExecutorService executor, ShutdownContext shutdownContext) { + // Mutiny leaks a ScheduledExecutorService if we don't do this Infrastructure.getDefaultWorkerPool().shutdown(); - Infrastructure.setDefaultExecutor(new Executor() { + + // Since executor is not a ScheduledExecutorService and Mutiny needs one for scheduling we have to adapt one around the provided executor + MutinyScheduler mutinyScheduler = new MutinyScheduler(executor); + Infrastructure.setDefaultExecutor(new ScheduledExecutorService() { + + @Override + public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + return mutinyScheduler.schedule(command, delay, unit); + } + + @Override + public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + return mutinyScheduler.schedule(callable, delay, unit); + } + + @Override + public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { + return mutinyScheduler.scheduleAtFixedRate(command, initialDelay, period, unit); + } + + @Override + public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + return mutinyScheduler.scheduleWithFixedDelay(command, initialDelay, delay, unit); + } + + @Override + public void shutdown() { + mutinyScheduler.shutdown(); // ...but do not shut `executor` down + } + + @Override + public List shutdownNow() { + return mutinyScheduler.shutdownNow(); + } + + @Override + public boolean isShutdown() { + return mutinyScheduler.isShutdown(); + } + + @Override + public boolean isTerminated() { + return mutinyScheduler.isTerminated(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return mutinyScheduler.awaitTermination(timeout, unit); + } + + @Override + public Future submit(Callable task) { + return executor.submit(task); + } + + @Override + public Future submit(Runnable task, T result) { + return executor.submit(task, result); + } + + @Override + public Future submit(Runnable task) { + return executor.submit(task); + } + + @Override + public List> invokeAll(Collection> tasks) throws InterruptedException { + return executor.invokeAll(tasks); + } + + @Override + public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) + throws InterruptedException { + return executor.invokeAll(tasks, timeout, unit); + } + + @Override + public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { + return executor.invokeAny(tasks); + } + + @Override + public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return executor.invokeAny(tasks, timeout, unit); + } + @Override public void execute(Runnable command) { try { - exec.execute(command); - } catch (RejectedExecutionException e) { - if (!exec.isShutdown() && !exec.isTerminated()) { - throw e; + executor.execute(command); + } catch (RejectedExecutionException rejected) { + // Ignore submission failures on application shutdown + if (!executor.isShutdown() && !executor.isTerminated()) { + throw rejected; } - // Ignore the failure - the application has been shutdown. } } }); + shutdownContext.addLastShutdownTask(new Runnable() { @Override public void run() { - Infrastructure.getDefaultWorkerPool().shutdown(); + mutinyScheduler.shutdown(); } }); } diff --git a/integration-tests/resteasy-mutiny/src/main/java/io/quarkus/it/resteasy/mutiny/regression/bug25818/BlockingService.java b/integration-tests/resteasy-mutiny/src/main/java/io/quarkus/it/resteasy/mutiny/regression/bug25818/BlockingService.java new file mode 100644 index 00000000000000..8d90464e7455b2 --- /dev/null +++ b/integration-tests/resteasy-mutiny/src/main/java/io/quarkus/it/resteasy/mutiny/regression/bug25818/BlockingService.java @@ -0,0 +1,24 @@ +package io.quarkus.it.resteasy.mutiny.regression.bug25818; + +import javax.enterprise.context.ApplicationScoped; + +import io.vertx.core.Context; +import io.vertx.core.Vertx; + +@ApplicationScoped +public class BlockingService { + + public String getBlocking() { + try { + Thread.sleep(250); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + Context context = Vertx.currentContext(); + if (context == null) { + return "~~ context is null ~~"; + } else { + return "hello-" + context.getLocal("hello-target"); + } + } +} diff --git a/integration-tests/resteasy-mutiny/src/main/java/io/quarkus/it/resteasy/mutiny/regression/bug25818/ReproducerResource.java b/integration-tests/resteasy-mutiny/src/main/java/io/quarkus/it/resteasy/mutiny/regression/bug25818/ReproducerResource.java new file mode 100644 index 00000000000000..025bd3e97de276 --- /dev/null +++ b/integration-tests/resteasy-mutiny/src/main/java/io/quarkus/it/resteasy/mutiny/regression/bug25818/ReproducerResource.java @@ -0,0 +1,48 @@ +package io.quarkus.it.resteasy.mutiny.regression.bug25818; + +import javax.inject.Inject; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; + +import org.jboss.logging.Logger; + +import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.infrastructure.Infrastructure; +import io.vertx.core.Vertx; + +@Path("/reproducer/25818") +public class ReproducerResource { + + private final Logger logger = Logger.getLogger(ReproducerResource.class); + + @Inject + BlockingService service; + + private void addToContext() { + Vertx.currentContext().putLocal("hello-target", "you"); + } + + @GET + @Path("/worker-pool") + @Produces(MediaType.TEXT_PLAIN) + public Uni workerPool() { + logger.info("worker pool endpoint"); + addToContext(); + return Uni.createFrom() + .item(service::getBlocking) + .runSubscriptionOn(Infrastructure.getDefaultWorkerPool()); + } + + @GET + @Path("/default-executor") + @Produces(MediaType.TEXT_PLAIN) + public Uni defaultExecutor() { + logger.info("default executor endpoint"); + addToContext(); + return Uni.createFrom() + .item(service::getBlocking) + .runSubscriptionOn(Infrastructure.getDefaultExecutor()); + } +} diff --git a/integration-tests/resteasy-mutiny/src/test/java/io/quarkus/it/resteasy/mutiny/RegressionTest.java b/integration-tests/resteasy-mutiny/src/test/java/io/quarkus/it/resteasy/mutiny/RegressionTest.java new file mode 100644 index 00000000000000..042aa8f0a7cd8c --- /dev/null +++ b/integration-tests/resteasy-mutiny/src/test/java/io/quarkus/it/resteasy/mutiny/RegressionTest.java @@ -0,0 +1,35 @@ +package io.quarkus.it.resteasy.mutiny; + +import static io.restassured.RestAssured.get; +import static org.hamcrest.CoreMatchers.is; + +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +import io.quarkus.test.junit.QuarkusTest; + +@QuarkusTest +public class RegressionTest { + + @Nested + @DisplayName("Regression tests for #25818 (see https://github.com/quarkusio/quarkus/issues/25818)") + public class Bug25818 { + + @Test + public void testDefaultExecutor() { + get("/reproducer/25818/default-executor") + .then() + .body(is("hello-you")) + .statusCode(200); + } + + @Test + public void testWorkerPool() { + get("/reproducer/25818/worker-pool") + .then() + .body(is("hello-you")) + .statusCode(200); + } + } +}