diff --git a/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/DefaultInvoker.java b/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/DefaultInvoker.java index 63ec3717646b8..c587b0c056e32 100644 --- a/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/DefaultInvoker.java +++ b/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/DefaultInvoker.java @@ -3,6 +3,7 @@ import java.util.concurrent.CompletionStage; import io.quarkus.arc.Arc; +import io.quarkus.arc.InjectableContext.ContextState; import io.quarkus.arc.ManagedContext; import io.quarkus.scheduler.ScheduledExecution; @@ -14,15 +15,26 @@ public CompletionStage invoke(ScheduledExecution execution) throws Excepti if (requestContext.isActive()) { return invokeBean(execution); } else { + // 1. Activate the context + // 2. Capture the state (which is basically a shared Map instance) + // 3. Destroy the context correctly when the returned stage completes + requestContext.activate(); + final ContextState state = requestContext.getState(); try { - requestContext.activate(); - return invokeBean(execution); - } finally { + return invokeBean(execution).whenComplete((v, t) -> { + requestContext.destroy(state); + }); + } catch (RuntimeException e) { + // Just terminate the context and rethrow the exception if something goes really wrong requestContext.terminate(); + throw e; + } finally { + // Always deactivate the context + requestContext.deactivate(); } } } - protected abstract CompletionStage invokeBean(ScheduledExecution execution) throws Exception; + protected abstract CompletionStage invokeBean(ScheduledExecution execution); } diff --git a/extensions/scheduler/deployment/src/main/java/io/quarkus/scheduler/deployment/SchedulerProcessor.java b/extensions/scheduler/deployment/src/main/java/io/quarkus/scheduler/deployment/SchedulerProcessor.java index ddaff0e644fe8..3fcf66bbf4c12 100644 --- a/extensions/scheduler/deployment/src/main/java/io/quarkus/scheduler/deployment/SchedulerProcessor.java +++ b/extensions/scheduler/deployment/src/main/java/io/quarkus/scheduler/deployment/SchedulerProcessor.java @@ -409,8 +409,7 @@ private String generateInvoker(ScheduledBusinessMethodItem scheduledMethod, Clas ScheduledExecution.class.getName(), SchedulerDotNames.CONTINUATION.toString()); } else { // The descriptor is: CompletionStage invoke(ScheduledExecution execution) - invoke = invokerCreator.getMethodCreator("invokeBean", CompletionStage.class, ScheduledExecution.class) - .addException(Exception.class); + invoke = invokerCreator.getMethodCreator("invokeBean", CompletionStage.class, ScheduledExecution.class); } // Use a try-catch block and return failed future if an exception is thrown diff --git a/extensions/scheduler/deployment/src/test/java/io/quarkus/scheduler/test/nonblocking/NonBlockingScheduledMethodTest.java b/extensions/scheduler/deployment/src/test/java/io/quarkus/scheduler/test/nonblocking/NonBlockingScheduledMethodTest.java index 1032748b0c7f4..2ddbbc58e735d 100644 --- a/extensions/scheduler/deployment/src/test/java/io/quarkus/scheduler/test/nonblocking/NonBlockingScheduledMethodTest.java +++ b/extensions/scheduler/deployment/src/test/java/io/quarkus/scheduler/test/nonblocking/NonBlockingScheduledMethodTest.java @@ -1,5 +1,6 @@ package io.quarkus.scheduler.test.nonblocking; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -10,13 +11,19 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import javax.annotation.PreDestroy; +import javax.enterprise.context.RequestScoped; import javax.enterprise.event.Observes; import javax.inject.Singleton; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import io.quarkus.arc.Arc; +import io.quarkus.arc.Unremovable; +import io.quarkus.scheduler.FailedExecution; import io.quarkus.scheduler.Scheduled; import io.quarkus.scheduler.ScheduledExecution; import io.quarkus.scheduler.SuccessfulExecution; @@ -30,7 +37,7 @@ public class NonBlockingScheduledMethodTest { @RegisterExtension static final QuarkusUnitTest test = new QuarkusUnitTest() - .withApplicationRoot(root -> root.addClasses(Jobs.class, JobWasExecuted.class)); + .withApplicationRoot(root -> root.addClasses(Jobs.class, JobWasExecuted.class, Naeb.class)); @Test public void testVoid() throws InterruptedException { @@ -46,6 +53,10 @@ public void testUni() throws InterruptedException { assertTrue(Jobs.UNI_ON_EVENT_LOOP.get()); assertTrue(Jobs.SUCCESS_LATCH.await(5, TimeUnit.SECONDS)); assertEvents("every_uni"); + assertTrue(Jobs.UNI_FAIL_LATCH.await(5, TimeUnit.SECONDS)); + assertTrue(Jobs.FAILED_LATCH.await(5, TimeUnit.SECONDS)); + // the bean should be destroyed twice + assertEquals(2, Naeb.DESTROYED_COUNTER.get()); } @Test @@ -57,12 +68,12 @@ public void testCompletionStage() throws InterruptedException { } private void assertEvents(String id) { - for (SuccessfulExecution exec : Jobs.events) { + for (SuccessfulExecution exec : Jobs.successEvents) { if (exec.getExecution().getTrigger().getId().equals(id)) { return; } } - fail("No SuccessfulExecution event fired for " + id + ": " + Jobs.events); + fail("No SuccessfulExecution event fired for " + id + ": " + Jobs.successEvents); } static class Jobs { @@ -70,6 +81,7 @@ static class Jobs { // jobs executed static final CountDownLatch VOID_LATCH = new CountDownLatch(1); static final CountDownLatch UNI_LATCH = new CountDownLatch(1); + static final CountDownLatch UNI_FAIL_LATCH = new CountDownLatch(1); static final CountDownLatch CS_LATCH = new CountDownLatch(1); // jobs executed on the event loop @@ -79,13 +91,22 @@ static class Jobs { // successful events static final CountDownLatch SUCCESS_LATCH = new CountDownLatch(3); - static final List events = new CopyOnWriteArrayList<>(); + static final List successEvents = new CopyOnWriteArrayList<>(); + + // failed events + static final CountDownLatch FAILED_LATCH = new CountDownLatch(1); + static final List failedEvents = new CopyOnWriteArrayList<>(); static void onSuccess(@Observes SuccessfulExecution event) { - events.add(event); + successEvents.add(event); SUCCESS_LATCH.countDown(); } + static void onFailure(@Observes FailedExecution event) { + failedEvents.add(event); + FAILED_LATCH.countDown(); + } + @NonBlocking @Scheduled(every = "0.5s", identity = "every_void", skipExecutionIf = JobWasExecuted.class) void everySecond() { @@ -97,7 +118,18 @@ void everySecond() { Uni everySecondUni() { UNI_ON_EVENT_LOOP.set(Context.isOnEventLoopThread() && VertxContext.isOnDuplicatedContext()); UNI_LATCH.countDown(); - return Uni.createFrom().voidItem(); + return Uni.createFrom().voidItem().invoke(Void -> { + // this callback is executed (and the bean instance is created) after the scheduled method completes + Arc.container().instance(Naeb.class).get().doSomething(); + }); + } + + @Scheduled(every = "0.5s", identity = "every_uni_fail", skipExecutionIf = JobWasExecuted.class) + Uni everySecondUniFailure() { + UNI_FAIL_LATCH.countDown(); + // the bean instance is created before the scheduled method completes + Arc.container().instance(Naeb.class).get().doSomething(); + throw new IllegalStateException("FAIL!"); } @Scheduled(every = "0.5s", identity = "every_cs", skipExecutionIf = JobWasExecuted.class) @@ -108,6 +140,7 @@ CompletionStage everySecondCompletionStage() { ret.complete(null); return ret; } + } @Singleton @@ -120,6 +153,8 @@ public boolean test(ScheduledExecution execution) { return Jobs.VOID_LATCH.getCount() == 0; case "every_uni": return Jobs.UNI_LATCH.getCount() == 0; + case "every_uni_fail": + return Jobs.UNI_FAIL_LATCH.getCount() == 0; case "every_cs": return Jobs.CS_LATCH.getCount() == 0; default: @@ -129,4 +164,19 @@ public boolean test(ScheduledExecution execution) { } + @Unremovable // this bean is "unused" + @RequestScoped + static class Naeb { + + static final AtomicInteger DESTROYED_COUNTER = new AtomicInteger(); + + void doSomething() { + } + + @PreDestroy + void destroy() { + DESTROYED_COUNTER.incrementAndGet(); + } + } + }