From 1644e718a1e242290b4c26ae6b08dd5878657913 Mon Sep 17 00:00:00 2001 From: Martin Krasser Date: Wed, 2 Mar 2016 12:15:02 +0100 Subject: [PATCH] Dynamic and custom ports for integration tests - including a major refactoring of all log-based integration tests - closes #226 --- .../eventuate/DeleteEventsSpec.scala | 107 +++---- .../EventsourcedActorCausalitySpec.scala | 103 +++---- .../EventsourcedActorIntegrationSpec.scala | 12 +- .../EventsourcedActorThroughputSpec.scala | 10 +- ...EventsourcedProcessorIntegrationSpec.scala | 11 +- ...ationConfig.scala => LocationConfig.scala} | 6 +- .../eventuate/LocationSpec.scala | 195 +++++++++++++ .../eventuate/LocationSpecCassandra.scala | 151 ++++++++++ .../eventuate/LocationSpecLeveldb.scala | 71 +++++ .../PersistOnEventIntegrationSpec.scala | 11 +- .../eventuate/RecoverySpec.scala | 274 +++++++++-------- .../eventuate/ReplicationCycleSpec.scala | 89 ++---- .../ReplicationIntegrationSpec.scala | 205 ++++++------- .../eventuate/ReplicationNode.scala | 89 ------ .../eventuate/crdt/CRDTChaosSpec.scala | 64 ++-- .../eventuate/crdt/CRDTRecoverySpec.scala | 14 +- .../eventuate/crdt/CRDTServiceSpec.scala | 4 +- .../log/CircuitBreakerIntregrationSpec.scala | 2 +- .../eventuate/log/EventLogLifecycle.scala | 276 ------------------ .../log/EventLogPartitioningSpec.scala | 3 +- .../eventuate/log/EventLogSpec.scala | 21 +- .../serializer/CRDTSerializerSpec.scala | 12 +- .../DurableEventSerializerSpec.scala | 21 +- .../ReplicationFilterSerializerSpec.scala | 32 +- .../ReplicationProtocolSerializerSpec.scala | 16 +- ...pport.scala => SerializationContext.scala} | 21 +- .../serializer/SnapshotSerializerSpec.scala | 12 +- 27 files changed, 892 insertions(+), 940 deletions(-) rename src/it/scala/com/rbmhtechnology/eventuate/{ReplicationConfig.scala => LocationConfig.scala} (91%) create mode 100644 src/it/scala/com/rbmhtechnology/eventuate/LocationSpec.scala create mode 100644 src/it/scala/com/rbmhtechnology/eventuate/LocationSpecCassandra.scala create mode 100644 src/it/scala/com/rbmhtechnology/eventuate/LocationSpecLeveldb.scala delete mode 100644 src/it/scala/com/rbmhtechnology/eventuate/ReplicationNode.scala delete mode 100644 src/it/scala/com/rbmhtechnology/eventuate/log/EventLogLifecycle.scala rename src/it/scala/com/rbmhtechnology/eventuate/serializer/{SerializerSpecSupport.scala => SerializationContext.scala} (70%) diff --git a/src/it/scala/com/rbmhtechnology/eventuate/DeleteEventsSpec.scala b/src/it/scala/com/rbmhtechnology/eventuate/DeleteEventsSpec.scala index 638bd7c0..fffcdf1c 100644 --- a/src/it/scala/com/rbmhtechnology/eventuate/DeleteEventsSpec.scala +++ b/src/it/scala/com/rbmhtechnology/eventuate/DeleteEventsSpec.scala @@ -18,10 +18,8 @@ package com.rbmhtechnology.eventuate import akka.actor._ -import com.rbmhtechnology.eventuate.ReplicationIntegrationSpec.replicationConnection -import com.rbmhtechnology.eventuate.log.EventLogCleanupLeveldb -import com.rbmhtechnology.eventuate.log.EventLogLifecycleLeveldb.TestEventLog import com.rbmhtechnology.eventuate.utilities.AwaitHelper +import com.typesafe.config.ConfigFactory import org.scalatest.Matchers import org.scalatest.WordSpec @@ -30,19 +28,7 @@ import scala.util.Failure import scala.util.Success object DeleteEventsSpecLeveldb { - val L1 = "L1" - - val portA = 2552 - val connectedToA = replicationConnection(portA) - - val portB = 2553 - val connectedToB = replicationConnection(portB) - - val portC = 2554 - val connectedToC = replicationConnection(portC) - class Emitter(locationId: String, val eventLog: ActorRef) extends EventsourcedActor with ActorLogging { - override def id = s"${locationId}_Emitter" override def onEvent = Actor.emptyBehavior @@ -55,77 +41,76 @@ object DeleteEventsSpecLeveldb { } } - def emitter(node: ReplicationNode, logName: String) = - node.system.actorOf(Props(new Emitter(node.id, node.logs(logName)))) -} - -class DeleteEventsSpecLeveldb extends WordSpec with Matchers with ReplicationNodeRegistry with EventLogCleanupLeveldb { - import DeleteEventsSpecLeveldb._ - - val logFactory: String => Props = id => TestEventLog.props(id, batching = true) - - var ctr: Int = 0 - - override def beforeEach(): Unit = - ctr += 1 + def emitter(endpoint: ReplicationEndpoint, logName: String) = + endpoint.system.actorOf(Props(new Emitter(endpoint.id, endpoint.logs(logName)))) - def config = - ReplicationConfig.create() + val config = ConfigFactory.parseString( + """ + |eventuate.log.replication.retry-delay = 1s + |eventuate.log.replication.remote-read-timeout = 2s + |eventuate.disaster-recovery.remote-operation-retry-max = 10 + |eventuate.disaster-recovery.remote-operation-retry-delay = 1s + |eventuate.disaster-recovery.remote-operation-timeout = 1s + """.stripMargin) - def nodeId(node: String): String = - s"${node}_${ctr}" + val L1 = "L1" +} - def node(nodeName: String, logNames: Set[String], port: Int, connections: Set[ReplicationConnection], customConfig: String = "", activate: Boolean = false): ReplicationNode = - register(new ReplicationNode(nodeId(nodeName), logNames, logFactory, connections, port = port, customConfig = RecoverySpecLeveldb.config + customConfig, activate = activate)) +class DeleteEventsSpecLeveldb extends WordSpec with Matchers with MultiLocationSpecLeveldb { + import ReplicationIntegrationSpec.replicationConnection + import DeleteEventsSpecLeveldb._ "Deleting events" must { "not replay deleted events on restart" in { - val nodeA = newNodeA(Set(L1)) - val emitterA = emitter(nodeA, L1) - val listenerA = nodeA.eventListener(L1) + def newLocationA = location("A", customConfig = DeleteEventsSpecLeveldb.config) + def newEndpointA(l: Location) = l.endpoint(Set(L1), Set(), activate = false) + + val locationA1 = newLocationA + val endpointA1 = newEndpointA(locationA1) + + val listenerA = locationA1.listener(endpointA1.logs(L1)) + val emitterA = emitter(endpointA1, L1) (0 to 5).foreach(emitterA ! _) listenerA.waitForMessage(5) - nodeA.endpoint.delete(L1, 3, Set.empty).await shouldBe 3 + endpointA1.delete(L1, 3, Set.empty).await shouldBe 3 + locationA1.terminate().await - nodeA.terminate().await + val locationA2 = newLocationA + def endpointA2 = newEndpointA(locationA2) - val restartedA = newNodeA(Set(L1)) - restartedA.eventListener(L1).expectMsgAllOf(3 to 5: _*) + locationA2.listener(endpointA2.logs(L1)).expectMsgAllOf(3 to 5: _*) } } "Conditionally deleting events" must { "keep event available for corresponding remote log" in { - val nodeA = newNodeA(Set(L1), Set(connectedToB, connectedToC)) - val nodeB = newNodeB(Set(L1), Set(connectedToA)) - val nodeC = newNodeC(Set(L1), Set(connectedToA)) - val emitterA = emitter(nodeA, L1) - val listenerA = nodeA.eventListener(L1) - val listenerB = nodeB.eventListener(L1) - val listenerC = nodeC.eventListener(L1) + val locationA = location("A", customConfig = DeleteEventsSpecLeveldb.config) + val locationB = location("B", customConfig = DeleteEventsSpecLeveldb.config) + val locationC = location("C", customConfig = DeleteEventsSpecLeveldb.config) + + val endpointA = locationA.endpoint(Set(L1), Set(replicationConnection(locationB.port), replicationConnection(locationC.port)), activate = false) + val endpointB = locationB.endpoint(Set(L1), Set(replicationConnection(locationA.port)), activate = false) + val endpointC = locationC.endpoint(Set(L1), Set(replicationConnection(locationA.port)), activate = false) + + val emitterA = emitter(endpointA, L1) + + val listenerA = locationA.listener(endpointA.logs(L1)) + val listenerB = locationB.listener(endpointB.logs(L1)) + val listenerC = locationC.listener(endpointC.logs(L1)) (0 to 5).foreach(emitterA ! _) listenerA.waitForMessage(5) - nodeA.endpoint.delete(L1, 3, Set(nodeB.endpoint.id, nodeC.endpoint.id)).await shouldBe 3 + endpointA.delete(L1, 3, Set(endpointB.id, endpointC.id)).await shouldBe 3 - nodeA.endpoint.activate() - nodeB.endpoint.activate() + endpointA.activate() + endpointB.activate() listenerB.expectMsgAllOf(0 to 5: _*) - nodeC.endpoint.activate() + endpointC.activate() listenerC.expectMsgAllOf(0 to 5: _*) } } - - def newNodeA(logNames: Set[String], connections: Set[ReplicationConnection] = Set.empty) = - node("A", logNames, portA, connections) - - def newNodeB(logNames: Set[String], connections: Set[ReplicationConnection] = Set.empty) = - node("B", logNames, portB, connections) - - def newNodeC(logNames: Set[String], connections: Set[ReplicationConnection] = Set.empty) = - node("C", logNames, portC, connections) } diff --git a/src/it/scala/com/rbmhtechnology/eventuate/EventsourcedActorCausalitySpec.scala b/src/it/scala/com/rbmhtechnology/eventuate/EventsourcedActorCausalitySpec.scala index 298b953a..8beaf809 100644 --- a/src/it/scala/com/rbmhtechnology/eventuate/EventsourcedActorCausalitySpec.scala +++ b/src/it/scala/com/rbmhtechnology/eventuate/EventsourcedActorCausalitySpec.scala @@ -19,10 +19,6 @@ package com.rbmhtechnology.eventuate import akka.actor._ import akka.testkit.TestProbe -import com.rbmhtechnology.eventuate.log._ -import com.rbmhtechnology.eventuate.log.cassandra.Cassandra - -import org.cassandraunit.utils.EmbeddedCassandraServerHelper import org.scalatest._ import scala.collection.immutable.Seq @@ -44,26 +40,10 @@ object EventsourcedActorCausalitySpec { } } -abstract class EventsourcedActorCausalitySpec extends WordSpec with Matchers with ReplicationNodeRegistry { +abstract class EventsourcedActorCausalitySpec extends WordSpec with Matchers with MultiLocationSpec { import ReplicationIntegrationSpec.replicationConnection import EventsourcedActorCausalitySpec._ - def logFactory: String => Props - - var ctr: Int = 0 - - override def beforeEach(): Unit = - ctr += 1 - - def config = - ReplicationConfig.create() - - def nodeId(node: String): String = - s"${node}_${ctr}" - - def node(nodeName: String, logNames: Set[String], port: Int, connections: Set[ReplicationConnection]): ReplicationNode = - register(new ReplicationNode(nodeId(nodeName), logNames, logFactory, connections, port = port)) - def assertPartialOrder[A](events: Seq[A], sample: A*): Unit = { val indices = sample.map(events.indexOf) assert(indices == indices.sorted) @@ -74,24 +54,27 @@ abstract class EventsourcedActorCausalitySpec extends WordSpec with Matchers wit "track causality by default (sharedClockEntry = true)" in { val logName = "L1" - val nodeA = node("A", Set(logName), 2552, Set(replicationConnection(2553))) - val nodeB = node("B", Set(logName), 2553, Set(replicationConnection(2552))) + val locationA = location("A") + val locationB = location("B") - val logA = nodeA.logs(logName) - val logB = nodeB.logs(logName) + val endpointA = locationA.endpoint(Set(logName), Set(replicationConnection(locationB.port))) + val endpointB = locationB.endpoint(Set(logName), Set(replicationConnection(locationA.port))) - val logIdA = nodeA.endpoint.logId(logName) - val logIdB = nodeB.endpoint.logId(logName) + val logA = endpointA.logs(logName) + val logB = endpointB.logs(logName) - val probeA1 = new TestProbe(nodeA.system) - val probeA2 = new TestProbe(nodeA.system) - val probeA3 = new TestProbe(nodeA.system) - val probeB = new TestProbe(nodeB.system) + val logIdA = endpointA.logId(logName) + val logIdB = endpointB.logId(logName) - val actorA1 = nodeA.system.actorOf(Props(new Collaborator("pa1", logA, sharedClockEntry = true, Set("e1", "e2", "e5"), probeA1.ref))) - val actorA2 = nodeA.system.actorOf(Props(new Collaborator("pa2", logA, sharedClockEntry = true, Set("e3", "e5", "e6"), probeA2.ref))) - val actorA3 = nodeA.system.actorOf(Props(new Collaborator("pa3", logA, sharedClockEntry = true, Set("e4"), probeA3.ref))) - val actorB = nodeB.system.actorOf(Props(new Collaborator("pb", logB, sharedClockEntry = true, Set("e1", "e6"), probeB.ref))) + val probeA1 = new TestProbe(locationA.system) + val probeA2 = new TestProbe(locationA.system) + val probeA3 = new TestProbe(locationA.system) + val probeB = new TestProbe(locationB.system) + + val actorA1 = locationA.system.actorOf(Props(new Collaborator("pa1", logA, sharedClockEntry = true, Set("e1", "e2", "e5"), probeA1.ref))) + val actorA2 = locationA.system.actorOf(Props(new Collaborator("pa2", logA, sharedClockEntry = true, Set("e3", "e5", "e6"), probeA2.ref))) + val actorA3 = locationA.system.actorOf(Props(new Collaborator("pa3", logA, sharedClockEntry = true, Set("e4"), probeA3.ref))) + val actorB = locationB.system.actorOf(Props(new Collaborator("pb", logB, sharedClockEntry = true, Set("e1", "e6"), probeB.ref))) def vectorTime(a: Long, b: Long) = (a, b) match { case (0L, 0L) => VectorTime() @@ -131,54 +114,46 @@ abstract class EventsourcedActorCausalitySpec extends WordSpec with Matchers wit "located at the same location" can { "track causality if enabled (sharedClockEntry = false)" in { val logName = "L1" - val logNode = this.node("A", Set(logName), 2552, Set()) - val log = logNode.logs(logName) - val logId = logNode.endpoint.logId(logName) + + val locationA = location("A") + val endpointA = locationA.endpoint(Set(logName), Set()) + + val logA = endpointA.logs(logName) + val logIdA = endpointA.logId(logName) val actorIdA = "PA" val actorIdB = "PB" val actorIdC = "PC" - val probeA = new TestProbe(logNode.system) - val probeB = new TestProbe(logNode.system) - val probeC = new TestProbe(logNode.system) + val probeA = new TestProbe(locationA.system) + val probeB = new TestProbe(locationA.system) + val probeC = new TestProbe(locationA.system) - val actorA = logNode.system.actorOf(Props(new Collaborator(actorIdA, log, sharedClockEntry = false, Set("e1", "e3"), probeA.ref))) - val actorB = logNode.system.actorOf(Props(new Collaborator(actorIdB, log, sharedClockEntry = false, Set("e2", "e3"), probeB.ref))) - val actorC = logNode.system.actorOf(Props(new Collaborator(actorIdC, log, sharedClockEntry = true, Set("e1", "e2", "e3"), probeC.ref))) + val actorA = locationA.system.actorOf(Props(new Collaborator(actorIdA, logA, sharedClockEntry = false, Set("e1", "e3"), probeA.ref))) + val actorB = locationA.system.actorOf(Props(new Collaborator(actorIdB, logA, sharedClockEntry = false, Set("e2", "e3"), probeB.ref))) + val actorC = locationA.system.actorOf(Props(new Collaborator(actorIdC, logA, sharedClockEntry = true, Set("e1", "e2", "e3"), probeC.ref))) actorA ! "e1" probeA.expectMsg(("e1", VectorTime(actorIdA -> 1L), VectorTime(actorIdA -> 1L))) - probeC.expectMsg(("e1", VectorTime(actorIdA -> 1L), VectorTime(actorIdA -> 1L, logId -> 1L))) + probeC.expectMsg(("e1", VectorTime(actorIdA -> 1L), VectorTime(actorIdA -> 1L, logIdA -> 1L))) actorB ! "e2" probeB.expectMsg(("e2", VectorTime(actorIdB -> 1L), VectorTime(actorIdB -> 1L))) - probeC.expectMsg(("e2", VectorTime(actorIdB -> 1L), VectorTime(actorIdA -> 1L, actorIdB -> 1L, logId -> 2L))) + probeC.expectMsg(("e2", VectorTime(actorIdB -> 1L), VectorTime(actorIdA -> 1L, actorIdB -> 1L, logIdA -> 2L))) actorC ! "e3" - probeA.expectMsg(("e3", VectorTime(actorIdA -> 1L, actorIdB -> 1L, logId -> 3L), VectorTime(actorIdA -> 2L, actorIdB -> 1L, logId -> 3L))) - probeB.expectMsg(("e3", VectorTime(actorIdA -> 1L, actorIdB -> 1L, logId -> 3L), VectorTime(actorIdA -> 1L, actorIdB -> 2L, logId -> 3L))) - probeC.expectMsg(("e3", VectorTime(actorIdA -> 1L, actorIdB -> 1L, logId -> 3L), VectorTime(actorIdA -> 1L, actorIdB -> 1L, logId -> 3L))) + probeA.expectMsg(("e3", VectorTime(actorIdA -> 1L, actorIdB -> 1L, logIdA -> 3L), VectorTime(actorIdA -> 2L, actorIdB -> 1L, logIdA -> 3L))) + probeB.expectMsg(("e3", VectorTime(actorIdA -> 1L, actorIdB -> 1L, logIdA -> 3L), VectorTime(actorIdA -> 1L, actorIdB -> 2L, logIdA -> 3L))) + probeC.expectMsg(("e3", VectorTime(actorIdA -> 1L, actorIdB -> 1L, logIdA -> 3L), VectorTime(actorIdA -> 1L, actorIdB -> 1L, logIdA -> 3L))) } } } } -class EventsourcedActorCausalitySpecLeveldb extends EventsourcedActorCausalitySpec with EventLogCleanupLeveldb { - override val logFactory: String => Props = id => EventLogLifecycleLeveldb.TestEventLog.props(id, batching = true) +class EventsourcedActorCausalitySpecLeveldb extends EventsourcedActorCausalitySpec with MultiLocationSpecLeveldb { + override val logFactory: String => Props = id => SingleLocationSpecLeveldb.TestEventLog.props(id, batching = true) } -class EventsourcedActorCausalitySpecCassandra extends EventsourcedActorCausalitySpec with EventLogCleanupCassandra { - override val logFactory: String => Props = id => EventLogLifecycleCassandra.TestEventLog.props(id, batching = true) - - override def beforeAll(): Unit = { - super.beforeAll() - EmbeddedCassandraServerHelper.startEmbeddedCassandra(60000) - } - - override def node(nodeName: String, logNames: Set[String], port: Int, connections: Set[ReplicationConnection]): ReplicationNode = { - val node = super.node(nodeName, logNames, port, connections) - Cassandra(node.system) // enforce keyspace/schema setup - node - } +class EventsourcedActorCausalitySpecCassandra extends EventsourcedActorCausalitySpec with MultiLocationSpecCassandra { + override val logFactory: String => Props = id => SingleLocationSpecCassandra.TestEventLog.props(id, batching = true) } diff --git a/src/it/scala/com/rbmhtechnology/eventuate/EventsourcedActorIntegrationSpec.scala b/src/it/scala/com/rbmhtechnology/eventuate/EventsourcedActorIntegrationSpec.scala index 951689a9..342da101 100644 --- a/src/it/scala/com/rbmhtechnology/eventuate/EventsourcedActorIntegrationSpec.scala +++ b/src/it/scala/com/rbmhtechnology/eventuate/EventsourcedActorIntegrationSpec.scala @@ -21,9 +21,6 @@ import scala.util._ import akka.actor._ import akka.testkit._ -import com.rbmhtechnology.eventuate.log.EventLogLifecycleCassandra -import com.rbmhtechnology.eventuate.log.EventLogLifecycleLeveldb - import org.scalatest._ object EventsourcedActorIntegrationSpec { @@ -234,12 +231,9 @@ object EventsourcedActorIntegrationSpec { } } -abstract class EventsourcedActorIntegrationSpec extends TestKit(ActorSystem("test")) with WordSpecLike with Matchers with BeforeAndAfterEach { +abstract class EventsourcedActorIntegrationSpec extends TestKit(ActorSystem("test")) with WordSpecLike with Matchers with SingleLocationSpec { import EventsourcedActorIntegrationSpec._ - def log: ActorRef - def logId: String - var probe: TestProbe = _ override def beforeEach(): Unit = { @@ -430,10 +424,10 @@ abstract class EventsourcedActorIntegrationSpec extends TestKit(ActorSystem("tes } } -class EventsourcedActorIntegrationSpecLeveldb extends EventsourcedActorIntegrationSpec with EventLogLifecycleLeveldb { +class EventsourcedActorIntegrationSpecLeveldb extends EventsourcedActorIntegrationSpec with SingleLocationSpecLeveldb { override def batching = false } -class EventsourcedActorIntegrationSpecCassandra extends EventsourcedActorIntegrationSpec with EventLogLifecycleCassandra { +class EventsourcedActorIntegrationSpecCassandra extends EventsourcedActorIntegrationSpec with SingleLocationSpecCassandra { override def batching = false } \ No newline at end of file diff --git a/src/it/scala/com/rbmhtechnology/eventuate/EventsourcedActorThroughputSpec.scala b/src/it/scala/com/rbmhtechnology/eventuate/EventsourcedActorThroughputSpec.scala index 039c2a4f..c0eec212 100644 --- a/src/it/scala/com/rbmhtechnology/eventuate/EventsourcedActorThroughputSpec.scala +++ b/src/it/scala/com/rbmhtechnology/eventuate/EventsourcedActorThroughputSpec.scala @@ -22,9 +22,6 @@ import scala.util._ import akka.actor._ import akka.testkit._ -import com.rbmhtechnology.eventuate.log.EventLogLifecycleCassandra -import com.rbmhtechnology.eventuate.log.EventLogLifecycleLeveldb - import org.scalatest._ object EventsourcedActorThroughputSpec { @@ -88,10 +85,9 @@ object EventsourcedActorThroughputSpec { } } -abstract class EventsourcedActorThroughputSpec extends TestKit(ActorSystem("test")) with WordSpecLike with Matchers with BeforeAndAfterEach { +abstract class EventsourcedActorThroughputSpec extends TestKit(ActorSystem("test")) with WordSpecLike with Matchers with SingleLocationSpec { import EventsourcedActorThroughputSpec._ - def log: ActorRef var probe: TestProbe = _ override def beforeEach(): Unit = { @@ -154,5 +150,5 @@ abstract class EventsourcedActorThroughputSpec extends TestKit(ActorSystem("test } } -class EventsourcedActorThroughputSpecLeveldb extends EventsourcedActorThroughputSpec with EventLogLifecycleLeveldb -class EventsourcedActorThroughputSpecCassandra extends EventsourcedActorThroughputSpec with EventLogLifecycleCassandra +class EventsourcedActorThroughputSpecLeveldb extends EventsourcedActorThroughputSpec with SingleLocationSpecLeveldb +class EventsourcedActorThroughputSpecCassandra extends EventsourcedActorThroughputSpec with SingleLocationSpecCassandra diff --git a/src/it/scala/com/rbmhtechnology/eventuate/EventsourcedProcessorIntegrationSpec.scala b/src/it/scala/com/rbmhtechnology/eventuate/EventsourcedProcessorIntegrationSpec.scala index 9196f8c4..21a58735 100644 --- a/src/it/scala/com/rbmhtechnology/eventuate/EventsourcedProcessorIntegrationSpec.scala +++ b/src/it/scala/com/rbmhtechnology/eventuate/EventsourcedProcessorIntegrationSpec.scala @@ -19,9 +19,6 @@ package com.rbmhtechnology.eventuate import akka.actor._ import akka.testkit._ -import com.rbmhtechnology.eventuate.log.EventLogLifecycleCassandra -import com.rbmhtechnology.eventuate.log.EventLogLifecycleLeveldb - import org.scalatest._ import scala.util._ @@ -69,11 +66,9 @@ object EventsourcedProcessorIntegrationSpec { extends StatelessSampleProcessor(id, eventLog, targetEventLog, eventProbe, progressProbe) with StatefulProcessor } -abstract class EventsourcedProcessorIntegrationSpec extends TestKit(ActorSystem("test")) with WordSpecLike with Matchers with BeforeAndAfterEach { +abstract class EventsourcedProcessorIntegrationSpec extends TestKit(ActorSystem("test")) with WordSpecLike with Matchers with SingleLocationSpec { import EventsourcedProcessorIntegrationSpec._ - def log: ActorRef - def logId: String def logProps(logId: String): Props def sourceLog = log @@ -260,14 +255,14 @@ abstract class EventsourcedProcessorIntegrationSpec extends TestKit(ActorSystem( } } -class EventsourcedProcessorIntegrationSpecLeveldb extends EventsourcedProcessorIntegrationSpec with EventLogLifecycleLeveldb { +class EventsourcedProcessorIntegrationSpecLeveldb extends EventsourcedProcessorIntegrationSpec with SingleLocationSpecLeveldb { override def beforeEach(): Unit = { super.beforeEach() init() } } -class EventsourcedProcessorIntegrationSpecCassandra extends EventsourcedProcessorIntegrationSpec with EventLogLifecycleCassandra { +class EventsourcedProcessorIntegrationSpecCassandra extends EventsourcedProcessorIntegrationSpec with SingleLocationSpecCassandra { override def beforeEach(): Unit = { super.beforeEach() init() diff --git a/src/it/scala/com/rbmhtechnology/eventuate/ReplicationConfig.scala b/src/it/scala/com/rbmhtechnology/eventuate/LocationConfig.scala similarity index 91% rename from src/it/scala/com/rbmhtechnology/eventuate/ReplicationConfig.scala rename to src/it/scala/com/rbmhtechnology/eventuate/LocationConfig.scala index 0eed20e0..c40a8667 100644 --- a/src/it/scala/com/rbmhtechnology/eventuate/ReplicationConfig.scala +++ b/src/it/scala/com/rbmhtechnology/eventuate/LocationConfig.scala @@ -18,8 +18,8 @@ package com.rbmhtechnology.eventuate import com.typesafe.config._ -object ReplicationConfig { - def create(port: Int = 2552, customConfig: String = ""): Config = { +object LocationConfig { + def create(port: Int = 0, customConfig: Config = ConfigFactory.empty()): Config = { val defaultConfig = ConfigFactory.parseString( s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" @@ -41,6 +41,6 @@ object ReplicationConfig { |eventuate.snapshot.filesystem.dir = target/test-snapshot """.stripMargin) - ConfigFactory.parseString(customConfig).withFallback(defaultConfig) + customConfig.withFallback(defaultConfig) } } diff --git a/src/it/scala/com/rbmhtechnology/eventuate/LocationSpec.scala b/src/it/scala/com/rbmhtechnology/eventuate/LocationSpec.scala new file mode 100644 index 00000000..32f81a47 --- /dev/null +++ b/src/it/scala/com/rbmhtechnology/eventuate/LocationSpec.scala @@ -0,0 +1,195 @@ +/* + * Copyright (C) 2015 - 2016 Red Bull Media House GmbH - all rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.rbmhtechnology.eventuate + +import java.io.File + +import akka.actor._ +import akka.testkit.{TestProbe, TestKit} + +import com.rbmhtechnology.eventuate.ReplicationEndpoint._ +import com.rbmhtechnology.eventuate.log._ +import com.typesafe.config._ + +import org.apache.commons.io.FileUtils +import org.scalatest._ + +import scala.collection.immutable.Seq +import scala.concurrent._ +import scala.concurrent.duration._ + +trait LocationCleanup extends Suite with BeforeAndAfterAll { + def config: Config + + def storageLocations: List[File] + + override def beforeAll(): Unit = { + storageLocations.foreach(FileUtils.deleteDirectory) + storageLocations.foreach(_.mkdirs()) + } + + override def afterAll(): Unit = { + storageLocations.foreach(FileUtils.deleteDirectory) + } +} + +object SingleLocationSpec { + val ErrorSequenceNr = -1L + val IgnoreDeletedSequenceNr = -2L + + trait TestEventLog extends EventLog { + override def currentSystemTime: Long = 0L + + abstract override def read(fromSequenceNr: Long, toSequenceNr: Long, max: Int): Future[BatchReadResult] = + if (fromSequenceNr == ErrorSequenceNr) Future.failed(boom) else super.read(fromSequenceNr, toSequenceNr, max) + + abstract override def read(fromSequenceNr: Long, toSequenceNr: Long, max: Int, aggregateId: String): Future[BatchReadResult] = + if (fromSequenceNr == ErrorSequenceNr) Future.failed(boom) else super.read(fromSequenceNr, toSequenceNr, max, aggregateId) + + abstract override def replicationRead(fromSequenceNr: Long, toSequenceNr: Long, max: Int, filter: (DurableEvent) => Boolean): Future[BatchReadResult] = + if (fromSequenceNr == ErrorSequenceNr) Future.failed(boom) else super.replicationRead(fromSequenceNr, toSequenceNr, max, filter) + + abstract override def write(events: Seq[DurableEvent], partition: Long, clock: EventLogClock): Unit = + if (events.map(_.payload).contains("boom")) throw boom else super.write(events, partition, clock) + + override private[eventuate] def adjustFromSequenceNr(seqNr: Long) = seqNr match { + case ErrorSequenceNr => seqNr + case IgnoreDeletedSequenceNr => 0 + case s => super.adjustFromSequenceNr(s) + } + } +} + +trait SingleLocationSpec extends LocationCleanup with BeforeAndAfterEach { + private var _logCtr: Int = 0 + + override def beforeEach(): Unit = { + super.beforeEach() + _logCtr += 1 + } + + override def afterEach(): Unit = { + system.stop(log) + super.afterEach() + } + + override def afterAll(): Unit = { + TestKit.shutdownActorSystem(system) + super.afterAll() + } + + def config: Config = + system.settings.config + + def batching: Boolean = + true + + def logId: String = + _logCtr.toString + + def log: ActorRef + + def system: ActorSystem +} + +trait MultiLocationSpec extends LocationCleanup with BeforeAndAfterEach { + private var locations: List[Location] = Nil + private var ctr: Int = 0 + + override def config = + LocationConfig.create(port = 0) + + override def beforeEach(): Unit = + ctr += 1 + + override def afterEach(): Unit = + locations.foreach(location => Await.result(location.terminate(), 10.seconds)) + + def logFactory: String => Props + + def location(name: String, customPort: Int = 0, customConfig: Config = ConfigFactory.empty()): Location = { + val location = new Location(locationId(name), logFactory, customPort, customConfig) + registerLocation(location) + location + } + + private def registerLocation(location: Location): Location = { + locations = location :: locations + location + } + + private def locationId(locationName: String): String = + s"${locationName}_${ctr}" +} + +class Location(val id: String, logFactory: String => Props, customPort: Int, customConfig: Config) { + import Location._ + + val system: ActorSystem = + ActorSystem(ReplicationConnection.DefaultRemoteSystemName, LocationConfig.create(customPort, customConfig)) + + val port: Int = + if (customPort != 0) customPort else system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress.port.get + + val probe: TestProbe = + new TestProbe(system) + + def listener(eventLog: ActorRef): EventListener = + new EventListener(id, eventLog)(system) + + def endpoint( + logNames: Set[String], + connections: Set[ReplicationConnection], + applicationName: String = DefaultApplicationName, + applicationVersion: ApplicationVersion = DefaultApplicationVersion, + activate: Boolean = true): ReplicationEndpoint = { + + val endpoint = new ReplicationEndpoint(id, logNames, logFactory, connections, applicationName, applicationVersion)(system) + if (activate) endpoint.activate() + endpoint + } + + def terminate(): Future[Terminated] = + system.terminate() +} + +object Location { + class EventListener(locationId: String, eventLog: ActorRef)(implicit system: ActorSystem) extends TestProbe(system, s"EventListener-$locationId") { listener => + private class EventListenerView extends EventsourcedView { + override val id = + testActorName + + override val eventLog = + listener.eventLog + + override def onCommand = + Actor.emptyBehavior + + override def onEvent = { + case event => ref ! event + } + } + + system.actorOf(Props(new EventListenerView)) + + def waitForMessage(msg: Any): Any = + fishForMessage(hint = msg.toString) { + case `msg` => true + case _ => false + } + } +} diff --git a/src/it/scala/com/rbmhtechnology/eventuate/LocationSpecCassandra.scala b/src/it/scala/com/rbmhtechnology/eventuate/LocationSpecCassandra.scala new file mode 100644 index 00000000..ab0741e5 --- /dev/null +++ b/src/it/scala/com/rbmhtechnology/eventuate/LocationSpecCassandra.scala @@ -0,0 +1,151 @@ +/* + * Copyright (C) 2015 - 2016 Red Bull Media House GmbH - all rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.rbmhtechnology.eventuate + +import java.io.File + +import akka.actor._ +import akka.testkit._ + +import com.rbmhtechnology.eventuate.log._ +import com.rbmhtechnology.eventuate.log.cassandra._ +import com.rbmhtechnology.eventuate.log.cassandra.CassandraIndex.AggregateEvents +import com.typesafe.config._ + +import org.cassandraunit.utils.EmbeddedCassandraServerHelper + +import scala.concurrent._ + +trait LocationCleanupCassandra extends LocationCleanup { + override def storageLocations: List[File] = + List("eventuate.snapshot.filesystem.dir").map(s => new File(config.getString(s))) + + override def afterAll(): Unit = { + EmbeddedCassandraServerHelper.cleanEmbeddedCassandra() + super.afterAll() + } +} + +object SingleLocationSpecCassandra { + case class TestFailureSpec( + failOnClockRead: Boolean = false, + failBeforeIndexIncrementWrite: Boolean = false, + failAfterIndexIncrementWrite: Boolean = false) + + object TestEventLog { + def props(logId: String, batching: Boolean): Props = + props(logId, TestFailureSpec(), None, batching) + + def props(logId: String, failureSpec: TestFailureSpec, indexProbe: ActorRef, batching: Boolean): Props = + props(logId, failureSpec, Some(indexProbe), batching) + + def props(logId: String, failureSpec: TestFailureSpec, indexProbe: Option[ActorRef], batching: Boolean): Props = { + val logProps = Props(new TestEventLog(logId, failureSpec, indexProbe)).withDispatcher("eventuate.log.dispatchers.write-dispatcher") + Props(new CircuitBreaker(logProps, batching)) + } + } + + class TestEventLog(id: String, failureSpec: TestFailureSpec, indexProbe: Option[ActorRef]) + extends CassandraEventLog(id) with SingleLocationSpec.TestEventLog { + + override def currentSystemTime: Long = 0L + + override def unhandled(message: Any): Unit = message match { + case "boom" => + throw boom + case _ => + super.unhandled(message) + } + + private[eventuate] override def createIndexStore(cassandra: Cassandra, logId: String) = + new TestIndexStore(cassandra, logId, failureSpec) + + private[eventuate] override def onIndexEvent(event: Any): Unit = + indexProbe.foreach(_ ! event) + } + + class TestIndexStore(cassandra: Cassandra, logId: String, failureSpec: TestFailureSpec) extends CassandraIndexStore(cassandra, logId) { + private var writeIncrementFailed = false + private var readClockFailed = false + + override def writeAsync(aggregateEvents: AggregateEvents, clock: EventLogClock)(implicit executor: ExecutionContext): Future[EventLogClock] = + if (failureSpec.failBeforeIndexIncrementWrite && !writeIncrementFailed) { + writeIncrementFailed = true + Future.failed(boom) + } else if (failureSpec.failAfterIndexIncrementWrite && !writeIncrementFailed) { + writeIncrementFailed = true + for { + _ <- super.writeAsync(aggregateEvents, clock) + r <- Future.failed(boom) + } yield r + } else super.writeAsync(aggregateEvents, clock) + + override def readEventLogClockAsync(implicit executor: ExecutionContext): Future[EventLogClock] = + if (failureSpec.failOnClockRead && !readClockFailed) { + readClockFailed = true + Future.failed(boom) + } else super.readEventLogClockAsync + } +} + +trait SingleLocationSpecCassandra extends SingleLocationSpec with LocationCleanupCassandra { + import SingleLocationSpecCassandra._ + + private var _indexProbe: TestProbe = _ + private var _log: ActorRef = _ + + override def beforeEach(): Unit = { + super.beforeEach() + _indexProbe = new TestProbe(system) + _log = createLog(TestFailureSpec(), indexProbe.ref) + } + + override def beforeAll(): Unit = { + super.beforeAll() + EmbeddedCassandraServerHelper.startEmbeddedCassandra(60000) + } + + def createLog(failureSpec: TestFailureSpec, indexProbe: ActorRef): ActorRef = + system.actorOf(logProps(logId, failureSpec, indexProbe)) + + def indexProbe: TestProbe = + _indexProbe + + def log: ActorRef = + _log + + def logProps(logId: String): Props = + logProps(logId, TestFailureSpec(), system.deadLetters) + + def logProps(logId: String, failureSpec: TestFailureSpec, indexProbe: ActorRef): Props = + TestEventLog.props(logId, failureSpec, indexProbe, batching) +} + +trait MultiLocationSpecCassandra extends MultiLocationSpec with LocationCleanupCassandra { + override val logFactory: String => Props = id => CassandraEventLog.props(id) + + override def beforeAll(): Unit = { + super.beforeAll() + EmbeddedCassandraServerHelper.startEmbeddedCassandra(60000) + } + + override def location(name: String, customPort: Int = 0, customConfig: Config = ConfigFactory.empty()): Location = { + val location = super.location(name, customPort, customConfig) + Cassandra(location.system) // enforce keyspace/schema setup + location + } +} diff --git a/src/it/scala/com/rbmhtechnology/eventuate/LocationSpecLeveldb.scala b/src/it/scala/com/rbmhtechnology/eventuate/LocationSpecLeveldb.scala new file mode 100644 index 00000000..8d6432ed --- /dev/null +++ b/src/it/scala/com/rbmhtechnology/eventuate/LocationSpecLeveldb.scala @@ -0,0 +1,71 @@ +/* + * Copyright (C) 2015 - 2016 Red Bull Media House GmbH - all rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.rbmhtechnology.eventuate + +import java.io.File + +import akka.actor._ + +import com.rbmhtechnology.eventuate.log._ +import com.rbmhtechnology.eventuate.log.leveldb.LeveldbEventLog +import com.rbmhtechnology.eventuate.utilities.RestarterActor + +trait LocationCleanupLeveldb extends LocationCleanup { + override def storageLocations: List[File] = + List("eventuate.log.leveldb.dir", "eventuate.snapshot.filesystem.dir").map(s => new File(config.getString(s))) +} + +object SingleLocationSpecLeveldb { + object TestEventLog { + def props(logId: String, batching: Boolean): Props = { + val logProps = Props(new TestEventLog(logId)).withDispatcher("eventuate.log.dispatchers.write-dispatcher") + if (batching) Props(new BatchingLayer(logProps)) else logProps + } + } + + class TestEventLog(id: String) extends LeveldbEventLog(id, "log-test") with SingleLocationSpec.TestEventLog { + override def unhandled(message: Any): Unit = message match { + case "boom" => + throw boom + case "dir" => + sender() ! logDir + case _ => + super.unhandled(message) + } + } +} + +trait SingleLocationSpecLeveldb extends SingleLocationSpec with LocationCleanupLeveldb { + import SingleLocationSpecLeveldb._ + + private var _log: ActorRef = _ + + override def beforeEach(): Unit = { + super.beforeEach() + _log = system.actorOf(logProps(logId)) + } + + def log: ActorRef = + _log + + def logProps(logId: String): Props = + RestarterActor.props(TestEventLog.props(logId, batching)) +} + +trait MultiLocationSpecLeveldb extends MultiLocationSpec with LocationCleanupLeveldb { + override val logFactory: String => Props = id => LeveldbEventLog.props(id) +} diff --git a/src/it/scala/com/rbmhtechnology/eventuate/PersistOnEventIntegrationSpec.scala b/src/it/scala/com/rbmhtechnology/eventuate/PersistOnEventIntegrationSpec.scala index 41c6c6ba..d91fa1be 100644 --- a/src/it/scala/com/rbmhtechnology/eventuate/PersistOnEventIntegrationSpec.scala +++ b/src/it/scala/com/rbmhtechnology/eventuate/PersistOnEventIntegrationSpec.scala @@ -20,8 +20,6 @@ import akka.actor._ import akka.testkit._ import com.rbmhtechnology.eventuate.EventsourcedView.Handler -import com.rbmhtechnology.eventuate.log.EventLogLifecycleCassandra -import com.rbmhtechnology.eventuate.log.EventLogLifecycleLeveldb import org.scalatest._ @@ -49,12 +47,9 @@ object PersistOnEventIntegrationSpec { } } -abstract class PersistOnEventIntegrationSpec extends TestKit(ActorSystem("test")) with WordSpecLike with Matchers with BeforeAndAfterEach { +abstract class PersistOnEventIntegrationSpec extends TestKit(ActorSystem("test")) with WordSpecLike with Matchers with SingleLocationSpec { import PersistOnEventIntegrationSpec._ - def log: ActorRef - def logId: String - var probe: TestProbe = _ override def beforeEach(): Unit = { @@ -74,5 +69,5 @@ abstract class PersistOnEventIntegrationSpec extends TestKit(ActorSystem("test") } } -class PersistOnEventIntegrationSpecLeveldb extends PersistOnEventIntegrationSpec with EventLogLifecycleLeveldb -class PersistOnEventIntegrationSpecCassandra extends PersistOnEventIntegrationSpec with EventLogLifecycleCassandra \ No newline at end of file +class PersistOnEventIntegrationSpecLeveldb extends PersistOnEventIntegrationSpec with SingleLocationSpecLeveldb +class PersistOnEventIntegrationSpecCassandra extends PersistOnEventIntegrationSpec with SingleLocationSpecCassandra \ No newline at end of file diff --git a/src/it/scala/com/rbmhtechnology/eventuate/RecoverySpec.scala b/src/it/scala/com/rbmhtechnology/eventuate/RecoverySpec.scala index f912a518..32de3aaa 100644 --- a/src/it/scala/com/rbmhtechnology/eventuate/RecoverySpec.scala +++ b/src/it/scala/com/rbmhtechnology/eventuate/RecoverySpec.scala @@ -23,9 +23,9 @@ import akka.pattern.ask import akka.testkit.TestProbe import akka.util.Timeout -import com.rbmhtechnology.eventuate.log._ import com.rbmhtechnology.eventuate.log.leveldb.LeveldbEventLogSettings import com.rbmhtechnology.eventuate.utilities._ +import com.typesafe.config.ConfigFactory import org.apache.commons.io.FileUtils import org.scalatest._ @@ -47,14 +47,14 @@ object RecoverySpecLeveldb { } } - val config = + val config = ConfigFactory.parseString( """ |eventuate.log.replication.retry-delay = 1s |eventuate.log.replication.remote-read-timeout = 2s |eventuate.disaster-recovery.remote-operation-retry-max = 10 |eventuate.disaster-recovery.remote-operation-retry-delay = 1s |eventuate.disaster-recovery.remote-operation-timeout = 1s - """.stripMargin + """.stripMargin) def rootDirectory(target: ReplicationTarget): File = new File(new LeveldbEventLogSettings(target.endpoint.system.settings.config).rootDir) @@ -65,30 +65,20 @@ object RecoverySpecLeveldb { } } -class RecoverySpecLeveldb extends WordSpec with Matchers with ReplicationNodeRegistry with EventLogCleanupLeveldb { +class RecoverySpecLeveldb extends WordSpec with Matchers with MultiLocationSpec with LocationCleanupLeveldb { import ReplicationIntegrationSpec.replicationConnection import RecoverySpecLeveldb._ - val logFactory: String => Props = id => EventLogLifecycleLeveldb.TestEventLog.props(id, batching = true) + val customPort: Int = + 2555 - var ctr: Int = 0 + val logFactory: String => Props = + id => SingleLocationSpecLeveldb.TestEventLog.props(id, batching = true) - override def beforeEach(): Unit = - ctr += 1 - - def config = - ReplicationConfig.create() - - def nodeId(node: String): String = - s"${node}_${ctr}" - - def node(nodeName: String, logNames: Set[String], port: Int, connections: Set[ReplicationConnection], customConfig: String = "", activate: Boolean = false): ReplicationNode = - register(new ReplicationNode(nodeId(nodeName), logNames, logFactory, connections, port = port, customConfig = RecoverySpecLeveldb.config + customConfig, activate = activate)) - - def assertConvergence(expected: Set[String], nodes: ReplicationNode *): Unit = { - val probes = nodes.map { node => - val probe = new TestProbe(node.system) - node.system.actorOf(Props(new ConvergenceView(s"p-${node.id}", node.logs("L1"), expected.size, probe.ref))) + def assertConvergence(expected: Set[String], endpoints: ReplicationEndpoint *): Unit = { + val probes = endpoints.map { endpoint => + val probe = new TestProbe(endpoint.system) + endpoint.system.actorOf(Props(new ConvergenceView(s"p-${endpoint.id}", endpoint.logs("L1"), expected.size, probe.ref))) probe } probes.foreach(_.expectMsg(expected)) @@ -96,55 +86,68 @@ class RecoverySpecLeveldb extends WordSpec with Matchers with ReplicationNodeReg "Replication endpoint recovery" must { "disallow activation of endpoint during and after recovery" in { - node("B", Set("L1"), 2553, Set(replicationConnection(2552))).endpoint.activate() - val endpoint = node("A", Set("L1"), 2552, Set(replicationConnection(2553))).endpoint + val locationA = location("A", customConfig = RecoverySpecLeveldb.config) + val locationB = location("B", customConfig = RecoverySpecLeveldb.config) - val recovery = endpoint.recover() + val endpointA = locationA.endpoint(Set("L1"), Set(replicationConnection(locationB.port)), activate = false) + val endpointB = locationB.endpoint(Set("L1"), Set(replicationConnection(locationA.port))) - an [IllegalStateException] shouldBe thrownBy(endpoint.activate()) + val recovery = endpointA.recover() + + an [IllegalStateException] shouldBe thrownBy(endpointA.activate()) recovery.await - an [IllegalStateException] shouldBe thrownBy(endpoint.activate()) + an [IllegalStateException] shouldBe thrownBy(endpointA.activate()) } "fail when connected endpoint is unavailable" in { - val endpoint = node("A", Set("L1"), 2552, Set(replicationConnection(2553)), - customConfig = "eventuate.disaster-recovery.remote-operation-retry-max = 0") - .endpoint + val locationA = location("A", customConfig = ConfigFactory.parseString("eventuate.disaster-recovery.remote-operation-retry-max = 0").withFallback(RecoverySpecLeveldb.config)) + val endpointA = locationA.endpoint(Set("L1"), Set(replicationConnection(customPort))) - an [Exception] shouldBe thrownBy(endpoint.recover().await) + an [Exception] shouldBe thrownBy(endpointA.recover().await) } "succeed normally if the endpoint was healthy (but not convergent yet)" in { - def newNodeA = node("A", Set("L1"), 2552, Set(replicationConnection(2553))) - val nodeA = newNodeA - val nodeB = node("B", Set("L1"), 2553, Set(replicationConnection(2552))) + val locationB = location("B", customConfig = RecoverySpecLeveldb.config) + def newLocationA = location("A", customConfig = RecoverySpecLeveldb.config, customPort = customPort) + val locationA1 = newLocationA + + val endpointB = locationB.endpoint(Set("L1"), Set(replicationConnection(locationA1.port)), activate = false) + def newEndpointA(l: Location) = l.endpoint(Set("L1"), Set(replicationConnection(locationB.port)), activate = false) + val endpointA1 = newEndpointA(locationA1) - val targetA = nodeA.endpoint.target("L1") - val targetB = nodeB.endpoint.target("L1") + val targetA = endpointA1.target("L1") + val targetB = endpointB.target("L1") write(targetA, List("a1", "a2")) write(targetB, List("b1", "b2")) replicate(targetA, targetB, 1) replicate(targetB, targetA, 1) - nodeA.terminate().await - val restartedA = newNodeA + locationA1.terminate().await + + val locationA2 = newLocationA + val endpointA2 = newEndpointA(locationA2) - nodeB.endpoint.activate() - restartedA.endpoint.recover().await + endpointB.activate() + endpointA2.recover().await - assertConvergence(Set("a1", "a2", "b1", "b2"), restartedA, nodeB) + assertConvergence(Set("a1", "a2", "b1", "b2"), endpointA2, endpointB) } "repair inconsistencies of an endpoint that has lost all events" in { - val nodeA = node("A", Set("L1"), 2552, Set(replicationConnection(2555))) - val nodeB = node("B", Set("L1"), 2553, Set(replicationConnection(2555))) - val nodeC = node("C", Set("L1"), 2554, Set(replicationConnection(2555))) - def nodeD = node("D", Set("L1"), 2555, Set(replicationConnection(2552), replicationConnection(2553), replicationConnection(2554))) - - val nodeD1 = nodeD - - val targetA = nodeA.endpoint.target("L1") - val targetB = nodeB.endpoint.target("L1") - val targetC = nodeC.endpoint.target("L1") - val targetD1 = nodeD1.endpoint.target("L1") + val locationA = location("A", customConfig = RecoverySpecLeveldb.config) + val locationB = location("B", customConfig = RecoverySpecLeveldb.config) + val locationC = location("C", customConfig = RecoverySpecLeveldb.config) + def newLocationD = location("D", customConfig = RecoverySpecLeveldb.config, customPort = customPort) + val locationD1 = newLocationD + + val endpointA = locationA.endpoint(Set("L1"), Set(replicationConnection(locationD1.port)), activate = false) + val endpointB = locationB.endpoint(Set("L1"), Set(replicationConnection(locationD1.port)), activate = false) + val endpointC = locationC.endpoint(Set("L1"), Set(replicationConnection(locationD1.port)), activate = false) + def newEndpointD(l: Location) = l.endpoint(Set("L1"), Set(replicationConnection(locationA.port), replicationConnection(locationB.port), replicationConnection(locationC.port)), activate = false) + val endpointD1 = newEndpointD(locationD1) + + val targetA = endpointA.target("L1") + val targetB = endpointB.target("L1") + val targetC = endpointC.target("L1") + val targetD1 = endpointD1.target("L1") val logDirD = logDirectory(targetD1) @@ -163,66 +166,40 @@ class RecoverySpecLeveldb extends WordSpec with Matchers with ReplicationNodeReg replicate(targetD1, targetC) // what a disaster ... - nodeD1.terminate().await + locationD1.terminate().await FileUtils.deleteDirectory(logDirD) - nodeA.endpoint.activate() - nodeB.endpoint.activate() - nodeC.endpoint.activate() + endpointA.activate() + endpointB.activate() + endpointC.activate() // start node D again (no backup available) - val nodeD2 = nodeD + val locationD2 = newLocationD + val endpointD2 = newEndpointD(locationD2) - nodeD2.endpoint.recover().await + endpointD2.recover().await // disclose bug #152 (writing new events is allowed after successful recovery) - write(nodeD2.endpoint.target("L1"), List("d1")) - - assertConvergence(Set("a", "b", "c", "d", "d1"), nodeA, nodeB, nodeC, nodeD2) - } - "repair inconsistencies if recovery was stopped during event recovery and restarted" in { - def newNodeA = node("A", Set("L1"), 2552, Set(replicationConnection(2553)), customConfig = "eventuate.log.write-batch-size = 1") - val nodeA = newNodeA - val nodeB = node("B", Set("L1"), 2553, Set(replicationConnection(2552))) - - nodeA.endpoint.activate() - nodeB.endpoint.activate() - val targetA = nodeA.endpoint.target("L1") - val logDirA = logDirectory(targetA) - val targetB = nodeB.endpoint.target("L1") - - val as = (0 to 5).map("A" + _) - val bs = (0 to 5).map("B" + _) - val all = as.toSet ++ bs.toSet - write(targetA, as) - write(targetB, bs) - assertConvergence(all, nodeA, nodeB) - - nodeA.terminate().await - FileUtils.deleteDirectory(logDirA) - - val restartedA = newNodeA - restartedA.endpoint.recover() - restartedA.eventListener("L1").waitForMessage("A1") - - restartedA.terminate().await - - val restartedA2 = newNodeA - restartedA2.endpoint.recover().await + write(endpointD2.target("L1"), List("d1")) - assertConvergence(all, restartedA2, nodeB) + assertConvergence(Set("a", "b", "c", "d", "d1"), endpointA, endpointB, endpointC, endpointD2) } "repair inconsistencies of an endpoint that has lost all events but has been partially recovered from a storage backup" in { - val nodeA = node("A", Set("L1"), 2552, Set(replicationConnection(2555))) - val nodeB = node("B", Set("L1"), 2553, Set(replicationConnection(2555))) - val nodeC = node("C", Set("L1"), 2554, Set(replicationConnection(2555))) - def nodeD = node("D", Set("L1"), 2555, Set(replicationConnection(2552), replicationConnection(2553), replicationConnection(2554))) - - val nodeD1 = nodeD - - val targetA = nodeA.endpoint.target("L1") - val targetB = nodeB.endpoint.target("L1") - val targetC = nodeC.endpoint.target("L1") - val targetD1 = nodeD1.endpoint.target("L1") + val locationA = location("A", customConfig = RecoverySpecLeveldb.config) + val locationB = location("B", customConfig = RecoverySpecLeveldb.config) + val locationC = location("C", customConfig = RecoverySpecLeveldb.config) + def newLocationD = location("D", customConfig = RecoverySpecLeveldb.config, customPort = customPort) + val locationD1 = newLocationD + + val endpointA = locationA.endpoint(Set("L1"), Set(replicationConnection(locationD1.port)), activate = false) + val endpointB = locationB.endpoint(Set("L1"), Set(replicationConnection(locationD1.port)), activate = false) + val endpointC = locationC.endpoint(Set("L1"), Set(replicationConnection(locationD1.port)), activate = false) + def newEndpointD(l: Location) = l.endpoint(Set("L1"), Set(replicationConnection(locationA.port), replicationConnection(locationB.port), replicationConnection(locationC.port)), activate = false) + val endpointD1 = newEndpointD(locationD1) + + val targetA = endpointA.target("L1") + val targetB = endpointB.target("L1") + val targetC = endpointC.target("L1") + val targetD1 = endpointD1.target("L1") val rootDirD = rootDirectory(targetD1) val logDirD = logDirectory(targetD1) @@ -236,11 +213,12 @@ class RecoverySpecLeveldb extends WordSpec with Matchers with ReplicationNodeReg write(targetC, List("c")) replicate(targetB, targetD1) - nodeD1.terminate().await + locationD1.terminate().await FileUtils.copyDirectory(logDirD, bckDirD) - val nodeD2 = nodeD - val targetD2 = nodeD2.endpoint.target("L1") + val locationD2 = newLocationD + val endpointD2 = newEndpointD(locationD2) + val targetD2 = endpointD2.target("L1") replicate(targetC, targetD2) replicate(targetD2, targetB) @@ -250,56 +228,102 @@ class RecoverySpecLeveldb extends WordSpec with Matchers with ReplicationNodeReg replicate(targetD2, targetC) // what a disaster ... - nodeD2.terminate().await + locationD2.terminate().await FileUtils.deleteDirectory(logDirD) // install a backup FileUtils.copyDirectory(bckDirD, logDirD) - nodeA.endpoint.activate() - nodeB.endpoint.activate() - nodeC.endpoint.activate() + endpointA.activate() + endpointB.activate() + endpointC.activate() // start node D again (with backup available) - val nodeD3 = nodeD + val locationD3 = newLocationD + val endpointD3 = newEndpointD(locationD3) + + endpointD3.recover().await + + assertConvergence(Set("a", "b", "c", "d"), endpointA, endpointB, endpointC, endpointD3) + } + "repair inconsistencies if recovery was stopped during event recovery and restarted" in { + val config = ConfigFactory.parseString("eventuate.log.write-batch-size = 1").withFallback(RecoverySpecLeveldb.config) + + val locationB = location("B", customConfig = config) + def newLocationA = location("A", customConfig = config, customPort = customPort) + val locationA1 = newLocationA + + val endpointB = locationB.endpoint(Set("L1"), Set(replicationConnection(locationA1.port)), activate = true) + def newEndpointA(l: Location) = l.endpoint(Set("L1"), Set(replicationConnection(locationB.port)), activate = false) + val endpointA1 = newEndpointA(locationA1) + + val targetA = endpointA1.target("L1") + val logDirA = logDirectory(targetA) + val targetB = endpointB.target("L1") + + val as = (0 to 5).map("A" + _) + val bs = (0 to 5).map("B" + _) + val all = as.toSet ++ bs.toSet + + endpointA1.activate() + + write(targetA, as) + write(targetB, bs) + assertConvergence(all, endpointA1, endpointB) - nodeD3.endpoint.recover().await + locationA1.terminate().await + FileUtils.deleteDirectory(logDirA) + + val locationA2 = newLocationA + val endpointA2 = newEndpointA(locationA2) + + endpointA2.recover() + locationA2.listener(endpointA2.logs("L1")).waitForMessage("A1") + locationA2.terminate().await - assertConvergence(Set("a", "b", "c", "d"), nodeA, nodeB, nodeC, nodeD3) + val locationA3 = newLocationA + val endpointA3 = newEndpointA(locationA3) + + endpointA3.recover().await + + assertConvergence(all, endpointA3, endpointB) } } "A replication endpoint" must { + def createEndpoint: ReplicationEndpoint = + location("A", customConfig = RecoverySpecLeveldb.config).endpoint(Set("L1"), Set(replicationConnection(customPort)), activate = false) + "not allow concurrent recoveries" in { - val nodeA = node("A", Set("L1"), 2552, Set(replicationConnection(2553))) + val endpoint = createEndpoint - nodeA.endpoint.recover() + endpoint.recover() intercept[IllegalStateException] { - nodeA.endpoint.recover().await + endpoint.recover().await } } "not allow concurrent recovery and activation" in { - val nodeA = node("A", Set("L1"), 2552, Set(replicationConnection(2553))) + val endpoint = createEndpoint - nodeA.endpoint.recover() + endpoint.recover() intercept[IllegalStateException] { - nodeA.endpoint.activate() + endpoint.activate() } } "not allow activated endpoints to be recovered" in { - val nodeA = node("A", Set("L1"), 2552, Set(replicationConnection(2553))) + val endpoint = createEndpoint - nodeA.endpoint.activate() + endpoint.activate() intercept[IllegalStateException] { - nodeA.endpoint.recover().await + endpoint.recover().await } } "not allow multiple activations" in { - val nodeA = node("A", Set("L1"), 2552, Set(replicationConnection(2553))) + val endpoint = createEndpoint - nodeA.endpoint.activate() + endpoint.activate() intercept[IllegalStateException] { - nodeA.endpoint.activate() + endpoint.activate() } } } diff --git a/src/it/scala/com/rbmhtechnology/eventuate/ReplicationCycleSpec.scala b/src/it/scala/com/rbmhtechnology/eventuate/ReplicationCycleSpec.scala index c9d74bad..ca687f34 100644 --- a/src/it/scala/com/rbmhtechnology/eventuate/ReplicationCycleSpec.scala +++ b/src/it/scala/com/rbmhtechnology/eventuate/ReplicationCycleSpec.scala @@ -17,44 +17,29 @@ package com.rbmhtechnology.eventuate import akka.actor._ -import akka.testkit.TestProbe -import com.rbmhtechnology.eventuate.log._ -import com.rbmhtechnology.eventuate.log.cassandra._ -import com.rbmhtechnology.eventuate.log.leveldb._ - -import org.cassandraunit.utils.EmbeddedCassandraServerHelper import org.scalatest._ import scala.concurrent.Future -abstract class ReplicationCycleSpec extends WordSpec with Matchers with ReplicationNodeRegistry { +abstract class ReplicationCycleSpec extends WordSpec with Matchers with MultiLocationSpec { import ReplicationIntegrationSpec._ - def logFactory: String => Props - - var ctr: Int = 0 - - override def beforeEach(): Unit = - ctr += 1 - - def config = - ReplicationConfig.create() + var locationA: Location = _ + var locationB: Location = _ + var locationC: Location = _ - def nodeId(node: String): String = - s"${node}_${ctr}" - - def node(nodeName: String, logNames: Set[String], port: Int, connections: Set[ReplicationConnection]): ReplicationNode = - register(new ReplicationNode(nodeId(nodeName), logNames, logFactory, connections, port = port, customConfig = "eventuate.log.write-batch-size = 50")) - - def testReplication(nodeA: ReplicationNode, nodeB: ReplicationNode, nodeC: ReplicationNode): Unit = { - val probeA = new TestProbe(nodeA.system) - val probeB = new TestProbe(nodeB.system) - val probeC = new TestProbe(nodeC.system) + override def beforeEach(): Unit = { + super.beforeEach() + locationA = location("A") + locationB = location("B") + locationC = location("C") + } - val actorA = nodeA.system.actorOf(Props(new ReplicatedActor("pa", nodeA.logs("L1"), probeA.ref))) - val actorB = nodeB.system.actorOf(Props(new ReplicatedActor("pb", nodeB.logs("L1"), probeB.ref))) - val actorC = nodeB.system.actorOf(Props(new ReplicatedActor("pc", nodeC.logs("L1"), probeC.ref))) + def testReplication(endpointA: ReplicationEndpoint, endpointB: ReplicationEndpoint, endpointC: ReplicationEndpoint): Unit = { + val actorA = locationA.system.actorOf(Props(new ReplicatedActor("pa", endpointA.logs("L1"), locationA.probe.ref))) + val actorB = locationB.system.actorOf(Props(new ReplicatedActor("pb", endpointB.logs("L1"), locationB.probe.ref))) + val actorC = locationC.system.actorOf(Props(new ReplicatedActor("pc", endpointC.logs("L1"), locationC.probe.ref))) actorA ! "a1" actorB ! "b1" @@ -62,9 +47,9 @@ abstract class ReplicationCycleSpec extends WordSpec with Matchers with Replicat val expected = List("a1", "b1", "c1") - val eventsA = probeA.expectMsgAllOf(expected: _*) - val eventsB = probeB.expectMsgAllOf(expected: _*) - val eventsC = probeC.expectMsgAllOf(expected: _*) + val eventsA = locationA.probe.expectMsgAllOf(expected: _*) + val eventsB = locationB.probe.expectMsgAllOf(expected: _*) + val eventsC = locationC.probe.expectMsgAllOf(expected: _*) val num = 100 val expectedA = 2 to num map { i => s"a$i"} @@ -72,49 +57,33 @@ abstract class ReplicationCycleSpec extends WordSpec with Matchers with Replicat val expectedC = 2 to num map { i => s"c$i"} val all = expectedA ++ expectedB ++ expectedC - import nodeA.system.dispatcher + import endpointA.system.dispatcher Future(expectedA.foreach(s => actorA ! s)) Future(expectedB.foreach(s => actorB ! s)) Future(expectedC.foreach(s => actorC ! s)) - probeA.expectMsgAllOf(all: _*) - probeB.expectMsgAllOf(all: _*) - probeC.expectMsgAllOf(all: _*) + locationA.probe.expectMsgAllOf(all: _*) + locationB.probe.expectMsgAllOf(all: _*) + locationC.probe.expectMsgAllOf(all: _*) } "Event log replication" must { "support bi-directional cyclic replication networks" in { testReplication( - node("A", Set("L1"), 2552, Set(replicationConnection(2553), replicationConnection(2554))), - node("B", Set("L1"), 2553, Set(replicationConnection(2552), replicationConnection(2554))), - node("C", Set("L1"), 2554, Set(replicationConnection(2552), replicationConnection(2553)))) + locationA.endpoint(Set("L1"), Set(replicationConnection(locationB.port), replicationConnection(locationC.port))), + locationB.endpoint(Set("L1"), Set(replicationConnection(locationA.port), replicationConnection(locationC.port))), + locationC.endpoint(Set("L1"), Set(replicationConnection(locationA.port), replicationConnection(locationB.port)))) } "support uni-directional cyclic replication networks" in { testReplication( - node("A", Set("L1"), 2552, Set(replicationConnection(2553))), - node("B", Set("L1"), 2553, Set(replicationConnection(2554))), - node("C", Set("L1"), 2554, Set(replicationConnection(2552)))) + locationA.endpoint(Set("L1"), Set(replicationConnection(locationB.port))), + locationB.endpoint(Set("L1"), Set(replicationConnection(locationC.port))), + locationC.endpoint(Set("L1"), Set(replicationConnection(locationA.port)))) } } } -class ReplicationCycleSpecLeveldb extends ReplicationCycleSpec with EventLogCleanupLeveldb { - override val logFactory: String => Props = id => LeveldbEventLog.props(id) -} - -class ReplicationCycleSpecCassandra extends ReplicationCycleSpec with EventLogCleanupCassandra { - override val logFactory: String => Props = id => CassandraEventLog.props(id) - - override def beforeAll(): Unit = { - super.beforeAll() - EmbeddedCassandraServerHelper.startEmbeddedCassandra(60000) - } - - override def node(nodeName: String, logNames: Set[String], port: Int, connections: Set[ReplicationConnection]): ReplicationNode = { - val node = super.node(nodeName, logNames, port, connections) - Cassandra(node.system) // enforce keyspace/schema setup - node - } -} +class ReplicationCycleSpecLeveldb extends ReplicationCycleSpec with MultiLocationSpecLeveldb +class ReplicationCycleSpecCassandra extends ReplicationCycleSpec with MultiLocationSpecCassandra diff --git a/src/it/scala/com/rbmhtechnology/eventuate/ReplicationIntegrationSpec.scala b/src/it/scala/com/rbmhtechnology/eventuate/ReplicationIntegrationSpec.scala index 4f4e7de7..cce63e3a 100644 --- a/src/it/scala/com/rbmhtechnology/eventuate/ReplicationIntegrationSpec.scala +++ b/src/it/scala/com/rbmhtechnology/eventuate/ReplicationIntegrationSpec.scala @@ -19,14 +19,9 @@ package com.rbmhtechnology.eventuate import akka.actor._ import akka.testkit.TestProbe -import com.rbmhtechnology.eventuate.ReplicationEndpoint._ import com.rbmhtechnology.eventuate.ReplicationFilter.NoFilter import com.rbmhtechnology.eventuate.ReplicationProtocol._ -import com.rbmhtechnology.eventuate.log._ -import com.rbmhtechnology.eventuate.log.cassandra._ -import com.rbmhtechnology.eventuate.log.leveldb._ -import org.cassandraunit.utils.EmbeddedCassandraServerHelper import org.scalatest._ import scala.collection.immutable.Seq @@ -60,28 +55,10 @@ object ReplicationIntegrationSpec { ReplicationConnection("127.0.0.1", port, filters) } -abstract class ReplicationIntegrationSpec extends WordSpec with Matchers with ReplicationNodeRegistry { +abstract class ReplicationIntegrationSpec extends WordSpec with Matchers with MultiLocationSpec { import ReplicationIntegrationSpec._ - def logFactory: String => Props - - var ctr: Int = 0 - - override def beforeEach(): Unit = - ctr += 1 - - def config = - ReplicationConfig.create() - - def nodeId(node: String): String = - s"${node}_${ctr}" - - def node(nodeName: String, logNames: Set[String], port: Int, connections: Set[ReplicationConnection], - applicationName: String = DefaultApplicationName, - applicationVersion: ApplicationVersion = DefaultApplicationVersion): ReplicationNode = { - - register(new ReplicationNode(nodeId(nodeName), logNames, logFactory, connections, applicationName, applicationVersion, port)) - } + def customPort: Int def assertPartialOrder[A](events: Seq[A], sample: A*): Unit = { val indices = sample.map(events.indexOf) @@ -90,17 +67,17 @@ abstract class ReplicationIntegrationSpec extends WordSpec with Matchers with Re "Event log replication" must { "replicate all events by default" in { - val nodeA = node("A", Set("L1"), 2552, Set(replicationConnection(2553))) - val nodeB = node("B", Set("L1"), 2553, Set(replicationConnection(2552), replicationConnection(2554))) - val nodeC = node("C", Set("L1"), 2554, Set(replicationConnection(2553))) + val locationA = location("A") + val locationB = location("B") + val locationC = location("C") - val probeA = new TestProbe(nodeA.system) - val probeB = new TestProbe(nodeB.system) - val probeC = new TestProbe(nodeC.system) + val endpointA = locationA.endpoint(Set("L1"), Set(replicationConnection(locationB.port))) + val endpointB = locationB.endpoint(Set("L1"), Set(replicationConnection(locationA.port), replicationConnection(locationC.port))) + val endpointC = locationC.endpoint(Set("L1"), Set(replicationConnection(locationB.port))) - val actorA = nodeA.system.actorOf(Props(new ReplicatedActor("pa", nodeA.logs("L1"), probeA.ref))) - val actorB = nodeB.system.actorOf(Props(new ReplicatedActor("pb", nodeB.logs("L1"), probeB.ref))) - val actorC = nodeC.system.actorOf(Props(new ReplicatedActor("pc", nodeC.logs("L1"), probeC.ref))) + val actorA = locationA.system.actorOf(Props(new ReplicatedActor("pa", endpointA.logs("L1"), locationA.probe.ref))) + val actorB = locationB.system.actorOf(Props(new ReplicatedActor("pb", endpointB.logs("L1"), locationB.probe.ref))) + val actorC = locationC.system.actorOf(Props(new ReplicatedActor("pc", endpointC.logs("L1"), locationC.probe.ref))) actorA ! "a1" actorA ! "a2" @@ -116,9 +93,9 @@ abstract class ReplicationIntegrationSpec extends WordSpec with Matchers with Re val expected = List("a1", "a2", "a3", "b1", "b2", "b3", "c1", "c2", "c3") - val eventsA = probeA.expectMsgAllOf(expected: _*) - val eventsB = probeB.expectMsgAllOf(expected: _*) - val eventsC = probeC.expectMsgAllOf(expected: _*) + val eventsA = locationA.probe.expectMsgAllOf(expected: _*) + val eventsB = locationB.probe.expectMsgAllOf(expected: _*) + val eventsC = locationC.probe.expectMsgAllOf(expected: _*) def assertPartialOrderOnAllReplicas(sample: String*): Unit = { assertPartialOrder(eventsA, sample) @@ -131,14 +108,14 @@ abstract class ReplicationIntegrationSpec extends WordSpec with Matchers with Re assertPartialOrderOnAllReplicas("c1", "c2", "c3") } "replicate events based on filter criteria" in { - val nodeA = node("A", Set("L1"), 2552, Set(replicationConnection(2553, Map("L1" -> new PayloadEqualityFilter("b2"))))) - val nodeB = node("B", Set("L1"), 2553, Set(replicationConnection(2552, Map("L1" -> new PayloadEqualityFilter("a2"))))) + val locationA = location("A") + val locationB = location("B") - val probeA = new TestProbe(nodeA.system) - val probeB = new TestProbe(nodeB.system) + val endpointA = locationA.endpoint(Set("L1"), Set(replicationConnection(locationB.port, Map("L1" -> new PayloadEqualityFilter("b2"))))) + val endpointB = locationB.endpoint(Set("L1"), Set(replicationConnection(locationA.port, Map("L1" -> new PayloadEqualityFilter("a2"))))) - val actorA = nodeA.system.actorOf(Props(new ReplicatedActor("pa", nodeA.logs("L1"), probeA.ref))) - val actorB = nodeB.system.actorOf(Props(new ReplicatedActor("pb", nodeB.logs("L1"), probeB.ref))) + val actorA = locationA.system.actorOf(Props(new ReplicatedActor("pa", endpointA.logs("L1"), locationA.probe.ref))) + val actorB = locationB.system.actorOf(Props(new ReplicatedActor("pb", endpointB.logs("L1"), locationB.probe.ref))) actorA ! "a1" actorA ! "a2" @@ -148,76 +125,83 @@ abstract class ReplicationIntegrationSpec extends WordSpec with Matchers with Re actorB ! "b2" actorB ! "b3" - val eventsA = probeA.expectMsgAllOf("a1", "a2", "a3", "b2") - val eventsB = probeB.expectMsgAllOf("b1", "b2", "b3", "a2") + val eventsA = locationA.probe.expectMsgAllOf("a1", "a2", "a3", "b2") + val eventsB = locationB.probe.expectMsgAllOf("b1", "b2", "b3", "a2") } "immediately attempt next batch if last replicated batch was not empty" in { - val nodeA = node("A", Set("L1"), 2552, Set(replicationConnection(2553))) - val nodeB = node("B", Set("L1"), 2553, Set(replicationConnection(2552))) + val locationA = location("A") + val locationB = location("B") - val probeB = new TestProbe(nodeB.system) + val endpointA = locationA.endpoint(Set("L1"), Set(replicationConnection(locationB.port))) + val endpointB = locationB.endpoint(Set("L1"), Set(replicationConnection(locationA.port))) - val actorA = nodeA.system.actorOf(Props(new ReplicatedActor("pa", nodeA.logs("L1"), nodeA.system.deadLetters))) - val actorB = nodeB.system.actorOf(Props(new ReplicatedActor("pb", nodeB.logs("L1"), probeB.ref))) + val actorA = locationA.system.actorOf(Props(new ReplicatedActor("pa", endpointA.logs("L1"), locationA.system.deadLetters))) + val actorB = locationB.system.actorOf(Props(new ReplicatedActor("pb", endpointB.logs("L1"), locationB.probe.ref))) val num = 100 1 to num foreach { i => actorA ! s"a${i}" } - 1 to num foreach { i => probeB.expectMsg(s"a${i}") } + 1 to num foreach { i => locationB.probe.expectMsg(s"a${i}") } } "detect replication server availability" in { import ReplicationEndpoint._ - val nodeA = node("A", Set("L1"), 2552, Set(replicationConnection(2553))) - val nodeB = node("B", Set("L1"), 2553, Set(replicationConnection(2552))) + val locationA = location("A") + val locationB = location("B") - val probeA = new TestProbe(nodeA.system) - val probeB = new TestProbe(nodeB.system) + val endpointA = locationA.endpoint(Set("L1"), Set(replicationConnection(locationB.port))) + val endpointB = locationB.endpoint(Set("L1"), Set(replicationConnection(locationA.port))) - nodeA.system.eventStream.subscribe(probeA.ref, classOf[Available]) - nodeB.system.eventStream.subscribe(probeB.ref, classOf[Available]) + endpointA.system.eventStream.subscribe(locationA.probe.ref, classOf[Available]) + endpointB.system.eventStream.subscribe(locationB.probe.ref, classOf[Available]) - probeA.expectMsg(Available(nodeId("B"), "L1")) - probeB.expectMsg(Available(nodeId("A"), "L1")) + locationA.probe.expectMsg(Available(endpointB.id, "L1")) + locationB.probe.expectMsg(Available(endpointA.id, "L1")) } "detect replication server unavailability" in { import ReplicationEndpoint._ - val nodeA = node("A", Set("L1"), 2552, Set(replicationConnection(2553))) - val nodeB1 = node("B", Set("L1"), 2553, Set(replicationConnection(2552))) + val locationA = location("A") + val locationB1 = location("B", customPort = customPort) + + val endpointA = locationA.endpoint(Set("L1"), Set(replicationConnection(locationB1.port))) + val endpointB1 = locationB1.endpoint(Set("L1"), Set(replicationConnection(locationA.port))) - val probeAvailable1 = new TestProbe(nodeA.system) - val probeAvailable2 = new TestProbe(nodeA.system) - val probeUnavailable = new TestProbe(nodeA.system) + val probeAvailable1 = new TestProbe(locationA.system) + val probeAvailable2 = new TestProbe(locationA.system) + val probeUnavailable = new TestProbe(locationA.system) - nodeA.system.eventStream.subscribe(probeAvailable1.ref, classOf[Available]) - nodeA.system.eventStream.subscribe(probeUnavailable.ref, classOf[Unavailable]) + locationA.system.eventStream.subscribe(probeAvailable1.ref, classOf[Available]) + locationA.system.eventStream.subscribe(probeUnavailable.ref, classOf[Unavailable]) - probeAvailable1.expectMsg(Available(nodeId("B"), "L1")) - Await.result(nodeB1.terminate(), 10.seconds) - probeUnavailable.expectMsg(Unavailable(nodeId("B"), "L1")) + probeAvailable1.expectMsg(Available(endpointB1.id, "L1")) + Await.result(locationB1.terminate(), 10.seconds) + probeUnavailable.expectMsg(Unavailable(endpointB1.id, "L1")) - // start replication node B again - node("B", Set("L1"), 2553, Set(replicationConnection(2552))) + val locationB2 = location("B", customPort = customPort) + val endpointB2 = locationB2.endpoint(Set("L1"), Set(replicationConnection(locationA.port))) - nodeA.system.eventStream.subscribe(probeAvailable2.ref, classOf[Available]) - probeAvailable2.expectMsg(Available(nodeId("B"), "L1")) + locationA.system.eventStream.subscribe(probeAvailable2.ref, classOf[Available]) + probeAvailable2.expectMsg(Available(endpointB2.id, "L1")) } "support multiple logs per replication endpoint" in { val logNames = Set("L1", "L2") - val nodeA = node("A", logNames, 2552, Set(replicationConnection(2553))) - val nodeB = node("B", logNames, 2553, Set(replicationConnection(2552))) + val locationA = location("A") + val locationB = location("B") - val probeAL1 = new TestProbe(nodeA.system) - val probeAL2 = new TestProbe(nodeA.system) - val probeBL1 = new TestProbe(nodeB.system) - val probeBL2 = new TestProbe(nodeB.system) + val endpointA = locationA.endpoint(logNames, Set(replicationConnection(locationB.port))) + val endpointB = locationB.endpoint(logNames, Set(replicationConnection(locationA.port))) - val actorAL1 = nodeA.system.actorOf(Props(new ReplicatedActor("pa1", nodeA.logs("L1"), probeAL1.ref))) - val actorAL2 = nodeA.system.actorOf(Props(new ReplicatedActor("pa2", nodeA.logs("L2"), probeAL2.ref))) - val actorBL1 = nodeB.system.actorOf(Props(new ReplicatedActor("pb1", nodeB.logs("L1"), probeBL1.ref))) - val actorBL2 = nodeB.system.actorOf(Props(new ReplicatedActor("pb2", nodeB.logs("L2"), probeBL2.ref))) + val probeAL1 = new TestProbe(locationA.system) + val probeAL2 = new TestProbe(locationA.system) + val probeBL1 = new TestProbe(locationB.system) + val probeBL2 = new TestProbe(locationB.system) + + val actorAL1 = locationA.system.actorOf(Props(new ReplicatedActor("pa1", endpointA.logs("L1"), probeAL1.ref))) + val actorAL2 = locationA.system.actorOf(Props(new ReplicatedActor("pa2", endpointA.logs("L2"), probeAL2.ref))) + val actorBL1 = locationB.system.actorOf(Props(new ReplicatedActor("pb1", endpointB.logs("L1"), probeBL1.ref))) + val actorBL2 = locationB.system.actorOf(Props(new ReplicatedActor("pb2", endpointB.logs("L2"), probeBL2.ref))) actorAL1 ! "a" actorBL1 ! "b" @@ -235,18 +219,21 @@ abstract class ReplicationIntegrationSpec extends WordSpec with Matchers with Re val logNamesA = Set("L1", "L2") val logNamesB = Set("L2", "L3") - val nodeA = node("A", logNamesA, 2552, Set(replicationConnection(2553))) - val nodeB = node("B", logNamesB, 2553, Set(replicationConnection(2552))) + val locationA = location("A") + val locationB = location("B") + + val endpointA = locationA.endpoint(logNamesA, Set(replicationConnection(locationB.port))) + val endpointB = locationB.endpoint(logNamesB, Set(replicationConnection(locationA.port))) - val probeAL1 = new TestProbe(nodeA.system) - val probeAL2 = new TestProbe(nodeA.system) - val probeBL2 = new TestProbe(nodeB.system) - val probeBL3 = new TestProbe(nodeB.system) + val probeAL1 = new TestProbe(locationA.system) + val probeAL2 = new TestProbe(locationA.system) + val probeBL2 = new TestProbe(locationB.system) + val probeBL3 = new TestProbe(locationB.system) - val actorAL1 = nodeA.system.actorOf(Props(new ReplicatedActor("pa1", nodeA.logs("L1"), probeAL1.ref))) - val actorAL2 = nodeA.system.actorOf(Props(new ReplicatedActor("pa2", nodeA.logs("L2"), probeAL2.ref))) - val actorBL2 = nodeB.system.actorOf(Props(new ReplicatedActor("pb2", nodeB.logs("L2"), probeBL2.ref))) - val actorBL3 = nodeB.system.actorOf(Props(new ReplicatedActor("pb3", nodeB.logs("L3"), probeBL3.ref))) + val actorAL1 = locationA.system.actorOf(Props(new ReplicatedActor("pa1", endpointA.logs("L1"), probeAL1.ref))) + val actorAL2 = locationA.system.actorOf(Props(new ReplicatedActor("pa2", endpointA.logs("L2"), probeAL2.ref))) + val actorBL2 = locationB.system.actorOf(Props(new ReplicatedActor("pb2", endpointB.logs("L2"), probeBL2.ref))) + val actorBL3 = locationB.system.actorOf(Props(new ReplicatedActor("pb3", endpointB.logs("L3"), probeBL3.ref))) actorAL1 ! "a" actorAL2 ! "b" @@ -289,35 +276,21 @@ abstract class ReplicationIntegrationSpec extends WordSpec with Matchers with Re targetApplicationName: String, targetApplicationVersion: ApplicationVersion): TestProbe = { - val endpoint = node("A", Set("L1"), 2552, Set(), sourceApplicationName, sourceApplicationVersion).endpoint - val probe = new TestProbe(endpoint.system) + val locationA = location("A") + val endpointA = locationA.endpoint(Set("L1"), Set(), sourceApplicationName, sourceApplicationVersion) - val message = ReplicationRead(1, Int.MaxValue, NoFilter, DurableEvent.UndefinedLogId, endpoint.system.deadLetters, VectorTime()) + val message = ReplicationRead(1, Int.MaxValue, NoFilter, DurableEvent.UndefinedLogId, locationA.system.deadLetters, VectorTime()) val envelope = ReplicationReadEnvelope(message, "L1", targetApplicationName, targetApplicationVersion) - endpoint.acceptor.tell(envelope, probe.ref) - probe + endpointA.acceptor.tell(envelope, locationA.probe.ref) + locationA.probe } } -class ReplicationIntegrationSpecLeveldb extends ReplicationIntegrationSpec with EventLogCleanupLeveldb { - override val logFactory: String => Props = id => LeveldbEventLog.props(id) +class ReplicationIntegrationSpecLeveldb extends ReplicationIntegrationSpec with MultiLocationSpecLeveldb { + def customPort = 2553 } -class ReplicationIntegrationSpecCassandra extends ReplicationIntegrationSpec with EventLogCleanupCassandra { - override val logFactory: String => Props = id => CassandraEventLog.props(id) - - override def beforeAll(): Unit = { - super.beforeAll() - EmbeddedCassandraServerHelper.startEmbeddedCassandra(60000) - } - - override def node(nodeName: String, logNames: Set[String], port: Int, connections: Set[ReplicationConnection], - applicationName: String = DefaultApplicationName, - applicationVersion: ApplicationVersion = DefaultApplicationVersion): ReplicationNode = { - - val node = super.node(nodeName, logNames, port, connections, applicationName, applicationVersion) - Cassandra(node.system) // enforce keyspace/schema setup - node - } +class ReplicationIntegrationSpecCassandra extends ReplicationIntegrationSpec with MultiLocationSpecCassandra { + def customPort = 2554 } diff --git a/src/it/scala/com/rbmhtechnology/eventuate/ReplicationNode.scala b/src/it/scala/com/rbmhtechnology/eventuate/ReplicationNode.scala deleted file mode 100644 index 7cd4bbf8..00000000 --- a/src/it/scala/com/rbmhtechnology/eventuate/ReplicationNode.scala +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Copyright (C) 2015 - 2016 Red Bull Media House GmbH - all rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.rbmhtechnology.eventuate - -import akka.actor._ -import akka.testkit.TestProbe - -import com.rbmhtechnology.eventuate.ReplicationEndpoint._ -import com.rbmhtechnology.eventuate.ReplicationNode.EventListener - -import org.scalatest._ - -import scala.concurrent.{Await, Future} -import scala.concurrent.duration._ - -object ReplicationNode { - class EventListener(locationId: String, eventLog: ActorRef)(implicit system: ActorSystem) extends TestProbe(system, s"${locationId}_EventListener") { - - private class ListenerView(val id: String, val eventLog: ActorRef, probe: ActorRef) extends EventsourcedView { - override def onEvent = { - case event => probe ! event - } - override def onCommand = Actor.emptyBehavior - } - - system.actorOf(Props(new ListenerView(testActorName, eventLog, ref))) - - def waitForMessage(msg: Any) = fishForMessage(hint = msg.toString) { - case `msg` => true - case _ => false - } - } -} - -class ReplicationNode(val id: String, - logNames: Set[String], - logFactory: String => Props, - connections: Set[ReplicationConnection], - applicationName: String = DefaultApplicationName, - applicationVersion: ApplicationVersion = DefaultApplicationVersion, - port: Int = 2552, customConfig: String = "", activate: Boolean = true) { - - val system: ActorSystem = - ActorSystem(ReplicationConnection.DefaultRemoteSystemName, ReplicationConfig.create(port, customConfig)) - - val endpoint: ReplicationEndpoint = { - val endpoint = new ReplicationEndpoint(id, logNames, logFactory, connections, applicationName, applicationVersion)(system) - if (activate) endpoint.activate() - endpoint - } - - def logs: Map[String, ActorRef] = - endpoint.logs - - def terminate(): Future[Terminated] = { - system.terminate() - } - - def eventListener(logName: String) = new EventListener(id, logs(logName))(system) - - private def localEndpoint(port: Int) = - s"127.0.0.1:${port}" -} - -trait ReplicationNodeRegistry extends BeforeAndAfterEach { this: Suite => - var nodes: List[ReplicationNode] = Nil - - override def afterEach(): Unit = - nodes.foreach(node => Await.result(node.terminate(), 10.seconds)) - - def register(node: ReplicationNode): ReplicationNode = { - nodes = node :: nodes - node - } -} \ No newline at end of file diff --git a/src/it/scala/com/rbmhtechnology/eventuate/crdt/CRDTChaosSpec.scala b/src/it/scala/com/rbmhtechnology/eventuate/crdt/CRDTChaosSpec.scala index a1ed1598..6853db55 100644 --- a/src/it/scala/com/rbmhtechnology/eventuate/crdt/CRDTChaosSpec.scala +++ b/src/it/scala/com/rbmhtechnology/eventuate/crdt/CRDTChaosSpec.scala @@ -27,8 +27,8 @@ import com.rbmhtechnology.eventuate.crdt.CRDTService.ValueUpdated import com.rbmhtechnology.eventuate.log._ import com.rbmhtechnology.eventuate.log.cassandra._ import com.rbmhtechnology.eventuate.log.leveldb._ +import com.typesafe.config.ConfigFactory -import org.cassandraunit.utils.EmbeddedCassandraServerHelper import org.scalatest._ import scala.collection.immutable.Seq @@ -41,27 +41,21 @@ object CRDTChaosSpec { ThreadLocalRandom.current.nextInt(1, 10).toString } -abstract class CRDTChaosSpec extends WordSpec with Matchers with ReplicationNodeRegistry { +abstract class CRDTChaosSpec extends WordSpec with Matchers with MultiLocationSpec { import ReplicationIntegrationSpec.replicationConnection import CRDTChaosSpec._ - def logFactory: String => Props + val customConfig = ConfigFactory.parseString( + """ + |eventuate.log.write-batch-size = 3 + |eventuate.log.replication.retry-delay = 100ms + """.stripMargin) - def config = - ReplicationConfig.create() - - def node(nodeName: String, port: Int, connections: Set[ReplicationConnection]): ReplicationNode = - register(new ReplicationNode(nodeName, Set(ReplicationEndpoint.DefaultLogName), logFactory, connections, port = port, customConfig = - """ - |eventuate.log.write-batch-size = 3 - |eventuate.log.replication.retry-delay = 100ms - """.stripMargin)) - - def service(node: ReplicationNode): (ORSetService[String], TestProbe) = { - implicit val system = node.system + def service(endpoint: ReplicationEndpoint): (ORSetService[String], TestProbe) = { + implicit val system = endpoint.system val probe = TestProbe() - val service = new ORSetService[String](node.id, node.logs(ReplicationEndpoint.DefaultLogName)) { + val service = new ORSetService[String](endpoint.id, endpoint.logs("L1")) { val startCounter = new AtomicInteger() val stopCounter = new AtomicInteger() @@ -91,15 +85,20 @@ abstract class CRDTChaosSpec extends WordSpec with Matchers with ReplicationNode "converge under concurrent updates and write failures" in { val numUpdates = 100 - val nodeA = node("A", 2552, Set(replicationConnection(2553), replicationConnection(2554), replicationConnection(2555))) - val nodeB = node("B", 2553, Set(replicationConnection(2552))) - val nodeC = node("C", 2554, Set(replicationConnection(2552))) - val nodeD = node("D", 2555, Set(replicationConnection(2552))) + val locationA = location("A", customConfig = customConfig) + val locationB = location("B", customConfig = customConfig) + val locationC = location("C", customConfig = customConfig) + val locationD = location("D", customConfig = customConfig) - val (serviceA, probeA) = service(nodeA) - val (serviceB, probeB) = service(nodeB) - val (serviceC, probeC) = service(nodeC) - val (serviceD, probeD) = service(nodeD) + val endpointA = locationA.endpoint(Set("L1"), Set(replicationConnection(locationB.port), replicationConnection(locationC.port), replicationConnection(locationD.port))) + val endpointB = locationB.endpoint(Set("L1"), Set(replicationConnection(locationA.port))) + val endpointC = locationC.endpoint(Set("L1"), Set(replicationConnection(locationA.port))) + val endpointD = locationD.endpoint(Set("L1"), Set(replicationConnection(locationA.port))) + + val (serviceA, probeA) = service(endpointA) + val (serviceB, probeB) = service(endpointB) + val (serviceC, probeC) = service(endpointC) + val (serviceD, probeD) = service(endpointD) serviceA.add(crdtId, s"start-${serviceA.serviceId}") serviceB.add(crdtId, s"start-${serviceB.serviceId}") @@ -143,7 +142,7 @@ abstract class CRDTChaosSpec extends WordSpec with Matchers with ReplicationNode } } -class CRDTChaosSpecLeveldb extends CRDTChaosSpec with EventLogCleanupLeveldb { +class CRDTChaosSpecLeveldb extends CRDTChaosSpec with MultiLocationSpecLeveldb { import CRDTChaosSpec._ class TestEventLog(id: String) extends LeveldbEventLog(id, "log-test") { @@ -151,7 +150,7 @@ class CRDTChaosSpecLeveldb extends CRDTChaosSpec with EventLogCleanupLeveldb { if (events.map(_.payload).contains(ValueUpdated(crdtId, AddOp(randomNr())))) throw boom else super.write(events, partition, clock) } - val logFactory: String => Props = + override val logFactory: String => Props = id => logProps(id) def logProps(logId: String, batching: Boolean = true): Props = { @@ -160,7 +159,7 @@ class CRDTChaosSpecLeveldb extends CRDTChaosSpec with EventLogCleanupLeveldb { } } -class CRDTChaosSpecCassandra extends CRDTChaosSpec with EventLogCleanupCassandra { +class CRDTChaosSpecCassandra extends CRDTChaosSpec with MultiLocationSpecCassandra { import CRDTChaosSpec._ class TestEventLog(id: String) extends CassandraEventLog(id) { @@ -171,17 +170,6 @@ class CRDTChaosSpecCassandra extends CRDTChaosSpec with EventLogCleanupCassandr override val logFactory: String => Props = id => logProps(id) - override def beforeAll(): Unit = { - super.beforeAll() - EmbeddedCassandraServerHelper.startEmbeddedCassandra(60000) - } - - override def node(nodeName: String, port: Int, connections: Set[ReplicationConnection]): ReplicationNode = { - val node = super.node(nodeName, port, connections) - Cassandra(node.system) // enforce keyspace/schema setup - node - } - def logProps(logId: String, batching: Boolean = true): Props = { val logProps = Props(new TestEventLog(logId)).withDispatcher("eventuate.log.dispatchers.write-dispatcher") Props(new CircuitBreaker(logProps, batching)) diff --git a/src/it/scala/com/rbmhtechnology/eventuate/crdt/CRDTRecoverySpec.scala b/src/it/scala/com/rbmhtechnology/eventuate/crdt/CRDTRecoverySpec.scala index 619cf092..aec7e797 100644 --- a/src/it/scala/com/rbmhtechnology/eventuate/crdt/CRDTRecoverySpec.scala +++ b/src/it/scala/com/rbmhtechnology/eventuate/crdt/CRDTRecoverySpec.scala @@ -19,22 +19,22 @@ package com.rbmhtechnology.eventuate.crdt import akka.actor._ import akka.testkit._ -import com.rbmhtechnology.eventuate.log._ +import com.rbmhtechnology.eventuate._ import com.rbmhtechnology.eventuate.utilities._ import org.scalatest._ -abstract class CRDTRecoverySpec extends TestKit(ActorSystem("test")) with WordSpecLike with Matchers with BeforeAndAfterEach { +abstract class CRDTRecoverySpec extends TestKit(ActorSystem("test")) with WordSpecLike with Matchers with SingleLocationSpec { var probe: TestProbe = _ - def log: ActorRef - def service(serviceId: String) = new ORSetService[Int](serviceId, log) { override def onChange(crdt: ORSet[Int], operation: Any): Unit = probe.ref ! crdt.value } - override protected def beforeEach(): Unit = + override def beforeEach(): Unit = { + super.beforeEach() probe = TestProbe() + } "A CRDTService" must { "recover CRDT instances" in { @@ -80,5 +80,5 @@ abstract class CRDTRecoverySpec extends TestKit(ActorSystem("test")) with WordSp } } -class CRDTRecoverySpecLeveldb extends CRDTRecoverySpec with EventLogLifecycleLeveldb -class CRDTRecoverySpecCassandra extends CRDTRecoverySpec with EventLogLifecycleCassandra +class CRDTRecoverySpecLeveldb extends CRDTRecoverySpec with SingleLocationSpecLeveldb +class CRDTRecoverySpecCassandra extends CRDTRecoverySpec with SingleLocationSpecCassandra diff --git a/src/it/scala/com/rbmhtechnology/eventuate/crdt/CRDTServiceSpec.scala b/src/it/scala/com/rbmhtechnology/eventuate/crdt/CRDTServiceSpec.scala index f84cd6dd..5d8f8220 100644 --- a/src/it/scala/com/rbmhtechnology/eventuate/crdt/CRDTServiceSpec.scala +++ b/src/it/scala/com/rbmhtechnology/eventuate/crdt/CRDTServiceSpec.scala @@ -19,12 +19,12 @@ package com.rbmhtechnology.eventuate.crdt import akka.actor._ import akka.testkit._ -import com.rbmhtechnology.eventuate.log.EventLogLifecycleLeveldb +import com.rbmhtechnology.eventuate.SingleLocationSpecLeveldb import com.rbmhtechnology.eventuate.utilities._ import org.scalatest._ -class CRDTServiceSpecLeveldb extends TestKit(ActorSystem("test")) with WordSpecLike with Matchers with EventLogLifecycleLeveldb { +class CRDTServiceSpecLeveldb extends TestKit(ActorSystem("test")) with WordSpecLike with Matchers with SingleLocationSpecLeveldb { "A CRDTService" must { "manage multiple CRDTs identified by name" in { val service = new CounterService[Int]("a", log) diff --git a/src/it/scala/com/rbmhtechnology/eventuate/log/CircuitBreakerIntregrationSpec.scala b/src/it/scala/com/rbmhtechnology/eventuate/log/CircuitBreakerIntregrationSpec.scala index e21b2cd1..b5e78979 100644 --- a/src/it/scala/com/rbmhtechnology/eventuate/log/CircuitBreakerIntregrationSpec.scala +++ b/src/it/scala/com/rbmhtechnology/eventuate/log/CircuitBreakerIntregrationSpec.scala @@ -99,7 +99,7 @@ object CircuitBreakerIntregrationSpecCassandra { } class CircuitBreakerIntregrationSpecCassandra extends TestKit(ActorSystem("test", CircuitBreakerIntregrationSpecCassandra.config)) - with WordSpecLike with Matchers with BeforeAndAfterEach with EventLogCleanupCassandra with Eventually { + with WordSpecLike with Matchers with BeforeAndAfterEach with LocationCleanupCassandra with Eventually { import CircuitBreakerIntregrationSpecCassandra._ import system.dispatcher diff --git a/src/it/scala/com/rbmhtechnology/eventuate/log/EventLogLifecycle.scala b/src/it/scala/com/rbmhtechnology/eventuate/log/EventLogLifecycle.scala deleted file mode 100644 index 6eabc8c5..00000000 --- a/src/it/scala/com/rbmhtechnology/eventuate/log/EventLogLifecycle.scala +++ /dev/null @@ -1,276 +0,0 @@ -/* - * Copyright (C) 2015 - 2016 Red Bull Media House GmbH - all rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.rbmhtechnology.eventuate.log - -import java.io.{Closeable, File} - -import akka.actor._ -import akka.testkit.{TestProbe, TestKit} - -import com.rbmhtechnology.eventuate._ -import com.rbmhtechnology.eventuate.log.EventLogLifecycle.ErrorSequenceNr -import com.rbmhtechnology.eventuate.log.cassandra._ -import com.rbmhtechnology.eventuate.log.cassandra.CassandraIndex._ -import com.rbmhtechnology.eventuate.log.leveldb._ -import com.rbmhtechnology.eventuate.utilities.RestarterActor -import com.typesafe.config.Config - -import org.apache.commons.io.FileUtils -import org.cassandraunit.utils.EmbeddedCassandraServerHelper -import org.scalatest._ - -import scala.collection.immutable.Seq -import scala.concurrent.{ExecutionContext, Future} - -trait EventLogCleanupLeveldb extends Suite with BeforeAndAfterAll { - def config: Config - - def storageLocations: List[File] = - List("eventuate.log.leveldb.dir", "eventuate.snapshot.filesystem.dir").map(s => new File(config.getString(s))) - - override def beforeAll(): Unit = { - storageLocations.foreach(FileUtils.deleteDirectory) - storageLocations.foreach(_.mkdirs()) - } - - override def afterAll(): Unit = { - storageLocations.foreach(FileUtils.deleteDirectory) - } -} - -trait EventLogLifecycleLeveldb extends EventLogCleanupLeveldb with BeforeAndAfterEach { - import EventLogLifecycleLeveldb._ - - private var _logCtr: Int = 0 - private var _log: ActorRef = _ - - override def beforeEach(): Unit = { - super.beforeEach() - - _logCtr += 1 - _log = system.actorOf(logProps(logId)) - } - - override def afterEach(): Unit = { - system.stop(log) - super.afterEach() - } - - override def afterAll(): Unit = { - TestKit.shutdownActorSystem(system) - super.afterAll() - } - - def system: ActorSystem - - def config: Config = - system.settings.config - - def batching: Boolean = - true - - def log: ActorRef = - _log - - def logId: String = - _logCtr.toString - - def logProps(logId: String): Props = - RestarterActor.props(TestEventLog.props(logId, batching)) -} - -object EventLogLifecycleLeveldb { - object TestEventLog { - def props(logId: String, batching: Boolean): Props = { - val logProps = Props(new TestEventLog(logId)).withDispatcher("eventuate.log.dispatchers.write-dispatcher") - if (batching) Props(new BatchingLayer(logProps)) else logProps - } - } - - class TestEventLog(id: String) extends LeveldbEventLog(id, "log-test") with EventLogLifecycle.TestEventLog { - override def unhandled(message: Any): Unit = message match { - case "boom" => - throw boom - case "dir" => - sender() ! logDir - case _ => - super.unhandled(message) - } - } -} - -trait EventLogCleanupCassandra extends Suite with BeforeAndAfterAll { - def config: Config - - def storageLocations: List[File] = - List("eventuate.snapshot.filesystem.dir").map(s => new File(config.getString(s))) - - override def beforeAll(): Unit = { - storageLocations.foreach(FileUtils.deleteDirectory) - storageLocations.foreach(_.mkdirs()) - } - - override def afterAll(): Unit = { - EmbeddedCassandraServerHelper.cleanEmbeddedCassandra() - storageLocations.foreach(FileUtils.deleteDirectory) - } -} - -trait EventLogLifecycleCassandra extends EventLogCleanupCassandra with BeforeAndAfterEach { - import EventLogLifecycleCassandra._ - - private var _logCtr: Int = 0 - private var _log: ActorRef = _ - - var indexProbe: TestProbe = _ - - override def beforeEach(): Unit = { - super.beforeEach() - - indexProbe = new TestProbe(system) - - _logCtr += 1 - _log = createLog(TestFailureSpec(), indexProbe.ref) - } - - override def afterEach(): Unit = { - system.stop(log) - super.afterEach() - } - - override def beforeAll(): Unit = { - super.beforeAll() - EmbeddedCassandraServerHelper.startEmbeddedCassandra(60000) - } - - override def afterAll(): Unit = { - TestKit.shutdownActorSystem(system) - super.afterAll() - } - - def createLog(failureSpec: TestFailureSpec, indexProbe: ActorRef): ActorRef = - system.actorOf(logProps(logId, failureSpec, indexProbe)) - - def system: ActorSystem - - def config: Config = - system.settings.config - - def batching: Boolean = - true - - def log: ActorRef = - _log - - def logId: String = - _logCtr.toString - - def logProps(logId: String): Props = - logProps(logId, TestFailureSpec(), system.deadLetters) - - def logProps(logId: String, failureSpec: TestFailureSpec, indexProbe: ActorRef): Props = - TestEventLog.props(logId, failureSpec, indexProbe, batching) -} - -object EventLogLifecycleCassandra { - case class TestFailureSpec( - failOnClockRead: Boolean = false, - failBeforeIndexIncrementWrite: Boolean = false, - failAfterIndexIncrementWrite: Boolean = false) - - object TestEventLog { - def props(logId: String, batching: Boolean): Props = - props(logId, TestFailureSpec(), None, batching) - - def props(logId: String, failureSpec: TestFailureSpec, indexProbe: ActorRef, batching: Boolean): Props = - props(logId, failureSpec, Some(indexProbe), batching) - - def props(logId: String, failureSpec: TestFailureSpec, indexProbe: Option[ActorRef], batching: Boolean): Props = { - val logProps = Props(new TestEventLog(logId, failureSpec, indexProbe)).withDispatcher("eventuate.log.dispatchers.write-dispatcher") - Props(new CircuitBreaker(logProps, batching)) - } - } - - class TestEventLog(id: String, failureSpec: TestFailureSpec, indexProbe: Option[ActorRef]) - extends CassandraEventLog(id) with EventLogLifecycle.TestEventLog { - - override def currentSystemTime: Long = 0L - - override def unhandled(message: Any): Unit = message match { - case "boom" => - throw boom - case _ => - super.unhandled(message) - } - - private[eventuate] override def createIndexStore(cassandra: Cassandra, logId: String) = - new TestIndexStore(cassandra, logId, failureSpec) - - private[eventuate] override def onIndexEvent(event: Any): Unit = - indexProbe.foreach(_ ! event) - } - - class TestIndexStore(cassandra: Cassandra, logId: String, failureSpec: TestFailureSpec) extends CassandraIndexStore(cassandra, logId) { - private var writeIncrementFailed = false - private var readClockFailed = false - - override def writeAsync(aggregateEvents: AggregateEvents, clock: EventLogClock)(implicit executor: ExecutionContext): Future[EventLogClock] = - if (failureSpec.failBeforeIndexIncrementWrite && !writeIncrementFailed) { - writeIncrementFailed = true - Future.failed(boom) - } else if (failureSpec.failAfterIndexIncrementWrite && !writeIncrementFailed) { - writeIncrementFailed = true - for { - _ <- super.writeAsync(aggregateEvents, clock) - r <- Future.failed(boom) - } yield r - } else super.writeAsync(aggregateEvents, clock) - - override def readEventLogClockAsync(implicit executor: ExecutionContext): Future[EventLogClock] = - if (failureSpec.failOnClockRead && !readClockFailed) { - readClockFailed = true - Future.failed(boom) - } else super.readEventLogClockAsync - } -} - -object EventLogLifecycle { - val ErrorSequenceNr = -1L - val IgnoreDeletedSequenceNr = -2L - - trait TestEventLog extends EventLog { - override def currentSystemTime: Long = 0L - - abstract override def read(fromSequenceNr: Long, toSequenceNr: Long, max: Int): Future[BatchReadResult] = - if (fromSequenceNr == ErrorSequenceNr) Future.failed(boom) else super.read(fromSequenceNr, toSequenceNr, max) - - abstract override def read(fromSequenceNr: Long, toSequenceNr: Long, max: Int, aggregateId: String): Future[BatchReadResult] = - if (fromSequenceNr == ErrorSequenceNr) Future.failed(boom) else super.read(fromSequenceNr, toSequenceNr, max, aggregateId) - - abstract override def replicationRead(fromSequenceNr: Long, toSequenceNr: Long, max: Int, filter: (DurableEvent) => Boolean): Future[BatchReadResult] = - if (fromSequenceNr == ErrorSequenceNr) Future.failed(boom) else super.replicationRead(fromSequenceNr, toSequenceNr, max, filter) - - abstract override def write(events: Seq[DurableEvent], partition: Long, clock: EventLogClock): Unit = - if (events.map(_.payload).contains("boom")) throw boom else super.write(events, partition, clock) - - override private[eventuate] def adjustFromSequenceNr(seqNr: Long) = seqNr match { - case ErrorSequenceNr => seqNr - case IgnoreDeletedSequenceNr => 0 - case s => super.adjustFromSequenceNr(s) - } - } -} \ No newline at end of file diff --git a/src/it/scala/com/rbmhtechnology/eventuate/log/EventLogPartitioningSpec.scala b/src/it/scala/com/rbmhtechnology/eventuate/log/EventLogPartitioningSpec.scala index 23af43f7..0d9cd488 100644 --- a/src/it/scala/com/rbmhtechnology/eventuate/log/EventLogPartitioningSpec.scala +++ b/src/it/scala/com/rbmhtechnology/eventuate/log/EventLogPartitioningSpec.scala @@ -20,6 +20,7 @@ import akka.actor.ActorSystem import akka.testkit.{TestProbe, TestKit} import com.rbmhtechnology.eventuate.EventsourcingProtocol._ +import com.rbmhtechnology.eventuate.SingleLocationSpecCassandra import com.typesafe.config._ @@ -39,7 +40,7 @@ object EventLogPartitioningSpecCassandra { """.stripMargin) } -class EventLogPartitioningSpecCassandra extends TestKit(ActorSystem("test", EventLogPartitioningSpecCassandra.config)) with EventLogSpecSupport with EventLogLifecycleCassandra { +class EventLogPartitioningSpecCassandra extends TestKit(ActorSystem("test", EventLogPartitioningSpecCassandra.config)) with EventLogSpecSupport with SingleLocationSpecCassandra { import EventLogSpec._ def replay(fromSequenceNr: Long): Seq[(Any, Long)] = { diff --git a/src/it/scala/com/rbmhtechnology/eventuate/log/EventLogSpec.scala b/src/it/scala/com/rbmhtechnology/eventuate/log/EventLogSpec.scala index 1655f658..64ff0d9b 100644 --- a/src/it/scala/com/rbmhtechnology/eventuate/log/EventLogSpec.scala +++ b/src/it/scala/com/rbmhtechnology/eventuate/log/EventLogSpec.scala @@ -18,7 +18,7 @@ package com.rbmhtechnology.eventuate.log import akka.actor._ import akka.pattern.ask -import akka.testkit.{TestProbe, TestKit} +import akka.testkit._ import akka.util.Timeout import com.rbmhtechnology.eventuate._ @@ -26,14 +26,14 @@ import com.rbmhtechnology.eventuate.DurableEvent._ import com.rbmhtechnology.eventuate.EventsourcingProtocol._ import com.rbmhtechnology.eventuate.ReplicationFilter.NoFilter import com.rbmhtechnology.eventuate.ReplicationProtocol._ -import com.rbmhtechnology.eventuate.log.EventLogLifecycle._ -import com.rbmhtechnology.eventuate.log.EventLogLifecycleCassandra.TestFailureSpec +import com.rbmhtechnology.eventuate.SingleLocationSpec._ +import com.rbmhtechnology.eventuate.SingleLocationSpecCassandra._ import com.rbmhtechnology.eventuate.log.EventLogSpecLeveldb.immediateEventLogClockSnapshotConfig import com.rbmhtechnology.eventuate.log.cassandra._ import com.rbmhtechnology.eventuate.utilities.RestarterActor.restartActor import com.rbmhtechnology.eventuate.utilities._ -import com.typesafe.config.{ConfigFactory, Config} +import com.typesafe.config._ import org.scalatest._ import org.scalatest.time._ @@ -83,7 +83,7 @@ object EventLogSpec { } } -trait EventLogSpecSupport extends WordSpecLike with Matchers with BeforeAndAfterEach { +trait EventLogSpecSupport extends WordSpecLike with Matchers with SingleLocationSpec { import EventLogSpec._ implicit val system: ActorSystem @@ -102,10 +102,8 @@ trait EventLogSpecSupport extends WordSpecLike with Matchers with BeforeAndAfter def generatedEmittedEvents: Vector[DurableEvent] = _generatedEmittedEvents def generatedReplicatedEvents: Vector[DurableEvent] = _generatedReplicatedEvents - def log: ActorRef - def logId: String - override def beforeEach(): Unit = { + super.beforeEach() _replyToProbe = TestProbe() _replicatorProbe = TestProbe() _notificationProbe = TestProbe() @@ -114,6 +112,7 @@ trait EventLogSpecSupport extends WordSpecLike with Matchers with BeforeAndAfter override def afterEach(): Unit = { _generatedEmittedEvents = Vector.empty _generatedReplicatedEvents = Vector.empty + super.afterEach() } def timestamp(a: Long = 0L, b: Long= 0L) = (a, b) match { @@ -605,7 +604,7 @@ object EventLogSpecLeveldb { ConfigFactory.parseString("eventuate.log.leveldb.state-snapshot-limit = 1") } -class EventLogSpecLeveldb extends EventLogSpec with EventLogLifecycleLeveldb { +class EventLogSpecLeveldb extends EventLogSpec with SingleLocationSpecLeveldb { "A LeveldbEventLog" must { "not delete events required for restoring the EventLogClock" in { @@ -621,7 +620,7 @@ class EventLogSpecLeveldb extends EventLogSpec with EventLogLifecycleLeveldb { class EventLogWithImmediateEventLogClockSnapshotSpecLeveldb extends TestKit(ActorSystem("test", immediateEventLogClockSnapshotConfig.withFallback(EventLogSpec.config))) - with EventLogSpecSupport with EventLogLifecycleLeveldb with Eventually { + with EventLogSpecSupport with SingleLocationSpecLeveldb with Eventually { import EventLogSpec._ @@ -703,7 +702,7 @@ class EventLogWithImmediateEventLogClockSnapshotSpecLeveldb } } -class EventLogSpecCassandra extends EventLogSpec with EventLogLifecycleCassandra { +class EventLogSpecCassandra extends EventLogSpec with SingleLocationSpecCassandra { import EventLogSpec._ import CassandraIndex._ diff --git a/src/it/scala/com/rbmhtechnology/eventuate/serializer/CRDTSerializerSpec.scala b/src/it/scala/com/rbmhtechnology/eventuate/serializer/CRDTSerializerSpec.scala index 854dbab6..7a5075c8 100644 --- a/src/it/scala/com/rbmhtechnology/eventuate/serializer/CRDTSerializerSpec.scala +++ b/src/it/scala/com/rbmhtechnology/eventuate/serializer/CRDTSerializerSpec.scala @@ -41,15 +41,15 @@ class CRDTSerializerSpec extends WordSpec with Matchers with BeforeAndAfterAll { import DurableEventSerializerSpec.ExamplePayload import CRDTSerializerSpec._ - val support = new SerializerSpecSupport( - ReplicationConfig.create(2552), - ReplicationConfig.create(2553, serializerConfig), - ReplicationConfig.create(2554, serializerWithStringManifestConfig)) + val context = new SerializationContext( + LocationConfig.create(), + LocationConfig.create(customConfig = serializerConfig), + LocationConfig.create(customConfig = serializerWithStringManifestConfig)) override def afterAll(): Unit = - support.shutdown() + context.shutdown() - import support._ + import context._ "A CRDTSerializer" must { "support ORSet serialization with default payload serialization" in { diff --git a/src/it/scala/com/rbmhtechnology/eventuate/serializer/DurableEventSerializerSpec.scala b/src/it/scala/com/rbmhtechnology/eventuate/serializer/DurableEventSerializerSpec.scala index d3f06e8f..d85cdf6d 100644 --- a/src/it/scala/com/rbmhtechnology/eventuate/serializer/DurableEventSerializerSpec.scala +++ b/src/it/scala/com/rbmhtechnology/eventuate/serializer/DurableEventSerializerSpec.scala @@ -21,13 +21,14 @@ import akka.serialization.SerializerWithStringManifest import akka.serialization.Serializer import com.rbmhtechnology.eventuate._ +import com.typesafe.config.ConfigFactory import org.scalatest._ object DurableEventSerializerSpec { case class ExamplePayload(foo: String, bar: String) - val serializerConfig = + val serializerConfig = ConfigFactory.parseString( """ |akka.actor.serializers { | eventuate-test = "com.rbmhtechnology.eventuate.serializer.DurableEventSerializerSpec$ExamplePayloadSerializer" @@ -35,9 +36,9 @@ object DurableEventSerializerSpec { |akka.actor.serialization-bindings { | "com.rbmhtechnology.eventuate.serializer.DurableEventSerializerSpec$ExamplePayload" = eventuate-test |} - """.stripMargin + """.stripMargin) - val serializerWithStringManifestConfig = + val serializerWithStringManifestConfig = ConfigFactory.parseString( """ |akka.actor.serializers { | eventuate-test = "com.rbmhtechnology.eventuate.serializer.DurableEventSerializerSpec$ExamplePayloadSerializerWithStringManifest" @@ -45,7 +46,7 @@ object DurableEventSerializerSpec { |akka.actor.serialization-bindings { | "com.rbmhtechnology.eventuate.serializer.DurableEventSerializerSpec$ExamplePayload" = eventuate-test |} - """.stripMargin + """.stripMargin) /** * Swaps `foo` and `bar` of `ExamplePayload`. */ @@ -99,15 +100,15 @@ object DurableEventSerializerSpec { class DurableEventSerializerSpec extends WordSpec with Matchers with BeforeAndAfterAll { import DurableEventSerializerSpec._ - val support = new SerializerSpecSupport( - ReplicationConfig.create(2552), - ReplicationConfig.create(2553, serializerConfig), - ReplicationConfig.create(2554, serializerWithStringManifestConfig)) + val context = new SerializationContext( + LocationConfig.create(), + LocationConfig.create(customConfig = serializerConfig), + LocationConfig.create(customConfig = serializerWithStringManifestConfig)) override def afterAll(): Unit = - support.shutdown() + context.shutdown() - import support._ + import context._ "A DurableEventSerializer" must { "support default payload serialization" in { diff --git a/src/it/scala/com/rbmhtechnology/eventuate/serializer/ReplicationFilterSerializerSpec.scala b/src/it/scala/com/rbmhtechnology/eventuate/serializer/ReplicationFilterSerializerSpec.scala index 722b5a7b..df895531 100644 --- a/src/it/scala/com/rbmhtechnology/eventuate/serializer/ReplicationFilterSerializerSpec.scala +++ b/src/it/scala/com/rbmhtechnology/eventuate/serializer/ReplicationFilterSerializerSpec.scala @@ -16,19 +16,17 @@ package com.rbmhtechnology.eventuate.serializer -import akka.actor.Props import akka.actor._ import akka.serialization.Serializer import akka.serialization.SerializerWithStringManifest import akka.testkit.TestProbe -import com.rbmhtechnology.eventuate.ReplicationFilter.AndFilter -import com.rbmhtechnology.eventuate.ReplicationFilter.NoFilter -import com.rbmhtechnology.eventuate.ReplicationFilter.OrFilter import com.rbmhtechnology.eventuate._ -import com.rbmhtechnology.eventuate.serializer.SerializerSpecSupport.ReceiverActor -import com.rbmhtechnology.eventuate.serializer.SerializerSpecSupport.SenderActor +import com.rbmhtechnology.eventuate.ReplicationFilter._ +import com.rbmhtechnology.eventuate.serializer.SerializationContext.ReceiverActor +import com.rbmhtechnology.eventuate.serializer.SerializationContext.SenderActor +import com.typesafe.config.ConfigFactory import org.scalatest._ @@ -41,7 +39,7 @@ object ReplicationFilterSerializerSpec { def apply(event: DurableEvent): Boolean = num == 1 } - val serializerConfig = + val serializerConfig = ConfigFactory.parseString( """ |akka.actor.serializers { | eventuate-test = "com.rbmhtechnology.eventuate.serializer.ReplicationFilterSerializerSpec$ExampleFilterSerializer" @@ -49,9 +47,9 @@ object ReplicationFilterSerializerSpec { |akka.actor.serialization-bindings { | "com.rbmhtechnology.eventuate.serializer.ReplicationFilterSerializerSpec$ExampleFilter" = eventuate-test |} - """.stripMargin + """.stripMargin) - val serializerWithStringManifestConfig = + val serializerWithStringManifestConfig = ConfigFactory.parseString( """ |akka.actor.serializers { | eventuate-test = "com.rbmhtechnology.eventuate.serializer.ReplicationFilterSerializerSpec$ExampleFilterSerializerWithStringManifest" @@ -59,7 +57,7 @@ object ReplicationFilterSerializerSpec { |akka.actor.serialization-bindings { | "com.rbmhtechnology.eventuate.serializer.ReplicationFilterSerializerSpec$ExampleFilter" = eventuate-test |} - """.stripMargin + """.stripMargin) /** * Increments `ExampleFilter.num` by 1 during deserialization. @@ -123,19 +121,19 @@ object ReplicationFilterSerializerSpec { class ReplicationFilterSerializerSpec extends WordSpec with Matchers with BeforeAndAfterAll { import ReplicationFilterSerializerSpec._ - val support = new SerializerSpecSupport( - ReplicationConfig.create(2552), - ReplicationConfig.create(2553, serializerConfig), - ReplicationConfig.create(2554, serializerWithStringManifestConfig)) + val context = new SerializationContext( + LocationConfig.create(), + LocationConfig.create(customConfig = serializerConfig), + LocationConfig.create(customConfig = serializerWithStringManifestConfig)) override def afterAll(): Unit = - support.shutdown() + context.shutdown() - import support._ + import context._ val receiverProbe = new TestProbe(systems(1)) val receiverActor = systems(1).actorOf(Props(new ReceiverActor(receiverProbe.ref)), "receiver") - val senderActor = systems(0).actorOf(Props(new SenderActor(systems(0).actorSelection("akka.tcp://test-system-2@127.0.0.1:2553/user/receiver")))) + val senderActor = systems(0).actorOf(Props(new SenderActor(systems(0).actorSelection(s"akka.tcp://test-system-2@127.0.0.1:${ports(1)}/user/receiver")))) "A ReplicationFilterSerializer" must { "serialize replication filter trees with an and-filter root" in { diff --git a/src/it/scala/com/rbmhtechnology/eventuate/serializer/ReplicationProtocolSerializerSpec.scala b/src/it/scala/com/rbmhtechnology/eventuate/serializer/ReplicationProtocolSerializerSpec.scala index 5c4448e8..80575ad9 100644 --- a/src/it/scala/com/rbmhtechnology/eventuate/serializer/ReplicationProtocolSerializerSpec.scala +++ b/src/it/scala/com/rbmhtechnology/eventuate/serializer/ReplicationProtocolSerializerSpec.scala @@ -22,8 +22,8 @@ import akka.testkit.TestProbe import com.rbmhtechnology.eventuate._ import com.rbmhtechnology.eventuate.ReplicationProtocol._ -import com.rbmhtechnology.eventuate.serializer.SerializerSpecSupport.ReceiverActor -import com.rbmhtechnology.eventuate.serializer.SerializerSpecSupport.SenderActor +import com.rbmhtechnology.eventuate.serializer.SerializationContext.ReceiverActor +import com.rbmhtechnology.eventuate.serializer.SerializationContext.SenderActor import org.scalatest._ @@ -57,18 +57,18 @@ object ReplicationProtocolSerializerSpec { class ReplicationProtocolSerializerSpec extends WordSpec with Matchers with BeforeAndAfterAll { import ReplicationProtocolSerializerSpec._ - val support = new SerializerSpecSupport( - ReplicationConfig.create(2552), - ReplicationConfig.create(2553)) + val context = new SerializationContext( + LocationConfig.create(), + LocationConfig.create()) override def afterAll(): Unit = - support.shutdown() + context.shutdown() - import support._ + import context._ val receiverProbe = new TestProbe(systems(1)) val receiverActor = systems(1).actorOf(Props(new ReceiverActor(receiverProbe.ref)), "receiver") - val senderActor = systems(0).actorOf(Props(new SenderActor(systems(0).actorSelection("akka.tcp://test-system-2@127.0.0.1:2553/user/receiver")))) + val senderActor = systems(0).actorOf(Props(new SenderActor(systems(0).actorSelection(s"akka.tcp://test-system-2@127.0.0.1:${ports(1)}/user/receiver")))) val dl1 = systems(0).deadLetters val dl2 = systems(1).deadLetters diff --git a/src/it/scala/com/rbmhtechnology/eventuate/serializer/SerializerSpecSupport.scala b/src/it/scala/com/rbmhtechnology/eventuate/serializer/SerializationContext.scala similarity index 70% rename from src/it/scala/com/rbmhtechnology/eventuate/serializer/SerializerSpecSupport.scala rename to src/it/scala/com/rbmhtechnology/eventuate/serializer/SerializationContext.scala index 98c775f4..6892f518 100644 --- a/src/it/scala/com/rbmhtechnology/eventuate/serializer/SerializerSpecSupport.scala +++ b/src/it/scala/com/rbmhtechnology/eventuate/serializer/SerializationContext.scala @@ -19,14 +19,14 @@ package com.rbmhtechnology.eventuate.serializer import akka.actor._ import akka.serialization.Serialization import akka.serialization.SerializationExtension -import akka.testkit.TestProbe import com.typesafe.config.Config +import scala.collection.immutable.Seq import scala.concurrent.Await import scala.concurrent.duration._ -object SerializerSpecSupport { +object SerializationContext { class SenderActor(receiver: ActorSelection) extends Actor { def receive = { case msg => receiver ! msg @@ -40,13 +40,20 @@ object SerializerSpecSupport { } } -class SerializerSpecSupport(configs: Config*) { +class SerializationContext(configs: Config*) { + val systems: Seq[ActorSystem] = configs.toList.zipWithIndex.map { + case (config, idx) => ActorSystem(s"test-system-${idx + 1}", config) + } + + val serializations: Seq[Serialization] = + systems.map(SerializationExtension(_)) - val systems = configs.zipWithIndex.map { case (config, idx) => ActorSystem(s"test-system-${idx+1}", config) } + val ports: Seq[Int] = + systems.map(port) - val serializations: Seq[Serialization] = systems.map(SerializationExtension(_)) + def port(system: ActorSystem): Int = + system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress.port.get - def shutdown(): Unit = { + def shutdown(): Unit = systems.foreach(system => Await.result(system.terminate(), 10.seconds)) - } } diff --git a/src/it/scala/com/rbmhtechnology/eventuate/serializer/SnapshotSerializerSpec.scala b/src/it/scala/com/rbmhtechnology/eventuate/serializer/SnapshotSerializerSpec.scala index 2ee2665e..776ce9e1 100644 --- a/src/it/scala/com/rbmhtechnology/eventuate/serializer/SnapshotSerializerSpec.scala +++ b/src/it/scala/com/rbmhtechnology/eventuate/serializer/SnapshotSerializerSpec.scala @@ -62,15 +62,15 @@ class SnapshotSerializerSpec extends WordSpec with Matchers with BeforeAndAfterA import DurableEventSerializerSpec.ExamplePayload import SnapshotSerializerSpec._ - val support = new SerializerSpecSupport( - ReplicationConfig.create(2552), - ReplicationConfig.create(2553, serializerConfig), - ReplicationConfig.create(2554, serializerWithStringManifestConfig)) + val context = new SerializationContext( + LocationConfig.create(), + LocationConfig.create(customConfig = serializerConfig), + LocationConfig.create(customConfig = serializerWithStringManifestConfig)) override def afterAll(): Unit = - support.shutdown() + context.shutdown() - import support._ + import context._ "A SnapshotSerializer" must { "support snapshot serialization with default payload serialization" in {