Skip to content

Commit

Permalink
Fix termination of request context for non-blocking scheduled methods
Browse files Browse the repository at this point in the history
- fix the problem mentioned in
quarkusio#23739 (comment)
  • Loading branch information
mkouba authored and miador committed Sep 6, 2022
1 parent 1294ba4 commit 261fb8d
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.concurrent.CompletionStage;

import io.quarkus.arc.Arc;
import io.quarkus.arc.InjectableContext.ContextState;
import io.quarkus.arc.ManagedContext;
import io.quarkus.scheduler.ScheduledExecution;

Expand All @@ -14,15 +15,26 @@ public CompletionStage<Void> invoke(ScheduledExecution execution) throws Excepti
if (requestContext.isActive()) {
return invokeBean(execution);
} else {
// 1. Activate the context
// 2. Capture the state (which is basically a shared Map instance)
// 3. Destroy the context correctly when the returned stage completes
requestContext.activate();
final ContextState state = requestContext.getState();
try {
requestContext.activate();
return invokeBean(execution);
} finally {
return invokeBean(execution).whenComplete((v, t) -> {
requestContext.destroy(state);
});
} catch (RuntimeException e) {
// Just terminate the context and rethrow the exception if something goes really wrong
requestContext.terminate();
throw e;
} finally {
// Always deactivate the context
requestContext.deactivate();
}
}
}

protected abstract CompletionStage<Void> invokeBean(ScheduledExecution execution) throws Exception;
protected abstract CompletionStage<Void> invokeBean(ScheduledExecution execution);

}
Original file line number Diff line number Diff line change
Expand Up @@ -409,8 +409,7 @@ private String generateInvoker(ScheduledBusinessMethodItem scheduledMethod, Clas
ScheduledExecution.class.getName(), SchedulerDotNames.CONTINUATION.toString());
} else {
// The descriptor is: CompletionStage invoke(ScheduledExecution execution)
invoke = invokerCreator.getMethodCreator("invokeBean", CompletionStage.class, ScheduledExecution.class)
.addException(Exception.class);
invoke = invokerCreator.getMethodCreator("invokeBean", CompletionStage.class, ScheduledExecution.class);
}

// Use a try-catch block and return failed future if an exception is thrown
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.quarkus.scheduler.test.nonblocking;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

Expand All @@ -10,13 +11,19 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import javax.annotation.PreDestroy;
import javax.enterprise.context.RequestScoped;
import javax.enterprise.event.Observes;
import javax.inject.Singleton;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.arc.Arc;
import io.quarkus.arc.Unremovable;
import io.quarkus.scheduler.FailedExecution;
import io.quarkus.scheduler.Scheduled;
import io.quarkus.scheduler.ScheduledExecution;
import io.quarkus.scheduler.SuccessfulExecution;
Expand All @@ -30,7 +37,7 @@ public class NonBlockingScheduledMethodTest {

@RegisterExtension
static final QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot(root -> root.addClasses(Jobs.class, JobWasExecuted.class));
.withApplicationRoot(root -> root.addClasses(Jobs.class, JobWasExecuted.class, Naeb.class));

@Test
public void testVoid() throws InterruptedException {
Expand All @@ -46,6 +53,10 @@ public void testUni() throws InterruptedException {
assertTrue(Jobs.UNI_ON_EVENT_LOOP.get());
assertTrue(Jobs.SUCCESS_LATCH.await(5, TimeUnit.SECONDS));
assertEvents("every_uni");
assertTrue(Jobs.UNI_FAIL_LATCH.await(5, TimeUnit.SECONDS));
assertTrue(Jobs.FAILED_LATCH.await(5, TimeUnit.SECONDS));
// the bean should be destroyed twice
assertEquals(2, Naeb.DESTROYED_COUNTER.get());
}

@Test
Expand All @@ -57,19 +68,20 @@ public void testCompletionStage() throws InterruptedException {
}

private void assertEvents(String id) {
for (SuccessfulExecution exec : Jobs.events) {
for (SuccessfulExecution exec : Jobs.successEvents) {
if (exec.getExecution().getTrigger().getId().equals(id)) {
return;
}
}
fail("No SuccessfulExecution event fired for " + id + ": " + Jobs.events);
fail("No SuccessfulExecution event fired for " + id + ": " + Jobs.successEvents);
}

static class Jobs {

// jobs executed
static final CountDownLatch VOID_LATCH = new CountDownLatch(1);
static final CountDownLatch UNI_LATCH = new CountDownLatch(1);
static final CountDownLatch UNI_FAIL_LATCH = new CountDownLatch(1);
static final CountDownLatch CS_LATCH = new CountDownLatch(1);

// jobs executed on the event loop
Expand All @@ -79,13 +91,22 @@ static class Jobs {

// successful events
static final CountDownLatch SUCCESS_LATCH = new CountDownLatch(3);
static final List<SuccessfulExecution> events = new CopyOnWriteArrayList<>();
static final List<SuccessfulExecution> successEvents = new CopyOnWriteArrayList<>();

// failed events
static final CountDownLatch FAILED_LATCH = new CountDownLatch(1);
static final List<FailedExecution> failedEvents = new CopyOnWriteArrayList<>();

static void onSuccess(@Observes SuccessfulExecution event) {
events.add(event);
successEvents.add(event);
SUCCESS_LATCH.countDown();
}

static void onFailure(@Observes FailedExecution event) {
failedEvents.add(event);
FAILED_LATCH.countDown();
}

@NonBlocking
@Scheduled(every = "0.5s", identity = "every_void", skipExecutionIf = JobWasExecuted.class)
void everySecond() {
Expand All @@ -97,7 +118,18 @@ void everySecond() {
Uni<Void> everySecondUni() {
UNI_ON_EVENT_LOOP.set(Context.isOnEventLoopThread() && VertxContext.isOnDuplicatedContext());
UNI_LATCH.countDown();
return Uni.createFrom().voidItem();
return Uni.createFrom().voidItem().invoke(Void -> {
// this callback is executed (and the bean instance is created) after the scheduled method completes
Arc.container().instance(Naeb.class).get().doSomething();
});
}

@Scheduled(every = "0.5s", identity = "every_uni_fail", skipExecutionIf = JobWasExecuted.class)
Uni<Void> everySecondUniFailure() {
UNI_FAIL_LATCH.countDown();
// the bean instance is created before the scheduled method completes
Arc.container().instance(Naeb.class).get().doSomething();
throw new IllegalStateException("FAIL!");
}

@Scheduled(every = "0.5s", identity = "every_cs", skipExecutionIf = JobWasExecuted.class)
Expand All @@ -108,6 +140,7 @@ CompletionStage<Void> everySecondCompletionStage() {
ret.complete(null);
return ret;
}

}

@Singleton
Expand All @@ -120,6 +153,8 @@ public boolean test(ScheduledExecution execution) {
return Jobs.VOID_LATCH.getCount() == 0;
case "every_uni":
return Jobs.UNI_LATCH.getCount() == 0;
case "every_uni_fail":
return Jobs.UNI_FAIL_LATCH.getCount() == 0;
case "every_cs":
return Jobs.CS_LATCH.getCount() == 0;
default:
Expand All @@ -129,4 +164,19 @@ public boolean test(ScheduledExecution execution) {

}

@Unremovable // this bean is "unused"
@RequestScoped
static class Naeb {

static final AtomicInteger DESTROYED_COUNTER = new AtomicInteger();

void doSomething() {
}

@PreDestroy
void destroy() {
DESTROYED_COUNTER.incrementAndGet();
}
}

}

0 comments on commit 261fb8d

Please sign in to comment.