Skip to content

Commit

Permalink
Merge pull request quarkusio#18060 from mkouba/scheduler-conditional-…
Browse files Browse the repository at this point in the history
…exec

Scheduler - introduce Scheduled#skipExecutionIf()
  • Loading branch information
mkouba authored Jun 22, 2021
2 parents 71f9dc5 + 4954679 commit 21679e7
Show file tree
Hide file tree
Showing 13 changed files with 428 additions and 18 deletions.
55 changes: 55 additions & 0 deletions docs/src/main/asciidoc/scheduler-reference.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,61 @@ void nonConcurrent() {
----
<1> Concurrent executions are skipped.

TIP: A CDI event of type `io.quarkus.scheduler.SkippedExecution` is fired when an execution of a scheduled method is skipped.

[[conditional_execution]]
=== Conditional Execution

You can define the logic to skip any execution of a scheduled method via `@Scheduled#skipExecutionIf()`.
The specified bean class must implement `io.quarkus.scheduler.Scheduled.SkipPredicate` and the execution is skipped if the result of the `test()` method is `true`.

[source,java]
----
class Jobs {
@Scheduled(every = "1s", skipExecutionIf = MyPredicate.class) <1>
void everySecond() {
// do something every second...
}
}
@Singleton <2>
class MyPredicate implements SkipPredicate {
@Inject
MyService service;
boolean test(ScheduledExecution execution) {
return !service.isStarted(); <3>
}
}
----
<1> A bean instance of `MyPredicate.class` is used to evaluate whether an execution should be skipped. There must be exactly one bean that has the specified class in its set of bean types, otherwise the build fails.
<2> The scope of the bean must be active during execution.
<3> `Jobs.everySecond()` is skipped until `MyService.isStarted()` returns `true`.

Note that this is an equivalent of the following code:

[source,java]
----
class Jobs {
@Inject
MyService service;
@Scheduled(every = "1s")
void everySecond() {
if (service.isStarted()) {
// do something every second...
}
}
}
----

The main idea is to keep the the logic to skip the execution outside the scheduled business methods so that it can be reused and refactored easily.

TIP: A CDI event of type `io.quarkus.scheduler.SkippedExecution` is fired when an execution of a scheduled method is skipped.

== Scheduler

Quarkus provides a built-in bean of type `io.quarkus.scheduler.Scheduler` that can be injected and used to pause/resume the scheduler and individual scheduled methods identified by a specific `Scheduled#identity()`.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package io.quarkus.quartz.test;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.enterprise.event.Observes;
import javax.inject.Singleton;

import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.scheduler.Scheduled;
import io.quarkus.scheduler.ScheduledExecution;
import io.quarkus.scheduler.SkippedExecution;
import io.quarkus.test.QuarkusUnitTest;

public class ConditionalExecutionTest {

@RegisterExtension
static final QuarkusUnitTest test = new QuarkusUnitTest()
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
.addClasses(Jobs.class));

@Test
public void testExecution() {
try {
// Wait until Jobs#doSomething() is executed at least 1x and skipped 1x
if (IsDisabled.SKIPPED_LATCH.await(10, TimeUnit.SECONDS)) {
assertEquals(1, Jobs.COUNTER.getCount());
IsDisabled.DISABLED.set(false);
} else {
fail("Job#foo not skipped in 10 seconds!");
}
if (!Jobs.COUNTER.await(10, TimeUnit.SECONDS)) {
fail("Job#foo not executed in 10 seconds!");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
}

static class Jobs {

static final CountDownLatch COUNTER = new CountDownLatch(1);

@Scheduled(identity = "foo", every = "1s", skipExecutionIf = IsDisabled.class)
void doSomething() throws InterruptedException {
COUNTER.countDown();
}

}

@Singleton
public static class IsDisabled implements Scheduled.SkipPredicate {

static final CountDownLatch SKIPPED_LATCH = new CountDownLatch(1);

static final AtomicBoolean DISABLED = new AtomicBoolean(true);

@Override
public boolean test(ScheduledExecution execution) {
return DISABLED.get();
}

void onSkip(@Observes SkippedExecution event) {
if (event.triggerId.equals("foo")) {
System.out.println(event);
SKIPPED_LATCH.countDown();
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import javax.enterprise.context.BeforeDestroyed;
import javax.enterprise.event.Event;
import javax.enterprise.event.Observes;
import javax.enterprise.inject.Any;
import javax.enterprise.inject.Instance;
import javax.enterprise.inject.Produces;
import javax.inject.Singleton;
Expand Down Expand Up @@ -46,6 +47,7 @@
import com.cronutils.model.definition.CronDefinitionBuilder;
import com.cronutils.parser.CronParser;

import io.quarkus.arc.Arc;
import io.quarkus.runtime.StartupEvent;
import io.quarkus.scheduler.Scheduled;
import io.quarkus.scheduler.Scheduled.ConcurrentExecution;
Expand All @@ -58,6 +60,7 @@
import io.quarkus.scheduler.runtime.SchedulerContext;
import io.quarkus.scheduler.runtime.SchedulerRuntimeConfig;
import io.quarkus.scheduler.runtime.SkipConcurrentExecutionInvoker;
import io.quarkus.scheduler.runtime.SkipPredicateInvoker;
import io.quarkus.scheduler.runtime.util.SchedulerUtils;

@Singleton
Expand Down Expand Up @@ -126,6 +129,11 @@ public QuartzScheduler(SchedulerContext context, QuartzSupport quartzSupport, Sc
if (scheduled.concurrentExecution() == ConcurrentExecution.SKIP) {
invoker = new SkipConcurrentExecutionInvoker(invoker, skippedExecutionEvent);
}
if (!scheduled.skipExecutionIf().equals(Scheduled.Never.class)) {
invoker = new SkipPredicateInvoker(invoker,
Arc.container().select(scheduled.skipExecutionIf(), Any.Literal.INSTANCE).get(),
skippedExecutionEvent);
}
invokers.put(identity, invoker);

JobBuilder jobBuilder = JobBuilder.newJob(InvokerJob.class)
Expand Down Expand Up @@ -169,7 +177,7 @@ public QuartzScheduler(SchedulerContext context, QuartzSupport quartzSupport, Sc
}

TriggerBuilder<?> triggerBuilder = TriggerBuilder.newTrigger()
.withIdentity(identity + "_trigger", Scheduler.class.getName())
.withIdentity(identity, Scheduler.class.getName())
.withSchedule(scheduleBuilder);

Long millisToAdd = null;
Expand Down Expand Up @@ -416,7 +424,7 @@ static class QuartzTrigger implements Trigger {

final JobExecutionContext context;

public QuartzTrigger(JobExecutionContext context) {
QuartzTrigger(JobExecutionContext context) {
this.context = context;
}

Expand All @@ -434,7 +442,7 @@ public Instant getPreviousFireTime() {

@Override
public String getId() {
return context.getTrigger().getKey().toString();
return context.getTrigger().getKey().getName();
}

}
Expand All @@ -443,7 +451,7 @@ static class QuartzScheduledExecution implements ScheduledExecution {

final QuartzTrigger trigger;

public QuartzScheduledExecution(QuartzTrigger trigger) {
QuartzScheduledExecution(QuartzTrigger trigger) {
this.trigger = trigger;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.quarkus.arc.deployment.UnremovableBeanBuildItem.BeanClassAnnotationExclusion;
import io.quarkus.arc.deployment.ValidationPhaseBuildItem;
import io.quarkus.arc.deployment.ValidationPhaseBuildItem.ValidationErrorBuildItem;
import io.quarkus.arc.processor.BeanDeploymentValidator;
import io.quarkus.arc.processor.BeanInfo;
import io.quarkus.arc.processor.BuiltinScope;
import io.quarkus.arc.processor.DotNames;
Expand Down Expand Up @@ -85,6 +86,7 @@ public class SchedulerProcessor {

static final DotName SCHEDULED_NAME = DotName.createSimple(Scheduled.class.getName());
static final DotName SCHEDULES_NAME = DotName.createSimple(Scheduled.Schedules.class.getName());
static final DotName SKIP_NEVER_NAME = DotName.createSimple(Scheduled.Never.class.getName());

static final Type SCHEDULED_EXECUTION_TYPE = Type.create(DotName.createSimple(ScheduledExecution.class.getName()),
Kind.CLASS);
Expand Down Expand Up @@ -184,7 +186,7 @@ void validateScheduledBusinessMethods(SchedulerConfig config, List<ScheduledBusi
// Validate cron() and every() expressions
CronParser parser = new CronParser(CronDefinitionBuilder.instanceDefinitionFor(config.cronType));
for (AnnotationInstance scheduled : scheduledMethod.getSchedules()) {
Throwable error = validateScheduled(parser, scheduled, encounteredIdentities);
Throwable error = validateScheduled(parser, scheduled, encounteredIdentities, validationPhase.getContext());
if (error != null) {
errors.add(error);
}
Expand Down Expand Up @@ -325,7 +327,8 @@ private String generateInvoker(ScheduledBusinessMethodItem scheduledMethod, Clas
}

private Throwable validateScheduled(CronParser parser, AnnotationInstance schedule,
Map<String, AnnotationInstance> encounteredIdentities) {
Map<String, AnnotationInstance> encounteredIdentities,
BeanDeploymentValidator.ValidationContext validationContext) {
MethodInfo method = schedule.target().asMethod();
AnnotationValue cronValue = schedule.value("cron");
AnnotationValue everyValue = schedule.value("every");
Expand All @@ -343,6 +346,7 @@ private Throwable validateScheduled(CronParser parser, AnnotationInstance schedu
schedule, method.declaringClass().name(), method.name());
}
}

} else {
if (everyValue != null && !everyValue.asString().trim().isEmpty()) {
String every = everyValue.asString().trim();
Expand Down Expand Up @@ -375,6 +379,7 @@ private Throwable validateScheduled(CronParser parser, AnnotationInstance schedu
return new IllegalStateException("Invalid delayed() expression on: " + schedule, e);
}
}

}
} else {
if (delayedValue != null && !delayedValue.asString().trim().isEmpty()) {
Expand All @@ -395,8 +400,19 @@ private Throwable validateScheduled(CronParser parser, AnnotationInstance schedu
} else {
encounteredIdentities.put(identity, schedule);
}
}

AnnotationValue skipExecutionIfValue = schedule.value("skipExecutionIf");
if (skipExecutionIfValue != null) {
DotName skipPredicate = skipExecutionIfValue.asClass().name();
if (!SKIP_NEVER_NAME.equals(skipPredicate)
&& validationContext.beans().withBeanType(skipPredicate).collect().size() != 1) {
String message = String.format("There must be exactly one bean that matches the skip predicate: \"%s\" on: %s",
skipPredicate, schedule);
return new IllegalStateException(message);
}
}

return null;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package io.quarkus.scheduler.test;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.enterprise.event.Observes;
import javax.inject.Singleton;

import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.scheduler.Scheduled;
import io.quarkus.scheduler.ScheduledExecution;
import io.quarkus.scheduler.SkippedExecution;
import io.quarkus.test.QuarkusUnitTest;

public class ConditionalExecutionTest {

@RegisterExtension
static final QuarkusUnitTest test = new QuarkusUnitTest()
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
.addClasses(Jobs.class));

@Test
public void testExecution() {
try {
// Wait until Jobs#doSomething() is executed at least 1x and skipped 1x
if (IsDisabled.SKIPPED_LATCH.await(10, TimeUnit.SECONDS)) {
assertEquals(1, Jobs.COUNTER.getCount());
IsDisabled.DISABLED.set(false);
} else {
fail("Job#foo not skipped in 10 seconds!");
}
if (!Jobs.COUNTER.await(10, TimeUnit.SECONDS)) {
fail("Job#foo not executed in 10 seconds!");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
}

static class Jobs {

static final CountDownLatch COUNTER = new CountDownLatch(1);

@Scheduled(identity = "foo", every = "1s", skipExecutionIf = IsDisabled.class)
void doSomething() throws InterruptedException {
COUNTER.countDown();
}

}

@Singleton
public static class IsDisabled implements Scheduled.SkipPredicate {

static final CountDownLatch SKIPPED_LATCH = new CountDownLatch(1);

static final AtomicBoolean DISABLED = new AtomicBoolean(true);

@Override
public boolean test(ScheduledExecution execution) {
return DISABLED.get();
}

void onSkip(@Observes SkippedExecution event) {
if (event.triggerId.equals("foo")) {

SKIPPED_LATCH.countDown();
}
}

}
}
Loading

0 comments on commit 21679e7

Please sign in to comment.