Skip to content

Commit

Permalink
Merge pull request quarkusio#28255 from cescoffier/run-scheduled-meth…
Browse files Browse the repository at this point in the history
…od-on-dc

Executes scheduled methods on duplicated contexts
  • Loading branch information
gsmet authored Sep 30, 2022
2 parents 5234d47 + 963a588 commit 05f50da
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,11 @@

import java.lang.reflect.InvocationTargetException;
import java.util.List;
import java.util.concurrent.ExecutorService;

import com.cronutils.model.CronType;

public interface SchedulerContext {

ExecutorService getExecutor();

CronType getCronType();

List<ScheduledMethodMetadata> getScheduledMethods();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.annotations.Record;
import io.quarkus.deployment.builditem.AnnotationProxyBuildItem;
import io.quarkus.deployment.builditem.ExecutorBuildItem;
import io.quarkus.deployment.builditem.FeatureBuildItem;
import io.quarkus.deployment.builditem.GeneratedClassBuildItem;
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
Expand Down Expand Up @@ -277,7 +276,7 @@ public List<UnremovableBeanBuildItem> unremovableBeans() {
public FeatureBuildItem build(SchedulerConfig config, BuildProducer<SyntheticBeanBuildItem> syntheticBeans,
SchedulerRecorder recorder, List<ScheduledBusinessMethodItem> scheduledMethods,
BuildProducer<GeneratedClassBuildItem> generatedClasses, BuildProducer<ReflectiveClassBuildItem> reflectiveClass,
AnnotationProxyBuildItem annotationProxy, ExecutorBuildItem executor) {
AnnotationProxyBuildItem annotationProxy) {

List<ScheduledMethodMetadata> scheduledMetadata = new ArrayList<>();
ClassOutput classOutput = new GeneratedClassGizmoAdaptor(generatedClasses, new Function<String, String>() {
Expand Down Expand Up @@ -311,7 +310,7 @@ public String apply(String name) {
}

syntheticBeans.produce(SyntheticBeanBuildItem.configure(SchedulerContext.class).setRuntimeInit()
.supplier(recorder.createContext(config, executor.getExecutorProxy(), scheduledMetadata))
.supplier(recorder.createContext(config, scheduledMetadata))
.done());

return new FeatureBuildItem(Feature.SCHEDULER);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package io.quarkus.scheduler.test;

import static org.awaitility.Awaitility.await;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;

import javax.inject.Inject;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.scheduler.Scheduled;
import io.quarkus.test.QuarkusUnitTest;
import io.smallrye.common.vertx.VertxContext;
import io.smallrye.mutiny.Uni;
import io.vertx.core.Context;
import io.vertx.core.Vertx;

/**
* Verifies that the @Scheduled method are called on a duplicated context.
*/
public class DuplicatedContextTest {
@RegisterExtension
static final QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot((jar) -> jar
.addClasses(MyScheduledClass.class));

@Inject
MyScheduledClass scheduled;

@Test
public void testBlocking() {
await()
.atMost(Duration.ofSeconds(3))
.until(() -> scheduled.blockingCalled() > 0);
}

@Test
public void testNonBlocking() {
await()
.atMost(Duration.ofSeconds(3))
.until(() -> scheduled.nonBlockingCalled() > 0);
}

public static class MyScheduledClass {

private final AtomicInteger blockingCalled = new AtomicInteger();
private final AtomicInteger nonBlockingCalled = new AtomicInteger();

@Scheduled(every = "1m")
public void blocking() {
Context context = Vertx.currentContext();
Assertions.assertNotNull(context);
Assertions.assertTrue(VertxContext.isDuplicatedContext(context));
Assertions.assertTrue(VertxContext.isOnDuplicatedContext());

blockingCalled.incrementAndGet();
}

@Scheduled(every = "1m")
public Uni<Void> nonblocking() {
Context context = Vertx.currentContext();
Assertions.assertNotNull(context);
Assertions.assertTrue(VertxContext.isDuplicatedContext(context));
Assertions.assertTrue(VertxContext.isOnDuplicatedContext());

nonBlockingCalled.incrementAndGet();
return Uni.createFrom().voidItem();
}

public int blockingCalled() {
return blockingCalled.get();
}

public int nonBlockingCalled() {
return nonBlockingCalled.get();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.quarkus.scheduler.runtime;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;

import com.cronutils.model.CronType;
Expand All @@ -13,18 +12,13 @@
@Recorder
public class SchedulerRecorder {

public Supplier<Object> createContext(SchedulerConfig config, ExecutorService executorService,
public Supplier<Object> createContext(SchedulerConfig config,
List<ScheduledMethodMetadata> scheduledMethods) {
return new Supplier<Object>() {
@Override
public Object get() {
return new SchedulerContext() {

@Override
public ExecutorService getExecutor() {
return executorService;
}

@Override
public CronType getCronType() {
return config.cronType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -55,6 +53,7 @@
import io.smallrye.common.vertx.VertxContext;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;

@Typed(Scheduler.class)
Expand All @@ -67,7 +66,6 @@ public class SimpleScheduler implements Scheduler {
private static final long CHECK_PERIOD = 1000L;

private final ScheduledExecutorService scheduledExecutor;
private final ExecutorService executor;
private final Vertx vertx;
private volatile boolean running;
private final List<ScheduledTask> scheduledTasks;
Expand All @@ -79,7 +77,6 @@ public SimpleScheduler(SchedulerContext context, SchedulerRuntimeConfig schedule
this.running = true;
this.enabled = schedulerRuntimeConfig.enabled;
this.scheduledTasks = new ArrayList<>();
this.executor = context.getExecutor();
this.vertx = vertx;

if (!schedulerRuntimeConfig.enabled) {
Expand Down Expand Up @@ -155,7 +152,7 @@ void checkTriggers() {
ZonedDateTime now = ZonedDateTime.now();
LOG.tracef("Check triggers at %s", now);
for (ScheduledTask task : scheduledTasks) {
task.execute(now, executor, vertx);
task.execute(now, vertx);
}
}

Expand Down Expand Up @@ -301,26 +298,26 @@ static class ScheduledTask {
this.invoker = invoker;
}

void execute(ZonedDateTime now, ExecutorService executor, Vertx vertx) {
void execute(ZonedDateTime now, Vertx vertx) {
if (!trigger.isRunning()) {
return;
}
ZonedDateTime scheduledFireTime = trigger.evaluate(now);
if (scheduledFireTime != null) {
Context context = VertxContext.getOrCreateDuplicatedContext(vertx);
VertxContextSafetyToggle.setContextSafe(context, true);
if (invoker.isBlocking()) {
try {
executor.execute(new Runnable() {
@Override
public void run() {
context.executeBlocking(new Handler<Promise<Object>>() {
@Override
public void handle(Promise<Object> p) {
try {
doInvoke(now, scheduledFireTime);
} finally {
p.complete();
}
});
} catch (RejectedExecutionException e) {
LOG.warnf("Rejected execution of a scheduled task for trigger %s", trigger);
}
}
}, false);
} else {
Context context = VertxContext.getOrCreateDuplicatedContext(vertx);
VertxContextSafetyToggle.setContextSafe(context, true);
context.runOnContext(new Handler<Void>() {
@Override
public void handle(Void event) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
import io.quarkus.scheduler.common.runtime.ScheduledMethodMetadata;
import io.quarkus.scheduler.common.runtime.SchedulerContext;
import io.quarkus.scheduler.common.runtime.util.SchedulerUtils;
import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle;
import io.smallrye.common.vertx.VertxContext;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.Vertx;
Expand Down Expand Up @@ -80,39 +82,36 @@ protected void handlePost(RoutingContext ctx, MultiMap form) throws Exception {
SchedulerContext context = Arc.container().instance(SchedulerContext.class).get();
for (ScheduledMethodMetadata metadata : context.getScheduledMethods()) {
if (metadata.getMethodDescription().equals(name)) {
context.getExecutor().execute(new Runnable() {
@Override
public void run() {
final ClassLoader previousCl = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(currentCl);
ScheduledInvoker invoker = context
.createInvoker(metadata.getInvokerClassName());
if (invoker.isBlocking()) {
Vertx vertx = Arc.container().instance(Vertx.class).get();
Context vdc = VertxContext.getOrCreateDuplicatedContext(vertx);
VertxContextSafetyToggle.setContextSafe(vdc, true);
try {
ScheduledInvoker invoker = context
.createInvoker(metadata.getInvokerClassName());
if (invoker.isBlocking()) {
vdc.executeBlocking(p -> {
try {
invoker.invoke(new DevModeScheduledExecution());
} else {
Vertx vertx = Arc.container().instance(Vertx.class).get();
VertxContext.getOrCreateDuplicatedContext(vertx).runOnContext(new Handler<Void>() {
@Override
public void handle(Void event) {
try {
invoker.invoke(new DevModeScheduledExecution());
} catch (Exception ignored) {
}
}
});
} catch (Exception ignored) {
} finally {
p.complete();
}
LOG.infof("Invoked scheduled method %s via Dev UI", name);
} catch (Exception e) {
LOG.error(
"Unable to invoke a @Scheduled method: "
+ metadata.getMethodDescription(),
e);
} finally {
Thread.currentThread().setContextClassLoader(previousCl);
}
}, false);
} else {
vdc.runOnContext(x -> {
try {
invoker.invoke(new DevModeScheduledExecution());
} catch (Exception ignored) {
}
});
}
});
LOG.infof("Invoked scheduled method %s via Dev UI", name);
} catch (Exception e) {
LOG.error(
"Unable to invoke a @Scheduled method: "
+ metadata.getMethodDescription(),
e);
}
flashMessage(ctx, "Action invoked");
return;
}
Expand Down

0 comments on commit 05f50da

Please sign in to comment.