From 29db695d30256cb5bc23245a20c0fa7eae8c7a07 Mon Sep 17 00:00:00 2001 From: Martin Kouba Date: Thu, 23 Jul 2020 15:17:40 +0200 Subject: [PATCH] Implement container-managed concurrency for beans - resolves #10923 --- docs/src/main/asciidoc/cdi-reference.adoc | 34 +++++ .../io/quarkus/arc/test/lock/LockTest.java | 91 +++++++++++ .../quarkus/arc/processor/BeanArchives.java | 4 + .../src/main/java/io/quarkus/arc/Lock.java | 67 ++++++++ .../java/io/quarkus/arc/LockException.java | 15 ++ .../io/quarkus/arc/impl/LockInterceptor.java | 92 +++++++++++ .../arc/test/lock/LockInterceptorTest.java | 144 ++++++++++++++++++ .../quarkus/arc/test/lock/LockWaitTest.java | 95 ++++++++++++ 8 files changed, 542 insertions(+) create mode 100644 extensions/arc/deployment/src/test/java/io/quarkus/arc/test/lock/LockTest.java create mode 100644 independent-projects/arc/runtime/src/main/java/io/quarkus/arc/Lock.java create mode 100644 independent-projects/arc/runtime/src/main/java/io/quarkus/arc/LockException.java create mode 100644 independent-projects/arc/runtime/src/main/java/io/quarkus/arc/impl/LockInterceptor.java create mode 100644 independent-projects/arc/tests/src/test/java/io/quarkus/arc/test/lock/LockInterceptorTest.java create mode 100644 independent-projects/arc/tests/src/test/java/io/quarkus/arc/test/lock/LockWaitTest.java diff --git a/docs/src/main/asciidoc/cdi-reference.adoc b/docs/src/main/asciidoc/cdi-reference.adoc index 55e6520928f8e..8ff45199a1805 100644 --- a/docs/src/main/asciidoc/cdi-reference.adoc +++ b/docs/src/main/asciidoc/cdi-reference.adoc @@ -661,6 +661,40 @@ This situation is very common when trying to use CDI with alternative JVM langua Quarkus however, can overcome these limitations when `quarkus.arc.transform-unproxyable-classes` is set to `true` (which is the default value). +=== Container-managed Concurrency + +There is no standard concurrency control mechanism for CDI beans. +Nevertheless, a bean instance can be shared and accessed concurrently from multiple threads. +In that case it should be thread-safe. +You can use standard Java constructs (`volatile`, `synchronized`, `ReadWriteLock`, etc.) or let the container control the concurrent access. +Quarkus provides `@io.quarkus.arc.Lock` and a built-in interceptor for this interceptor binding. +Each interceptor instance associated with a contextual instance of an intercepted bean holds a separate `ReadWriteLock` with non-fair ordering policy. + +TIP: `io.quarkus.arc.Lock` is a regular interceptor binding and as such can be used for any bean with any scope. However, it is especially useful for "shared" scopes, e.g. `@Singleton` and `@ApplicationScoped`. + +.Container-managed Concurrency Example +[source,java] +---- +import io.quarkus.arc.Lock; + +@Lock <1> +@ApplicationScoped +class SharedService { + + void addAmount(BigDecimal amout) { + // ...changes some internal state of the bean + } + + @Lock(value = Lock.Type.READ, time = 1, unit = TimeUnit.SECONDS) <2> <3> + BigDecimal getAmount() { + // ...it is safe to read the value concurrently + } +} +---- +<1> `@Lock` (which maps to `@Lock(Lock.Type.WRITE)`) declared on the class instructs the container to lock the bean instance for any invocation of any business method, i.e. the client has "exclusive access" and no concurrent invocations will be allowed. +<2> `@Lock(Lock.Type.READ)` overrides the value specified at class level. It means that any number of clients can invoke the method concurrently, unless the bean instance is locked by `@Lock(Lock.Type.WRITE)`. +<3> You can also specify the "wait time". If it's not possible to acquire the lock in the given time a `LockException` is thrown. + [[build_time_apis]] == Build Time Extension Points diff --git a/extensions/arc/deployment/src/test/java/io/quarkus/arc/test/lock/LockTest.java b/extensions/arc/deployment/src/test/java/io/quarkus/arc/test/lock/LockTest.java new file mode 100644 index 0000000000000..45dc95cb8606c --- /dev/null +++ b/extensions/arc/deployment/src/test/java/io/quarkus/arc/test/lock/LockTest.java @@ -0,0 +1,91 @@ +package io.quarkus.arc.test.lock; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; + +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.spec.JavaArchive; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.arc.Lock; +import io.quarkus.test.QuarkusUnitTest; + +public class LockTest { + + @RegisterExtension + static final QuarkusUnitTest config = new QuarkusUnitTest() + .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class) + .addClasses(SimpleBean.class)); + + @Inject + SimpleBean bean; + + @Test + public void testLock() throws InterruptedException, ExecutionException { + int count = 2; + ExecutorService executor = Executors.newFixedThreadPool(count); + try { + // Submit the tasks + List> results = new ArrayList<>(); + for (int i = 0; i < count; i++) { + int idx = i; + results.add(executor.submit(() -> { + try { + bean.ping(idx); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + })); + } + // Wait until the first method invocation is locked + assertTrue(SimpleBean.FIRST_INSIDE_LATCH.await(5, TimeUnit.SECONDS)); + // Verify that no invocation was completed but one started + assertEquals(0, SimpleBean.COMPLETED.get()); + // Count down the "completed" latch -> finish invocation of the first + SimpleBean.MAY_COMPLETE_LATCH.countDown(); + // Wait until all tasks are complete + for (Future future : results) { + future.get(); + } + assertEquals(2, SimpleBean.COMPLETED.get()); + } finally { + executor.shutdownNow(); + } + } + + @ApplicationScoped + static class SimpleBean { + + static final CountDownLatch FIRST_INSIDE_LATCH = new CountDownLatch(1); + static final CountDownLatch MAY_COMPLETE_LATCH = new CountDownLatch(1); + static final AtomicInteger COMPLETED = new AtomicInteger(); + + @Lock + public void ping(int idx) throws InterruptedException { + if (FIRST_INSIDE_LATCH.getCount() == 0 && COMPLETED.get() == 0) { + fail("Locked method invocation not finished yet"); + } + FIRST_INSIDE_LATCH.countDown(); + assertTrue(MAY_COMPLETE_LATCH.await(3, TimeUnit.SECONDS), idx + ":" + MAY_COMPLETE_LATCH.toString()); + COMPLETED.incrementAndGet(); + } + + } + +} diff --git a/independent-projects/arc/processor/src/main/java/io/quarkus/arc/processor/BeanArchives.java b/independent-projects/arc/processor/src/main/java/io/quarkus/arc/processor/BeanArchives.java index 584342319a98d..56ce8f311528f 100644 --- a/independent-projects/arc/processor/src/main/java/io/quarkus/arc/processor/BeanArchives.java +++ b/independent-projects/arc/processor/src/main/java/io/quarkus/arc/processor/BeanArchives.java @@ -1,7 +1,9 @@ package io.quarkus.arc.processor; +import io.quarkus.arc.Lock; import io.quarkus.arc.impl.ActivateRequestContextInterceptor; import io.quarkus.arc.impl.InjectableRequestContextController; +import io.quarkus.arc.impl.LockInterceptor; import java.io.IOException; import java.io.InputStream; import java.lang.reflect.Modifier; @@ -64,9 +66,11 @@ private static IndexView buildAdditionalIndex() { index(indexer, Destroyed.class.getName()); index(indexer, Intercepted.class.getName()); index(indexer, Model.class.getName()); + index(indexer, Lock.class.getName()); // Arc built-in beans index(indexer, ActivateRequestContextInterceptor.class.getName()); index(indexer, InjectableRequestContextController.class.getName()); + index(indexer, LockInterceptor.class.getName()); return indexer.complete(); } diff --git a/independent-projects/arc/runtime/src/main/java/io/quarkus/arc/Lock.java b/independent-projects/arc/runtime/src/main/java/io/quarkus/arc/Lock.java new file mode 100644 index 0000000000000..44536cd17afea --- /dev/null +++ b/independent-projects/arc/runtime/src/main/java/io/quarkus/arc/Lock.java @@ -0,0 +1,67 @@ +package io.quarkus.arc; + +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.TYPE; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.Target; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReadWriteLock; +import javax.enterprise.util.Nonbinding; +import javax.interceptor.InterceptorBinding; + +/** + * Defines a concurrency lock for a bean. + *

