Skip to content
This repository has been archived by the owner on Jun 1, 2021. It is now read-only.

Allow EventsourcedProcessor to post-process events #373

Conversation

danbim
Copy link
Contributor

@danbim danbim commented Dec 19, 2016

  • Closes Allow EventsourcedProcessor to emit events with custom aggregation IDs #370 in which an EventsourcedProcessor was not able to define custom routing destination
    aggregate IDs for the events produced by processEvent
  • Introduces method EventsourcedProcessor.postProcessDurableEvent: DurableEvent => DurableEvent
    which defaults to identity and allows manipulation of the DurableEvent instance created as a
    consequence of new events generated by processEvent

@@ -113,7 +120,9 @@ trait EventsourcedProcessor extends EventsourcedWriter[Long, Long] with ActorLog
case payload if processEvent.isDefinedAt(payload) =>
val currentProcessedEvents = processEvent(payload)
if (lastSequenceNr > processingProgress)
processedEvents = processedEvents :+ currentProcessedEvents.map(createEvent(_, lastHandledEvent.customDestinationAggregateIds))
processedEvents = processedEvents :+ currentProcessedEvents.map { e =>
postProcessDurableEvent(createEvent(e, lastHandledEvent.customDestinationAggregateIds))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This allows implementation classes to overwrite emitterId, vectorTimestamp and processId which must be prevented.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taking a look at the fields in DurableEvent I'd say there are maybe more than these 3 fields you mentioned that shouldn't be changed:

case class DurableEvent(
  payload: Any,
  emitterId: String = DurableEvent.UndefinedEmittedId,
  emitterAggregateId: Option[String] = None,
  customDestinationAggregateIds: Set[String] = Set(),
  systemTimestamp: Long = 0L,
  vectorTimestamp: VectorTime = VectorTime.Zero,
  processId: String = DurableEvent.UndefinedLogId,
  localLogId: String = DurableEvent.UndefinedLogId,
  localSequenceNr: Long = DurableEvent.UndefinedSequenceNr,
  deliveryId: Option[String] = None,
  persistOnEventSequenceNr: Option[Long] = None)

Maybe I'm missing some use-cases here but I'm not exactly sure which other fields than customDestinationAggregateIds should be allowed to change in the first place:

  • payload is "created" in processEvent, no need to allow to post-process it
  • emitterId could be allowed to change, even though I don't have a use-case in mind now
  • systemTimestamp should be set by the EventLog implementation as I now learned in the context of Allow EventsourcedProcessor to emit events with custom aggregation IDs #370, right? If so, it shouldn't be allowed to be changed here, either.
  • vectorTimestamp should not change as you said
  • processId should not change as you said
  • localLogId probably not !?
  • localSequenceNr probably not !?
  • deliveryId probably not !?
  • persistOnEventSequenceNr probably not !?

The assumption of my initial implementation was "garbage in, garbage out", i.e. the implementor of the processor should know what he/she is doing. Therefore, I also didn't add a test because would be no real assumptions that can be verified (other than e.g. the use-case of changing the customDestinationAggregateId which I could obviously add).

WDYT? If nothing should be allowed to change other than customDestinationAggregateIds (and emitterAggregateId?) maybe it would be sensible to add something specific for that feature.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good arguments @danbim, thanks. We definitely need a better specification which DurableEvent fields should be allowed to change and which not. More importantly, both the actor-based and the stream-based event processors should enforce that specification consistently. I created a separate ticket for a common event processing model (#376) so that we can cover that later.

In context of this ticket, I'm fine with your initial proposal that is based on the "garbage in, garbage out" assumption. Let's keep it as an (undocumented) expert feature and document the work results of #376 later. It anyway closely matches what can be done with the stream-based processor at the moment.

Regarding the test, adding one that only changes customDestinationAggregateIds should be sufficient.

@@ -113,7 +120,9 @@ trait EventsourcedProcessor extends EventsourcedWriter[Long, Long] with ActorLog
case payload if processEvent.isDefinedAt(payload) =>
val currentProcessedEvents = processEvent(payload)
if (lastSequenceNr > processingProgress)
processedEvents = processedEvents :+ currentProcessedEvents.map(createEvent(_, lastHandledEvent.customDestinationAggregateIds))
processedEvents = processedEvents :+ currentProcessedEvents.map { e =>
postProcessDurableEvent(createEvent(e, lastHandledEvent.customDestinationAggregateIds))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling postProcessDurableEvent causes the creation of a new function object for each processed event. Function object creation should be done once and only function application should be repeated. Although we don't have a behavior stack here, this is similar to what an EventsourcedView does for onCommand, onEvent and onSnapshot handlers (see https://github.com/RBMHTechnology/eventuate/blob/master/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/EventsourcedView.scala#L113). Although not touched by your PR, the same problem exists for processEvent and should be changed accordingly. Such changes are a prerequisite for adding support for event processing behavior changes later, if needed.

@krasserm
Copy link
Contributor

And please write tests for all changes/additions you make 😃

- Closes RBMHTechnology#370 in which an EventsourcedProcessor was not able to define custom routing destination
  aggregate IDs for the events produced by `processEvent`
- Introduces method EventsourcedProcessor.postProcessDurableEvent: DurableEvent => DurableEvent
  which defaults to identity and allows manipulation of the DurableEvent instance created as a
  consequence of new events generated by `processEvent`
@danbim danbim force-pushed the 370_EventsourcedProcessor_customDestinationAggregateIds branch from 2264feb to e5d3fda Compare January 4, 2017 07:34
@danbim
Copy link
Contributor Author

danbim commented Jan 4, 2017

Hey Martin!

Just added a test. I'm not sure about the "code style" but I think it covers what should be tested.

Cheers
Daniel

Copy link
Contributor

@krasserm krasserm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks @danbim

CI error is not related

@krasserm krasserm merged commit 8c6e15b into RBMHTechnology:master Jan 4, 2017
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants