Skip to content
This repository has been archived by the owner on Jun 1, 2021. It is now read-only.

Commit

Permalink
Use EventId in PersistOnEventRequests
Browse files Browse the repository at this point in the history
Instead of using the in case of disaster recovery potentially
unstable sequence number as id for persist on event requests use a
stable EventId that is composed of the sequence number of the emitter
of the event and the corresponding process id.

Closes #385
  • Loading branch information
volkerstampa committed Mar 29, 2017
1 parent 5a94362 commit dff43d8
Show file tree
Hide file tree
Showing 14 changed files with 320 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ object DurableEventSerializerSpec {
localLogId = "p3",
localSequenceNr = 17L,
deliveryId = Some("x"),
persistOnEventSequenceNr = Some(12L))
persistOnEventSequenceNr = Some(12L),
persistOnEventEventId = Some(EventId("p4", 0L)))
}

class DurableEventSerializerSpec extends WordSpec with Matchers with Inside with BeforeAndAfterAll {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ object SnapshotSerializerSpec {
DeliveryAttempt("4", payload, destination))

def persistOnEventRequests(payload: Any) = Vector(
PersistOnEventRequest(7L, Vector(PersistOnEventInvocation(payload, Set("a"))), 17),
PersistOnEventRequest(8L, Vector(PersistOnEventInvocation(payload, Set("b"))), 17))
PersistOnEventRequest(7L, None, Vector(PersistOnEventInvocation(payload, Set("a"))), 17),
PersistOnEventRequest(8L, Some(EventId("p-a", 3L)), Vector(PersistOnEventInvocation(payload, Set("b"))), 17))

