Skip to content

Commit

Permalink
Merge pull request quarkusio#10936 from mkouba/issue-10923
Browse files Browse the repository at this point in the history
Implement container-managed concurrency for beans
  • Loading branch information
stuartwdouglas authored Jul 26, 2020
2 parents 8d892f9 + 29db695 commit a081360
Show file tree
Hide file tree
Showing 8 changed files with 542 additions and 0 deletions.
34 changes: 34 additions & 0 deletions docs/src/main/asciidoc/cdi-reference.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,40 @@ This situation is very common when trying to use CDI with alternative JVM langua

Quarkus however, can overcome these limitations when `quarkus.arc.transform-unproxyable-classes` is set to `true` (which is the default value).

=== Container-managed Concurrency

There is no standard concurrency control mechanism for CDI beans.
Nevertheless, a bean instance can be shared and accessed concurrently from multiple threads.
In that case it should be thread-safe.
You can use standard Java constructs (`volatile`, `synchronized`, `ReadWriteLock`, etc.) or let the container control the concurrent access.
Quarkus provides `@io.quarkus.arc.Lock` and a built-in interceptor for this interceptor binding.
Each interceptor instance associated with a contextual instance of an intercepted bean holds a separate `ReadWriteLock` with non-fair ordering policy.

TIP: `io.quarkus.arc.Lock` is a regular interceptor binding and as such can be used for any bean with any scope. However, it is especially useful for "shared" scopes, e.g. `@Singleton` and `@ApplicationScoped`.

.Container-managed Concurrency Example
[source,java]
----
import io.quarkus.arc.Lock;
@Lock <1>
@ApplicationScoped
class SharedService {
void addAmount(BigDecimal amout) {
// ...changes some internal state of the bean
}
@Lock(value = Lock.Type.READ, time = 1, unit = TimeUnit.SECONDS) <2> <3>
BigDecimal getAmount() {
// ...it is safe to read the value concurrently
}
}
----
<1> `@Lock` (which maps to `@Lock(Lock.Type.WRITE)`) declared on the class instructs the container to lock the bean instance for any invocation of any business method, i.e. the client has "exclusive access" and no concurrent invocations will be allowed.
<2> `@Lock(Lock.Type.READ)` overrides the value specified at class level. It means that any number of clients can invoke the method concurrently, unless the bean instance is locked by `@Lock(Lock.Type.WRITE)`.
<3> You can also specify the "wait time". If it's not possible to acquire the lock in the given time a `LockException` is thrown.

[[build_time_apis]]
== Build Time Extension Points

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package io.quarkus.arc.test.lock;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.arc.Lock;
import io.quarkus.test.QuarkusUnitTest;

public class LockTest {

@RegisterExtension
static final QuarkusUnitTest config = new QuarkusUnitTest()
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
.addClasses(SimpleBean.class));

@Inject
SimpleBean bean;

