Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize completion of publications to issue by-identifier query #258

Closed
odrotbohm opened this issue Aug 9, 2023 · 5 comments
Closed

Optimize completion of publications to issue by-identifier query #258

odrotbohm opened this issue Aug 9, 2023 · 5 comments
Assignees
Labels
in: event publication registry Event publication registry type: enhancement Major enhanvements, new features
Milestone

Comments

@odrotbohm
Copy link
Member

#251 pointed at the fact that the database access to mark event publications as completed suffers from the need to query for event payload and publication target identifier. This stems from the fact that the completion is persisted in an interceptor around the target listener, which does not have access to the original event publication's identifier (as it only sees the listener method's argument).

To optimize the data access by using the primary key, we can provide a user-facing SPI that allows extracting an identifier from the actual event payload, defaulting to support jMolecules Identifiable / @Identity out of the box. Uses should be able to declare custom extractors by declaring instances of the interface as Spring bean.

@odrotbohm odrotbohm added in: event publication registry Event publication registry type: improvement Minor improvements labels Aug 9, 2023
@odrotbohm odrotbohm added this to the 1.0 RC1 milestone Aug 9, 2023
@odrotbohm odrotbohm self-assigned this Aug 9, 2023
@odrotbohm odrotbohm removed this from the 1.0 RC1 milestone Aug 9, 2023
@bozo-ivetic-ananas
Copy link

Hello,
recently we started using Modulith project for event persistence and marking event publication by comparing serialized event is big concern for us.
This requires refactoring considerable amount of our event classes to not use unordered collections as there is a chance that some of the publications won't be marked as completed.

Is there any timeline for this issue?

@odrotbohm
Copy link
Member Author

We have a prototype working that flips the completion to a by-identifier query. It's likely to make it into the next milestone of 1.3, i.e., you should have something to try by the end of August.

@odrotbohm odrotbohm added this to the 1.3 M2 milestone Aug 2, 2024
@odrotbohm odrotbohm changed the title Support extracting event publication identifiers from the event itself Optimize completion of publications to issue by-identifier query Aug 2, 2024
odrotbohm added a commit that referenced this issue Aug 6, 2024
DefaultEventPublicationRegistry now tracks the event publications currently in progress, so that the completion step can use the database identifier to issue an update statement solely based on that.
@odrotbohm
Copy link
Member Author

That's in place now. Feel free to give the snapshots a try.

@odrotbohm odrotbohm added type: enhancement Major enhanvements, new features and removed type: improvement Minor improvements labels Sep 24, 2024
@bozo-ivetic-ananas
Copy link

Hello @odrotbohm,
I finally managed to try this code change and

  1. I ran into a problem of ConcurrentModificationException
    I'm not in some high frequency environment by I generally publish bunch of events in succession. I'm guessing this collection is being modified with new event publications while we wan't to mark this one as completed and therefore we have exception.
  2. Method markCompleted(...) requires new transaction, does this mean we can potentially have case when we mark event as completed but something goes wrong in the outside transaction and we actually never process event but mark it as completed
  3. Can you maybe explain what is the reason for using equals when getting inProgress publications?
"java.util.ConcurrentModificationException: null
	at java.base/java.util.HashMap$KeySpliterator.tryAdvance(HashMap.java:1730)
	at java.base/java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:129)
	at java.base/java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:527)
	at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:513)
	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
	at java.base/java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:150)
	at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.base/java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:647)
	at org.springframework.modulith.events.core.DefaultEventPublicationRegistry$PublicationsInProgress.getPublication(DefaultEventPublicationRegistry.java:319)
	at org.springframework.modulith.events.core.DefaultEventPublicationRegistry$PublicationsInProgress.unregister(DefaultEventPublicationRegistry.java:300)
	at org.springframework.modulith.events.core.DefaultEventPublicationRegistry.lambda$markCompleted$3(DefaultEventPublicationRegistry.java:126)
	at java.base/java.util.Optional.ifPresentOrElse(Optional.java:196)
	at org.springframework.modulith.events.core.DefaultEventPublicationRegistry.markCompleted(DefaultEventPublicationRegistry.java:130)
	at jdk.internal.reflect.GeneratedMethodAccessor534.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:354)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:196)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:768)
	at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:123)
	at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:392)
	at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:768)
	at org.springframework.aop.aspectj.MethodInvocationProceedingJoinPoint.proceed(MethodInvocationProceedingJoinPoint.java:89)
	at rs.ananas.os.common.configuration.aop.TransactionReadonlyAspect.proceed(TransactionReadonlyAspect.java:34)
	at jdk.internal.reflect.GeneratedMethodAccessor245.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethodWithGivenArgs(AbstractAspectJAdvice.java:637)
	at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethod(AbstractAspectJAdvice.java:627)
	at org.springframework.aop.aspectj.AspectJAroundAdvice.invoke(AspectJAroundAdvice.java:71)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:173)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:768)
	at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:97)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:768)
	at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:720)
	at org.springframework.modulith.events.core.DefaultEventPublicationRegistry$$SpringCGLIB$$0.markCompleted(<generated>)
	at org.springframework.modulith.events.support.CompletionRegisteringAdvisor$CompletionRegisteringMethodInterceptor.markCompleted(CompletionRegisteringAdvisor.java:224)
	at org.springframework.modulith.events.support.CompletionRegisteringAdvisor$CompletionRegisteringMethodInterceptor.invoke(CompletionRegisteringAdvisor.java:193)

@odrotbohm
Copy link
Member Author

  1. I've created Properly deregister in-progress publication on failed resubmission attempts #891 as a follow-up for a related reason but fixed the concurrent access problem along the lines.
  2. markCompleted(…) is called after the listener has executed. I.e., it's executed on the thread that asynchronously executes the listener invocation. That means there's a rare chance for the opposite. The listener might succeed, but the publication might end up not being marked as completed. We do not consider this an issue, as listeners have to be implemented idempotently anyway or be able to gracefully handle repeated invocations.
  3. The consequence of 2. is that the completion happens from a context that only has access to the event and target identifier. Thus, we need to match on equality of those when looking up the TargetEventPublication to unregister.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
in: event publication registry Event publication registry type: enhancement Major enhanvements, new features
Projects
None yet
Development

No branches or pull requests

2 participants