def snapshot(payload: Any, destination: ActorPath) =
Snapshot(payload, "x", last(payload), vectorTime(17, 18), event.localSequenceNr,
Expand Down
6 changes: 6 additions & 0 deletions eventuate-core/src/main/protobuf/DurableEventFormats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,10 @@ message DurableEventFormat {
optional int64 localSequenceNr = 9;
optional int64 persistOnEventSequenceNr = 10;
optional string deliveryId = 11;
optional EventIdFormat persistOnEventEventId = 12;
}

message EventIdFormat {
optional string processId = 1;
optional int64 sequenceNr = 2;
}
3 changes: 2 additions & 1 deletion eventuate-core/src/main/protobuf/SnapshotFormats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ message DeliveryAttemptFormat {
}

message PersistOnEventRequestFormat {
optional int64 persistOnEventSequenceNr = 1;
optional int64 sequenceNr = 1;
repeated PersistOnEventInvocationFormat invocation = 2;
optional int32 instanceId = 3;
optional EventIdFormat eventId = 4;
}

message PersistOnEventInvocationFormat {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@ package com.rbmhtechnology.eventuate

import scala.collection.immutable.Seq

/**
* Unique id of a [[DurableEvent]].
*
* @param processId the id of the event log the initially wrote the event
* @param sequenceNr the initial sequence number in this log. Even if in case of disaster recovery the
* event ends up at a different sequence number in this log the `sequenceNr` of the
* `EventId` remains stable.
*/
case class EventId(processId: String, sequenceNr: Long)

/**
* Provider API.
*
Expand All @@ -43,7 +53,14 @@ import scala.collection.immutable.Seq
* @param deliveryId Delivery id chosen by an application that persisted this event with [[ConfirmedDelivery.persistConfirmation]].
* @param persistOnEventSequenceNr Sequence number of the event that caused the emission of this event in an event handler.
* Defined if an [[EventsourcedActor]] with a [[PersistOnEvent]] mixin emitted this event
* with `persistOnEvent`.
* with `persistOnEvent`. Actually superseded by `persistOnEventEventId`, but still
* has to be maintained for backwards compatibility. It is required for confirmation
* of old [[com.rbmhtechnology.eventuate.PersistOnEvent.PersistOnEventRequest]]s from
* a snapshot that do not have [[com.rbmhtechnology.eventuate.PersistOnEvent.PersistOnEventRequest.eventId]]
* set.
* @param persistOnEventEventId event id of the event that caused the emission of this event in an event handler.
* Defined if an [[EventsourcedActor]] with a [[PersistOnEvent]] mixin emitted this event
* with `persistOnEvent`.
*/
case class DurableEvent(
payload: Any,
Expand All @@ -56,15 +73,16 @@ case class DurableEvent(
localLogId: String = DurableEvent.UndefinedLogId,
localSequenceNr: Long = DurableEvent.UndefinedSequenceNr,
deliveryId: Option[String] = None,
persistOnEventSequenceNr: Option[Long] = None) {
persistOnEventSequenceNr: Option[Long] = None,
persistOnEventEventId: Option[EventId] = None) {

import DurableEvent._

/**
* Unique event identifier.
*/
def id: VectorTime =
vectorTimestamp
val id: EventId =
EventId(processId, vectorTimestamp.localTime(processId))

/**
* Returns `true` if this event did not happen before or at the given `vectorTime`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,12 @@ trait EventsourcedActor extends EventsourcedView with EventsourcedVersion {
messageStash.unstash()
}
}
case PersistOnEventRequest(persistOnEventSequenceNr: Long, invocations, iid) => if (iid == instanceId) {
case PersistOnEventRequest(persistOnEventSequenceNr, persistOnEventEventId, invocations, iid) => if (iid == instanceId) {
writeOrDelay {
writeHandlers = Vector.fill(invocations.length)(PersistOnEvent.DefaultHandler)
writeRequests = invocations.map {
case PersistOnEventInvocation(event, customDestinationAggregateIds) =>
durableEvent(event, customDestinationAggregateIds, None, Some(persistOnEventSequenceNr))
durableEvent(event, customDestinationAggregateIds, None, Some(persistOnEventSequenceNr), persistOnEventEventId)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,16 @@ trait EventsourcedVersion extends EventsourcedView {
* Internal API.
*/
private[eventuate] def durableEvent(payload: Any, customDestinationAggregateIds: Set[String],
deliveryId: Option[String] = None, persistOnEventSequenceNr: Option[Long] = None): DurableEvent =
deliveryId: Option[String] = None, persistOnEventSequenceNr: Option[Long] = None, persistOnEventEventId: Option[EventId] = None): DurableEvent =
DurableEvent(
payload = payload,
emitterId = id,
emitterAggregateId = aggregateId,
customDestinationAggregateIds = customDestinationAggregateIds,
vectorTimestamp = currentVersion,
deliveryId = deliveryId,
persistOnEventSequenceNr = persistOnEventSequenceNr)
persistOnEventSequenceNr = persistOnEventSequenceNr,
persistOnEventEventId = persistOnEventEventId)

/**
* Internal API.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.rbmhtechnology.eventuate

import scala.collection.immutable.SortedMap

import com.rbmhtechnology.eventuate.EventsourcedView.Handler

import scala.util._
Expand All @@ -30,8 +29,10 @@ object PersistOnEvent {

/**
* A request sent by [[PersistOnEvent]] instances to `self` in order to persist events recorded by `invocations`.
* @param sequenceNr the sequence number of the event that caused this request.
* @param eventId [[EventId]] of the event that caused this request.
*/
case class PersistOnEventRequest(persistOnEventSequenceNr: Long, invocations: Vector[PersistOnEventInvocation], instanceId: Int)
case class PersistOnEventRequest(sequenceNr: Long, eventId: Option[EventId], invocations: Vector[PersistOnEventInvocation], instanceId: Int)

/**
* Default `persist` handler to use when processing [[PersistOnEventRequest]]s in [[EventsourcedActor]].
Expand Down Expand Up @@ -61,7 +62,22 @@ trait PersistOnEvent extends EventsourcedActor {
import PersistOnEvent._

private var invocations: Vector[PersistOnEventInvocation] = Vector.empty
private var requests: SortedMap[Long, PersistOnEventRequest] = SortedMap.empty
/**
* [[PersistOnEventRequest]] by sequence number of the event that caused the persist on event request.
*
* This map keeps the requests in the order they were submitted.
*/
private var requestsBySequenceNr: SortedMap[Long, PersistOnEventRequest] = SortedMap.empty

/**
* [[PersistOnEventRequest]] by [[EventId]] of the event that caused the persist on event request.
*
* This map ensures that requests can be confirmed properly even if the sequence number of the event
* that caused the request changed its local sequence number due to a disaster recovery.
*
* @see https://github.com/RBMHTechnology/eventuate/issues/385
*/
private var requestsByEventId: Map[EventId, PersistOnEventRequest] = Map.empty

/**
* Asynchronously persists the given `event`. Applications that want to handle the persisted event should define
Expand All @@ -77,13 +93,10 @@ trait PersistOnEvent extends EventsourcedActor {
*/
override private[eventuate] def receiveEvent(event: DurableEvent): Unit = {
super.receiveEvent(event)

event.persistOnEventSequenceNr.foreach { persistOnEventSequenceNr =>
if (event.emitterId == id) confirmRequest(persistOnEventSequenceNr)
}
if (event.emitterId == id) findPersistOnEventRequest(event).foreach(confirmRequest)

if (invocations.nonEmpty) {
deliverRequest(PersistOnEventRequest(lastSequenceNr, invocations, instanceId))
deliverRequest(PersistOnEventRequest(lastSequenceNr, Some(lastHandledEvent.id), invocations, instanceId))
invocations = Vector.empty
}
}
Expand All @@ -92,7 +105,7 @@ trait PersistOnEvent extends EventsourcedActor {
* Internal API.
*/
override private[eventuate] def snapshotCaptured(snapshot: Snapshot): Snapshot = {
requests.values.foldLeft(super.snapshotCaptured(snapshot)) {
requestsBySequenceNr.values.foldLeft(super.snapshotCaptured(snapshot)) {
case (s, pr) => s.addPersistOnEventRequest(pr)
}
}
Expand All @@ -103,7 +116,9 @@ trait PersistOnEvent extends EventsourcedActor {
override private[eventuate] def snapshotLoaded(snapshot: Snapshot): Unit = {
super.snapshotLoaded(snapshot)
snapshot.persistOnEventRequests.foreach { pr =>
requests = requests + (pr.persistOnEventSequenceNr -> pr.copy(instanceId = instanceId))
val requestWithUpdatedInstanceId = pr.copy(instanceId = instanceId)
requestsBySequenceNr += (pr.sequenceNr -> requestWithUpdatedInstanceId)
pr.eventId.foreach(requestsByEventId += _ -> requestWithUpdatedInstanceId)
}
}

Expand All @@ -119,18 +134,26 @@ trait PersistOnEvent extends EventsourcedActor {
* Internal API.
*/
private[eventuate] def unconfirmedRequests: Set[Long] =
requests.keySet
requestsBySequenceNr.keySet

private def deliverRequest(request: PersistOnEventRequest): Unit = {
requests = requests + (request.persistOnEventSequenceNr -> request)
requestsBySequenceNr += request.sequenceNr -> request
request.eventId.foreach(requestsByEventId += _ -> request)
if (!recovering) self ! request
}

private def confirmRequest(persistOnEventSequenceNr: Long): Unit = {
requests = requests - persistOnEventSequenceNr
private def confirmRequest(request: PersistOnEventRequest): Unit = {
request.eventId.foreach(requestsByEventId -= _)
requestsBySequenceNr -= request.sequenceNr
}

private def redeliverUnconfirmedRequests(): Unit = requests.foreach {
private def findPersistOnEventRequest(event: DurableEvent) =
event
.persistOnEventEventId.flatMap(requestsByEventId.get)
// Fallback for old events that have no persistOnEventEventId
.orElse(event.persistOnEventSequenceNr.flatMap(requestsBySequenceNr.get))

private def redeliverUnconfirmedRequests(): Unit = requestsBySequenceNr.foreach {
case (_, request) => self ! request
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ abstract class DurableEventSerializer(
durableEvent.persistOnEventSequenceNr.foreach { persistOnEventSequenceNr =>
builder.setPersistOnEventSequenceNr(persistOnEventSequenceNr)
}
durableEvent.persistOnEventEventId.foreach { persistOnEventEventId =>
builder.setPersistOnEventEventId(eventIdFormatBuilder(persistOnEventEventId))
}

durableEvent.emitterAggregateId.foreach { id =>
builder.setEmitterAggregateId(id)
Expand All @@ -114,6 +117,13 @@ abstract class DurableEventSerializer(
builder
}

def eventIdFormatBuilder(eventId: EventId) = {
val builder = EventIdFormat.newBuilder()
builder.setProcessId(eventId.processId)
builder.setSequenceNr(eventId.sequenceNr)
builder
}

// --------------------------------------------------------------------------------
// fromBinary helpers
// --------------------------------------------------------------------------------
Expand All @@ -128,6 +138,7 @@ abstract class DurableEventSerializer(

val deliveryId = if (durableEventFormat.hasDeliveryId) Some(durableEventFormat.getDeliveryId) else None
val persistOnEventSequenceNr = if (durableEventFormat.hasPersistOnEventSequenceNr) Some(durableEventFormat.getPersistOnEventSequenceNr) else None
val persistOnEventEventId = if (durableEventFormat.hasPersistOnEventEventId) Some(eventId(durableEventFormat.getPersistOnEventEventId)) else None

DurableEvent(
payload = payloadSerializer.payload(durableEventFormat.getPayload),
Expand All @@ -140,6 +151,10 @@ abstract class DurableEventSerializer(
localLogId = durableEventFormat.getLocalLogId,
localSequenceNr = durableEventFormat.getLocalSequenceNr,
deliveryId = deliveryId,
persistOnEventSequenceNr = persistOnEventSequenceNr)
persistOnEventSequenceNr = persistOnEventSequenceNr,
persistOnEventEventId = persistOnEventEventId)
}

def eventId(eventId: EventIdFormat) =
EventId(eventId.getProcessId, eventId.getSequenceNr)
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,10 @@ class SnapshotSerializer(system: ExtendedActorSystem) extends Serializer {

private def persistOnEventRequestFormatBuilder(persistOnEventRequest: PersistOnEventRequest): PersistOnEventRequestFormat.Builder = {
val builder = PersistOnEventRequestFormat.newBuilder
builder.setPersistOnEventSequenceNr(persistOnEventRequest.persistOnEventSequenceNr)
builder.setSequenceNr(persistOnEventRequest.sequenceNr)
persistOnEventRequest.eventId.foreach { eventId =>
builder.setEventId(eventSerializer.eventIdFormatBuilder(eventId))
}
builder.setInstanceId(persistOnEventRequest.instanceId)

persistOnEventRequest.invocations.foreach { invocation =>
Expand Down Expand Up @@ -191,8 +194,11 @@ class SnapshotSerializer(system: ExtendedActorSystem) extends Serializer {
invocationsBuilder += persistOnEventInvocation(pif)
}

val persistOnEventReference = if (persistOnEventRequestFormat.hasEventId) Some(eventSerializer.eventId(persistOnEventRequestFormat.getEventId)) else None

PersistOnEventRequest(
persistOnEventRequestFormat.getPersistOnEventSequenceNr,
persistOnEventRequestFormat.getSequenceNr,
persistOnEventReference,
invocationsBuilder.result(),
persistOnEventRequestFormat.getInstanceId)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ protected DurableEvent createEvent(final Object payload, final long sequenceNr)
@SuppressWarnings("unchecked")
protected DurableEvent createEvent(final Object payload, final long sequenceNr, final String emitterId, final String logId, final VectorTime timestamp) {
return new DurableEvent(payload, emitterId, Option.empty(), Set$.MODULE$.<String>empty(), 0L,
timestamp, logId, logId, sequenceNr, Option.empty(), Option.empty());
timestamp, logId, logId, sequenceNr, Option.empty(), Option.empty(), Option.empty());
}

protected VectorTime timestamp(final long a, final long b) {
Expand Down
Loading

0 comments on commit dff43d8

Please sign in to comment.