Skip to content

Commit

Permalink
Scheduler - support non-blocking scheduled methods
Browse files Browse the repository at this point in the history
- resolves quarkusio#24621
  • Loading branch information
mkouba committed Mar 31, 2022
1 parent c6c2788 commit 026db5c
Show file tree
Hide file tree
Showing 12 changed files with 405 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,21 @@
public final class ScheduledBusinessMethodItem extends MultiBuildItem {

private final BeanInfo bean;

private final List<AnnotationInstance> schedules;

private final MethodInfo method;
private final boolean nonBlocking;

public ScheduledBusinessMethodItem(BeanInfo bean, MethodInfo method, List<AnnotationInstance> schedules) {
this(bean, method, schedules, false);
}

public ScheduledBusinessMethodItem(BeanInfo bean, MethodInfo method, List<AnnotationInstance> schedules,
boolean hasNonBlockingAnnotation) {
this.bean = bean;
this.method = method;
this.schedules = schedules;
this.nonBlocking = hasNonBlockingAnnotation || SchedulerDotNames.COMPLETION_STAGE.equals(method.returnType().name())
|| SchedulerDotNames.UNI.equals(method.returnType().name());
}

/**
Expand All @@ -38,4 +44,8 @@ public List<AnnotationInstance> getSchedules() {
return schedules;
}

public boolean isNonBlocking() {
return nonBlocking;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.quarkus.scheduler.deployment;

import java.util.concurrent.CompletionStage;

import org.jboss.jandex.DotName;

import io.quarkus.scheduler.Scheduled;
import io.smallrye.common.annotation.NonBlocking;

class SchedulerDotNames {

static final DotName SCHEDULED_NAME = DotName.createSimple(Scheduled.class.getName());
static final DotName SCHEDULES_NAME = DotName.createSimple(Scheduled.Schedules.class.getName());
static final DotName SKIP_NEVER_NAME = DotName.createSimple(Scheduled.Never.class.getName());
static final DotName SKIP_PREDICATE = DotName.createSimple(Scheduled.SkipPredicate.class.getName());
static final DotName NON_BLOCKING = DotName.createSimple(NonBlocking.class.getName());
static final DotName UNI = DotName.createSimple("io.smallrye.mutiny.Uni");
static final DotName COMPLETION_STAGE = DotName.createSimple(CompletionStage.class.getName());
static final DotName VOID = DotName.createSimple(Void.class.getName());

}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package io.quarkus.scheduler.test.nonblocking;

import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.enterprise.event.Observes;
import javax.inject.Singleton;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.scheduler.Scheduled;
import io.quarkus.scheduler.ScheduledExecution;
import io.quarkus.scheduler.SuccessfulExecution;
import io.quarkus.test.QuarkusUnitTest;
import io.smallrye.common.annotation.NonBlocking;
import io.smallrye.mutiny.Uni;
import io.vertx.core.Context;

public class NonBlockingScheduledMethodTest {

@RegisterExtension
static final QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot(root -> root.addClasses(Jobs.class, JobWasExecuted.class));

@Test
public void testVoid() throws InterruptedException {
assertTrue(Jobs.VOID_LATCH.await(5, TimeUnit.SECONDS));
assertTrue(Jobs.VOID_ON_EVENT_LOOP.get());
assertTrue(Jobs.SUCCESS_LATCH.await(5, TimeUnit.SECONDS));
assertEvents("every_void");
}

@Test
public void testUni() throws InterruptedException {
assertTrue(Jobs.UNI_LATCH.await(5, TimeUnit.SECONDS));
assertTrue(Jobs.UNI_ON_EVENT_LOOP.get());
assertTrue(Jobs.SUCCESS_LATCH.await(5, TimeUnit.SECONDS));
assertEvents("every_uni");
}

@Test
public void testCompletionStage() throws InterruptedException {
assertTrue(Jobs.CS_LATCH.await(5, TimeUnit.SECONDS));
assertTrue(Jobs.CS_ON_EVENT_LOOP.get());
assertTrue(Jobs.SUCCESS_LATCH.await(5, TimeUnit.SECONDS));
assertEvents("every_cs");
}

private void assertEvents(String id) {
for (SuccessfulExecution exec : Jobs.events) {
if (exec.getExecution().getTrigger().getId().equals(id)) {
return;
}
}
fail("No SuccessfulExecution event fired for " + id + ": " + Jobs.events);
}

static class Jobs {

// jobs executed
static final CountDownLatch VOID_LATCH = new CountDownLatch(1);
static final CountDownLatch UNI_LATCH = new CountDownLatch(1);
static final CountDownLatch CS_LATCH = new CountDownLatch(1);

// jobs executed on the event loop
static final AtomicBoolean VOID_ON_EVENT_LOOP = new AtomicBoolean();
static final AtomicBoolean UNI_ON_EVENT_LOOP = new AtomicBoolean();
static final AtomicBoolean CS_ON_EVENT_LOOP = new AtomicBoolean();

// sucessfull events
static final CountDownLatch SUCCESS_LATCH = new CountDownLatch(3);
static final List<SuccessfulExecution> events = new CopyOnWriteArrayList<>();

static void onSuccess(@Observes SuccessfulExecution event) {
events.add(event);
SUCCESS_LATCH.countDown();
}

@NonBlocking
@Scheduled(every = "0.5s", identity = "every_void", skipExecutionIf = JobWasExecuted.class)
void everySecond() {
VOID_ON_EVENT_LOOP.set(Context.isOnEventLoopThread());
VOID_LATCH.countDown();
}

@Scheduled(every = "0.5s", identity = "every_uni", skipExecutionIf = JobWasExecuted.class)
Uni<Void> everySecondUni() {
UNI_ON_EVENT_LOOP.set(Context.isOnEventLoopThread());
UNI_LATCH.countDown();
return Uni.createFrom().voidItem();
}

@Scheduled(every = "0.5s", identity = "every_cs", skipExecutionIf = JobWasExecuted.class)
CompletionStage<Void> everySecondCompletionStage() {
CompletableFuture<Void> ret = new CompletableFuture<Void>();
CS_ON_EVENT_LOOP.set(Context.isOnEventLoopThread());
CS_LATCH.countDown();
ret.complete(null);
return ret;
}
}

@Singleton
static class JobWasExecuted implements Scheduled.SkipPredicate {

@Override
public boolean test(ScheduledExecution execution) {
switch (execution.getTrigger().getId()) {
case "every_void":
return Jobs.VOID_LATCH.getCount() == 0;
case "every_uni":
return Jobs.UNI_LATCH.getCount() == 0;
case "every_cs":
return Jobs.CS_LATCH.getCount() == 0;
default:
return false;
}
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,13 @@
* }
* </pre>
*
* The annotated method must return {@code void} and either declare no parameters or one parameter of type
* The annotated method must return {@code void}, {@code java.util.concurrent.CompletionStage<Void>} or
* {@code io.smallrye.mutiny.Uni<Void>} and either declare no parameters or one parameter of type
* {@link ScheduledExecution}.
* <p>
* By default, a scheduled method is executed on the main executor for blocking tasks. However, a scheduled method that returns
* {@code java.util.concurrent.CompletionStage<Void>} or {@code io.smallrye.mutiny.Uni<Void>}, or is annotated with
* {@link io.smallrye.common.annotation.NonBlocking} is executed on the event loop.
*
* @see ScheduledExecution
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.quarkus.scheduler.runtime;

import java.util.concurrent.CompletionStage;

import io.quarkus.arc.Arc;
import io.quarkus.arc.ManagedContext;
import io.quarkus.scheduler.ScheduledExecution;

public abstract class DefaultInvoker implements ScheduledInvoker {

@Override
public CompletionStage<Void> invoke(ScheduledExecution execution) throws Exception {
ManagedContext requestContext = Arc.container().requestContext();
if (requestContext.isActive()) {
return invokeBean(execution);
} else {
try {
requestContext.activate();
return invokeBean(execution);
} finally {
requestContext.terminate();
}
}
}

protected abstract CompletionStage<Void> invokeBean(ScheduledExecution execution) throws Exception;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.quarkus.scheduler.runtime;

abstract class DelegateInvoker implements ScheduledInvoker {

protected final ScheduledInvoker delegate;

public DelegateInvoker(ScheduledInvoker delegate) {
this.delegate = delegate;
}

@Override
public boolean isBlocking() {
return delegate.isBlocking();
}

}
Original file line number Diff line number Diff line change
@@ -1,11 +1,30 @@
package io.quarkus.scheduler.runtime;

import io.quarkus.arc.runtime.BeanInvoker;
import java.util.concurrent.CompletionStage;

import io.quarkus.scheduler.ScheduledExecution;

/**
* Invokes a scheduled business method of a bean.
*/
public interface ScheduledInvoker extends BeanInvoker<ScheduledExecution> {
public interface ScheduledInvoker {

/**
*
* @param execution
* @return the result
* @throws Exception
*/
CompletionStage<Void> invoke(ScheduledExecution execution) throws Exception;

/**
* A blocking invoker is executed on the main executor for blocking tasks.
* A non-blocking invoker is executed on the event loop.
*
* @return {@code true} if the scheduled method is blocking, {@code false} otherwise
*/
default boolean isBlocking() {
return true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import io.quarkus.scheduler.SuccessfulExecution;
import io.quarkus.scheduler.Trigger;
import io.quarkus.scheduler.runtime.util.SchedulerUtils;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;

@Typed(Scheduler.class)
@Singleton
Expand All @@ -57,17 +59,19 @@ public class SimpleScheduler implements Scheduler {

private final ScheduledExecutorService scheduledExecutor;
private final ExecutorService executor;
private final Vertx vertx;
private volatile boolean running;
private final List<ScheduledTask> scheduledTasks;
private final boolean enabled;

public SimpleScheduler(SchedulerContext context, SchedulerRuntimeConfig schedulerRuntimeConfig,
Event<SkippedExecution> skippedExecutionEvent, Event<SuccessfulExecution> successExecutionEvent,
Event<FailedExecution> failedExecutionEvent) {
Event<FailedExecution> failedExecutionEvent, Vertx vertx) {
this.running = true;
this.enabled = schedulerRuntimeConfig.enabled;
this.scheduledTasks = new ArrayList<>();
this.executor = context.getExecutor();
this.vertx = vertx;

if (!schedulerRuntimeConfig.enabled) {
this.scheduledExecutor = null;
Expand Down Expand Up @@ -142,7 +146,7 @@ void checkTriggers() {
ZonedDateTime now = ZonedDateTime.now();
LOG.tracef("Check triggers at %s", now);
for (ScheduledTask task : scheduledTasks) {
task.execute(now, executor);
task.execute(now, executor, vertx);
}
}

Expand Down Expand Up @@ -288,29 +292,42 @@ static class ScheduledTask {
this.invoker = invoker;
}

void execute(ZonedDateTime now, ExecutorService executor) {
void execute(ZonedDateTime now, ExecutorService executor, Vertx vertx) {
if (!trigger.isRunning()) {
return;
}
ZonedDateTime scheduledFireTime = trigger.evaluate(now);
if (scheduledFireTime != null) {
try {
executor.execute(new Runnable() {
@Override
public void run() {
try {
invoker.invoke(new SimpleScheduledExecution(now, scheduledFireTime, trigger));
} catch (Throwable t) {
// already logged by the StatusEmitterInvoker
if (invoker.isBlocking()) {
try {
executor.execute(new Runnable() {
@Override
public void run() {
doInvoke(now, scheduledFireTime);
}
});
} catch (RejectedExecutionException e) {
LOG.warnf("Rejected execution of a scheduled task for trigger %s", trigger);
}
} else {
vertx.runOnContext(new Handler<Void>() {
@Override
public void handle(Void event) {
doInvoke(now, scheduledFireTime);
}
});
} catch (RejectedExecutionException e) {
LOG.warnf("Rejected execution of a scheduled task for trigger %s", trigger);
}
}
}

void doInvoke(ZonedDateTime now, ZonedDateTime scheduledFireTime) {
try {
invoker.invoke(new SimpleScheduledExecution(now, scheduledFireTime, trigger));
} catch (Throwable t) {
// already logged by the StatusEmitterInvoker
}
}

}

static abstract class SimpleTrigger implements Trigger {
Expand Down
Loading

0 comments on commit 026db5c

Please sign in to comment.