Skip to content

Commit

Permalink
Merge pull request #24665 from manovotn/asyncEventExceptionHandling
Browse files Browse the repository at this point in the history
Arc - make sure exceptions from async events are always among suppressed exceptions and alter existing test to assert that
  • Loading branch information
gsmet authored Mar 31, 2022
2 parents b010b61 + cfa3204 commit 447e216
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,10 @@ private void handleExceptions(ObserverExceptionHandler handler) {
exception = new CompletionException(handledExceptions.get(0));
} else {
exception = new CompletionException(null);
for (Throwable handledException : handledExceptions) {
exception.addSuppressed(handledException);
}
}
// always add exceptions into suppressed
for (Throwable handledException : handledExceptions) {
exception.addSuppressed(handledException);
}
throw exception;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,21 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import io.quarkus.arc.Arc;
import io.quarkus.arc.ArcContainer;
import io.quarkus.arc.test.ArcTestContainer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.PostConstruct;
import javax.annotation.Priority;
import javax.enterprise.context.Dependent;
Expand All @@ -31,27 +33,71 @@ public class AsyncObserverExceptionTest {
public ArcTestContainer container = new ArcTestContainer(StringProducer.class, StringObserver.class);

@Test
public void testAsyncObservers() throws InterruptedException, ExecutionException, TimeoutException {
public void testAsyncObserversSingleException() throws InterruptedException {
ArcContainer container = Arc.container();
StringProducer producer = container.instance(StringProducer.class).get();
StringObserver observer = container.instance(StringObserver.class).get();

BlockingQueue<Object> synchronizer = new LinkedBlockingQueue<>();
final List<Throwable> suppressed = new ArrayList<>();
BlockingQueue<Throwable> synchronizer = new LinkedBlockingQueue<>();
producer.produceAsync("pong").exceptionally(ex -> {
Arrays.stream(ex.getSuppressed()).forEach(t -> suppressed.add(t));
synchronizer.add(ex);
return ex.getMessage();
});

Object exception = synchronizer.poll(10, TimeUnit.SECONDS);
Throwable exception = synchronizer.poll(10, TimeUnit.SECONDS);

// assert suppressed exception is always present
assertEquals(1, suppressed.size());
assertTrue(suppressed.get(0) instanceof RuntimeException);

// assert actual exception, always a CompletionException
assertNotNull(exception);
assertTrue(exception instanceof RuntimeException);
assertTrue(exception instanceof CompletionException);
// in case of single exception in event chain, the cause is the exception
assertTrue(exception.getCause() instanceof RuntimeException);

List<String> events = observer.getEvents();
assertEquals(2, events.size());
assertEquals("async1::pong", events.get(0));
assertEquals("async2::pong", events.get(1));
}

@Test
public void testAsyncObserversMultipleExceptions() throws InterruptedException {
ArcContainer container = Arc.container();
StringProducer producer = container.instance(StringProducer.class).get();
StringObserver observer = container.instance(StringObserver.class).get();

final List<Throwable> suppressed = new ArrayList<>();
BlockingQueue<Throwable> synchronizer = new LinkedBlockingQueue<>();
producer.produceAsync("ping").exceptionally(ex -> {
Arrays.stream(ex.getSuppressed()).forEach(t -> suppressed.add(t));
synchronizer.add(ex);
return ex.getMessage();
});

Throwable exception = synchronizer.poll(10, TimeUnit.SECONDS);

// assert all suppressed exceptions are present
assertEquals(2, suppressed.size());
assertTrue(suppressed.get(0) instanceof RuntimeException);
assertTrue(suppressed.get(1) instanceof IllegalStateException);

// assert actual exception, always a CompletionException
assertNotNull(exception);
assertTrue(exception instanceof CompletionException);
// in case of multiple exceptions in event chain, the cause is expected to be null
assertNull(exception.getCause());

List<String> events = observer.getEvents();
assertEquals(3, events.size());
assertEquals("async1::ping", events.get(0));
assertEquals("async2::ping", events.get(1));
assertEquals("async3::ping", events.get(2));
}

@Singleton
static class StringObserver {

Expand All @@ -71,6 +117,13 @@ void observeAsync2(@ObservesAsync @Priority(2) String value) {
events.add("async2::" + value);
}

void observeAsync3(@ObservesAsync @Priority(3) String value) {
if (value.equals("ping")) {
events.add("async3::" + value);
throw new IllegalStateException("nok");
}
}

List<String> getEvents() {
return events;
}
Expand Down

0 comments on commit 447e216

Please sign in to comment.