diff --git a/eventuate-core/src/it/scala/com/rbmhtechnology/eventuate/serializer/DurableEventSerializerSpec.scala b/eventuate-core/src/it/scala/com/rbmhtechnology/eventuate/serializer/DurableEventSerializerSpec.scala index 1ba9417d..b469248d 100644 --- a/eventuate-core/src/it/scala/com/rbmhtechnology/eventuate/serializer/DurableEventSerializerSpec.scala +++ b/eventuate-core/src/it/scala/com/rbmhtechnology/eventuate/serializer/DurableEventSerializerSpec.scala @@ -106,7 +106,8 @@ object DurableEventSerializerSpec { localLogId = "p3", localSequenceNr = 17L, deliveryId = Some("x"), - persistOnEventSequenceNr = Some(12L)) + persistOnEventSequenceNr = Some(12L), + persistOnEventEventId = Some(EventId("p4", 0L))) } class DurableEventSerializerSpec extends WordSpec with Matchers with Inside with BeforeAndAfterAll { diff --git a/eventuate-core/src/it/scala/com/rbmhtechnology/eventuate/serializer/SnapshotSerializerSpec.scala b/eventuate-core/src/it/scala/com/rbmhtechnology/eventuate/serializer/SnapshotSerializerSpec.scala index 82429f68..34d3b918 100644 --- a/eventuate-core/src/it/scala/com/rbmhtechnology/eventuate/serializer/SnapshotSerializerSpec.scala +++ b/eventuate-core/src/it/scala/com/rbmhtechnology/eventuate/serializer/SnapshotSerializerSpec.scala @@ -46,8 +46,8 @@ object SnapshotSerializerSpec { DeliveryAttempt("4", payload, destination)) def persistOnEventRequests(payload: Any) = Vector( - PersistOnEventRequest(7L, Vector(PersistOnEventInvocation(payload, Set("a"))), 17), - PersistOnEventRequest(8L, Vector(PersistOnEventInvocation(payload, Set("b"))), 17)) + PersistOnEventRequest(7L, None, Vector(PersistOnEventInvocation(payload, Set("a"))), 17), + PersistOnEventRequest(8L, Some(EventId("p-a", 3L)), Vector(PersistOnEventInvocation(payload, Set("b"))), 17)) def snapshot(payload: Any, destination: ActorPath) = Snapshot(payload, "x", last(payload), vectorTime(17, 18), event.localSequenceNr, diff --git a/eventuate-core/src/main/protobuf/DurableEventFormats.proto b/eventuate-core/src/main/protobuf/DurableEventFormats.proto index 86e2173d..91d1eec3 100644 --- a/eventuate-core/src/main/protobuf/DurableEventFormats.proto +++ b/eventuate-core/src/main/protobuf/DurableEventFormats.proto @@ -31,4 +31,10 @@ message DurableEventFormat { optional int64 localSequenceNr = 9; optional int64 persistOnEventSequenceNr = 10; optional string deliveryId = 11; + optional EventIdFormat persistOnEventEventId = 12; +} + +message EventIdFormat { + optional string processId = 1; + optional int64 sequenceNr = 2; } diff --git a/eventuate-core/src/main/protobuf/SnapshotFormats.proto b/eventuate-core/src/main/protobuf/SnapshotFormats.proto index 6cad7cf2..b295738d 100644 --- a/eventuate-core/src/main/protobuf/SnapshotFormats.proto +++ b/eventuate-core/src/main/protobuf/SnapshotFormats.proto @@ -37,9 +37,10 @@ message DeliveryAttemptFormat { } message PersistOnEventRequestFormat { - optional int64 persistOnEventSequenceNr = 1; + optional int64 sequenceNr = 1; repeated PersistOnEventInvocationFormat invocation = 2; optional int32 instanceId = 3; + optional EventIdFormat eventId = 4; } message PersistOnEventInvocationFormat { diff --git a/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/DurableEvent.scala b/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/DurableEvent.scala index 626b017b..33a821da 100644 --- a/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/DurableEvent.scala +++ b/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/DurableEvent.scala @@ -18,6 +18,16 @@ package com.rbmhtechnology.eventuate import scala.collection.immutable.Seq +/** + * Unique id of a [[DurableEvent]]. + * + * @param processId the id of the event log the initially wrote the event + * @param sequenceNr the initial sequence number in this log. Even if in case of disaster recovery the + * event ends up at a different sequence number in this log the `sequenceNr` of the + * `EventId` remains stable. + */ +case class EventId(processId: String, sequenceNr: Long) + /** * Provider API. * @@ -43,7 +53,14 @@ import scala.collection.immutable.Seq * @param deliveryId Delivery id chosen by an application that persisted this event with [[ConfirmedDelivery.persistConfirmation]]. * @param persistOnEventSequenceNr Sequence number of the event that caused the emission of this event in an event handler. * Defined if an [[EventsourcedActor]] with a [[PersistOnEvent]] mixin emitted this event - * with `persistOnEvent`. + * with `persistOnEvent`. Actually superseded by `persistOnEventEventId`, but still + * has to be maintained for backwards compatibility. It is required for confirmation + * of old [[com.rbmhtechnology.eventuate.PersistOnEvent.PersistOnEventRequest]]s from + * a snapshot that do not have [[com.rbmhtechnology.eventuate.PersistOnEvent.PersistOnEventRequest.eventId]] + * set. + * @param persistOnEventEventId event id of the event that caused the emission of this event in an event handler. + * Defined if an [[EventsourcedActor]] with a [[PersistOnEvent]] mixin emitted this event + * with `persistOnEvent`. */ case class DurableEvent( payload: Any, @@ -56,15 +73,16 @@ case class DurableEvent( localLogId: String = DurableEvent.UndefinedLogId, localSequenceNr: Long = DurableEvent.UndefinedSequenceNr, deliveryId: Option[String] = None, - persistOnEventSequenceNr: Option[Long] = None) { + persistOnEventSequenceNr: Option[Long] = None, + persistOnEventEventId: Option[EventId] = None) { import DurableEvent._ /** * Unique event identifier. */ - def id: VectorTime = - vectorTimestamp + val id: EventId = + EventId(processId, vectorTimestamp.localTime(processId)) /** * Returns `true` if this event did not happen before or at the given `vectorTime` diff --git a/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/EventsourcedActor.scala b/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/EventsourcedActor.scala index f42cada3..7d58b17d 100644 --- a/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/EventsourcedActor.scala +++ b/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/EventsourcedActor.scala @@ -145,12 +145,12 @@ trait EventsourcedActor extends EventsourcedView with EventsourcedVersion { messageStash.unstash() } } - case PersistOnEventRequest(persistOnEventSequenceNr: Long, invocations, iid) => if (iid == instanceId) { + case PersistOnEventRequest(persistOnEventSequenceNr, persistOnEventEventId, invocations, iid) => if (iid == instanceId) { writeOrDelay { writeHandlers = Vector.fill(invocations.length)(PersistOnEvent.DefaultHandler) writeRequests = invocations.map { case PersistOnEventInvocation(event, customDestinationAggregateIds) => - durableEvent(event, customDestinationAggregateIds, None, Some(persistOnEventSequenceNr)) + durableEvent(event, customDestinationAggregateIds, None, Some(persistOnEventSequenceNr), persistOnEventEventId) } } } diff --git a/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/EventsourcedVersion.scala b/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/EventsourcedVersion.scala index 1038f077..0d80ad92 100644 --- a/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/EventsourcedVersion.scala +++ b/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/EventsourcedVersion.scala @@ -39,7 +39,7 @@ trait EventsourcedVersion extends EventsourcedView { * Internal API. */ private[eventuate] def durableEvent(payload: Any, customDestinationAggregateIds: Set[String], - deliveryId: Option[String] = None, persistOnEventSequenceNr: Option[Long] = None): DurableEvent = + deliveryId: Option[String] = None, persistOnEventSequenceNr: Option[Long] = None, persistOnEventEventId: Option[EventId] = None): DurableEvent = DurableEvent( payload = payload, emitterId = id, @@ -47,7 +47,8 @@ trait EventsourcedVersion extends EventsourcedView { customDestinationAggregateIds = customDestinationAggregateIds, vectorTimestamp = currentVersion, deliveryId = deliveryId, - persistOnEventSequenceNr = persistOnEventSequenceNr) + persistOnEventSequenceNr = persistOnEventSequenceNr, + persistOnEventEventId = persistOnEventEventId) /** * Internal API. diff --git a/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/PersistOnEvent.scala b/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/PersistOnEvent.scala index 7411dca2..ba60a0c4 100644 --- a/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/PersistOnEvent.scala +++ b/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/PersistOnEvent.scala @@ -17,7 +17,6 @@ package com.rbmhtechnology.eventuate import scala.collection.immutable.SortedMap - import com.rbmhtechnology.eventuate.EventsourcedView.Handler import scala.util._ @@ -30,8 +29,10 @@ object PersistOnEvent { /** * A request sent by [[PersistOnEvent]] instances to `self` in order to persist events recorded by `invocations`. + * @param sequenceNr the sequence number of the event that caused this request. + * @param eventId [[EventId]] of the event that caused this request. */ - case class PersistOnEventRequest(persistOnEventSequenceNr: Long, invocations: Vector[PersistOnEventInvocation], instanceId: Int) + case class PersistOnEventRequest(sequenceNr: Long, eventId: Option[EventId], invocations: Vector[PersistOnEventInvocation], instanceId: Int) /** * Default `persist` handler to use when processing [[PersistOnEventRequest]]s in [[EventsourcedActor]]. @@ -61,7 +62,22 @@ trait PersistOnEvent extends EventsourcedActor { import PersistOnEvent._ private var invocations: Vector[PersistOnEventInvocation] = Vector.empty - private var requests: SortedMap[Long, PersistOnEventRequest] = SortedMap.empty + /** + * [[PersistOnEventRequest]] by sequence number of the event that caused the persist on event request. + * + * This map keeps the requests in the order they were submitted. + */ + private var requestsBySequenceNr: SortedMap[Long, PersistOnEventRequest] = SortedMap.empty + + /** + * [[PersistOnEventRequest]] by [[EventId]] of the event that caused the persist on event request. + * + * This map ensures that requests can be confirmed properly even if the sequence number of the event + * that caused the request changed its local sequence number due to a disaster recovery. + * + * @see https://github.com/RBMHTechnology/eventuate/issues/385 + */ + private var requestsByEventId: Map[EventId, PersistOnEventRequest] = Map.empty /** * Asynchronously persists the given `event`. Applications that want to handle the persisted event should define @@ -77,13 +93,10 @@ trait PersistOnEvent extends EventsourcedActor { */ override private[eventuate] def receiveEvent(event: DurableEvent): Unit = { super.receiveEvent(event) - - event.persistOnEventSequenceNr.foreach { persistOnEventSequenceNr => - if (event.emitterId == id) confirmRequest(persistOnEventSequenceNr) - } + if (event.emitterId == id) findPersistOnEventRequest(event).foreach(confirmRequest) if (invocations.nonEmpty) { - deliverRequest(PersistOnEventRequest(lastSequenceNr, invocations, instanceId)) + deliverRequest(PersistOnEventRequest(lastSequenceNr, Some(lastHandledEvent.id), invocations, instanceId)) invocations = Vector.empty } } @@ -92,7 +105,7 @@ trait PersistOnEvent extends EventsourcedActor { * Internal API. */ override private[eventuate] def snapshotCaptured(snapshot: Snapshot): Snapshot = { - requests.values.foldLeft(super.snapshotCaptured(snapshot)) { + requestsBySequenceNr.values.foldLeft(super.snapshotCaptured(snapshot)) { case (s, pr) => s.addPersistOnEventRequest(pr) } } @@ -103,7 +116,9 @@ trait PersistOnEvent extends EventsourcedActor { override private[eventuate] def snapshotLoaded(snapshot: Snapshot): Unit = { super.snapshotLoaded(snapshot) snapshot.persistOnEventRequests.foreach { pr => - requests = requests + (pr.persistOnEventSequenceNr -> pr.copy(instanceId = instanceId)) + val requestWithUpdatedInstanceId = pr.copy(instanceId = instanceId) + requestsBySequenceNr += (pr.sequenceNr -> requestWithUpdatedInstanceId) + pr.eventId.foreach(requestsByEventId += _ -> requestWithUpdatedInstanceId) } } @@ -119,18 +134,26 @@ trait PersistOnEvent extends EventsourcedActor { * Internal API. */ private[eventuate] def unconfirmedRequests: Set[Long] = - requests.keySet + requestsBySequenceNr.keySet private def deliverRequest(request: PersistOnEventRequest): Unit = { - requests = requests + (request.persistOnEventSequenceNr -> request) + requestsBySequenceNr += request.sequenceNr -> request + request.eventId.foreach(requestsByEventId += _ -> request) if (!recovering) self ! request } - private def confirmRequest(persistOnEventSequenceNr: Long): Unit = { - requests = requests - persistOnEventSequenceNr + private def confirmRequest(request: PersistOnEventRequest): Unit = { + request.eventId.foreach(requestsByEventId -= _) + requestsBySequenceNr -= request.sequenceNr } - private def redeliverUnconfirmedRequests(): Unit = requests.foreach { + private def findPersistOnEventRequest(event: DurableEvent) = + event + .persistOnEventEventId.flatMap(requestsByEventId.get) + // Fallback for old events that have no persistOnEventEventId + .orElse(event.persistOnEventSequenceNr.flatMap(requestsBySequenceNr.get)) + + private def redeliverUnconfirmedRequests(): Unit = requestsBySequenceNr.foreach { case (_, request) => self ! request } } diff --git a/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/serializer/DelegatingDurableEventSerializer.scala b/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/serializer/DelegatingDurableEventSerializer.scala index abff6ed8..6f23d93a 100644 --- a/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/serializer/DelegatingDurableEventSerializer.scala +++ b/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/serializer/DelegatingDurableEventSerializer.scala @@ -102,6 +102,9 @@ abstract class DurableEventSerializer( durableEvent.persistOnEventSequenceNr.foreach { persistOnEventSequenceNr => builder.setPersistOnEventSequenceNr(persistOnEventSequenceNr) } + durableEvent.persistOnEventEventId.foreach { persistOnEventEventId => + builder.setPersistOnEventEventId(eventIdFormatBuilder(persistOnEventEventId)) + } durableEvent.emitterAggregateId.foreach { id => builder.setEmitterAggregateId(id) @@ -114,6 +117,13 @@ abstract class DurableEventSerializer( builder } + def eventIdFormatBuilder(eventId: EventId) = { + val builder = EventIdFormat.newBuilder() + builder.setProcessId(eventId.processId) + builder.setSequenceNr(eventId.sequenceNr) + builder + } + // -------------------------------------------------------------------------------- // fromBinary helpers // -------------------------------------------------------------------------------- @@ -128,6 +138,7 @@ abstract class DurableEventSerializer( val deliveryId = if (durableEventFormat.hasDeliveryId) Some(durableEventFormat.getDeliveryId) else None val persistOnEventSequenceNr = if (durableEventFormat.hasPersistOnEventSequenceNr) Some(durableEventFormat.getPersistOnEventSequenceNr) else None + val persistOnEventEventId = if (durableEventFormat.hasPersistOnEventEventId) Some(eventId(durableEventFormat.getPersistOnEventEventId)) else None DurableEvent( payload = payloadSerializer.payload(durableEventFormat.getPayload), @@ -140,6 +151,10 @@ abstract class DurableEventSerializer( localLogId = durableEventFormat.getLocalLogId, localSequenceNr = durableEventFormat.getLocalSequenceNr, deliveryId = deliveryId, - persistOnEventSequenceNr = persistOnEventSequenceNr) + persistOnEventSequenceNr = persistOnEventSequenceNr, + persistOnEventEventId = persistOnEventEventId) } + + def eventId(eventId: EventIdFormat) = + EventId(eventId.getProcessId, eventId.getSequenceNr) } diff --git a/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/serializer/SnapshotSerializer.scala b/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/serializer/SnapshotSerializer.scala index 74839060..5ab89d38 100644 --- a/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/serializer/SnapshotSerializer.scala +++ b/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/serializer/SnapshotSerializer.scala @@ -98,7 +98,10 @@ class SnapshotSerializer(system: ExtendedActorSystem) extends Serializer { private def persistOnEventRequestFormatBuilder(persistOnEventRequest: PersistOnEventRequest): PersistOnEventRequestFormat.Builder = { val builder = PersistOnEventRequestFormat.newBuilder - builder.setPersistOnEventSequenceNr(persistOnEventRequest.persistOnEventSequenceNr) + builder.setSequenceNr(persistOnEventRequest.sequenceNr) + persistOnEventRequest.eventId.foreach { eventId => + builder.setEventId(eventSerializer.eventIdFormatBuilder(eventId)) + } builder.setInstanceId(persistOnEventRequest.instanceId) persistOnEventRequest.invocations.foreach { invocation => @@ -191,8 +194,11 @@ class SnapshotSerializer(system: ExtendedActorSystem) extends Serializer { invocationsBuilder += persistOnEventInvocation(pif) } + val persistOnEventReference = if (persistOnEventRequestFormat.hasEventId) Some(eventSerializer.eventId(persistOnEventRequestFormat.getEventId)) else None + PersistOnEventRequest( - persistOnEventRequestFormat.getPersistOnEventSequenceNr, + persistOnEventRequestFormat.getSequenceNr, + persistOnEventReference, invocationsBuilder.result(), persistOnEventRequestFormat.getInstanceId) } diff --git a/eventuate-core/src/test/java/com/rbmhtechnology/eventuate/BaseSpec.java b/eventuate-core/src/test/java/com/rbmhtechnology/eventuate/BaseSpec.java index cf459ac4..cbc9a843 100644 --- a/eventuate-core/src/test/java/com/rbmhtechnology/eventuate/BaseSpec.java +++ b/eventuate-core/src/test/java/com/rbmhtechnology/eventuate/BaseSpec.java @@ -105,7 +105,7 @@ protected DurableEvent createEvent(final Object payload, final long sequenceNr) @SuppressWarnings("unchecked") protected DurableEvent createEvent(final Object payload, final long sequenceNr, final String emitterId, final String logId, final VectorTime timestamp) { return new DurableEvent(payload, emitterId, Option.empty(), Set$.MODULE$.empty(), 0L, - timestamp, logId, logId, sequenceNr, Option.empty(), Option.empty()); + timestamp, logId, logId, sequenceNr, Option.empty(), Option.empty(), Option.empty()); } protected VectorTime timestamp(final long a, final long b) { diff --git a/eventuate-core/src/test/scala/com/rbmhtechnology/eventuate/PersistOnEventSpec.scala b/eventuate-core/src/test/scala/com/rbmhtechnology/eventuate/PersistOnEventSpec.scala index 07f80988..cd332b2d 100644 --- a/eventuate-core/src/test/scala/com/rbmhtechnology/eventuate/PersistOnEventSpec.scala +++ b/eventuate-core/src/test/scala/com/rbmhtechnology/eventuate/PersistOnEventSpec.scala @@ -82,8 +82,8 @@ object PersistOnEventSpec { } } - def event(payload: Any, sequenceNr: Long, persistOnEventSequenceNr: Option[Long] = None): DurableEvent = - DurableEvent(payload, emitterIdA, None, Set(), 0L, timestamp(sequenceNr), logIdA, logIdA, sequenceNr, None, persistOnEventSequenceNr) + def event(payload: Any, sequenceNr: Long, persistOnEventEvent: Option[DurableEvent] = None): DurableEvent = + DurableEvent(payload, emitterIdA, None, Set(), 0L, timestamp(sequenceNr), logIdA, logIdA, sequenceNr, None, persistOnEventEvent.map(_.localSequenceNr), persistOnEventEvent.map(_.id)) } class PersistOnEventSpec extends TestKit(ActorSystem("test")) with WordSpecLike with Matchers with BeforeAndAfterAll with BeforeAndAfterEach { @@ -131,16 +131,17 @@ class PersistOnEventSpec extends TestKit(ActorSystem("test")) with WordSpecLike "An EventsourcedActor with PersistOnEvent" must { "support persistence in event handler" in { val actor = recoveredTestActor(stateSync = true) - actor ! Written(event("a", 1L)) + val eventA = event("a", 1L) + actor ! Written(eventA) deliverProbe.expectMsg(Set(1L)) val write = logProbe.expectMsgClass(classOf[Write]) - write.events(0).persistOnEventSequenceNr should be(Some(1L)) - write.events(1).persistOnEventSequenceNr should be(Some(1L)) + write.events(0).persistOnEventEventId should be(Some(eventA.id)) + write.events(1).persistOnEventEventId should be(Some(eventA.id)) logProbe.sender() ! WriteSuccess(Seq( - event(write.events(0).payload, 2L, Some(1L)), - event(write.events(1).payload, 3L, Some(1L))), write.correlationId, instanceId) + event(write.events(0).payload, 2L, Some(eventA)), + event(write.events(1).payload, 3L, Some(eventA))), write.correlationId, instanceId) deliverProbe.expectMsg(Set()) deliverProbe.expectMsg(Set()) @@ -150,47 +151,49 @@ class PersistOnEventSpec extends TestKit(ActorSystem("test")) with WordSpecLike } "support cascading persistence" in { val actor = recoveredTestActor(stateSync = true) - actor ! Written(event("b", 1L)) + val eventB = event("b", 1L) + actor ! Written(eventB) deliverProbe.expectMsg(Set(1L)) val write1 = logProbe.expectMsgClass(classOf[Write]) - write1.events(0).persistOnEventSequenceNr should be(Some(1L)) - write1.events(1).persistOnEventSequenceNr should be(Some(1L)) + write1.events(0).persistOnEventEventId should be(Some(eventB.id)) + write1.events(1).persistOnEventEventId should be(Some(eventB.id)) - logProbe.sender() ! WriteSuccess(Seq( - event(write1.events(0).payload, 2L, Some(1L)), - event(write1.events(1).payload, 3L, Some(1L))), write1.correlationId, instanceId) + val eventC = event(write1.events(0).payload, 2L, Some(eventB)) + val eventC2 = event(write1.events(1).payload, 3L, Some(eventB)) + logProbe.sender() ! WriteSuccess(Seq(eventC, eventC2), write1.correlationId, instanceId) deliverProbe.expectMsg(Set(2L)) deliverProbe.expectMsg(Set(2L)) persistProbe.expectMsg("c-1") val write2 = logProbe.expectMsgClass(classOf[Write]) - write2.events(0).persistOnEventSequenceNr should be(Some(2L)) - logProbe.sender() ! WriteSuccess(Seq(event(write2.events(0).payload, 4L, Some(2L))), write2.correlationId, instanceId) + write2.events(0).persistOnEventEventId should be(Some(eventC.id)) + logProbe.sender() ! WriteSuccess(Seq(event(write2.events(0).payload, 4L, Some(eventC))), write2.correlationId, instanceId) deliverProbe.expectMsg(Set()) persistProbe.expectMsg("c-2") } "confirm persistence with self-emitted events only" in { val actor = recoveredTestActor(stateSync = true) - actor ! Written(event("a", 1L)) + val eventA = event("a", 1L) + actor ! Written(eventA) deliverProbe.expectMsg(Set(1L)) val write = logProbe.expectMsgClass(classOf[Write]) - actor ! Written(event("x-1", 2L, Some(1L)).copy(emitterId = emitterIdB)) - actor ! Written(event("x-2", 3L, Some(1L)).copy(emitterId = emitterIdB)) + actor ! Written(event("x-1", 2L, Some(eventA)).copy(emitterId = emitterIdB)) + actor ! Written(event("x-2", 3L, Some(eventA)).copy(emitterId = emitterIdB)) deliverProbe.expectMsg(Set(1L)) deliverProbe.expectMsg(Set(1L)) logProbe.sender() ! WriteSuccess(Seq( - event(write.events(0).payload, 4L, Some(1L)), - event(write.events(1).payload, 5L, Some(1L))), write.correlationId, instanceId) + event(write.events(0).payload, 4L, Some(eventA)), + event(write.events(1).payload, 5L, Some(eventA))), write.correlationId, instanceId) deliverProbe.expectMsg(Set()) deliverProbe.expectMsg(Set()) @@ -200,31 +203,32 @@ class PersistOnEventSpec extends TestKit(ActorSystem("test")) with WordSpecLike } "re-attempt persistence on failed write after restart" in { val actor = recoveredTestActor(stateSync = true) - actor ! Written(event("a", 1L)) + val eventA = event("a", 1L) + actor ! Written(eventA) deliverProbe.expectMsg(Set(1L)) val write1 = logProbe.expectMsgClass(classOf[Write]) - write1.events(0).persistOnEventSequenceNr should be(Some(1L)) - write1.events(1).persistOnEventSequenceNr should be(Some(1L)) + write1.events(0).persistOnEventEventId should be(Some(eventA.id)) + write1.events(1).persistOnEventEventId should be(Some(eventA.id)) // application crash and restart logProbe.sender() ! WriteFailure(Seq( - event(write1.events(0).payload, 0L, Some(1L)), - event(write1.events(1).payload, 0L, Some(1L))), TestException, write1.correlationId, instanceId) + event(write1.events(0).payload, 0L, Some(eventA)), + event(write1.events(1).payload, 0L, Some(eventA))), TestException, write1.correlationId, instanceId) processRecover(actor, instanceId + 1, Seq(event("a", 1L))) deliverProbe.expectMsg(Set(1L)) val write2 = logProbe.expectMsgClass(classOf[Write]) - write2.events(0).persistOnEventSequenceNr should be(Some(1L)) - write2.events(1).persistOnEventSequenceNr should be(Some(1L)) + write2.events(0).persistOnEventEventId should be(Some(eventA.id)) + write2.events(1).persistOnEventEventId should be(Some(eventA.id)) logProbe.sender() ! WriteSuccess(Seq( - event(write2.events(0).payload, 2L, Some(1L)), - event(write2.events(1).payload, 3L, Some(1L))), write2.correlationId, instanceId + 1) + event(write2.events(0).payload, 2L, Some(eventA)), + event(write2.events(1).payload, 3L, Some(eventA))), write2.correlationId, instanceId + 1) deliverProbe.expectMsg(Set()) deliverProbe.expectMsg(Set()) @@ -234,18 +238,19 @@ class PersistOnEventSpec extends TestKit(ActorSystem("test")) with WordSpecLike } "not re-attempt persistence on successful write after restart" in { val actor = recoveredTestActor(stateSync = true) - actor ! Written(event("a", 1L)) + val eventA = event("a", 1L) + actor ! Written(eventA) deliverProbe.expectMsg(Set(1L)) val write = logProbe.expectMsgClass(classOf[Write]) - write.events(0).persistOnEventSequenceNr should be(Some(1L)) - write.events(1).persistOnEventSequenceNr should be(Some(1L)) + write.events(0).persistOnEventEventId should be(Some(eventA.id)) + write.events(1).persistOnEventEventId should be(Some(eventA.id)) logProbe.sender() ! WriteSuccess(Seq( - event(write.events(0).payload, 2L, Some(1L)), - event(write.events(1).payload, 3L, Some(1L))), write.correlationId, instanceId) + event(write.events(0).payload, 2L, Some(eventA)), + event(write.events(1).payload, 3L, Some(eventA))), write.correlationId, instanceId) deliverProbe.expectMsg(Set()) deliverProbe.expectMsg(Set()) @@ -256,8 +261,41 @@ class PersistOnEventSpec extends TestKit(ActorSystem("test")) with WordSpecLike actor ! "boom" processRecover(actor, instanceId + 1, Seq( event("a", 1L), - event(write.events(0).payload, 2L, Some(1L)), - event(write.events(1).payload, 3L, Some(1L)))) + event(write.events(0).payload, 2L, Some(eventA)), + event(write.events(1).payload, 3L, Some(eventA)))) + + deliverProbe.expectMsg(Set(1L)) + deliverProbe.expectMsg(Set()) + deliverProbe.expectMsg(Set()) + + persistProbe.expectNoMsg(timeout) + persistProbe.expectNoMsg(timeout) + } + "not re-attempt persistence on successful write of events without persistOnEventReference after restart" in { + val actor = recoveredTestActor(stateSync = true) + val eventA = event("a", 1L) + actor ! Written(eventA) + + deliverProbe.expectMsg(Set(1L)) + + val write = logProbe.expectMsgClass(classOf[Write]) + + write.events(0).persistOnEventEventId should be(Some(eventA.id)) + write.events(1).persistOnEventEventId should be(Some(eventA.id)) + + val persistedOnA = Seq( + event(write.events(0).payload, 2L, Some(eventA)).copy(persistOnEventEventId = None), + event(write.events(1).payload, 3L, Some(eventA))) + logProbe.sender() ! WriteSuccess(persistedOnA, write.correlationId, instanceId) + + deliverProbe.expectMsg(Set()) + deliverProbe.expectMsg(Set()) + + persistProbe.expectMsg("a-1") + persistProbe.expectMsg("a-2") + + actor ! "boom" + processRecover(actor, instanceId + 1, eventA +: persistedOnA) deliverProbe.expectMsg(Set(1L)) deliverProbe.expectMsg(Set()) @@ -270,12 +308,13 @@ class PersistOnEventSpec extends TestKit(ActorSystem("test")) with WordSpecLike val actor = recoveredTestActor(stateSync = true) actor ! Written(event("boom", 1L)) - processRecover(actor, instanceId + 1, Seq(event("a", 1L))) + val eventA = event("a", 1L) + processRecover(actor, instanceId + 1, Seq(eventA)) val write = logProbe.expectMsgClass(classOf[Write]) logProbe.sender() ! WriteSuccess(Seq( - event(write.events(0).payload, 2L, Some(1L)), - event(write.events(1).payload, 3L, Some(1L))), write.correlationId, instanceId + 1) + event(write.events(0).payload, 2L, Some(eventA)), + event(write.events(1).payload, 3L, Some(eventA))), write.correlationId, instanceId + 1) logProbe.expectNoMsg(timeout) } "save a snapshot with persistOnEvent requests" in { @@ -287,7 +326,7 @@ class PersistOnEventSpec extends TestKit(ActorSystem("test")) with WordSpecLike actor ! "snap" val save = logProbe.expectMsgClass(classOf[SaveSnapshot]) - val expected = PersistOnEventRequest(1L, Vector( + val expected = PersistOnEventRequest(1L, Some(EventId("logA", 1L)), Vector( PersistOnEventInvocation("x-1", Set("14")), PersistOnEventInvocation("x-2", Set("15"))), instanceId) @@ -317,12 +356,13 @@ class PersistOnEventSpec extends TestKit(ActorSystem("test")) with WordSpecLike "recover from a snapshot with persistOnEvent requests whose execution succeeded" in { val actor = recoveredTestActor(stateSync = false) - actor ! Written(event("x", 1L)) + val eventX = event("x", 1L) + actor ! Written(eventX) val write1 = logProbe.expectMsgClass(classOf[Write]) val written = List( - event(write1.events(0).payload, 2L, Some(1L)), - event(write1.events(1).payload, 3L, Some(1L))) + event(write1.events(0).payload, 2L, Some(eventX)), + event(write1.events(1).payload, 3L, Some(eventX))) actor ! "snap" @@ -338,12 +378,38 @@ class PersistOnEventSpec extends TestKit(ActorSystem("test")) with WordSpecLike logProbe.sender() ! ReplaySuccess(Nil, 3L, instanceId + 1) logProbe.expectNoMsg(timeout) } + "recover from a snapshot with persistOnEvent requests without persistOnEventReferences whose execution succeeded" in { + val actor = recoveredTestActor(stateSync = false) + + val eventX = event("x", 1L) + actor ! Written(eventX) + + val write1 = logProbe.expectMsgClass(classOf[Write]) + val written = List( + event(write1.events(0).payload, 2L, Some(eventX)), + event(write1.events(1).payload, 3L, Some(eventX))) + + actor ! "snap" + + val save = logProbe.expectMsgClass(classOf[SaveSnapshot]) + val snapshotWithoutReferences = save.snapshot.copy(persistOnEventRequests = save.snapshot.persistOnEventRequests.map(_.copy(eventId = None))) + + actor ! "boom" + + logProbe.expectMsg(LoadSnapshot(emitterIdA, instanceId + 1)) + logProbe.sender() ! LoadSnapshotSuccess(Some(snapshotWithoutReferences), instanceId + 1) + logProbe.expectMsg(Replay(2L, Some(actor), instanceId + 1)) + logProbe.sender() ! ReplaySuccess(written, 3L, instanceId + 1) + logProbe.expectMsg(Replay(4L, None, instanceId + 1)) + logProbe.sender() ! ReplaySuccess(Nil, 3L, instanceId + 1) + logProbe.expectNoMsg(timeout) + } "be tolerant to changing actor paths across incarnations" in { val actor = unrecoveredTestActor(stateSync = false) val path = ActorPath.fromString("akka://test/user/invalid") val requests = Vector( - PersistOnEventRequest(3L, Vector(PersistOnEventInvocation("y", Set())), instanceId), - PersistOnEventRequest(4L, Vector(PersistOnEventInvocation("z", Set())), instanceId)) + PersistOnEventRequest(3L, Some(EventId("p-2", 2L)), Vector(PersistOnEventInvocation("y", Set())), instanceId), + PersistOnEventRequest(4L, None, Vector(PersistOnEventInvocation("z", Set())), instanceId)) val snapshot = Snapshot("foo", emitterIdA, event("x", 2), timestamp(2), 2, persistOnEventRequests = requests) logProbe.expectMsg(LoadSnapshot(emitterIdA, instanceId)) diff --git a/eventuate-log-leveldb/src/it/scala/com/rbmhtechnology/eventuate/PersistOnEventWithRecoverySpecLeveldb.scala b/eventuate-log-leveldb/src/it/scala/com/rbmhtechnology/eventuate/PersistOnEventWithRecoverySpecLeveldb.scala new file mode 100644 index 00000000..e1bdbd4f --- /dev/null +++ b/eventuate-log-leveldb/src/it/scala/com/rbmhtechnology/eventuate/PersistOnEventWithRecoverySpecLeveldb.scala @@ -0,0 +1,96 @@ +/* + * Copyright 2015 - 2016 Red Bull Media House GmbH - all rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.rbmhtechnology.eventuate + +import java.util.UUID + +import akka.actor.Actor +import akka.actor.ActorRef +import akka.actor.Props +import akka.testkit.TestProbe +import com.rbmhtechnology.eventuate.ReplicationIntegrationSpec.replicationConnection +import com.rbmhtechnology.eventuate.utilities._ +import org.apache.commons.io.FileUtils +import org.scalatest.Matchers +import org.scalatest.WordSpec + +import scala.concurrent.duration.DurationInt + +object PersistOnEventWithRecoverySpecLeveldb { + class OnBEmitRandomActor(val eventLog: ActorRef, probe: TestProbe) extends EventsourcedActor with PersistOnEvent { + + override def id = getClass.getName + + override def onCommand = Actor.emptyBehavior + + override def onEvent = { + case "A" => + case "B" => persistOnEvent(UUID.randomUUID().toString) + case uuid: String => probe.ref ! uuid + } + } + + def persistOnEventProbe(locationA1: Location, log: ActorRef) = { + val probe = locationA1.probe + locationA1.system.actorOf(Props(new OnBEmitRandomActor(log, probe))) + probe + } + + val noMsgTimeout = 100.millis +} + +class PersistOnEventWithRecoverySpecLeveldb extends WordSpec with Matchers with MultiLocationSpecLeveldb { + import RecoverySpecLeveldb._ + import PersistOnEventWithRecoverySpecLeveldb._ + + override val logFactory: String => Props = + id => SingleLocationSpecLeveldb.TestEventLog.props(id, batching = true) + + "An EventsourcedActor with PersistOnEvent" must { + "not re-attempt persistence on successful write after reordering of events through disaster recovery" in { + val locationB = location("B", customConfig = RecoverySpecLeveldb.config) + def newLocationA = location("A", customConfig = RecoverySpecLeveldb.config) + val locationA1 = newLocationA + + val endpointB = locationB.endpoint(Set("L1"), Set(replicationConnection(locationA1.port))) + def newEndpointA(l: Location, activate: Boolean) = l.endpoint(Set("L1"), Set(replicationConnection(locationB.port)), activate = activate) + val endpointA1 = newEndpointA(locationA1, activate = true) + + val targetA = endpointA1.target("L1") + val logDirA = logDirectory(targetA) + val targetB = endpointB.target("L1") + val a1Probe = persistOnEventProbe(locationA1, targetA.log) + + write(targetA, List("A")) + write(targetB, List("B")) + val event = a1Probe.expectMsgClass(classOf[String]) + assertConvergence(Set("A", "B", event), endpointA1, endpointB) + + locationA1.terminate().await + FileUtils.deleteDirectory(logDirA) + + val locationA2 = newLocationA + val endpointA2 = newEndpointA(locationA2, activate = false) + endpointA2.recover().await + + val a2Probe = persistOnEventProbe(locationA2, endpointA2.logs("L1")) + a2Probe.expectMsg(event) + a2Probe.expectNoMsg(noMsgTimeout) + assertConvergence(Set("A", "B", event), endpointA2, endpointB) + } + } +} diff --git a/eventuate-log-leveldb/src/it/scala/com/rbmhtechnology/eventuate/RecoverySpecLeveldb.scala b/eventuate-log-leveldb/src/it/scala/com/rbmhtechnology/eventuate/RecoverySpecLeveldb.scala index 13c0453b..77f94804 100644 --- a/eventuate-log-leveldb/src/it/scala/com/rbmhtechnology/eventuate/RecoverySpecLeveldb.scala +++ b/eventuate-log-leveldb/src/it/scala/com/rbmhtechnology/eventuate/RecoverySpecLeveldb.scala @@ -49,6 +49,15 @@ object RecoverySpecLeveldb { } } + def assertConvergence(expected: Set[String], endpoints: ReplicationEndpoint*): Unit = { + val probes = endpoints.map { endpoint => + val probe = new TestProbe(endpoint.system) + endpoint.system.actorOf(Props(new ConvergenceView(s"p-${endpoint.id}", endpoint.logs("L1"), expected.size, probe.ref))) + probe + } + probes.foreach(_.expectMsg(expected)) + } + val config = ConfigFactory.parseString( """ |eventuate.log.replication.retry-delay = 1s @@ -96,15 +105,6 @@ class RecoverySpecLeveldb extends WordSpec with Matchers with MultiLocationSpecL override val logFactory: String => Props = id => SingleLocationSpecLeveldb.TestEventLog.props(id, batching = true) - def assertConvergence(expected: Set[String], endpoints: ReplicationEndpoint*): Unit = { - val probes = endpoints.map { endpoint => - val probe = new TestProbe(endpoint.system) - endpoint.system.actorOf(Props(new ConvergenceView(s"p-${endpoint.id}", endpoint.logs("L1"), expected.size, probe.ref))) - probe - } - probes.foreach(_.expectMsg(expected)) - } - "Replication endpoint recovery" must { "disallow activation of endpoint during and after recovery" in { val locationA = location("A", customConfig = RecoverySpecLeveldb.config) @@ -350,7 +350,8 @@ class RecoverySpecLeveldb extends WordSpec with Matchers with MultiLocationSpecL val locationA1 = newLocationA val endpointB = locationB.endpoint(Set("L1"), Set(replicationConnection(locationA1.port)), applicationVersion = oldVersion) - def newEndpointA(l: Location, version: ApplicationVersion, activate: Boolean) = l.endpoint(Set("L1"), Set(replicationConnection(locationB.port)), activate = activate) + def newEndpointA(l: Location, version: ApplicationVersion, activate: Boolean) = + l.endpoint(Set("L1"), Set(replicationConnection(locationB.port)), applicationVersion = version, activate = activate) val endpointA1 = newEndpointA(locationA1, oldVersion, activate = true) val targetA = endpointA1.target("L1")