Skip to content

Commit

Permalink
Scheduler: throw UnsupportedOperationException if appropriate
Browse files Browse the repository at this point in the history
- also refactor the implementation and introduce BaseScheduler
- resolves quarkusio#44101
  • Loading branch information
mkouba committed Oct 31, 2024
1 parent 4fe897e commit 08d72d4
Show file tree
Hide file tree
Showing 8 changed files with 290 additions and 169 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package io.quarkus.quartz.test;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;

import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
Expand Down Expand Up @@ -31,14 +31,19 @@ public class DisabledSchedulerTest {
"application.properties"));

@Test
public void testNoSchedulerInvocations() throws InterruptedException {
public void testSchedulerInvocations() throws InterruptedException {
assertNotNull(scheduler);
assertFalse(scheduler.isStarted());
assertFalse(scheduler.isRunning());
assertTrue(quartzScheduler.isResolvable());
try {
quartzScheduler.get();
fail();
} catch (IllegalStateException expected) {
}
assertNotNull(scheduler.implementation());
assertThrows(UnsupportedOperationException.class, () -> scheduler.newJob("foo"));
assertThrows(UnsupportedOperationException.class, () -> scheduler.unscheduleJob("foo"));
assertThrows(UnsupportedOperationException.class, () -> scheduler.pause());
assertThrows(UnsupportedOperationException.class, () -> scheduler.pause("foo"));
assertThrows(UnsupportedOperationException.class, () -> scheduler.resume());
assertThrows(UnsupportedOperationException.class, () -> scheduler.resume("foo"));
assertThrows(UnsupportedOperationException.class, () -> scheduler.getScheduledJobs());
assertThrows(UnsupportedOperationException.class, () -> scheduler.getScheduledJob("bar"));
}

static class Jobs {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import io.quarkus.scheduler.SuccessfulExecution;
import io.quarkus.scheduler.Trigger;
import io.quarkus.scheduler.common.runtime.AbstractJobDefinition;
import io.quarkus.scheduler.common.runtime.BaseScheduler;
import io.quarkus.scheduler.common.runtime.CronParser;
import io.quarkus.scheduler.common.runtime.DefaultInvoker;
import io.quarkus.scheduler.common.runtime.Events;
Expand All @@ -87,7 +88,6 @@
import io.quarkus.scheduler.runtime.SchedulerConfig;
import io.quarkus.scheduler.runtime.SchedulerRuntimeConfig;
import io.quarkus.scheduler.runtime.SchedulerRuntimeConfig.StartMode;
import io.quarkus.scheduler.runtime.SimpleScheduler;
import io.quarkus.scheduler.spi.JobInstrumenter;
import io.smallrye.mutiny.Uni;
import io.vertx.core.Vertx;
Expand All @@ -99,32 +99,18 @@
*/
@Typed({ QuartzScheduler.class, Scheduler.class })
@Singleton
public class QuartzSchedulerImpl implements QuartzScheduler {
public class QuartzSchedulerImpl extends BaseScheduler implements QuartzScheduler {

private static final Logger LOGGER = Logger.getLogger(QuartzSchedulerImpl.class.getName());
private static final String INVOKER_KEY = "invoker";

private final org.quartz.Scheduler scheduler;
private final Vertx vertx;
private final boolean startHalted;
private final Duration shutdownWaitTime;
private final boolean enabled;
private final CronParser cronParser;
private final Duration defaultOverdueGracePeriod;
private final Map<String, QuartzTrigger> scheduledTasks = new ConcurrentHashMap<>();
private final Event<SkippedExecution> skippedExecutionEvent;
private final Event<SuccessfulExecution> successExecutionEvent;
private final Event<FailedExecution> failedExecutionEvent;
private final Event<DelayedExecution> delayedExecutionEvent;
private final Event<SchedulerPaused> schedulerPausedEvent;
private final Event<SchedulerResumed> schedulerResumedEvent;
private final Event<ScheduledJobPaused> scheduledJobPausedEvent;
private final Event<ScheduledJobResumed> scheduledJobResumedEvent;
private final QuartzRuntimeConfig runtimeConfig;
private final SchedulerConfig schedulerConfig;
private final Instance<JobInstrumenter> jobInstrumenter;
private final StoreType storeType;
private final ScheduledExecutorService blockingExecutor;

public QuartzSchedulerImpl(SchedulerContext context, QuartzSupport quartzSupport,
SchedulerRuntimeConfig schedulerRuntimeConfig,
Expand All @@ -136,23 +122,14 @@ public QuartzSchedulerImpl(SchedulerContext context, QuartzSupport quartzSupport
Instance<Job> jobs, Instance<UserTransaction> userTransaction,
Vertx vertx, SchedulerConfig schedulerConfig, Instance<JobInstrumenter> jobInstrumenter,
ScheduledExecutorService blockingExecutor) {
super(vertx, new CronParser(context.getCronType()), schedulerRuntimeConfig.overdueGracePeriod,
new Events(skippedExecutionEvent, successExecutionEvent, failedExecutionEvent, delayedExecutionEvent,
schedulerPausedEvent, schedulerResumedEvent, scheduledJobPausedEvent, scheduledJobResumedEvent),
jobInstrumenter, blockingExecutor);
this.shutdownWaitTime = quartzSupport.getRuntimeConfig().shutdownWaitTime;
this.skippedExecutionEvent = skippedExecutionEvent;
this.successExecutionEvent = successExecutionEvent;
this.failedExecutionEvent = failedExecutionEvent;
this.delayedExecutionEvent = delayedExecutionEvent;
this.schedulerPausedEvent = schedulerPausedEvent;
this.schedulerResumedEvent = schedulerResumedEvent;
this.scheduledJobPausedEvent = scheduledJobPausedEvent;
this.scheduledJobResumedEvent = scheduledJobResumedEvent;
this.runtimeConfig = quartzSupport.getRuntimeConfig();
this.enabled = schedulerRuntimeConfig.enabled;
this.defaultOverdueGracePeriod = schedulerRuntimeConfig.overdueGracePeriod;
this.schedulerConfig = schedulerConfig;
this.jobInstrumenter = jobInstrumenter;
this.storeType = quartzSupport.getBuildTimeConfig().storeType;
this.vertx = vertx;
this.blockingExecutor = blockingExecutor;

StartMode startMode = initStartMode(schedulerRuntimeConfig, runtimeConfig);

Expand Down Expand Up @@ -182,14 +159,12 @@ public QuartzSchedulerImpl(SchedulerContext context, QuartzSupport quartzSupport
.collect(Collectors.joining(", ")));
}

cronParser = new CronParser(context.getCronType());

JobInstrumenter instrumenter = null;
if (schedulerConfig.tracingEnabled && jobInstrumenter.isResolvable()) {
instrumenter = jobInstrumenter.get();
}

if (!enabled) {
if (!schedulerRuntimeConfig.enabled) {
LOGGER.info("Quartz scheduler is disabled by config property and will not be started");
this.scheduler = null;
} else if (!forceStart && context.getScheduledMethods(Scheduled.QUARTZ).isEmpty()
Expand Down Expand Up @@ -240,11 +215,11 @@ public org.quartz.Trigger apply(TriggerKey triggerKey) {
}

ScheduledInvoker invoker = context.createInvoker(method.getInvokerClassName());
invoker = SimpleScheduler.initInvoker(
invoker = initInvoker(
invoker,
skippedExecutionEvent, successExecutionEvent, failedExecutionEvent, delayedExecutionEvent,
events,
scheduled.concurrentExecution(),
SimpleScheduler.initSkipPredicate(scheduled.skipExecutionIf()), instrumenter, vertx,
initSkipPredicate(scheduled.skipExecutionIf()), instrumenter, vertx,
invoker.isBlocking() && runtimeConfig.runBlockingScheduledMethodOnQuartzThread,
SchedulerUtils.parseExecutionMaxDelayAsMillis(scheduled), blockingExecutor);

Expand Down Expand Up @@ -346,29 +321,36 @@ public org.quartz.Scheduler getScheduler() {
return scheduler;
}

@Override
public boolean isStarted() {
return scheduler != null;
}

@Override
public String implementation() {
return Scheduled.QUARTZ;
}

@Override
public void pause() {
if (!enabled) {
LOGGER.warn("Quartz Scheduler is disabled and cannot be paused");
} else {
try {
if (scheduler != null) {
scheduler.standby();
Events.fire(schedulerPausedEvent, SchedulerPaused.INSTANCE);
}
} catch (SchedulerException e) {
throw new RuntimeException("Unable to pause scheduler", e);
if (!isStarted()) {
throw notStarted();
}
try {
if (scheduler != null) {
scheduler.standby();
events.fireSchedulerPaused();
}
} catch (SchedulerException e) {
throw new RuntimeException("Unable to pause scheduler", e);
}
}

@Override
public void pause(String identity) {
if (!isStarted()) {
throw notStarted();
}
Objects.requireNonNull(identity, "Cannot pause - identity is null");
if (identity.isEmpty()) {
LOGGER.warn("Cannot pause - identity is empty");
Expand All @@ -379,7 +361,7 @@ public void pause(String identity) {
QuartzTrigger trigger = scheduledTasks.get(parsedIdentity);
if (trigger != null) {
scheduler.pauseJob(new JobKey(parsedIdentity, Scheduler.class.getName()));
Events.fire(scheduledJobPausedEvent, new ScheduledJobPaused(trigger));
events.fireScheduledJobPaused(new ScheduledJobPaused(trigger));
}
} catch (SchedulerException e) {
throw new RuntimeException("Unable to pause job", e);
Expand All @@ -388,6 +370,9 @@ public void pause(String identity) {

@Override
public boolean isPaused(String identity) {
if (!isStarted()) {
throw notStarted();
}
Objects.requireNonNull(identity);
if (identity.isEmpty()) {
return false;
Expand Down Expand Up @@ -417,22 +402,24 @@ public boolean isPaused(String identity) {

@Override
public void resume() {
if (!enabled) {
LOGGER.warn("Quartz Scheduler is disabled and cannot be resumed");
} else {
try {
if (scheduler != null) {
scheduler.start();
Events.fire(schedulerResumedEvent, SchedulerResumed.INSTANCE);
}
} catch (SchedulerException e) {
throw new RuntimeException("Unable to resume scheduler", e);
if (!isStarted()) {
throw notStarted();
}
try {
if (scheduler != null) {
scheduler.start();
events.fireSchedulerResumed();
}
} catch (SchedulerException e) {
throw new RuntimeException("Unable to resume scheduler", e);
}
}

@Override
public void resume(String identity) {
if (!isStarted()) {
throw notStarted();
}
Objects.requireNonNull(identity, "Cannot resume - identity is null");
if (identity.isEmpty()) {
LOGGER.warn("Cannot resume - identity is empty");
Expand All @@ -443,7 +430,7 @@ public void resume(String identity) {
QuartzTrigger trigger = scheduledTasks.get(parsedIdentity);
if (trigger != null) {
scheduler.resumeJob(new JobKey(SchedulerUtils.lookUpPropertyValue(parsedIdentity), Scheduler.class.getName()));
Events.fire(scheduledJobResumedEvent, new ScheduledJobResumed(trigger));
events.fireScheduledJobResumed(new ScheduledJobResumed(trigger));
}
} catch (SchedulerException e) {
throw new RuntimeException("Unable to resume job", e);
Expand All @@ -452,7 +439,7 @@ public void resume(String identity) {

@Override
public boolean isRunning() {
if (!enabled || scheduler == null) {
if (!isStarted()) {
return false;
} else {
try {
Expand All @@ -465,11 +452,17 @@ public boolean isRunning() {

@Override
public List<Trigger> getScheduledJobs() {
if (!isStarted()) {
throw notStarted();
}
return List.copyOf(scheduledTasks.values());
}

@Override
public Trigger getScheduledJob(String identity) {
if (!isStarted()) {
throw notStarted();
}
Objects.requireNonNull(identity);
if (identity.isEmpty()) {
return null;
Expand All @@ -479,6 +472,9 @@ public Trigger getScheduledJob(String identity) {

@Override
public JobDefinition newJob(String identity) {
if (!isStarted()) {
throw notStarted();
}
Objects.requireNonNull(identity);
if (scheduledTasks.containsKey(identity)) {
throw new IllegalStateException("A job with this identity is already scheduled: " + identity);
Expand All @@ -488,6 +484,9 @@ public JobDefinition newJob(String identity) {

@Override
public Trigger unscheduleJob(String identity) {
if (!isStarted()) {
throw notStarted();
}
Objects.requireNonNull(identity);
if (!identity.isEmpty()) {
String parsedIdentity = SchedulerUtils.lookUpPropertyValue(identity);
Expand Down Expand Up @@ -1010,10 +1009,8 @@ public boolean isBlocking() {
if (schedulerConfig.tracingEnabled && jobInstrumenter.isResolvable()) {
instrumenter = jobInstrumenter.get();
}
invoker = SimpleScheduler.initInvoker(invoker, skippedExecutionEvent, successExecutionEvent,
failedExecutionEvent, delayedExecutionEvent, scheduled.concurrentExecution(), skipPredicate, instrumenter,
vertx,
task != null && runtimeConfig.runBlockingScheduledMethodOnQuartzThread,
invoker = initInvoker(invoker, events, scheduled.concurrentExecution(), skipPredicate, instrumenter,
vertx, task != null && runtimeConfig.runBlockingScheduledMethodOnQuartzThread,
SchedulerUtils.parseExecutionMaxDelayAsMillis(scheduled), blockingExecutor);
QuartzTrigger quartzTrigger = new QuartzTrigger(trigger.getKey(),
new Function<>() {
Expand Down
Loading

0 comments on commit 08d72d4

Please sign in to comment.