@Test
public void testLock() throws InterruptedException, ExecutionException {
int count = 2;
ExecutorService executor = Executors.newFixedThreadPool(count);
try {
// Submit the tasks
List<Future<?>> results = new ArrayList<>();
for (int i = 0; i < count; i++) {
int idx = i;
results.add(executor.submit(() -> {
try {
bean.ping(idx);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}));
}
// Wait until the first method invocation is locked
assertTrue(SimpleBean.FIRST_INSIDE_LATCH.await(5, TimeUnit.SECONDS));
// Verify that no invocation was completed but one started
assertEquals(0, SimpleBean.COMPLETED.get());
// Count down the "completed" latch -> finish invocation of the first
SimpleBean.MAY_COMPLETE_LATCH.countDown();
// Wait until all tasks are complete
for (Future<?> future : results) {
future.get();
}
assertEquals(2, SimpleBean.COMPLETED.get());
} finally {
executor.shutdownNow();
}
}

@ApplicationScoped
static class SimpleBean {

static final CountDownLatch FIRST_INSIDE_LATCH = new CountDownLatch(1);
static final CountDownLatch MAY_COMPLETE_LATCH = new CountDownLatch(1);
static final AtomicInteger COMPLETED = new AtomicInteger();

@Lock
public void ping(int idx) throws InterruptedException {
if (FIRST_INSIDE_LATCH.getCount() == 0 && COMPLETED.get() == 0) {
fail("Locked method invocation not finished yet");
}
FIRST_INSIDE_LATCH.countDown();
assertTrue(MAY_COMPLETE_LATCH.await(3, TimeUnit.SECONDS), idx + ":" + MAY_COMPLETE_LATCH.toString());
COMPLETED.incrementAndGet();
}

}

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package io.quarkus.arc.processor;

import io.quarkus.arc.Lock;
import io.quarkus.arc.impl.ActivateRequestContextInterceptor;
import io.quarkus.arc.impl.InjectableRequestContextController;
import io.quarkus.arc.impl.LockInterceptor;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Modifier;
Expand Down Expand Up @@ -64,9 +66,11 @@ private static IndexView buildAdditionalIndex() {
index(indexer, Destroyed.class.getName());
index(indexer, Intercepted.class.getName());
index(indexer, Model.class.getName());
index(indexer, Lock.class.getName());
// Arc built-in beans
index(indexer, ActivateRequestContextInterceptor.class.getName());
index(indexer, InjectableRequestContextController.class.getName());
index(indexer, LockInterceptor.class.getName());
return indexer.complete();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package io.quarkus.arc;

import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.TYPE;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import javax.enterprise.util.Nonbinding;
import javax.interceptor.InterceptorBinding;

/**
* Defines a concurrency lock for a bean.
* <p>
* The container provides a built-in interceptor for this interceptor binding. Each interceptor instance associated with a
* contextual instance of an intercepted bean holds a {@link ReadWriteLock} instance with non-fair ordering policy.
*/
@InterceptorBinding
@Inherited
@Target(value = { TYPE, METHOD })
@Retention(value = RUNTIME)
public @interface Lock {

/**
*
* @return the type of the lock
*/
@Nonbinding
Type value() default Type.WRITE;

/**
* If it's not possible to acquire the lock in the given time a {@link LockException} is thrown.
*
* @see java.util.concurrent.locks.Lock#tryLock(long, TimeUnit)
* @return the wait time
*/
@Nonbinding
long time() default -1l;

/**
*
* @return the wait time unit
*/
@Nonbinding
TimeUnit unit() default TimeUnit.MILLISECONDS;

public enum Type {
/**
* Acquires the read lock before the business method is invoked.
*/
READ,
/**
* Acquires the write (exclusive) lock before the business method is invoked.
*/
WRITE,
/**
* Acquires no lock.
* <p>
* This could be useful if you need to override the behavior defined by a class-level interceptor binding.
*/
NONE
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.quarkus.arc;

/**
*
* @see Lock
*/
public class LockException extends RuntimeException {

private static final long serialVersionUID = 4486284740873061615L;

public LockException(String message) {
super(message);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package io.quarkus.arc.impl;

import static javax.interceptor.Interceptor.Priority.PLATFORM_BEFORE;

import io.quarkus.arc.ArcInvocationContext;
import io.quarkus.arc.Lock;
import io.quarkus.arc.LockException;
import java.lang.annotation.Annotation;
import java.util.Set;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.annotation.Priority;
import javax.interceptor.AroundInvoke;
import javax.interceptor.Interceptor;
import javax.interceptor.InvocationContext;

@Lock
@Interceptor
@Priority(PLATFORM_BEFORE)
public class LockInterceptor {

private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();

@AroundInvoke
Object lock(InvocationContext ctx) throws Exception {
Lock lock = getLock(ctx);
switch (lock.value()) {
case WRITE:
return writeLock(lock, ctx);
case READ:
return readLock(lock, ctx);
case NONE:
return ctx.proceed();
}
throw new LockException("Unsupported @Lock type found on business method " + ctx.getMethod());
}

private Object writeLock(Lock lock, InvocationContext ctx) throws Exception {
boolean locked = false;
long time = lock.time();
try {
if (time > 0) {
locked = readWriteLock.writeLock().tryLock(time, lock.unit());
if (!locked) {
throw new LockException("Write lock not acquired in " + lock.unit().toMillis(time) + " ms");
}
} else {
readWriteLock.writeLock().lock();
locked = true;
}
return ctx.proceed();
} finally {
if (locked) {
readWriteLock.writeLock().unlock();
}
}
}

private Object readLock(Lock lock, InvocationContext ctx) throws Exception {
boolean locked = false;
long time = lock.time();
try {
if (time > 0) {
locked = readWriteLock.readLock().tryLock(time, lock.unit());
if (!locked) {
throw new LockException("Read lock not acquired in " + lock.unit().toMillis(time) + " ms");
}
} else {
readWriteLock.readLock().lock();
locked = true;
}
return ctx.proceed();
} finally {
if (locked) {
readWriteLock.readLock().unlock();
}
}
}

@SuppressWarnings("unchecked")
Lock getLock(InvocationContext ctx) {
Set<Annotation> bindings = (Set<Annotation>) ctx.getContextData().get(ArcInvocationContext.KEY_INTERCEPTOR_BINDINGS);
for (Annotation annotation : bindings) {
if (annotation.annotationType().equals(Lock.class)) {
return (Lock) annotation;
}
}
// This should never happen
throw new LockException("@Lock binding not found on business method " + ctx.getMethod());
}

}
Loading

0 comments on commit a081360

Please sign in to comment.