Skip to content

Commit

Permalink
Explicitly reject invalid aggregate event registrations during publis…
Browse files Browse the repository at this point in the history
…hing.

We now detect that the consumption of the events published during a persistence operation has produced new event instances that would go unpublished and raise an explaining exception. Previously such a scenario would've resulted in a ConcurrentModificationException.

We primarily reject such a scenario as handling the additional event would extend our convenience mechanism over the publishing scope a direct 1:1 replacement with ApplicationEventPublisher would've achieved.

Fixes GH-3116.
  • Loading branch information
odrotbohm committed Aug 6, 2024
1 parent ce62224 commit a976759
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
Expand Down Expand Up @@ -111,7 +112,7 @@ public Object invoke(MethodInvocation invocation) throws Throwable {
return result;
}

Iterable<?> arguments = asCollection(invocation.getArguments()[0], invocation.getMethod());
Iterable<?> arguments = asIterable(invocation.getArguments()[0], invocation.getMethod());

eventMethod.publishEventsFrom(arguments, publisher);

Expand Down Expand Up @@ -144,6 +145,9 @@ static class EventPublishingMethod {
private static Map<Class<?>, EventPublishingMethod> cache = new ConcurrentReferenceHashMap<>();
private static @SuppressWarnings("null") EventPublishingMethod NONE = new EventPublishingMethod(Object.class, null,
null);
private static String ILLEGAL_MODIFCATION = "Aggregate's events were modified during event publication. "
+ "Make sure event listeners obtain a fresh instance of the aggregate before adding further events. "
+ "Additional events found: %s.";

private final Class<?> type;
private final Method publishingMethod;
Expand Down Expand Up @@ -188,18 +192,33 @@ public static EventPublishingMethod of(Class<?> type) {
* @param aggregates can be {@literal null}.
* @param publisher must not be {@literal null}.
*/
public void publishEventsFrom(Iterable<?> aggregates, ApplicationEventPublisher publisher) {
public void publishEventsFrom(@Nullable Iterable<?> aggregates, ApplicationEventPublisher publisher) {

if (aggregates == null) {
return;
}

for (Object aggregateRoot : aggregates) {

if (!type.isInstance(aggregateRoot)) {
continue;
}

for (Object event : asCollection(ReflectionUtils.invokeMethod(publishingMethod, aggregateRoot), null)) {
var events = asCollection(ReflectionUtils.invokeMethod(publishingMethod, aggregateRoot));

for (Object event : events) {
publisher.publishEvent(event);
}

var postPublication = asCollection(ReflectionUtils.invokeMethod(publishingMethod, aggregateRoot));

if (events.size() != postPublication.size()) {

postPublication.removeAll(events);

throw new IllegalStateException(ILLEGAL_MODIFCATION.formatted(postPublication));
}

if (clearingMethod != null) {
ReflectionUtils.invokeMethod(clearingMethod, aggregateRoot);
}
Expand Down Expand Up @@ -272,23 +291,34 @@ private static Method getClearingMethod(AnnotationDetectionMethodCallback<?> cle
* one-element collection, {@literal null} will become an empty collection.
*
* @param source can be {@literal null}.
* @return
* @return will never be {@literal null}.
*/
@SuppressWarnings("unchecked")
private static Iterable<Object> asCollection(@Nullable Object source, @Nullable Method method) {
private static Collection<Object> asCollection(@Nullable Object source) {

if (source == null) {
return Collections.emptyList();
}

if (method != null && method.getName().startsWith("saveAll")) {
return (Iterable<Object>) source;
}

if (Collection.class.isInstance(source)) {
return (Collection<Object>) source;
return new ArrayList<>((Collection<Object>) source);
}

return Collections.singletonList(source);
}

/**
* Returns the given source object as {@link Iterable}.
*
* @param source can be {@literal null}.
* @return will never be {@literal null}.
*/
@Nullable
@SuppressWarnings("unchecked")
private static Iterable<Object> asIterable(@Nullable Object source, @Nullable Method method) {

return method != null && method.getName().startsWith("saveAll")
? (Iterable<Object>) source
: asCollection(source);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.mockito.Mockito.*;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -324,6 +325,32 @@ void doesNotEmitEventsFromPrimitiveValue() throws Throwable {
verify(publisher, never()).publishEvent(any());
}

@Test // GH-3116
void rejectsEventAddedDuringProcessing() throws Throwable {

var originalEvent = new SomeEvent();
var eventToBeAdded = new SomeEvent();

var events = new ArrayList<Object>();
events.add(originalEvent);

var aggregate = MultipleEvents.of(events);

doAnswer(invocation -> {

events.add(eventToBeAdded);
return null;

}).when(publisher).publishEvent(any(Object.class));

var method = EventPublishingMethod.of(MultipleEvents.class);

assertThatIllegalStateException()
.isThrownBy(() -> method.publishEventsFrom(List.of(aggregate), publisher))
.withMessageContaining(eventToBeAdded.toString())
.withMessageNotContaining(originalEvent.toString());
}

private static void mockInvocation(MethodInvocation invocation, Method method, Object parameterAndReturnValue)
throws Throwable {

Expand Down

0 comments on commit a976759

Please sign in to comment.