diff --git a/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/DelayedExecutionInvoker.java b/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/DelayedExecutionInvoker.java index 6e343ac35ab5e..4faa121b25794 100644 --- a/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/DelayedExecutionInvoker.java +++ b/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/DelayedExecutionInvoker.java @@ -53,12 +53,7 @@ public CompletionStage invoke(ScheduledExecution execution) throws Excepti executor.schedule(new Runnable() { @Override public void run() { - try { - delegate.invoke(execution); - ret.complete(null); - } catch (Exception e) { - ret.completeExceptionally(e); - } + invokeComplete(ret, execution); } }, delay, TimeUnit.MILLISECONDS); return ret; diff --git a/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/DelegateInvoker.java b/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/DelegateInvoker.java index a2245862d7fb0..5b25e3bebd99b 100644 --- a/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/DelegateInvoker.java +++ b/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/DelegateInvoker.java @@ -2,6 +2,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.function.BiConsumer; import io.quarkus.scheduler.ScheduledExecution; @@ -30,4 +31,17 @@ protected CompletionStage invokeDelegate(ScheduledExecution execution) { return CompletableFuture.failedStage(e); } } + + protected void invokeComplete(CompletableFuture ret, ScheduledExecution execution) { + invokeDelegate(execution).whenComplete(new BiConsumer<>() { + @Override + public void accept(Void r, Throwable t) { + if (t != null) { + ret.completeExceptionally(t); + } else { + ret.complete(null); + } + } + }); + } } diff --git a/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/OffloadingInvoker.java b/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/OffloadingInvoker.java index 3026c382fd282..23b8605aa6e5a 100644 --- a/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/OffloadingInvoker.java +++ b/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/OffloadingInvoker.java @@ -1,6 +1,7 @@ package io.quarkus.scheduler.common.runtime; import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import io.quarkus.scheduler.ScheduledExecution; @@ -28,6 +29,7 @@ public OffloadingInvoker(ScheduledInvoker delegate, Vertx vertx) { @Override public CompletionStage invoke(ScheduledExecution execution) throws Exception { + CompletableFuture ret = new CompletableFuture<>(); Context context = VertxContext.getOrCreateDuplicatedContext(vertx); VertxContextSafetyToggle.setContextSafe(context, true); if (delegate.isBlocking()) { @@ -40,7 +42,7 @@ public void handle(Void event) { VirtualThreadsRecorder.getCurrent().execute(new Runnable() { @Override public void run() { - doInvoke(execution); + invokeComplete(ret, execution); } }); } @@ -49,7 +51,7 @@ public void run() { context.executeBlocking(new Callable() { @Override public Void call() { - doInvoke(execution); + invokeComplete(ret, execution); return null; } }, false); @@ -58,19 +60,11 @@ public Void call() { context.runOnContext(new Handler() { @Override public void handle(Void event) { - doInvoke(execution); + invokeComplete(ret, execution); } }); } - return null; - } - - void doInvoke(ScheduledExecution execution) { - try { - delegate.invoke(execution); - } catch (Throwable t) { - // already logged by the StatusEmitterInvoker - } + return ret; } } diff --git a/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/ScheduledInvoker.java b/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/ScheduledInvoker.java index a7f1f6a80e702..b57f91648ddfe 100644 --- a/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/ScheduledInvoker.java +++ b/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/ScheduledInvoker.java @@ -11,7 +11,7 @@ public interface ScheduledInvoker { /** * @param execution - * @return the result + * @return the result, never {@code null} * @throws Exception */ CompletionStage invoke(ScheduledExecution execution) throws Exception;