From 059b7b7ecb0579a2363f6e02f1105fd12d5df3d0 Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Mon, 20 Jun 2022 17:37:23 +0200 Subject: [PATCH] Mutiny 1.6.0 upgrade and capture Vert.x contexts across all Mutiny schedulers Fixes #25818 This is part of a coordinated fix across Quarkus and Mutiny where scheduler wrapping would cause Vert.x context propagation not to be done. Some changes have been adapted from the draft code from @luneo7 in the discussions of: - https://github.com/quarkusio/quarkus/pull/26242 - https://github.com/quarkusio/quarkus/issues/25818 See the matching changes in Mutiny: https://github.com/smallrye/smallrye-mutiny/pull/955 --- .../mutiny/deployment/MutinyProcessor.java | 13 +- .../ContextualRunnableScheduledFuture.java | 71 ++++++++++ .../mutiny/runtime/MutinyInfrastructure.java | 131 ++++++++++++++++-- .../vertx/core/runtime/VertxCoreRecorder.java | 5 +- .../regression/bug25818/BlockingService.java | 24 ++++ .../bug25818/ReproducerResource.java | 83 +++++++++++ .../it/resteasy/mutiny/RegressionTest.java | 51 +++++++ 7 files changed, 363 insertions(+), 15 deletions(-) create mode 100644 extensions/mutiny/runtime/src/main/java/io/quarkus/mutiny/runtime/ContextualRunnableScheduledFuture.java create mode 100644 integration-tests/resteasy-mutiny/src/main/java/io/quarkus/it/resteasy/mutiny/regression/bug25818/BlockingService.java create mode 100644 integration-tests/resteasy-mutiny/src/main/java/io/quarkus/it/resteasy/mutiny/regression/bug25818/ReproducerResource.java create mode 100644 integration-tests/resteasy-mutiny/src/test/java/io/quarkus/it/resteasy/mutiny/RegressionTest.java diff --git a/extensions/mutiny/deployment/src/main/java/io/quarkus/mutiny/deployment/MutinyProcessor.java b/extensions/mutiny/deployment/src/main/java/io/quarkus/mutiny/deployment/MutinyProcessor.java index 84206cabbff94..0f6434807dd91 100644 --- a/extensions/mutiny/deployment/src/main/java/io/quarkus/mutiny/deployment/MutinyProcessor.java +++ b/extensions/mutiny/deployment/src/main/java/io/quarkus/mutiny/deployment/MutinyProcessor.java @@ -1,10 +1,14 @@ package io.quarkus.mutiny.deployment; +import java.util.Optional; import java.util.concurrent.ExecutorService; +import org.jboss.threads.ContextHandler; + import io.quarkus.deployment.annotations.BuildStep; import io.quarkus.deployment.annotations.ExecutionTime; import io.quarkus.deployment.annotations.Record; +import io.quarkus.deployment.builditem.ContextHandlerBuildItem; import io.quarkus.deployment.builditem.ExecutorBuildItem; import io.quarkus.deployment.builditem.ShutdownContextBuildItem; import io.quarkus.mutiny.runtime.MutinyInfrastructure; @@ -13,10 +17,13 @@ public class MutinyProcessor { @BuildStep @Record(ExecutionTime.RUNTIME_INIT) - public void runtimeInit(ExecutorBuildItem executorBuildItem, MutinyInfrastructure recorder, - ShutdownContextBuildItem shutdownContext) { + public void runtimeInit(ExecutorBuildItem executorBuildItem, + MutinyInfrastructure recorder, + ShutdownContextBuildItem shutdownContext, + Optional contextHandler) { ExecutorService executor = executorBuildItem.getExecutorProxy(); - recorder.configureMutinyInfrastructure(executor, shutdownContext); + ContextHandler handler = contextHandler.map(ContextHandlerBuildItem::contextHandler).orElse(null); + recorder.configureMutinyInfrastructure(executor, shutdownContext, handler); } @BuildStep diff --git a/extensions/mutiny/runtime/src/main/java/io/quarkus/mutiny/runtime/ContextualRunnableScheduledFuture.java b/extensions/mutiny/runtime/src/main/java/io/quarkus/mutiny/runtime/ContextualRunnableScheduledFuture.java new file mode 100644 index 0000000000000..a8da87aa3dd90 --- /dev/null +++ b/extensions/mutiny/runtime/src/main/java/io/quarkus/mutiny/runtime/ContextualRunnableScheduledFuture.java @@ -0,0 +1,71 @@ +package io.quarkus.mutiny.runtime; + +import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.RunnableScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.jboss.threads.ContextHandler; + +class ContextualRunnableScheduledFuture implements RunnableScheduledFuture { + private final RunnableScheduledFuture runnable; + private final Object context; + private final ContextHandler contextHandler; + + public ContextualRunnableScheduledFuture(ContextHandler contextHandler, Object context, + RunnableScheduledFuture runnable) { + this.contextHandler = contextHandler; + this.context = context; + this.runnable = runnable; + } + + @Override + public boolean isPeriodic() { + return runnable.isPeriodic(); + } + + @Override + public long getDelay(TimeUnit unit) { + return runnable.getDelay(unit); + } + + @Override + public int compareTo(Delayed o) { + return runnable.compareTo(o); + } + + @Override + public void run() { + if (contextHandler != null) { + contextHandler.runWith(runnable, context); + } else { + runnable.run(); + } + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return runnable.cancel(mayInterruptIfRunning); + } + + @Override + public boolean isCancelled() { + return runnable.isCancelled(); + } + + @Override + public boolean isDone() { + return runnable.isDone(); + } + + @Override + public V get() throws InterruptedException, ExecutionException { + return runnable.get(); + } + + @Override + public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return runnable.get(timeout, unit); + } +} diff --git a/extensions/mutiny/runtime/src/main/java/io/quarkus/mutiny/runtime/MutinyInfrastructure.java b/extensions/mutiny/runtime/src/main/java/io/quarkus/mutiny/runtime/MutinyInfrastructure.java index 3d606f5fd71bb..bccee939eb21e 100644 --- a/extensions/mutiny/runtime/src/main/java/io/quarkus/mutiny/runtime/MutinyInfrastructure.java +++ b/extensions/mutiny/runtime/src/main/java/io/quarkus/mutiny/runtime/MutinyInfrastructure.java @@ -1,42 +1,153 @@ package io.quarkus.mutiny.runtime; -import java.util.concurrent.Executor; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RunnableScheduledFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.function.BooleanSupplier; import java.util.function.Consumer; import org.jboss.logging.Logger; +import org.jboss.threads.ContextHandler; import io.quarkus.runtime.ShutdownContext; import io.quarkus.runtime.annotations.Recorder; import io.smallrye.mutiny.infrastructure.Infrastructure; +import io.smallrye.mutiny.infrastructure.MutinyScheduler; @Recorder public class MutinyInfrastructure { public static final String VERTX_EVENT_LOOP_THREAD_PREFIX = "vert.x-eventloop-thread-"; - public void configureMutinyInfrastructure(ExecutorService exec, ShutdownContext shutdownContext) { - //mutiny leaks a ScheduledExecutorService if you don't do this + public void configureMutinyInfrastructure(ExecutorService executor, ShutdownContext shutdownContext, + ContextHandler contextHandler) { + // Mutiny leaks a ScheduledExecutorService if we don't do this Infrastructure.getDefaultWorkerPool().shutdown(); - Infrastructure.setDefaultExecutor(new Executor() { + + // Since executor is not a ScheduledExecutorService and Mutiny needs one for scheduling we have to adapt one around the provided executor + MutinyScheduler mutinyScheduler = new MutinyScheduler(executor) { + @Override + protected RunnableScheduledFuture decorateTask(Runnable runnable, RunnableScheduledFuture task) { + Object context = (contextHandler != null) ? contextHandler.captureContext() : null; + return super.decorateTask(runnable, new ContextualRunnableScheduledFuture<>(contextHandler, context, task)); + } + + @Override + protected RunnableScheduledFuture decorateTask(Callable callable, RunnableScheduledFuture task) { + Object context = (contextHandler != null) ? contextHandler.captureContext() : null; + return super.decorateTask(callable, new ContextualRunnableScheduledFuture<>(contextHandler, context, task)); + } + }; + Infrastructure.setDefaultExecutor(new ScheduledExecutorService() { + + @Override + public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + return mutinyScheduler.schedule(command, delay, unit); + } + + @Override + public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + return mutinyScheduler.schedule(callable, delay, unit); + } + + @Override + public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { + return mutinyScheduler.scheduleAtFixedRate(command, initialDelay, period, unit); + } + + @Override + public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + return mutinyScheduler.scheduleWithFixedDelay(command, initialDelay, delay, unit); + } + + @Override + public void shutdown() { + mutinyScheduler.shutdown(); // ...but do not shut `executor` down + } + + @Override + public List shutdownNow() { + return mutinyScheduler.shutdownNow(); + } + + @Override + public boolean isShutdown() { + return mutinyScheduler.isShutdown(); + } + + @Override + public boolean isTerminated() { + return mutinyScheduler.isTerminated(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return mutinyScheduler.awaitTermination(timeout, unit); + } + + @Override + public Future submit(Callable task) { + return executor.submit(task); + } + + @Override + public Future submit(Runnable task, T result) { + return executor.submit(task, result); + } + + @Override + public Future submit(Runnable task) { + return executor.submit(task); + } + + @Override + public List> invokeAll(Collection> tasks) throws InterruptedException { + return executor.invokeAll(tasks); + } + + @Override + public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) + throws InterruptedException { + return executor.invokeAll(tasks, timeout, unit); + } + + @Override + public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { + return executor.invokeAny(tasks); + } + + @Override + public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return executor.invokeAny(tasks, timeout, unit); + } + @Override public void execute(Runnable command) { try { - exec.execute(command); - } catch (RejectedExecutionException e) { - if (!exec.isShutdown() && !exec.isTerminated()) { - throw e; + executor.execute(command); + } catch (RejectedExecutionException rejected) { + // Ignore submission failures on application shutdown + if (!executor.isShutdown() && !executor.isTerminated()) { + throw rejected; } - // Ignore the failure - the application has been shutdown. } } }); + shutdownContext.addLastShutdownTask(new Runnable() { @Override public void run() { - Infrastructure.getDefaultWorkerPool().shutdown(); + mutinyScheduler.shutdown(); } }); } diff --git a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/VertxCoreRecorder.java b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/VertxCoreRecorder.java index 5a6af7cfcfae5..0e5cc2e3a2fc4 100644 --- a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/VertxCoreRecorder.java +++ b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/VertxCoreRecorder.java @@ -546,14 +546,15 @@ public Object captureContext() { @Override public void runWith(Runnable task, Object context) { - if (context != null) { + ContextInternal currentContext = (ContextInternal) Vertx.currentContext(); + if (context != null && context != currentContext) { // Only do context handling if it's non-null final ContextInternal vertxContext = (ContextInternal) context; vertxContext.beginDispatch(); try { task.run(); } finally { - vertxContext.endDispatch(null); + vertxContext.endDispatch(currentContext); } } else { task.run(); diff --git a/integration-tests/resteasy-mutiny/src/main/java/io/quarkus/it/resteasy/mutiny/regression/bug25818/BlockingService.java b/integration-tests/resteasy-mutiny/src/main/java/io/quarkus/it/resteasy/mutiny/regression/bug25818/BlockingService.java new file mode 100644 index 0000000000000..8d90464e7455b --- /dev/null +++ b/integration-tests/resteasy-mutiny/src/main/java/io/quarkus/it/resteasy/mutiny/regression/bug25818/BlockingService.java @@ -0,0 +1,24 @@ +package io.quarkus.it.resteasy.mutiny.regression.bug25818; + +import javax.enterprise.context.ApplicationScoped; + +import io.vertx.core.Context; +import io.vertx.core.Vertx; + +@ApplicationScoped +public class BlockingService { + + public String getBlocking() { + try { + Thread.sleep(250); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + Context context = Vertx.currentContext(); + if (context == null) { + return "~~ context is null ~~"; + } else { + return "hello-" + context.getLocal("hello-target"); + } + } +} diff --git a/integration-tests/resteasy-mutiny/src/main/java/io/quarkus/it/resteasy/mutiny/regression/bug25818/ReproducerResource.java b/integration-tests/resteasy-mutiny/src/main/java/io/quarkus/it/resteasy/mutiny/regression/bug25818/ReproducerResource.java new file mode 100644 index 0000000000000..f1bda2b1aec4c --- /dev/null +++ b/integration-tests/resteasy-mutiny/src/main/java/io/quarkus/it/resteasy/mutiny/regression/bug25818/ReproducerResource.java @@ -0,0 +1,83 @@ +package io.quarkus.it.resteasy.mutiny.regression.bug25818; + +import java.util.concurrent.TimeUnit; + +import javax.inject.Inject; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; + +import org.jboss.logging.Logger; + +import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.infrastructure.Infrastructure; +import io.vertx.core.Context; +import io.vertx.core.Vertx; + +@Path("/reproducer/25818") +public class ReproducerResource { + + private final Logger logger = Logger.getLogger(ReproducerResource.class); + + @Inject + BlockingService service; + + private void addToContext() { + Vertx.currentContext().putLocal("hello-target", "you"); + } + + @GET + @Path("/worker-pool") + @Produces(MediaType.TEXT_PLAIN) + public Uni workerPool() { + logger.info("worker pool endpoint"); + addToContext(); + return Uni.createFrom() + .item(service::getBlocking) + .runSubscriptionOn(Infrastructure.getDefaultWorkerPool()); + } + + @GET + @Path("/default-executor") + @Produces(MediaType.TEXT_PLAIN) + public Uni defaultExecutor() { + logger.info("default executor endpoint"); + addToContext(); + return Uni.createFrom() + .item(service::getBlocking) + .runSubscriptionOn(Infrastructure.getDefaultExecutor()); + } + + @GET + @Path("/worker-pool-submit") + public Uni workerPoolSubmit() { + Vertx.currentContext().putLocal("yolo", "yolo"); + return Uni.createFrom().emitter(emitter -> { + Infrastructure.getDefaultWorkerPool().submit(() -> { + Context ctx = Vertx.currentContext(); + if (ctx != null) { + emitter.complete("yolo -> " + ctx.getLocal("yolo")); + } else { + emitter.complete("Context was null"); + } + }); + }); + } + + @GET + @Path("/worker-pool-schedule") + public Uni workerPoolSchedule() { + Vertx.currentContext().putLocal("yolo", "yolo"); + return Uni.createFrom().emitter(emitter -> { + Infrastructure.getDefaultWorkerPool().schedule(() -> { + Context ctx = Vertx.currentContext(); + if (ctx != null) { + emitter.complete("yolo -> " + ctx.getLocal("yolo")); + } else { + emitter.complete("Context was null"); + } + }, 25, TimeUnit.MILLISECONDS); + }); + } +} diff --git a/integration-tests/resteasy-mutiny/src/test/java/io/quarkus/it/resteasy/mutiny/RegressionTest.java b/integration-tests/resteasy-mutiny/src/test/java/io/quarkus/it/resteasy/mutiny/RegressionTest.java new file mode 100644 index 0000000000000..e5c059289ef41 --- /dev/null +++ b/integration-tests/resteasy-mutiny/src/test/java/io/quarkus/it/resteasy/mutiny/RegressionTest.java @@ -0,0 +1,51 @@ +package io.quarkus.it.resteasy.mutiny; + +import static io.restassured.RestAssured.get; +import static org.hamcrest.CoreMatchers.is; + +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +import io.quarkus.test.junit.QuarkusTest; + +@QuarkusTest +public class RegressionTest { + + @Nested + @DisplayName("Regression tests for #25818 (see https://github.com/quarkusio/quarkus/issues/25818)") + public class Bug25818 { + + @Test + public void testDefaultExecutor() { + get("/reproducer/25818/default-executor") + .then() + .body(is("hello-you")) + .statusCode(200); + } + + @Test + public void testWorkerPool() { + get("/reproducer/25818/worker-pool") + .then() + .body(is("hello-you")) + .statusCode(200); + } + + @Test + public void yolo1() { + get("/reproducer/25818/worker-pool-submit") + .then() + .body(is("yolo -> yolo")) + .statusCode(200); + } + + @Test + public void yolo2() { + get("/reproducer/25818/worker-pool-schedule") + .then() + .body(is("yolo -> yolo")) + .statusCode(200); + } + } +}