+ * The container provides a built-in interceptor for this interceptor binding. Each interceptor instance associated with a + * contextual instance of an intercepted bean holds a {@link ReadWriteLock} instance with non-fair ordering policy. + */ +@InterceptorBinding +@Inherited +@Target(value = { TYPE, METHOD }) +@Retention(value = RUNTIME) +public @interface Lock { + + /** + * + * @return the type of the lock + */ + @Nonbinding + Type value() default Type.WRITE; + + /** + * If it's not possible to acquire the lock in the given time a {@link LockException} is thrown. + * + * @see java.util.concurrent.locks.Lock#tryLock(long, TimeUnit) + * @return the wait time + */ + @Nonbinding + long time() default -1l; + + /** + * + * @return the wait time unit + */ + @Nonbinding + TimeUnit unit() default TimeUnit.MILLISECONDS; + + public enum Type { + /** + * Acquires the read lock before the business method is invoked. + */ + READ, + /** + * Acquires the write (exclusive) lock before the business method is invoked. + */ + WRITE, + /** + * Acquires no lock. + *

+ * This could be useful if you need to override the behavior defined by a class-level interceptor binding. + */ + NONE + } + +} diff --git a/independent-projects/arc/runtime/src/main/java/io/quarkus/arc/LockException.java b/independent-projects/arc/runtime/src/main/java/io/quarkus/arc/LockException.java new file mode 100644 index 0000000000000..4bb95d297c753 --- /dev/null +++ b/independent-projects/arc/runtime/src/main/java/io/quarkus/arc/LockException.java @@ -0,0 +1,15 @@ +package io.quarkus.arc; + +/** + * + * @see Lock + */ +public class LockException extends RuntimeException { + + private static final long serialVersionUID = 4486284740873061615L; + + public LockException(String message) { + super(message); + } + +} diff --git a/independent-projects/arc/runtime/src/main/java/io/quarkus/arc/impl/LockInterceptor.java b/independent-projects/arc/runtime/src/main/java/io/quarkus/arc/impl/LockInterceptor.java new file mode 100644 index 0000000000000..d10e560bd9cd1 --- /dev/null +++ b/independent-projects/arc/runtime/src/main/java/io/quarkus/arc/impl/LockInterceptor.java @@ -0,0 +1,92 @@ +package io.quarkus.arc.impl; + +import static javax.interceptor.Interceptor.Priority.PLATFORM_BEFORE; + +import io.quarkus.arc.ArcInvocationContext; +import io.quarkus.arc.Lock; +import io.quarkus.arc.LockException; +import java.lang.annotation.Annotation; +import java.util.Set; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import javax.annotation.Priority; +import javax.interceptor.AroundInvoke; +import javax.interceptor.Interceptor; +import javax.interceptor.InvocationContext; + +@Lock +@Interceptor +@Priority(PLATFORM_BEFORE) +public class LockInterceptor { + + private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + + @AroundInvoke + Object lock(InvocationContext ctx) throws Exception { + Lock lock = getLock(ctx); + switch (lock.value()) { + case WRITE: + return writeLock(lock, ctx); + case READ: + return readLock(lock, ctx); + case NONE: + return ctx.proceed(); + } + throw new LockException("Unsupported @Lock type found on business method " + ctx.getMethod()); + } + + private Object writeLock(Lock lock, InvocationContext ctx) throws Exception { + boolean locked = false; + long time = lock.time(); + try { + if (time > 0) { + locked = readWriteLock.writeLock().tryLock(time, lock.unit()); + if (!locked) { + throw new LockException("Write lock not acquired in " + lock.unit().toMillis(time) + " ms"); + } + } else { + readWriteLock.writeLock().lock(); + locked = true; + } + return ctx.proceed(); + } finally { + if (locked) { + readWriteLock.writeLock().unlock(); + } + } + } + + private Object readLock(Lock lock, InvocationContext ctx) throws Exception { + boolean locked = false; + long time = lock.time(); + try { + if (time > 0) { + locked = readWriteLock.readLock().tryLock(time, lock.unit()); + if (!locked) { + throw new LockException("Read lock not acquired in " + lock.unit().toMillis(time) + " ms"); + } + } else { + readWriteLock.readLock().lock(); + locked = true; + } + return ctx.proceed(); + } finally { + if (locked) { + readWriteLock.readLock().unlock(); + } + } + } + + @SuppressWarnings("unchecked") + Lock getLock(InvocationContext ctx) { + Set bindings = (Set) ctx.getContextData().get(ArcInvocationContext.KEY_INTERCEPTOR_BINDINGS); + for (Annotation annotation : bindings) { + if (annotation.annotationType().equals(Lock.class)) { + return (Lock) annotation; + } + } + // This should never happen + throw new LockException("@Lock binding not found on business method " + ctx.getMethod()); + } + +} diff --git a/independent-projects/arc/tests/src/test/java/io/quarkus/arc/test/lock/LockInterceptorTest.java b/independent-projects/arc/tests/src/test/java/io/quarkus/arc/test/lock/LockInterceptorTest.java new file mode 100644 index 0000000000000..5aa92ae068025 --- /dev/null +++ b/independent-projects/arc/tests/src/test/java/io/quarkus/arc/test/lock/LockInterceptorTest.java @@ -0,0 +1,144 @@ +package io.quarkus.arc.test.lock; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import io.quarkus.arc.Arc; +import io.quarkus.arc.Lock; +import io.quarkus.arc.Lock.Type; +import io.quarkus.arc.impl.LockInterceptor; +import io.quarkus.arc.test.ArcTestContainer; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import javax.enterprise.context.ApplicationScoped; +import javax.enterprise.context.Dependent; +import javax.inject.Singleton; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +public class LockInterceptorTest { + + private static int POOL_SIZE = 4; + + static ExecutorService executor; + + @BeforeAll + static void initExecutor() { + executor = Executors.newFixedThreadPool(POOL_SIZE); + } + + @AfterAll + static void shutdownExecutor() { + executor.shutdownNow(); + } + + @RegisterExtension + public ArcTestContainer container = new ArcTestContainer(SimpleSingletonBean.class, SimpleDependentBean.class, + SimpleApplicationScopedBean.class, Lock.class, + LockInterceptor.class); + + @Test + public void testSingletonBean() throws Exception { + assertConcurrentAccess(SimpleSingletonBean.class); + } + + @Test + public void testDependentBean() throws Exception { + assertConcurrentAccess(SimpleDependentBean.class); + } + + @Test + public void testApplicationScopedBean() throws Exception { + assertConcurrentAccess(SimpleApplicationScopedBean.class); + } + + private void assertConcurrentAccess(Class pingClass) + throws InterruptedException, ExecutionException, TimeoutException { + Ping bean = Arc.container().instance(pingClass).get(); + // Reset latches + Ping.reset(); + int numberOfTasks = POOL_SIZE; + List> results = new ArrayList<>(); + for (int i = 0; i < numberOfTasks; i++) { + int idx = i; + results.add(executor.submit(() -> { + try { + bean.ping(idx); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + })); + } + // Wait until the first method invocation starts + assertTrue(Ping.FIRST_INSIDE_LATCH.await(5, TimeUnit.SECONDS)); + // At this time all tasks should be blocked + assertEquals(0, Ping.COMPLETED.get()); + // Count down the completed latch and wait for results + Ping.MAY_COMPLETE_LATCH.countDown(); + for (Future future : results) { + future.get(5, TimeUnit.SECONDS); + } + assertEquals(numberOfTasks, Ping.COMPLETED.get()); + } + + static abstract class Ping { + + static CountDownLatch FIRST_INSIDE_LATCH; + static CountDownLatch MAY_COMPLETE_LATCH; + static AtomicInteger COMPLETED; + + static void reset() { + FIRST_INSIDE_LATCH = new CountDownLatch(1); + MAY_COMPLETE_LATCH = new CountDownLatch(1); + COMPLETED = new AtomicInteger(); + } + + void ping(int idx) throws InterruptedException { + if (FIRST_INSIDE_LATCH.getCount() == 0 && COMPLETED.get() == 0) { + fail("Locked method invocation not finished yet"); + } + FIRST_INSIDE_LATCH.countDown(); + assertTrue(MAY_COMPLETE_LATCH.await(5, TimeUnit.SECONDS), MAY_COMPLETE_LATCH.toString()); + COMPLETED.incrementAndGet(); + } + + } + + @Lock + @Singleton + static class SimpleSingletonBean extends Ping { + + } + + @Dependent + static class SimpleDependentBean extends Ping { + + @Lock + void ping(int idx) throws InterruptedException { + super.ping(idx); + } + + } + + @Lock(Type.READ) + @ApplicationScoped + static class SimpleApplicationScopedBean extends Ping { + + @Lock(Type.WRITE) + void ping(int idx) throws InterruptedException { + super.ping(idx); + } + } +} diff --git a/independent-projects/arc/tests/src/test/java/io/quarkus/arc/test/lock/LockWaitTest.java b/independent-projects/arc/tests/src/test/java/io/quarkus/arc/test/lock/LockWaitTest.java new file mode 100644 index 0000000000000..37e88b9cd46fd --- /dev/null +++ b/independent-projects/arc/tests/src/test/java/io/quarkus/arc/test/lock/LockWaitTest.java @@ -0,0 +1,95 @@ +package io.quarkus.arc.test.lock; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import io.quarkus.arc.Arc; +import io.quarkus.arc.Lock; +import io.quarkus.arc.LockException; +import io.quarkus.arc.impl.LockInterceptor; +import io.quarkus.arc.test.ArcTestContainer; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import javax.enterprise.context.ApplicationScoped; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +public class LockWaitTest { + + @RegisterExtension + public ArcTestContainer container = new ArcTestContainer(SimpleApplicationScopedBean.class, Lock.class, + LockInterceptor.class); + + @Test + public void testLockWait() throws Exception { + ExecutorService executor = Executors.newFixedThreadPool(3); + try { + SimpleApplicationScopedBean bean = Arc.container().instance(SimpleApplicationScopedBean.class).get(); + + // First invocation + Future firstResult = executor.submit(() -> { + try { + bean.ping(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + }); + // Wait until the first method invocation starts + assertTrue(SimpleApplicationScopedBean.FIRST_INSIDE_LATCH.await(50, TimeUnit.SECONDS)); + + // Second invocation - should be blocked and fail after 100ms + Future secondResult = executor.submit(() -> { + try { + bean.ping(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + }); + + try { + secondResult.get(); + fail(); + } catch (ExecutionException expected) { + assertNotNull(expected.getCause()); + assertEquals(LockException.class, expected.getCause().getClass()); + assertTrue(expected.getCause().getMessage().contains("Write lock not acquired in")); + } + + SimpleApplicationScopedBean.MAY_COMPLETE_LATCH.countDown(); + + firstResult.get(); + assertEquals(1, SimpleApplicationScopedBean.COMPLETED.get()); + + } finally { + executor.shutdownNow(); + } + + } + + @ApplicationScoped + static class SimpleApplicationScopedBean { + + static final CountDownLatch FIRST_INSIDE_LATCH = new CountDownLatch(1); + static final CountDownLatch MAY_COMPLETE_LATCH = new CountDownLatch(1); + static final AtomicInteger COMPLETED = new AtomicInteger(); + + @Lock(time = 100) + void ping() throws InterruptedException { + if (FIRST_INSIDE_LATCH.getCount() == 0 && COMPLETED.get() == 0) { + fail("Locked method invocation not finished yet"); + } + FIRST_INSIDE_LATCH.countDown(); + assertTrue(MAY_COMPLETE_LATCH.await(50, TimeUnit.SECONDS), MAY_COMPLETE_LATCH.toString()); + COMPLETED.incrementAndGet(); + } + } +}