From 3f6d88ed21323d1a7dfddae8a8324c492e976a04 Mon Sep 17 00:00:00 2001 From: Volker Stampa Date: Fri, 16 Dec 2016 09:41:27 +0100 Subject: [PATCH 1/3] Split EventLog implementation Split EventLog into composable components for: - eventsourcing - replication - replication filters - rfc - schema evolution --- .../eventuate/sandbox/EventLog.scala | 212 +++++++++++------- 1 file changed, 130 insertions(+), 82 deletions(-) diff --git a/src/main/scala/com/rbmhtechnology/eventuate/sandbox/EventLog.scala b/src/main/scala/com/rbmhtechnology/eventuate/sandbox/EventLog.scala index b3cfd53..62f4813 100644 --- a/src/main/scala/com/rbmhtechnology/eventuate/sandbox/EventLog.scala +++ b/src/main/scala/com/rbmhtechnology/eventuate/sandbox/EventLog.scala @@ -1,5 +1,6 @@ package com.rbmhtechnology.eventuate.sandbox +import akka.actor.Actor.Receive import akka.actor._ import com.rbmhtechnology.eventuate.sandbox.EventsourcingProtocol._ import com.rbmhtechnology.eventuate.sandbox.EventCompatibility.stopOnUnserializableKeepOthers @@ -13,25 +14,30 @@ import com.rbmhtechnology.eventuate.sandbox.serializer.EventPayloadSerializer import scala.collection.immutable.Seq trait EventLogOps { - // --- EventLog --- - private var _sequenceNr: Long = 0L - private var _versionVector: VectorTime = VectorTime.Zero - private var _deletionVector: VectorTime = VectorTime.Zero + protected def id: String + + protected def versionVector: VectorTime - var eventStore: Vector[EncodedEvent] = Vector.empty + protected def read(fromSequenceNr: Long): Seq[EncodedEvent] - def id: String + protected def write(events: Seq[EncodedEvent], prepare: (EncodedEvent, Long) => EncodedEvent): Seq[EncodedEvent] +} - def sequenceNr: Long = - _sequenceNr +trait InMemoryEventLog extends EventLogOps { + // Implementation + private var _sequenceNr: Long = 0L + private var _versionVector: VectorTime = VectorTime.Zero - def versionVector: VectorTime = + private var eventStore: Vector[EncodedEvent] = Vector.empty + + // API + protected def versionVector: VectorTime = _versionVector - def read(fromSequenceNr: Long): Seq[EncodedEvent] = + protected def read(fromSequenceNr: Long): Seq[EncodedEvent] = eventStore.drop(fromSequenceNr.toInt - 1) - private def write(events: Seq[EncodedEvent], prepare: (EncodedEvent, Long) => EncodedEvent): Seq[EncodedEvent] = { + protected def write(events: Seq[EncodedEvent], prepare: (EncodedEvent, Long) => EncodedEvent): Seq[EncodedEvent] = { var snr = _sequenceNr var cvv = _versionVector var log = eventStore @@ -53,61 +59,35 @@ trait EventLogOps { written } - - // --- Eventsourcing --- - - def emissionWrite(events: Seq[EncodedEvent]): Seq[EncodedEvent] = - write(events, (evt, snr) => evt.emitted(id, snr)) - - // --- Replication --- - - private var progressStore: Map[String, Long] = Map.empty - - def replicationWriteProcessor(sourceLogId: String, currentVersionVector: VectorTime): ReplicationProcessor - def replicationReadProcessor(targetLogId: String, targetVersionVector: VectorTime, num: Int): ReplicationProcessor - - def replicationRead(fromSequenceNr: Long, num: Int, targetLogId: String, targetVersionVector: VectorTime): ReplicationProcessResult = - replicationReadProcessor(targetLogId, targetVersionVector, num) - .apply(read(fromSequenceNr), fromSequenceNr) - - def causalityFilter(versionVector: VectorTime): ReplicationFilter = new ReplicationFilter { - override def apply(event: EncodedEvent): Boolean = !event.before(versionVector) - } - - def replicationWrite(events: Seq[EncodedEvent], progress: Long, sourceLogId: String): ReplicationProcessResult = { - replicationWriteProcessor(sourceLogId, versionVector) - .apply(events, progress).right.map { - case (filtered, updatedProgress) => (write(filtered, (evt, snr) => evt.replicated(id, snr)), updatedProgress) - } - } - - def progressWrite(progresses: Map[String, Long]): Unit = - progressStore = progressStore ++ progresses - - def progressRead(logId: String): Long = - progressStore.getOrElse(logId, 0L) } trait EventSubscribers { + // Implementation private var subscribers: Set[ActorRef] = Set.empty - def subscribe(subscriber: ActorRef): Unit = + // API + protected def subscribe(subscriber: ActorRef): Unit = subscribers = subscribers + subscriber - def publish(events: Seq[DecodedEvent]): Unit = for { + protected def publish(events: Seq[DecodedEvent]): Unit = for { e <- events s <- subscribers } s ! e } -class EventLog(val id: String, val sourceFilter: ReplicationFilter) extends Actor with EventLogOps with EventSubscribers { +trait EventLogWithEventsourcing extends EventLogOps { this: Actor with EventSubscribers => + import EventLog._ - import context.system - /* --- Eventsourcing --- */ + // Implementation + private def emissionWrite(events: Seq[EncodedEvent]): Seq[EncodedEvent] = + write(events, (evt, snr) => evt.emitted(id, snr)) + + private implicit val system = context.system - private def eventsourcingReceive: Receive = { + // API + protected def eventsourcingReceive: Receive = { case Subscribe(subscriber) => subscribe(subscriber) case Read(from) => @@ -120,9 +100,50 @@ class EventLog(val id: String, val sourceFilter: ReplicationFilter) extends Acto publish(decoded) } - /* --- Replication --- */ +} + +trait EventLogWithReplication extends EventLogOps { this: Actor with EventSubscribers => + + import EventLogWithReplication._ + import EventLog._ + + // Required + protected def replicationReadDecider(targetLogId: String, targetVersionVector: VectorTime, num: Int): ReplicationDecider + protected def replicationWriteDecider(sourceLogId: String): ReplicationDecider + + // Implementation + private implicit val system = context.system + + private var progressStore: Map[String, Long] = Map.empty + + private def replicationWriteProcessor(sourceLogId: String): ReplicationProcessor = + ReplicationProcessor(replicationWriteDecider(sourceLogId)) + private def replicationReadProcessor(targetLogId: String, targetVersionVector: VectorTime, num: Int): ReplicationProcessor = + ReplicationProcessor(replicationReadDecider(targetLogId, targetVersionVector, num) andThen ReplicationDecider(BlockAfter(num))) - private def replicationReceive: Receive = { + private def replicationRead(fromSequenceNr: Long, num: Int, targetLogId: String, targetVersionVector: VectorTime): ReplicationProcessResult = + replicationReadProcessor(targetLogId, targetVersionVector, num) + .apply(read(fromSequenceNr), fromSequenceNr) + + private def causalityFilter(versionVector: VectorTime): ReplicationFilter = new ReplicationFilter { + override def apply(event: EncodedEvent): Boolean = !event.before(versionVector) + } + + private def replicationWrite(events: Seq[EncodedEvent], progress: Long, sourceLogId: String): ReplicationProcessResult = { + replicationWriteProcessor(sourceLogId) + .apply(events, progress).right.map { + case (filtered, updatedProgress) => (write(filtered, (evt, snr) => evt.replicated(id, snr)), updatedProgress) + } + } + + private def progressWrite(progresses: Map[String, Long]): Unit = + progressStore = progressStore ++ progresses + + private def progressRead(logId: String): Long = + progressStore.getOrElse(logId, 0L) + + // API + protected def replicationReceive: Receive = { case ReplicationRead(from, num, tlid, tvv) => replicationRead(from, num, tlid, tvv) match { case Right((processedEvents, progress)) => @@ -144,42 +165,85 @@ class EventLog(val id: String, val sourceFilter: ReplicationFilter) extends Acto sender() ! GetReplicationProgressAndVersionVectorSuccess(progressRead(logId), versionVector) } - /* --- Replication Filters --- */ + protected def replicationReadCausalityDecider(targetVersionVector: VectorTime): ReplicationDecider = + ReplicationDecider(causalityFilter(targetVersionVector)) + + protected def replicationWriteCausalityDecider: ReplicationDecider = + ReplicationDecider(causalityFilter(versionVector)) +} + +object EventLogWithReplication { + class ReplicationStoppedException(reason: BlockReason) + extends IllegalStateException(s"Replication stopped: $reason") +} + +trait EventLogWithReplicationFilters { this: EventLogWithReplication => + + // required + protected def sourceFilter: ReplicationFilter + // Implementation /** Maps target log ids to replication filters used for replication reads */ private var targetFilters: Map[String, ReplicationFilter] = Map.empty - private def replicationFilterReceive: Receive = { + // API + protected def replicationFilterReceive: Receive = { case AddTargetFilter(logId, filter) => targetFilters = targetFilters.updated(logId, filter) } - /* --- RFC --- */ + protected def replicationReadFilterDecider(targetLogId: String): ReplicationDecider = { + val targetFilter = targetFilters.getOrElse(targetLogId, NoFilter) + ReplicationDecider(targetFilter and sourceFilter) + } +} +trait RedundantFilteredConnections { this: EventLogWithReplicationFilters => + // Implementation /** Maps target log ids to [[RedundantFilterConfig]]s used to build [[RfcBlocker]]s for replication reads */ private var redundantFilterConfigs: Map[String, RedundantFilterConfig] = Map.empty - - private def rfcReceive: Receive = { + // API + protected def rfcReceive: Receive = { case AddRedundantFilterConfig(logId, config) => redundantFilterConfigs += logId -> config } - /* --- Scheme evolution --- */ + protected def replicationReadRfcDecider(targetLogId: String, targetVersionVector: VectorTime) = { + val rfcBlocker = redundantFilterConfigs.get(targetLogId).map(_.rfcBlocker(targetVersionVector)).getOrElse(NoBlocker) + ReplicationDecider(rfcBlocker) + } +} +trait EventLogWithSchemaEvolution { this: Actor with EventLogWithReplication => + // Implementation /** Maps source log ids to [[ReplicationDecider]]s used for replication writes */ private var eventCompatibilityDeciders: Map[String, ReplicationDecider] = Map.empty - private def schemaEvolutionReceive: Receive = { + private implicit val system = context.system + + // API + protected def schemaEvolutionReceive: Receive = { case AddEventCompatibilityDecider(sourceLogId, processor) => eventCompatibilityDeciders += sourceLogId -> processor case RemoveEventCompatibilityDecider(sourceLogId) => eventCompatibilityDeciders -= sourceLogId } + protected def replicationWriteSchemaEvolutionDecider(sourceLogId: String) = + eventCompatibilityDeciders.getOrElse(sourceLogId, stopOnUnserializableKeepOthers) +} + +class EventLog(val id: String, val sourceFilter: ReplicationFilter) extends Actor + with EventLogWithEventsourcing with EventSubscribers + with EventLogWithReplication + with EventLogWithReplicationFilters with RedundantFilteredConnections + with EventLogWithSchemaEvolution + with InMemoryEventLog { + override def receive: Receive = eventsourcingReceive orElse replicationReceive orElse @@ -187,27 +251,14 @@ class EventLog(val id: String, val sourceFilter: ReplicationFilter) extends Acto rfcReceive orElse schemaEvolutionReceive - /* --- Replication processors --- */ - - override def replicationWriteProcessor(sourceLogId: String, currentVersionVector: VectorTime) = - ReplicationProcessor( - // replication - ReplicationDecider(causalityFilter(currentVersionVector)) - // schema evolution - .andThen(eventCompatibilityDeciders.getOrElse(sourceLogId, stopOnUnserializableKeepOthers))) + override def replicationWriteDecider(sourceLogId: String) = + replicationWriteCausalityDecider + .andThen(replicationWriteSchemaEvolutionDecider(sourceLogId)) - override def replicationReadProcessor(targetLogId: String, targetVersionVector: VectorTime, num: Int) = { - val targetFilter = targetFilters.getOrElse(targetLogId, NoFilter) - val rfcBlocker = redundantFilterConfigs.get(targetLogId).map(_.rfcBlocker(targetVersionVector)).getOrElse(NoBlocker) - ReplicationProcessor( - // replication - ReplicationDecider(causalityFilter(targetVersionVector)) - // replication filters - .andThen(ReplicationDecider(targetFilter and sourceFilter)) - // RFC - .andThen(ReplicationDecider(rfcBlocker)) - // replication - .andThen(ReplicationDecider(BlockAfter(num)))) + override def replicationReadDecider(targetLogId: String, targetVersionVector: VectorTime, num: Int) = { + replicationReadCausalityDecider(targetVersionVector) + .andThen(replicationReadFilterDecider(targetLogId)) + .andThen(replicationReadRfcDecider(targetLogId, targetVersionVector)) } } @@ -223,7 +274,4 @@ object EventLog { def decode(events: Seq[EncodedEvent])(implicit system: ActorSystem): Seq[DecodedEvent] = events.map(e => EventPayloadSerializer.decode(e).get) - - class ReplicationStoppedException(reason: BlockReason) - extends IllegalStateException(s"Replication stopped: $reason") } From 4549962043ebad9ff740247c0ad86aa324cf2cc3 Mon Sep 17 00:00:00 2001 From: Volker Stampa Date: Fri, 16 Dec 2016 09:53:18 +0100 Subject: [PATCH 2/3] Replace Nr by No see also: http://forum.wordreference.com/threads/abbreviation-of-number-n-n%C2%B0-nr-nbr-no.264328/ --- .../eventuate/sandbox/Event.scala | 12 +++++----- .../eventuate/sandbox/EventLog.scala | 24 +++++++++---------- .../sandbox/EventsourcingProtocol.scala | 2 +- .../sandbox/ReplicationEndpoint.scala | 4 ++-- .../sandbox/ReplicationProcessor.scala | 2 +- .../sandbox/ReplicationProtocol.scala | 2 +- 6 files changed, 23 insertions(+), 23 deletions(-) diff --git a/src/main/scala/com/rbmhtechnology/eventuate/sandbox/Event.scala b/src/main/scala/com/rbmhtechnology/eventuate/sandbox/Event.scala index 53b7f21..31dd202 100644 --- a/src/main/scala/com/rbmhtechnology/eventuate/sandbox/Event.scala +++ b/src/main/scala/com/rbmhtechnology/eventuate/sandbox/Event.scala @@ -1,6 +1,6 @@ package com.rbmhtechnology.eventuate.sandbox -case class EventMetadata(emitterId: String, emitterLogId: String, localLogId: String, localSequenceNr: Long, vectorTimestamp: VectorTime) +case class EventMetadata(emitterId: String, emitterLogId: String, localLogId: String, localSequenceNo: Long, vectorTimestamp: VectorTime) case class PayloadVersion(majorVersion: Int, minorVersion: Int) case class EventManifest(schema: String, isStringManifest: Boolean, payloadVersion: Option[PayloadVersion]) case class EventBytes(bytes: Array[Byte], serializerId: Int, manifest: EventManifest) @@ -20,17 +20,17 @@ object DecodedEvent { case class DecodedEvent(metadata: EventMetadata, payload: AnyRef) extends DurableEvent case class EncodedEvent(metadata: EventMetadata, payload: EventBytes) extends DurableEvent { - def emitted(localLogId: String, localSequenceNr: Long): EncodedEvent = { + def emitted(localLogId: String, localSequenceNo: Long): EncodedEvent = { copy(metadata.copy( emitterLogId = localLogId, localLogId = localLogId, - localSequenceNr = localSequenceNr, - vectorTimestamp = metadata.vectorTimestamp.setLocalTime(localLogId, localSequenceNr))) + localSequenceNo = localSequenceNo, + vectorTimestamp = metadata.vectorTimestamp.setLocalTime(localLogId, localSequenceNo))) } - def replicated(localLogId: String, localSequenceNr: Long): EncodedEvent = { + def replicated(localLogId: String, localSequenceNo: Long): EncodedEvent = { copy(metadata.copy( localLogId = localLogId, - localSequenceNr = localSequenceNr)) + localSequenceNo = localSequenceNo)) } } diff --git a/src/main/scala/com/rbmhtechnology/eventuate/sandbox/EventLog.scala b/src/main/scala/com/rbmhtechnology/eventuate/sandbox/EventLog.scala index 62f4813..51be194 100644 --- a/src/main/scala/com/rbmhtechnology/eventuate/sandbox/EventLog.scala +++ b/src/main/scala/com/rbmhtechnology/eventuate/sandbox/EventLog.scala @@ -18,14 +18,14 @@ trait EventLogOps { protected def versionVector: VectorTime - protected def read(fromSequenceNr: Long): Seq[EncodedEvent] + protected def read(fromSequenceNo: Long): Seq[EncodedEvent] protected def write(events: Seq[EncodedEvent], prepare: (EncodedEvent, Long) => EncodedEvent): Seq[EncodedEvent] } trait InMemoryEventLog extends EventLogOps { // Implementation - private var _sequenceNr: Long = 0L + private var _sequenceNo: Long = 0L private var _versionVector: VectorTime = VectorTime.Zero private var eventStore: Vector[EncodedEvent] = Vector.empty @@ -34,18 +34,18 @@ trait InMemoryEventLog extends EventLogOps { protected def versionVector: VectorTime = _versionVector - protected def read(fromSequenceNr: Long): Seq[EncodedEvent] = - eventStore.drop(fromSequenceNr.toInt - 1) + protected def read(fromSequenceNo: Long): Seq[EncodedEvent] = + eventStore.drop(fromSequenceNo.toInt - 1) protected def write(events: Seq[EncodedEvent], prepare: (EncodedEvent, Long) => EncodedEvent): Seq[EncodedEvent] = { - var snr = _sequenceNr + var sno = _sequenceNo var cvv = _versionVector var log = eventStore val written = events.map { event => - snr = snr + 1L + sno = sno + 1L - val prepared = prepare(event, snr) + val prepared = prepare(event, sno) cvv = cvv.merge(prepared.metadata.vectorTimestamp) log = log :+ prepared @@ -53,7 +53,7 @@ trait InMemoryEventLog extends EventLogOps { prepared } - _sequenceNr = snr + _sequenceNo = sno _versionVector = cvv eventStore = log @@ -82,7 +82,7 @@ trait EventLogWithEventsourcing extends EventLogOps { this: Actor with EventSubs // Implementation private def emissionWrite(events: Seq[EncodedEvent]): Seq[EncodedEvent] = - write(events, (evt, snr) => evt.emitted(id, snr)) + write(events, (evt, sno) => evt.emitted(id, sno)) private implicit val system = context.system @@ -121,9 +121,9 @@ trait EventLogWithReplication extends EventLogOps { this: Actor with EventSubscr private def replicationReadProcessor(targetLogId: String, targetVersionVector: VectorTime, num: Int): ReplicationProcessor = ReplicationProcessor(replicationReadDecider(targetLogId, targetVersionVector, num) andThen ReplicationDecider(BlockAfter(num))) - private def replicationRead(fromSequenceNr: Long, num: Int, targetLogId: String, targetVersionVector: VectorTime): ReplicationProcessResult = + private def replicationRead(fromSequenceNo: Long, num: Int, targetLogId: String, targetVersionVector: VectorTime): ReplicationProcessResult = replicationReadProcessor(targetLogId, targetVersionVector, num) - .apply(read(fromSequenceNr), fromSequenceNr) + .apply(read(fromSequenceNo), fromSequenceNo) private def causalityFilter(versionVector: VectorTime): ReplicationFilter = new ReplicationFilter { override def apply(event: EncodedEvent): Boolean = !event.before(versionVector) @@ -132,7 +132,7 @@ trait EventLogWithReplication extends EventLogOps { this: Actor with EventSubscr private def replicationWrite(events: Seq[EncodedEvent], progress: Long, sourceLogId: String): ReplicationProcessResult = { replicationWriteProcessor(sourceLogId) .apply(events, progress).right.map { - case (filtered, updatedProgress) => (write(filtered, (evt, snr) => evt.replicated(id, snr)), updatedProgress) + case (filtered, updatedProgress) => (write(filtered, (evt, sno) => evt.replicated(id, sno)), updatedProgress) } } diff --git a/src/main/scala/com/rbmhtechnology/eventuate/sandbox/EventsourcingProtocol.scala b/src/main/scala/com/rbmhtechnology/eventuate/sandbox/EventsourcingProtocol.scala index ff70777..6647c2f 100644 --- a/src/main/scala/com/rbmhtechnology/eventuate/sandbox/EventsourcingProtocol.scala +++ b/src/main/scala/com/rbmhtechnology/eventuate/sandbox/EventsourcingProtocol.scala @@ -7,7 +7,7 @@ import scala.collection.immutable.Seq object EventsourcingProtocol { case class Subscribe(subscriber: ActorRef) - case class Read(fromSequenceNr: Long) + case class Read(fromSequenceNo: Long) case class ReadSuccess(events: Seq[DecodedEvent]) case class Write(events: Seq[DecodedEvent]) diff --git a/src/main/scala/com/rbmhtechnology/eventuate/sandbox/ReplicationEndpoint.scala b/src/main/scala/com/rbmhtechnology/eventuate/sandbox/ReplicationEndpoint.scala index 06b6136..7619595 100644 --- a/src/main/scala/com/rbmhtechnology/eventuate/sandbox/ReplicationEndpoint.scala +++ b/src/main/scala/com/rbmhtechnology/eventuate/sandbox/ReplicationEndpoint.scala @@ -171,8 +171,8 @@ private class Replicator(sourceLogId: String, sourceLog: ActorRef, targetLogId: private def fetch(): Unit = targetLog.ask(GetReplicationProgressAndVersionVector(sourceLogId))(settings.askTimeout).pipeTo(self) - private def read(fromSequenceNr: Long, targetVersionVector: VectorTime): Unit = - sourceLog.ask(ReplicationRead(fromSequenceNr, settings.batchSize, targetLogId, targetVersionVector))(settings.askTimeout).pipeTo(self) + private def read(fromSequenceNo: Long, targetVersionVector: VectorTime): Unit = + sourceLog.ask(ReplicationRead(fromSequenceNo, settings.batchSize, targetLogId, targetVersionVector))(settings.askTimeout).pipeTo(self) private def write(events: Seq[EncodedEvent], progress: Long): Unit = targetLog.ask(ReplicationWrite(events, sourceLogId, progress))(settings.askTimeout).pipeTo(self) diff --git a/src/main/scala/com/rbmhtechnology/eventuate/sandbox/ReplicationProcessor.scala b/src/main/scala/com/rbmhtechnology/eventuate/sandbox/ReplicationProcessor.scala index 623518a..92fd667 100644 --- a/src/main/scala/com/rbmhtechnology/eventuate/sandbox/ReplicationProcessor.scala +++ b/src/main/scala/com/rbmhtechnology/eventuate/sandbox/ReplicationProcessor.scala @@ -25,7 +25,7 @@ case class ReplicationProcessor(replicationDecider: ReplicationDecider) { case Block(reason) => Either.cond(lastProgress > 0, (out, lastProgress), reason) case decision => - lastProgress = seq.head.metadata.localSequenceNr + lastProgress = seq.head.metadata.localSequenceNo go(seq.tail, if(decision == Filter) out else out :+ seq.head) } } diff --git a/src/main/scala/com/rbmhtechnology/eventuate/sandbox/ReplicationProtocol.scala b/src/main/scala/com/rbmhtechnology/eventuate/sandbox/ReplicationProtocol.scala index 7eeeb20..c883dcc 100644 --- a/src/main/scala/com/rbmhtechnology/eventuate/sandbox/ReplicationProtocol.scala +++ b/src/main/scala/com/rbmhtechnology/eventuate/sandbox/ReplicationProtocol.scala @@ -18,7 +18,7 @@ object ReplicationProtocol { case class GetReplicationProgressAndVersionVector(sourceLogId: String) case class GetReplicationProgressAndVersionVectorSuccess(progress: Long, targetVersionVector: VectorTime) - case class ReplicationRead(fromSequenceNr: Long, num: Int, targetLogId: String, targetVersionVector: VectorTime) + case class ReplicationRead(fromSequenceNo: Long, num: Int, targetLogId: String, targetVersionVector: VectorTime) case class ReplicationReadSuccess(events: Seq[EncodedEvent], progress: Long) case class ReplicationReadFailure(cause: Throwable) From d48d81f991012d3c198bb48bf98dc8df442727df Mon Sep 17 00:00:00 2001 From: Volker Stampa Date: Mon, 19 Dec 2016 15:43:41 +0100 Subject: [PATCH 3/3] Support deletion of event up to sequence no - add delete method to ReplicationEndpoint - implement immediate deletion (for unreplicated logs) - implement deferred deletion (for replicated logs): Only events that have been replicated to all connected targets are deleted - Move Location test utility to separate file --- src/main/resources/reference.conf | 4 + .../eventuate/sandbox/EventLog.scala | 110 +++++++++++++-- .../sandbox/EventsourcingProtocol.scala | 2 + .../sandbox/ReplicationEndpoint.scala | 6 +- .../eventuate/sandbox/EventDeletionSpec.scala | 55 ++++++++ .../eventuate/sandbox/EventLogSpec.scala | 17 ++- .../eventuate/sandbox/Location.scala | 133 ++++++++++++++++++ .../RedundantFilteredConnectionsSpec.scala | 127 +---------------- 8 files changed, 317 insertions(+), 137 deletions(-) create mode 100644 src/test/scala/com/rbmhtechnology/eventuate/sandbox/EventDeletionSpec.scala create mode 100644 src/test/scala/com/rbmhtechnology/eventuate/sandbox/Location.scala diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf index dfd009e..8eb9cf1 100644 --- a/src/main/resources/reference.conf +++ b/src/main/resources/reference.conf @@ -2,4 +2,8 @@ sandbox.replication { ask-timeout = 10s retry-delay = 200ms batch-size = 10 +} + +sandbox.log { + delete.retry-delay = 200ms } \ No newline at end of file diff --git a/src/main/scala/com/rbmhtechnology/eventuate/sandbox/EventLog.scala b/src/main/scala/com/rbmhtechnology/eventuate/sandbox/EventLog.scala index 51be194..20c7ba0 100644 --- a/src/main/scala/com/rbmhtechnology/eventuate/sandbox/EventLog.scala +++ b/src/main/scala/com/rbmhtechnology/eventuate/sandbox/EventLog.scala @@ -1,17 +1,23 @@ package com.rbmhtechnology.eventuate.sandbox +import java.util.concurrent.TimeUnit + import akka.actor.Actor.Receive import akka.actor._ import com.rbmhtechnology.eventuate.sandbox.EventsourcingProtocol._ import com.rbmhtechnology.eventuate.sandbox.EventCompatibility.stopOnUnserializableKeepOthers +import com.rbmhtechnology.eventuate.sandbox.EventLogWithDeferredDeletion.DeferredDeletionSettings import com.rbmhtechnology.eventuate.sandbox.ReplicationFilter.NoFilter import com.rbmhtechnology.eventuate.sandbox.ReplicationProcessor.ReplicationProcessResult import com.rbmhtechnology.eventuate.sandbox.ReplicationProtocol._ import com.rbmhtechnology.eventuate.sandbox.ReplicationBlocker.BlockAfter import com.rbmhtechnology.eventuate.sandbox.ReplicationBlocker.NoBlocker import com.rbmhtechnology.eventuate.sandbox.serializer.EventPayloadSerializer +import com.typesafe.config.Config import scala.collection.immutable.Seq +import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration.DurationLong trait EventLogOps { protected def id: String @@ -21,12 +27,28 @@ trait EventLogOps { protected def read(fromSequenceNo: Long): Seq[EncodedEvent] protected def write(events: Seq[EncodedEvent], prepare: (EncodedEvent, Long) => EncodedEvent): Seq[EncodedEvent] + + protected def deleteWhile(cond: EncodedEvent => Boolean): Long +} + +trait ProgressStoreOps { + + protected def progressWrite(progresses: Map[String, Long]): Unit + + protected def progressRead(logId: String): Long + + protected def versionVectorWrite(logId: String, versionVector: VectorTime): Unit + + protected def versionVectorRead(logId: String): Option[VectorTime] + + protected def versionVectorReadAll: Map[String, VectorTime] } trait InMemoryEventLog extends EventLogOps { // Implementation private var _sequenceNo: Long = 0L private var _versionVector: VectorTime = VectorTime.Zero + private var _deletionVector: VectorTime = VectorTime.Zero private var eventStore: Vector[EncodedEvent] = Vector.empty @@ -34,8 +56,11 @@ trait InMemoryEventLog extends EventLogOps { protected def versionVector: VectorTime = _versionVector + protected def deletionVector: VectorTime = + _deletionVector + protected def read(fromSequenceNo: Long): Seq[EncodedEvent] = - eventStore.drop(fromSequenceNo.toInt - 1) + eventStore.dropWhile(_.metadata.localSequenceNo < fromSequenceNo) protected def write(events: Seq[EncodedEvent], prepare: (EncodedEvent, Long) => EncodedEvent): Seq[EncodedEvent] = { var sno = _sequenceNo @@ -59,6 +84,44 @@ trait InMemoryEventLog extends EventLogOps { written } + + protected def deleteWhile(cond: EncodedEvent => Boolean): Long = { + val (deleted, updatedEventStore) = eventStore.span(cond) + eventStore = updatedEventStore + _deletionVector = deleted.foldLeft(_deletionVector)((dvv, ev) => dvv.merge(ev.metadata.vectorTimestamp)) + deleted.lastOption.map(_.metadata.localSequenceNo).getOrElse(0) + } +} + +trait InMemoryProgressStore extends ProgressStoreOps { + // Implementation + private var progressStore: Map[String, Long] = Map.empty + + /** Maps target log ids to respective version vector */ + private var versionVectorStore: Map[String, VectorTime] = Map.empty + + // API + protected def progressWrite(progresses: Map[String, Long]): Unit = + progressStore = progressStore ++ progresses + + protected def progressRead(logId: String): Long = + progressStore.getOrElse(logId, 0L) + + protected def versionVectorWrite(logId: String, versionVector: VectorTime): Unit = + versionVectorStore += logId -> versionVector + + protected def versionVectorRead(logId: String): Option[VectorTime] = + versionVectorStore.get(logId) + + override protected def versionVectorReadAll: Map[String, VectorTime] = + versionVectorStore +} + +trait EventLogWithImmediateDeletion extends EventLogOps { + protected def deleteReceive(): Receive = { + case Delete(toSequenceNo) => + deleteWhile(_.metadata.localSequenceNo <= toSequenceNo) + } } trait EventSubscribers { @@ -102,7 +165,7 @@ trait EventLogWithEventsourcing extends EventLogOps { this: Actor with EventSubs } -trait EventLogWithReplication extends EventLogOps { this: Actor with EventSubscribers => +trait EventLogWithReplication extends EventLogOps with ProgressStoreOps { this: Actor with EventSubscribers => import EventLogWithReplication._ import EventLog._ @@ -114,8 +177,6 @@ trait EventLogWithReplication extends EventLogOps { this: Actor with EventSubscr // Implementation private implicit val system = context.system - private var progressStore: Map[String, Long] = Map.empty - private def replicationWriteProcessor(sourceLogId: String): ReplicationProcessor = ReplicationProcessor(replicationWriteDecider(sourceLogId)) private def replicationReadProcessor(targetLogId: String, targetVersionVector: VectorTime, num: Int): ReplicationProcessor = @@ -136,15 +197,10 @@ trait EventLogWithReplication extends EventLogOps { this: Actor with EventSubscr } } - private def progressWrite(progresses: Map[String, Long]): Unit = - progressStore = progressStore ++ progresses - - private def progressRead(logId: String): Long = - progressStore.getOrElse(logId, 0L) - // API protected def replicationReceive: Receive = { case ReplicationRead(from, num, tlid, tvv) => + versionVectorWrite(tlid, tvv) replicationRead(from, num, tlid, tvv) match { case Right((processedEvents, progress)) => sender() ! ReplicationReadSuccess(processedEvents, progress) @@ -177,6 +233,36 @@ object EventLogWithReplication { extends IllegalStateException(s"Replication stopped: $reason") } +trait EventLogWithDeferredDeletion { this: Actor with EventLogWithReplication => + + private val settings: DeferredDeletionSettings = + new DeferredDeletionSettings(context.system.settings.config) + + private val scheduler = context.system.scheduler + + import context.dispatcher + + private var deletedToSequenceNo: Long = 0L + + protected def deleteReceive(): Receive = { + case Delete(toSequenceNo) => + deletedToSequenceNo = deleteWhile { event => + event.metadata.localSequenceNo <= toSequenceNo && + versionVectorReadAll.values.forall(event.metadata.vectorTimestamp <= _) + } + if(deletedToSequenceNo < toSequenceNo) + scheduler.scheduleOnce(settings.retryDelay, self, Delete(toSequenceNo)) + } +} + +object EventLogWithDeferredDeletion { + class DeferredDeletionSettings(config: Config) { + val retryDelay: FiniteDuration = + config.getDuration("sandbox.log.delete.retry-delay", TimeUnit.MILLISECONDS).millis + } + +} + trait EventLogWithReplicationFilters { this: EventLogWithReplication => // required @@ -239,13 +325,15 @@ trait EventLogWithSchemaEvolution { this: Actor with EventLogWithReplication => class EventLog(val id: String, val sourceFilter: ReplicationFilter) extends Actor with EventLogWithEventsourcing with EventSubscribers + with EventLogWithDeferredDeletion with EventLogWithReplication with EventLogWithReplicationFilters with RedundantFilteredConnections with EventLogWithSchemaEvolution - with InMemoryEventLog { + with InMemoryEventLog with InMemoryProgressStore { override def receive: Receive = eventsourcingReceive orElse + deleteReceive orElse replicationReceive orElse replicationFilterReceive orElse rfcReceive orElse diff --git a/src/main/scala/com/rbmhtechnology/eventuate/sandbox/EventsourcingProtocol.scala b/src/main/scala/com/rbmhtechnology/eventuate/sandbox/EventsourcingProtocol.scala index 6647c2f..2969494 100644 --- a/src/main/scala/com/rbmhtechnology/eventuate/sandbox/EventsourcingProtocol.scala +++ b/src/main/scala/com/rbmhtechnology/eventuate/sandbox/EventsourcingProtocol.scala @@ -12,4 +12,6 @@ object EventsourcingProtocol { case class Write(events: Seq[DecodedEvent]) case class WriteSuccess(events: Seq[DecodedEvent]) + + case class Delete(toSequenceNo: Long) } diff --git a/src/main/scala/com/rbmhtechnology/eventuate/sandbox/ReplicationEndpoint.scala b/src/main/scala/com/rbmhtechnology/eventuate/sandbox/ReplicationEndpoint.scala index 7619595..bc73f52 100644 --- a/src/main/scala/com/rbmhtechnology/eventuate/sandbox/ReplicationEndpoint.scala +++ b/src/main/scala/com/rbmhtechnology/eventuate/sandbox/ReplicationEndpoint.scala @@ -5,6 +5,7 @@ import java.util.function.UnaryOperator import akka.actor._ import akka.pattern.{ask, pipe} +import com.rbmhtechnology.eventuate.sandbox.EventsourcingProtocol.Delete import com.rbmhtechnology.eventuate.sandbox.ReplicationFilter.NoFilter import com.rbmhtechnology.eventuate.sandbox.ReplicationProtocol._ import com.typesafe.config._ @@ -64,7 +65,7 @@ class ReplicationEndpoint( eventCompatibilityDeciders.foreach { case (logName, processor) => eventLogs.get(logName).foreach(_ ! AddEventCompatibilityDecider(logId(reply.endpointId, logName), processor)) } - //TODO make sure processors are added before replicators are started + //TODO make sure deciders are added before replicators are started val replicators = reply.sourceLogs.map { case (logName, sourceLog) => val sourceLogId = logId(reply.endpointId, logName) @@ -82,6 +83,9 @@ class ReplicationEndpoint( } } + def delete(logName: String, toSequenceNo: Long): Unit = + eventLogs.get(logName).foreach(_ ! Delete(toSequenceNo)) + def terminate(): Future[Terminated] = system.terminate() diff --git a/src/test/scala/com/rbmhtechnology/eventuate/sandbox/EventDeletionSpec.scala b/src/test/scala/com/rbmhtechnology/eventuate/sandbox/EventDeletionSpec.scala new file mode 100644 index 0000000..e826bbd --- /dev/null +++ b/src/test/scala/com/rbmhtechnology/eventuate/sandbox/EventDeletionSpec.scala @@ -0,0 +1,55 @@ +package com.rbmhtechnology.eventuate.sandbox + +import akka.util.Timeout +import com.rbmhtechnology.eventuate.sandbox.Location.ExternalEvent +import com.rbmhtechnology.eventuate.sandbox.Location.InternalEvent +import com.rbmhtechnology.eventuate.sandbox.Location.LogName +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfterEach +import org.scalatest.Matchers +import org.scalatest.WordSpec +import org.scalatest.concurrent.Eventually +import org.scalatest.time.Millis +import org.scalatest.time.Span + +class EventDeletionSpec extends WordSpec with Matchers with BeforeAndAfterEach with Eventually { + + private val settings = + new ReplicationSettings(ConfigFactory.load()) + + implicit val timeout = + Timeout(settings.askTimeout) + + implicit override val patienceConfig = + PatienceConfig(timeout = Span(timeout.duration.toMillis, Millis), interval = Span(100, Millis)) + + "ReplicationEndpoints" must { + "delete events when replicated" in { + val Seq(a1, a2) = Location.locationMatrix(List("A"), 2).flatten + + val initialEmission = a1.emit(ExternalEvent) + a2.expectPayloads(initialEmission) + + val emitted = a1.emitN(ExternalEvent, 2) + a1.endpoint.delete(LogName, 2) + a2.expectPayloads(emitted) + eventually(a1.storedPayloads shouldBe emitted.tail) + } + "delete events when replication is filtered" ignore { + // fails as the internal event currently cannot be deleted + // as following events are not causally dependent + // and the internal event is not replicated to b2 + // thus version vector of b2 is never >= timestamp(internal event) + val Seq(a1, a2, b1, b2) = Location.locationMatrix(List("A", "B"), 2).flatten + + val internal = a1.emit(InternalEvent) + a2.expectPayloads(internal) + + val external = a2.emit(ExternalEvent) + b2.expectPayloads(external) + + a2.endpoint.delete(LogName, 1) + eventually(a2.storedPayloads shouldBe external) + } + } +} diff --git a/src/test/scala/com/rbmhtechnology/eventuate/sandbox/EventLogSpec.scala b/src/test/scala/com/rbmhtechnology/eventuate/sandbox/EventLogSpec.scala index 1b828b8..8abf9f0 100644 --- a/src/test/scala/com/rbmhtechnology/eventuate/sandbox/EventLogSpec.scala +++ b/src/test/scala/com/rbmhtechnology/eventuate/sandbox/EventLogSpec.scala @@ -8,6 +8,7 @@ import com.rbmhtechnology.eventuate.sandbox.EventsourcingProtocol._ import com.rbmhtechnology.eventuate.sandbox.ReplicationProtocol._ import com.rbmhtechnology.eventuate.sandbox.serializer.EventPayloadSerializer import org.scalatest._ +import org.scalatest.concurrent.Eventually import org.scalatest.concurrent.ScalaFutures import org.scalatest.time.Millis import org.scalatest.time.Span @@ -31,7 +32,7 @@ object EventLogSpec { new ExcludePayload(payload) } -class EventLogSpec extends TestKit(ActorSystem("test")) with WordSpecLike with Matchers with BeforeAndAfterEach with BeforeAndAfterAll with ScalaFutures { +class EventLogSpec extends TestKit(ActorSystem("test")) with WordSpecLike with Matchers with BeforeAndAfterEach with BeforeAndAfterAll with ScalaFutures with Eventually { import EventLogSpec._ import EventLog._ @@ -152,5 +153,19 @@ class EventLogSpec extends TestKit(ActorSystem("test")) with WordSpecLike with M probe.expectMsg(DecodedEvent(EventMetadata(EmitterId2, LogId2, LogId1, 1L, VectorTime(LogId2 -> 1L)), "a")) probe.expectMsg(DecodedEvent(EventMetadata(EmitterId2, LogId2, LogId1, 2L, VectorTime(LogId2 -> 2L)), "b")) } + "process Delete" in { + val emitted = Seq( + DecodedEvent(EmitterId1, "a"), + DecodedEvent(EmitterId1, "b")) + + log ! Write(emitted) + log ! Delete(1) + + eventually { + whenReady(log.ask(Read(1L))) { + case ReadSuccess(events) => events.map(_.payload) should be(emitted.tail.map(_.payload)) + } + } + } } } diff --git a/src/test/scala/com/rbmhtechnology/eventuate/sandbox/Location.scala b/src/test/scala/com/rbmhtechnology/eventuate/sandbox/Location.scala new file mode 100644 index 0000000..9e2b416 --- /dev/null +++ b/src/test/scala/com/rbmhtechnology/eventuate/sandbox/Location.scala @@ -0,0 +1,133 @@ +package com.rbmhtechnology.eventuate.sandbox + +import akka.actor.ActorSystem +import akka.pattern.ask +import akka.testkit.TestProbe +import akka.util.Timeout +import com.rbmhtechnology.eventuate.sandbox.EventsourcingProtocol.Read +import com.rbmhtechnology.eventuate.sandbox.EventsourcingProtocol.ReadSuccess +import com.rbmhtechnology.eventuate.sandbox.EventsourcingProtocol.Subscribe +import com.rbmhtechnology.eventuate.sandbox.EventsourcingProtocol.Write +import com.rbmhtechnology.eventuate.sandbox.ReplicationFilter.NoFilter +import com.rbmhtechnology.eventuate.sandbox.serializer.EventPayloadSerializer +import com.typesafe.config.ConfigFactory + +import scala.collection.immutable.Seq +import scala.concurrent.Await +import scala.concurrent.duration.DurationInt + +object Location { + private val settings = + new ReplicationSettings(ConfigFactory.load()) + + implicit val timeout = + Timeout(settings.askTimeout) + + val LogName = "L" + + case class InternalEvent(s: String) + case class ExternalEvent(s: String) + + def replicationFilter(implicit system: ActorSystem): ReplicationFilter = new ReplicationFilter { + override def apply(event: EncodedEvent) = + EventPayloadSerializer.decode(event).get.payload.isInstanceOf[ExternalEvent] + } + + def payloadEquals(payload: AnyRef): PartialFunction[Any, Any] = { + case DecodedEvent(_, actual) if actual == payload => actual + } + + def bidiConnect(location1: Location, location2: Location): Unit = { + location1.endpoint.connect(location2.endpoint) + location2.endpoint.connect(location1.endpoint) + } + + def disconnect(location1: Location, location2: Location): Unit = { + location1.endpoint.disconnect(location2.endpoint.id) + location2.endpoint.disconnect(location1.endpoint.id) + } + + def bidiConnect( + location1: Location, location2: Location, + outboundFilter1: ReplicationFilter = NoFilter, outboundFilter2: ReplicationFilter = NoFilter, + rfcLocations1: Set[Location]= Set.empty, negate1: Boolean = false, + rfcLocations2: Set[Location]= Set.empty, negate2: Boolean = false + ): Unit = { + if(rfcLocations1.nonEmpty) location1.endpoint.addRedundantFilterConfig(location2.endpoint.id, RedundantFilterConfig(LogName, rfcLocations1.map(_.endpoint.id), !negate1)) + if(rfcLocations2.nonEmpty) location2.endpoint.addRedundantFilterConfig(location1.endpoint.id, RedundantFilterConfig(LogName, rfcLocations2.map(_.endpoint.id), !negate2)) + if(outboundFilter1 ne NoFilter) location1.endpoint.addTargetFilter(location2.endpoint.id, LogName, outboundFilter1) + if(outboundFilter2 ne NoFilter) location2.endpoint.addTargetFilter(location1.endpoint.id, LogName, outboundFilter2) + bidiConnect(location1, location2) + } + + def expectPayloads(payloads: Seq[AnyRef], locs: Location*) = + locs.foreach(_.expectPayloads(payloads)) + + def locationMatrix(applicationNames: Seq[String], nReplicas: Int): Seq[Seq[Location]] = { + val applications = applicationNames.map { applicationName => + (1 to nReplicas).map(replica => new Location(applicationName + replica)) + } + // unfiltered connections between replicas of an application + applications.foreach { application => + application.sliding(2).foreach(connected => bidiConnect(connected.head, connected.last)) + } + // filtered connections between applications + var rfcLocations = Set.empty[Location] + for { + Seq(app1, app2) <- applications.sliding(2) + (location1, location2) <- app1 zip app2 + } { + rfcLocations ++= app1 + bidiConnect( + location1 = location1, outboundFilter1 = location1.filter, rfcLocations1 = rfcLocations, negate1 = true, + location2 = location2, outboundFilter2 = location2.filter, rfcLocations2 = rfcLocations) + } + applications + } +} + +class Location(id: String) { + + import Location._ + + var eventCnt = 0 + var emitted: List[AnyRef] = Nil + val endpoint = new ReplicationEndpoint(s"EP-$id", Set(LogName)) + val probe = TestProbe(s"P-$id")(endpoint.system) + val log = endpoint.eventLogs(LogName) + log ! Subscribe(probe.ref) + + def emit(makePayloads: Function1[String, AnyRef]*): Seq[AnyRef] = { + val payloads = makePayloads.toList.map { makePayload => + eventCnt += 1 + makePayload(s"$id.$eventCnt") + } + log ! Write(payloads.map(DecodedEvent(s"EM-$id", _))) + emitted = payloads.reverse ::: emitted + payloads + } + + def emitN(makePayload: String => AnyRef, n: Int = 1): Seq[AnyRef] = + emit(List.fill(n)(makePayload): _*) + + def emittedInternal = emitted.filter(_.isInstanceOf[InternalEvent]) + def emittedExternal = emitted.filter(_.isInstanceOf[ExternalEvent]) + + def expectPayloads(payloads: Seq[AnyRef]): Unit = + payloads.foreach { payload => + probe.expectMsgPF(hint = s"log ${ReplicationEndpoint.logId(endpoint.id, LogName)} expects $payload")(payloadEquals(payload)) + } + + def expectNoMsg(): Unit = + probe.expectNoMsg(200.millis) + + def storedPayloads: Seq[AnyRef] = { + import scala.concurrent.ExecutionContext.Implicits.global + Await.result(log.ask(Read(0)).mapTo[ReadSuccess].map(_.events.map(_.payload)), timeout.duration) + } + + val filter = replicationFilter(endpoint.system) + + override def toString = s"Loc:$id" +} + diff --git a/src/test/scala/com/rbmhtechnology/eventuate/sandbox/RedundantFilteredConnectionsSpec.scala b/src/test/scala/com/rbmhtechnology/eventuate/sandbox/RedundantFilteredConnectionsSpec.scala index 696c3e6..16025a7 100644 --- a/src/test/scala/com/rbmhtechnology/eventuate/sandbox/RedundantFilteredConnectionsSpec.scala +++ b/src/test/scala/com/rbmhtechnology/eventuate/sandbox/RedundantFilteredConnectionsSpec.scala @@ -1,16 +1,7 @@ package com.rbmhtechnology.eventuate.sandbox import akka.actor.ActorSystem -import akka.pattern.ask -import akka.testkit.TestProbe -import akka.util.Timeout -import com.rbmhtechnology.eventuate.sandbox.EventsourcingProtocol.Read -import com.rbmhtechnology.eventuate.sandbox.EventsourcingProtocol.ReadSuccess -import com.rbmhtechnology.eventuate.sandbox.EventsourcingProtocol.Subscribe -import com.rbmhtechnology.eventuate.sandbox.EventsourcingProtocol.Write -import com.rbmhtechnology.eventuate.sandbox.ReplicationFilter.NoFilter -import com.rbmhtechnology.eventuate.sandbox.serializer.EventPayloadSerializer -import com.typesafe.config.ConfigFactory +import com.rbmhtechnology.eventuate.sandbox.Location._ import org.scalatest.BeforeAndAfterEach import org.scalatest.Matchers import org.scalatest.WordSpec @@ -18,122 +9,10 @@ import org.scalatest.concurrent.Eventually import org.scalatest.time.Millis import org.scalatest.time.Span -import scala.concurrent.duration.DurationInt -import scala.concurrent.ExecutionContext.Implicits.global import scala.collection.immutable.Seq -import scala.concurrent.Await import scala.util.Random object RedundantFilteredConnectionsSpec { - private val settings = - new ReplicationSettings(ConfigFactory.load()) - - implicit val timeout = - Timeout(settings.askTimeout) - - val LogName = "L" - - case class InternalEvent(s: String) - case class ExternalEvent(s: String) - - def replicationFilter(implicit system: ActorSystem): ReplicationFilter = new ReplicationFilter { - override def apply(event: EncodedEvent) = - EventPayloadSerializer.decode(event).get.payload.isInstanceOf[ExternalEvent] - } - - def payloadEquals(payload: AnyRef): PartialFunction[Any, Any] = { - case DecodedEvent(_, actual) if actual == payload => actual - } - - def bidiConnect(location1: Location, location2: Location): Unit = { - location1.endpoint.connect(location2.endpoint) - location2.endpoint.connect(location1.endpoint) - } - - def disconnect(location1: Location, location2: Location): Unit = { - location1.endpoint.disconnect(location2.endpoint.id) - location2.endpoint.disconnect(location1.endpoint.id) - } - - def bidiConnect( - location1: Location, location2: Location, - outboundFilter1: ReplicationFilter = NoFilter, outboundFilter2: ReplicationFilter = NoFilter, - rfcLocations1: Set[Location]= Set.empty, negate1: Boolean = false, - rfcLocations2: Set[Location]= Set.empty, negate2: Boolean = false - ): Unit = { - if(rfcLocations1.nonEmpty) location1.endpoint.addRedundantFilterConfig(location2.endpoint.id, RedundantFilterConfig(LogName, rfcLocations1.map(_.endpoint.id), !negate1)) - if(rfcLocations2.nonEmpty) location2.endpoint.addRedundantFilterConfig(location1.endpoint.id, RedundantFilterConfig(LogName, rfcLocations2.map(_.endpoint.id), !negate2)) - if(outboundFilter1 ne NoFilter) location1.endpoint.addTargetFilter(location2.endpoint.id, LogName, outboundFilter1) - if(outboundFilter2 ne NoFilter) location2.endpoint.addTargetFilter(location1.endpoint.id, LogName, outboundFilter2) - bidiConnect(location1, location2) - } - - def expectPayloads(payloads: Seq[AnyRef], locs: Location*) = - locs.foreach(_.expectPayloads(payloads)) - - def event(payload: AnyRef): DecodedEvent = DecodedEvent("emitter-id", payload) - - class Location(id: String) { - var eventCnt = 0 - var emitted: List[AnyRef] = Nil - val endpoint = new ReplicationEndpoint(s"EP-$id", Set(LogName)) - val probe = TestProbe(s"P-$id")(endpoint.system) - val log = endpoint.eventLogs(LogName) - log ! Subscribe(probe.ref) - - def emit(makePayloads: Function1[String, AnyRef]*): Seq[AnyRef] = { - val payloads = makePayloads.toList.map { makePayload => - eventCnt += 1 - makePayload(s"$id.$eventCnt") - } - log ! Write(payloads.map(DecodedEvent(s"EM-$id", _))) - emitted = payloads.reverse ::: emitted - payloads - } - - def emitN(makePayload: String => AnyRef, n: Int = 1): Seq[AnyRef] = - emit(List.fill(n)(makePayload): _*) - - def emittedInternal = emitted.filter(_.isInstanceOf[InternalEvent]) - def emittedExternal = emitted.filter(_.isInstanceOf[ExternalEvent]) - - def expectPayloads(payloads: Seq[AnyRef]): Unit = - payloads.foreach { payload => - probe.expectMsgPF(hint = s"${probe.ref} expects $payload")(payloadEquals(payload)) - } - - def expectNoMsg(): Unit = - probe.expectNoMsg(200.millis) - - def storedPayloads: Seq[AnyRef] = - Await.result(log.ask(Read(0)).mapTo[ReadSuccess].map(_.events.map(_.payload)), timeout.duration) - - val filter = replicationFilter(endpoint.system) - - override def toString = s"Loc:$id" - } - - def locationMatrix(applicationNames: Seq[String], nReplicas: Int): Seq[Seq[Location]] = { - val applications = applicationNames.map { applicationName => - (1 to nReplicas).map(replica => new Location(applicationName + replica)) - } - // unfiltered connections between replicas of an application - applications.foreach { application => - application.sliding(2).foreach(connected => bidiConnect(connected.head, connected.last)) - } - // filtered connections between applications - var rfcLocations = Set.empty[Location] - for { - Seq(app1, app2) <- applications.sliding(2) - (location1, location2) <- app1 zip app2 - } { - rfcLocations ++= app1 - bidiConnect( - location1 = location1, outboundFilter1 = location1.filter, rfcLocations1 = rfcLocations, negate1 = true, - location2 = location2, outboundFilter2 = location2.filter, rfcLocations2 = rfcLocations) - } - applications - } def randomDisconnects(disconnected: Vector[(Location, Location)], applications: Seq[Seq[Location]]): Vector[(Location, Location)] = { val Seq(loc1, loc2) = Random.shuffle(Random.shuffle(applications).head.sliding(2).toList).head @@ -152,7 +31,7 @@ class RedundantFilteredConnectionsSpec extends WordSpec with Matchers with Befor import RedundantFilteredConnectionsSpec._ implicit override val patienceConfig = - PatienceConfig(timeout = Span(RedundantFilteredConnectionsSpec.timeout.duration.toMillis, Millis), interval = Span(100, Millis)) + PatienceConfig(timeout = Span(Location.timeout.duration.toMillis, Millis), interval = Span(100, Millis)) private var systems: Seq[ActorSystem] = Nil @@ -248,7 +127,7 @@ class RedundantFilteredConnectionsSpec extends WordSpec with Matchers with Befor val lastEmitted = locations.last.emittedExternal.head var disconnected = Vector.empty[(Location, Location)] var i = 0 - locations.head.probe.fishForMessage(hint = s"${locations.head.endpoint.id} fish $lastEmitted", max = RedundantFilteredConnectionsSpec.timeout.duration) { + locations.head.probe.fishForMessage(hint = s"${locations.head.endpoint.id} fish $lastEmitted", max = Location.timeout.duration) { case ev: DecodedEvent if ev.payload == lastEmitted => true case _ => i += 1