Skip to content

Commit

Permalink
Merge pull request #10451 from mkouba/scheduler-fix-concurrent-exec-t…
Browse files Browse the repository at this point in the history
…ests

Scheduler - make tests for concurrent execution more robust
  • Loading branch information
machi1990 authored Jul 3, 2020
2 parents e50eab4 + 59c78e1 commit 8fbb04d
Show file tree
Hide file tree
Showing 10 changed files with 277 additions and 119 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package io.quarkus.quartz.test;

import static org.junit.jupiter.api.Assertions.fail;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.scheduler.Scheduled;
import io.quarkus.test.QuarkusUnitTest;

public class ConcurrentExecutionProceedTest {

@RegisterExtension
static final QuarkusUnitTest test = new QuarkusUnitTest()
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
.addClasses(Jobs.class));

@Test
public void testExecution() {
try {
// Wait until Jobs#concurrent() is executed 3x and skipped 0x
if (Jobs.START_LATCH.await(10, TimeUnit.SECONDS)) {
// Unblock all executions
Jobs.BLOCKING_LATCH.countDown();
} else {
fail("Jobs were not executed in 10 seconds!");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
}

static class Jobs {

static final CountDownLatch BLOCKING_LATCH = new CountDownLatch(1);

static final CountDownLatch START_LATCH = new CountDownLatch(3);

@Scheduled(every = "1s")
void concurrent() throws InterruptedException {
START_LATCH.countDown();
if (!BLOCKING_LATCH.await(10, TimeUnit.SECONDS)) {
throw new IllegalStateException("concurrent() execution blocked too long...");
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package io.quarkus.quartz.test;

import static io.quarkus.scheduler.Scheduled.ConcurrentExecution.SKIP;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import javax.enterprise.event.Observes;

import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.scheduler.Scheduled;
import io.quarkus.scheduler.SkippedExecution;
import io.quarkus.test.QuarkusUnitTest;

public class ConcurrentExecutionSkipTest {

@RegisterExtension
static final QuarkusUnitTest test = new QuarkusUnitTest()
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
.addClasses(Jobs.class));

@Test
public void testExecution() {
try {
// Wait until Jobs#nonconcurrent() is executed 1x and skipped 1x
if (Jobs.SKIPPED_LATCH.await(10, TimeUnit.SECONDS)) {
// Exactly one job is blocked
assertEquals(1, Jobs.COUNTER.get());
// Unblock all executions
Jobs.BLOCKING_LATCH.countDown();
} else {
fail("Jobs were not executed in 10 seconds!");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
}

static class Jobs {

static final CountDownLatch BLOCKING_LATCH = new CountDownLatch(1);

static final AtomicInteger COUNTER = new AtomicInteger(0);
static final CountDownLatch SKIPPED_LATCH = new CountDownLatch(1);

@Scheduled(every = "1s", concurrentExecution = SKIP)
void nonconcurrent() throws InterruptedException {
COUNTER.incrementAndGet();
if (!BLOCKING_LATCH.await(10, TimeUnit.SECONDS)) {
throw new IllegalStateException("nonconcurrent() execution blocked too long...");
}
}

void onSkip(@Observes SkippedExecution event) {
SKIPPED_LATCH.countDown();
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import javax.annotation.Priority;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.context.BeforeDestroyed;
import javax.enterprise.event.Event;
import javax.enterprise.event.Observes;
import javax.enterprise.inject.Produces;
import javax.inject.Singleton;
Expand Down Expand Up @@ -47,6 +48,7 @@
import io.quarkus.scheduler.Scheduled.ConcurrentExecution;
import io.quarkus.scheduler.ScheduledExecution;
import io.quarkus.scheduler.Scheduler;
import io.quarkus.scheduler.SkippedExecution;
import io.quarkus.scheduler.Trigger;
import io.quarkus.scheduler.runtime.ScheduledInvoker;
import io.quarkus.scheduler.runtime.ScheduledMethodMetadata;
Expand Down Expand Up @@ -75,7 +77,7 @@ org.quartz.Scheduler produceQuartzScheduler() {
}

public QuartzScheduler(SchedulerContext context, QuartzSupport quartzSupport, Config config,
SchedulerRuntimeConfig schedulerRuntimeConfig) {
SchedulerRuntimeConfig schedulerRuntimeConfig, Event<SkippedExecution> skippedExecutionEvent) {
enabled = schedulerRuntimeConfig.enabled;
if (!enabled) {
LOGGER.info("Quartz scheduler is disabled by config property and will not be started");
Expand Down Expand Up @@ -117,7 +119,7 @@ public QuartzScheduler(SchedulerContext context, QuartzSupport quartzSupport, Co
}
ScheduledInvoker invoker = context.createInvoker(method.getInvokerClassName());
if (scheduled.concurrentExecution() == ConcurrentExecution.SKIP) {
invoker = new SkipConcurrentExecutionInvoker(invoker);
invoker = new SkipConcurrentExecutionInvoker(invoker, skippedExecutionEvent);
}
invokers.put(identity, invoker);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package io.quarkus.scheduler.test;

import static org.junit.jupiter.api.Assertions.fail;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.scheduler.Scheduled;
import io.quarkus.test.QuarkusUnitTest;

public class ConcurrentExecutionProceedTest {

@RegisterExtension
static final QuarkusUnitTest test = new QuarkusUnitTest()
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
.addClasses(Jobs.class));

@Test
public void testExecution() {
try {
// Wait until Jobs#concurrent() is executed 3x and skipped 0x
if (Jobs.START_LATCH.await(10, TimeUnit.SECONDS)) {
// Unblock all executions
Jobs.BLOCKING_LATCH.countDown();
} else {
fail("Jobs were not executed in 10 seconds!");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
}

static class Jobs {

static final CountDownLatch BLOCKING_LATCH = new CountDownLatch(1);

static final CountDownLatch START_LATCH = new CountDownLatch(3);

@Scheduled(every = "1s")
void concurrent() throws InterruptedException {
START_LATCH.countDown();
if (!BLOCKING_LATCH.await(10, TimeUnit.SECONDS)) {
throw new IllegalStateException("concurrent() execution blocked too long...");
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package io.quarkus.scheduler.test;

import static io.quarkus.scheduler.Scheduled.ConcurrentExecution.SKIP;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import javax.enterprise.event.Observes;

import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.scheduler.Scheduled;
import io.quarkus.scheduler.SkippedExecution;
import io.quarkus.test.QuarkusUnitTest;

public class ConcurrentExecutionSkipTest {

@RegisterExtension
static final QuarkusUnitTest test = new QuarkusUnitTest()
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
.addClasses(Jobs.class));

@Test
public void testExecution() {
try {
// Wait until Jobs#nonconcurrent() is executed 1x and skipped 1x
if (Jobs.SKIPPED_LATCH.await(10, TimeUnit.SECONDS)) {
// Exactly one job is blocked
assertEquals(1, Jobs.COUNTER.get());
// Unblock all executions
Jobs.BLOCKING_LATCH.countDown();
} else {
fail("Jobs were not executed in 10 seconds!");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
}

static class Jobs {

static final CountDownLatch BLOCKING_LATCH = new CountDownLatch(1);

static final AtomicInteger COUNTER = new AtomicInteger(0);
static final CountDownLatch SKIPPED_LATCH = new CountDownLatch(1);

@Scheduled(every = "1s", concurrentExecution = SKIP)
void nonconcurrent() throws InterruptedException {
COUNTER.incrementAndGet();
if (!BLOCKING_LATCH.await(10, TimeUnit.SECONDS)) {
throw new IllegalStateException("nonconcurrent() execution blocked too long...");
}
}

void onSkip(@Observes SkippedExecution event) {
SKIPPED_LATCH.countDown();
}
}
}

This file was deleted.

Loading

0 comments on commit 8fbb04d

Please sign in to comment.