-
Notifications
You must be signed in to change notification settings - Fork 2.7k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #10936 from mkouba/issue-10923
Implement container-managed concurrency for beans
- Loading branch information
Showing
8 changed files
with
542 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
91 changes: 91 additions & 0 deletions
91
extensions/arc/deployment/src/test/java/io/quarkus/arc/test/lock/LockTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
package io.quarkus.arc.test.lock; | ||
|
||
import static org.junit.jupiter.api.Assertions.assertEquals; | ||
import static org.junit.jupiter.api.Assertions.assertTrue; | ||
import static org.junit.jupiter.api.Assertions.fail; | ||
|
||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.concurrent.CountDownLatch; | ||
import java.util.concurrent.ExecutionException; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.Executors; | ||
import java.util.concurrent.Future; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
|
||
import javax.enterprise.context.ApplicationScoped; | ||
import javax.inject.Inject; | ||
|
||
import org.jboss.shrinkwrap.api.ShrinkWrap; | ||
import org.jboss.shrinkwrap.api.spec.JavaArchive; | ||
import org.junit.jupiter.api.Test; | ||
import org.junit.jupiter.api.extension.RegisterExtension; | ||
|
||
import io.quarkus.arc.Lock; | ||
import io.quarkus.test.QuarkusUnitTest; | ||
|
||
public class LockTest { | ||
|
||
@RegisterExtension | ||
static final QuarkusUnitTest config = new QuarkusUnitTest() | ||
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class) | ||
.addClasses(SimpleBean.class)); | ||
|
||
@Inject | ||
SimpleBean bean; | ||
|
||
@Test | ||
public void testLock() throws InterruptedException, ExecutionException { | ||
int count = 2; | ||
ExecutorService executor = Executors.newFixedThreadPool(count); | ||
try { | ||
// Submit the tasks | ||
List<Future<?>> results = new ArrayList<>(); | ||
for (int i = 0; i < count; i++) { | ||
int idx = i; | ||
results.add(executor.submit(() -> { | ||
try { | ||
bean.ping(idx); | ||
} catch (InterruptedException e) { | ||
Thread.currentThread().interrupt(); | ||
throw new RuntimeException(e); | ||
} | ||
})); | ||
} | ||
// Wait until the first method invocation is locked | ||
assertTrue(SimpleBean.FIRST_INSIDE_LATCH.await(5, TimeUnit.SECONDS)); | ||
// Verify that no invocation was completed but one started | ||
assertEquals(0, SimpleBean.COMPLETED.get()); | ||
// Count down the "completed" latch -> finish invocation of the first | ||
SimpleBean.MAY_COMPLETE_LATCH.countDown(); | ||
// Wait until all tasks are complete | ||
for (Future<?> future : results) { | ||
future.get(); | ||
} | ||
assertEquals(2, SimpleBean.COMPLETED.get()); | ||
} finally { | ||
executor.shutdownNow(); | ||
} | ||
} | ||
|
||
@ApplicationScoped | ||
static class SimpleBean { | ||
|
||
static final CountDownLatch FIRST_INSIDE_LATCH = new CountDownLatch(1); | ||
static final CountDownLatch MAY_COMPLETE_LATCH = new CountDownLatch(1); | ||
static final AtomicInteger COMPLETED = new AtomicInteger(); | ||
|
||
@Lock | ||
public void ping(int idx) throws InterruptedException { | ||
if (FIRST_INSIDE_LATCH.getCount() == 0 && COMPLETED.get() == 0) { | ||
fail("Locked method invocation not finished yet"); | ||
} | ||
FIRST_INSIDE_LATCH.countDown(); | ||
assertTrue(MAY_COMPLETE_LATCH.await(3, TimeUnit.SECONDS), idx + ":" + MAY_COMPLETE_LATCH.toString()); | ||
COMPLETED.incrementAndGet(); | ||
} | ||
|
||
} | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
67 changes: 67 additions & 0 deletions
67
independent-projects/arc/runtime/src/main/java/io/quarkus/arc/Lock.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
package io.quarkus.arc; | ||
|
||
import static java.lang.annotation.ElementType.METHOD; | ||
import static java.lang.annotation.ElementType.TYPE; | ||
import static java.lang.annotation.RetentionPolicy.RUNTIME; | ||
|
||
import java.lang.annotation.Inherited; | ||
import java.lang.annotation.Retention; | ||
import java.lang.annotation.Target; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.locks.ReadWriteLock; | ||
import javax.enterprise.util.Nonbinding; | ||
import javax.interceptor.InterceptorBinding; | ||
|
||
/** | ||
* Defines a concurrency lock for a bean. | ||
* <p> | ||
* The container provides a built-in interceptor for this interceptor binding. Each interceptor instance associated with a | ||
* contextual instance of an intercepted bean holds a {@link ReadWriteLock} instance with non-fair ordering policy. | ||
*/ | ||
@InterceptorBinding | ||
@Inherited | ||
@Target(value = { TYPE, METHOD }) | ||
@Retention(value = RUNTIME) | ||
public @interface Lock { | ||
|
||
/** | ||
* | ||
* @return the type of the lock | ||
*/ | ||
@Nonbinding | ||
Type value() default Type.WRITE; | ||
|
||
/** | ||
* If it's not possible to acquire the lock in the given time a {@link LockException} is thrown. | ||
* | ||
* @see java.util.concurrent.locks.Lock#tryLock(long, TimeUnit) | ||
* @return the wait time | ||
*/ | ||
@Nonbinding | ||
long time() default -1l; | ||
|
||
/** | ||
* | ||
* @return the wait time unit | ||
*/ | ||
@Nonbinding | ||
TimeUnit unit() default TimeUnit.MILLISECONDS; | ||
|
||
public enum Type { | ||
/** | ||
* Acquires the read lock before the business method is invoked. | ||
*/ | ||
READ, | ||
/** | ||
* Acquires the write (exclusive) lock before the business method is invoked. | ||
*/ | ||
WRITE, | ||
/** | ||
* Acquires no lock. | ||
* <p> | ||
* This could be useful if you need to override the behavior defined by a class-level interceptor binding. | ||
*/ | ||
NONE | ||
} | ||
|
||
} |
15 changes: 15 additions & 0 deletions
15
independent-projects/arc/runtime/src/main/java/io/quarkus/arc/LockException.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
package io.quarkus.arc; | ||
|
||
/** | ||
* | ||
* @see Lock | ||
*/ | ||
public class LockException extends RuntimeException { | ||
|
||
private static final long serialVersionUID = 4486284740873061615L; | ||
|
||
public LockException(String message) { | ||
super(message); | ||
} | ||
|
||
} |
92 changes: 92 additions & 0 deletions
92
independent-projects/arc/runtime/src/main/java/io/quarkus/arc/impl/LockInterceptor.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
package io.quarkus.arc.impl; | ||
|
||
import static javax.interceptor.Interceptor.Priority.PLATFORM_BEFORE; | ||
|
||
import io.quarkus.arc.ArcInvocationContext; | ||
import io.quarkus.arc.Lock; | ||
import io.quarkus.arc.LockException; | ||
import java.lang.annotation.Annotation; | ||
import java.util.Set; | ||
import java.util.concurrent.locks.ReadWriteLock; | ||
import java.util.concurrent.locks.ReentrantReadWriteLock; | ||
import javax.annotation.Priority; | ||
import javax.interceptor.AroundInvoke; | ||
import javax.interceptor.Interceptor; | ||
import javax.interceptor.InvocationContext; | ||
|
||
@Lock | ||
@Interceptor | ||
@Priority(PLATFORM_BEFORE) | ||
public class LockInterceptor { | ||
|
||
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); | ||
|
||
@AroundInvoke | ||
Object lock(InvocationContext ctx) throws Exception { | ||
Lock lock = getLock(ctx); | ||
switch (lock.value()) { | ||
case WRITE: | ||
return writeLock(lock, ctx); | ||
case READ: | ||
return readLock(lock, ctx); | ||
case NONE: | ||
return ctx.proceed(); | ||
} | ||
throw new LockException("Unsupported @Lock type found on business method " + ctx.getMethod()); | ||
} | ||
|
||
private Object writeLock(Lock lock, InvocationContext ctx) throws Exception { | ||
boolean locked = false; | ||
long time = lock.time(); | ||
try { | ||
if (time > 0) { | ||
locked = readWriteLock.writeLock().tryLock(time, lock.unit()); | ||
if (!locked) { | ||
throw new LockException("Write lock not acquired in " + lock.unit().toMillis(time) + " ms"); | ||
} | ||
} else { | ||
readWriteLock.writeLock().lock(); | ||
locked = true; | ||
} | ||
return ctx.proceed(); | ||
} finally { | ||
if (locked) { | ||
readWriteLock.writeLock().unlock(); | ||
} | ||
} | ||
} | ||
|
||
private Object readLock(Lock lock, InvocationContext ctx) throws Exception { | ||
boolean locked = false; | ||
long time = lock.time(); | ||
try { | ||
if (time > 0) { | ||
locked = readWriteLock.readLock().tryLock(time, lock.unit()); | ||
if (!locked) { | ||
throw new LockException("Read lock not acquired in " + lock.unit().toMillis(time) + " ms"); | ||
} | ||
} else { | ||
readWriteLock.readLock().lock(); | ||
locked = true; | ||
} | ||
return ctx.proceed(); | ||
} finally { | ||
if (locked) { | ||
readWriteLock.readLock().unlock(); | ||
} | ||
} | ||
} | ||
|
||
@SuppressWarnings("unchecked") | ||
Lock getLock(InvocationContext ctx) { | ||
Set<Annotation> bindings = (Set<Annotation>) ctx.getContextData().get(ArcInvocationContext.KEY_INTERCEPTOR_BINDINGS); | ||
for (Annotation annotation : bindings) { | ||
if (annotation.annotationType().equals(Lock.class)) { | ||
return (Lock) annotation; | ||
} | ||
} | ||
// This should never happen | ||
throw new LockException("@Lock binding not found on business method " + ctx.getMethod()); | ||
} | ||
|
||
} |
Oops, something went wrong.