Skip to content
This repository has been archived by the owner on Jun 1, 2021. It is now read-only.

Commit

Permalink
Merge pull request #229 from RBMHTechnology/wip-226-dynamic-ports
Browse files Browse the repository at this point in the history
Dynamic and custom ports for integration tests
  • Loading branch information
krasserm committed Mar 8, 2016
2 parents dfa49f0 + 1644e71 commit c95ed9c
Show file tree
Hide file tree
Showing 27 changed files with 892 additions and 940 deletions.
107 changes: 46 additions & 61 deletions src/it/scala/com/rbmhtechnology/eventuate/DeleteEventsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@ package com.rbmhtechnology.eventuate

import akka.actor._

import com.rbmhtechnology.eventuate.ReplicationIntegrationSpec.replicationConnection
import com.rbmhtechnology.eventuate.log.EventLogCleanupLeveldb
import com.rbmhtechnology.eventuate.log.EventLogLifecycleLeveldb.TestEventLog
import com.rbmhtechnology.eventuate.utilities.AwaitHelper
import com.typesafe.config.ConfigFactory

import org.scalatest.Matchers
import org.scalatest.WordSpec
Expand All @@ -30,19 +28,7 @@ import scala.util.Failure
import scala.util.Success

object DeleteEventsSpecLeveldb {
val L1 = "L1"

val portA = 2552
val connectedToA = replicationConnection(portA)

val portB = 2553
val connectedToB = replicationConnection(portB)

val portC = 2554
val connectedToC = replicationConnection(portC)

class Emitter(locationId: String, val eventLog: ActorRef) extends EventsourcedActor with ActorLogging {

override def id = s"${locationId}_Emitter"

override def onEvent = Actor.emptyBehavior
Expand All @@ -55,77 +41,76 @@ object DeleteEventsSpecLeveldb {
}
}

def emitter(node: ReplicationNode, logName: String) =
node.system.actorOf(Props(new Emitter(node.id, node.logs(logName))))
}

class DeleteEventsSpecLeveldb extends WordSpec with Matchers with ReplicationNodeRegistry with EventLogCleanupLeveldb {
import DeleteEventsSpecLeveldb._

val logFactory: String => Props = id => TestEventLog.props(id, batching = true)

var ctr: Int = 0

override def beforeEach(): Unit =
ctr += 1
def emitter(endpoint: ReplicationEndpoint, logName: String) =
endpoint.system.actorOf(Props(new Emitter(endpoint.id, endpoint.logs(logName))))

def config =
ReplicationConfig.create()
val config = ConfigFactory.parseString(
"""
|eventuate.log.replication.retry-delay = 1s
|eventuate.log.replication.remote-read-timeout = 2s
|eventuate.disaster-recovery.remote-operation-retry-max = 10
|eventuate.disaster-recovery.remote-operation-retry-delay = 1s
|eventuate.disaster-recovery.remote-operation-timeout = 1s
""".stripMargin)

def nodeId(node: String): String =
s"${node}_${ctr}"
val L1 = "L1"
}

def node(nodeName: String, logNames: Set[String], port: Int, connections: Set[ReplicationConnection], customConfig: String = "", activate: Boolean = false): ReplicationNode =
register(new ReplicationNode(nodeId(nodeName), logNames, logFactory, connections, port = port, customConfig = RecoverySpecLeveldb.config + customConfig, activate = activate))
class DeleteEventsSpecLeveldb extends WordSpec with Matchers with MultiLocationSpecLeveldb {
import ReplicationIntegrationSpec.replicationConnection
import DeleteEventsSpecLeveldb._

"Deleting events" must {
"not replay deleted events on restart" in {
val nodeA = newNodeA(Set(L1))
val emitterA = emitter(nodeA, L1)
val listenerA = nodeA.eventListener(L1)
def newLocationA = location("A", customConfig = DeleteEventsSpecLeveldb.config)
def newEndpointA(l: Location) = l.endpoint(Set(L1), Set(), activate = false)

val locationA1 = newLocationA
val endpointA1 = newEndpointA(locationA1)

val listenerA = locationA1.listener(endpointA1.logs(L1))
val emitterA = emitter(endpointA1, L1)

(0 to 5).foreach(emitterA ! _)
listenerA.waitForMessage(5)

nodeA.endpoint.delete(L1, 3, Set.empty).await shouldBe 3
endpointA1.delete(L1, 3, Set.empty).await shouldBe 3
locationA1.terminate().await

nodeA.terminate().await
val locationA2 = newLocationA
def endpointA2 = newEndpointA(locationA2)

val restartedA = newNodeA(Set(L1))
restartedA.eventListener(L1).expectMsgAllOf(3 to 5: _*)
locationA2.listener(endpointA2.logs(L1)).expectMsgAllOf(3 to 5: _*)
}
}

"Conditionally deleting events" must {
"keep event available for corresponding remote log" in {
val nodeA = newNodeA(Set(L1), Set(connectedToB, connectedToC))
val nodeB = newNodeB(Set(L1), Set(connectedToA))
val nodeC = newNodeC(Set(L1), Set(connectedToA))
val emitterA = emitter(nodeA, L1)
val listenerA = nodeA.eventListener(L1)
val listenerB = nodeB.eventListener(L1)
val listenerC = nodeC.eventListener(L1)
val locationA = location("A", customConfig = DeleteEventsSpecLeveldb.config)
val locationB = location("B", customConfig = DeleteEventsSpecLeveldb.config)
val locationC = location("C", customConfig = DeleteEventsSpecLeveldb.config)

val endpointA = locationA.endpoint(Set(L1), Set(replicationConnection(locationB.port), replicationConnection(locationC.port)), activate = false)
val endpointB = locationB.endpoint(Set(L1), Set(replicationConnection(locationA.port)), activate = false)
val endpointC = locationC.endpoint(Set(L1), Set(replicationConnection(locationA.port)), activate = false)

val emitterA = emitter(endpointA, L1)

val listenerA = locationA.listener(endpointA.logs(L1))
val listenerB = locationB.listener(endpointB.logs(L1))
val listenerC = locationC.listener(endpointC.logs(L1))

(0 to 5).foreach(emitterA ! _)
listenerA.waitForMessage(5)

nodeA.endpoint.delete(L1, 3, Set(nodeB.endpoint.id, nodeC.endpoint.id)).await shouldBe 3
endpointA.delete(L1, 3, Set(endpointB.id, endpointC.id)).await shouldBe 3

nodeA.endpoint.activate()
nodeB.endpoint.activate()
endpointA.activate()
endpointB.activate()
listenerB.expectMsgAllOf(0 to 5: _*)

nodeC.endpoint.activate()
endpointC.activate()
listenerC.expectMsgAllOf(0 to 5: _*)
}
}

def newNodeA(logNames: Set[String], connections: Set[ReplicationConnection] = Set.empty) =
node("A", logNames, portA, connections)

def newNodeB(logNames: Set[String], connections: Set[ReplicationConnection] = Set.empty) =
node("B", logNames, portB, connections)

def newNodeC(logNames: Set[String], connections: Set[ReplicationConnection] = Set.empty) =
node("C", logNames, portC, connections)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,6 @@ package com.rbmhtechnology.eventuate
import akka.actor._
import akka.testkit.TestProbe

import com.rbmhtechnology.eventuate.log._
import com.rbmhtechnology.eventuate.log.cassandra.Cassandra

import org.cassandraunit.utils.EmbeddedCassandraServerHelper
import org.scalatest._

import scala.collection.immutable.Seq
Expand All @@ -44,26 +40,10 @@ object EventsourcedActorCausalitySpec {
}
}

abstract class EventsourcedActorCausalitySpec extends WordSpec with Matchers with ReplicationNodeRegistry {
abstract class EventsourcedActorCausalitySpec extends WordSpec with Matchers with MultiLocationSpec {
import ReplicationIntegrationSpec.replicationConnection
import EventsourcedActorCausalitySpec._

def logFactory: String => Props

var ctr: Int = 0

override def beforeEach(): Unit =
ctr += 1

def config =
ReplicationConfig.create()

def nodeId(node: String): String =
s"${node}_${ctr}"

def node(nodeName: String, logNames: Set[String], port: Int, connections: Set[ReplicationConnection]): ReplicationNode =
register(new ReplicationNode(nodeId(nodeName), logNames, logFactory, connections, port = port))

def assertPartialOrder[A](events: Seq[A], sample: A*): Unit = {
val indices = sample.map(events.indexOf)
assert(indices == indices.sorted)
Expand All @@ -74,24 +54,27 @@ abstract class EventsourcedActorCausalitySpec extends WordSpec with Matchers wit
"track causality by default (sharedClockEntry = true)" in {
val logName = "L1"

val nodeA = node("A", Set(logName), 2552, Set(replicationConnection(2553)))
val nodeB = node("B", Set(logName), 2553, Set(replicationConnection(2552)))
val locationA = location("A")
val locationB = location("B")

val logA = nodeA.logs(logName)
val logB = nodeB.logs(logName)
val endpointA = locationA.endpoint(Set(logName), Set(replicationConnection(locationB.port)))
val endpointB = locationB.endpoint(Set(logName), Set(replicationConnection(locationA.port)))

val logIdA = nodeA.endpoint.logId(logName)
val logIdB = nodeB.endpoint.logId(logName)
val logA = endpointA.logs(logName)
val logB = endpointB.logs(logName)

val probeA1 = new TestProbe(nodeA.system)
val probeA2 = new TestProbe(nodeA.system)
val probeA3 = new TestProbe(nodeA.system)
val probeB = new TestProbe(nodeB.system)
val logIdA = endpointA.logId(logName)
val logIdB = endpointB.logId(logName)

val actorA1 = nodeA.system.actorOf(Props(new Collaborator("pa1", logA, sharedClockEntry = true, Set("e1", "e2", "e5"), probeA1.ref)))
val actorA2 = nodeA.system.actorOf(Props(new Collaborator("pa2", logA, sharedClockEntry = true, Set("e3", "e5", "e6"), probeA2.ref)))
val actorA3 = nodeA.system.actorOf(Props(new Collaborator("pa3", logA, sharedClockEntry = true, Set("e4"), probeA3.ref)))
val actorB = nodeB.system.actorOf(Props(new Collaborator("pb", logB, sharedClockEntry = true, Set("e1", "e6"), probeB.ref)))
val probeA1 = new TestProbe(locationA.system)
val probeA2 = new TestProbe(locationA.system)
val probeA3 = new TestProbe(locationA.system)
val probeB = new TestProbe(locationB.system)

val actorA1 = locationA.system.actorOf(Props(new Collaborator("pa1", logA, sharedClockEntry = true, Set("e1", "e2", "e5"), probeA1.ref)))
val actorA2 = locationA.system.actorOf(Props(new Collaborator("pa2", logA, sharedClockEntry = true, Set("e3", "e5", "e6"), probeA2.ref)))
val actorA3 = locationA.system.actorOf(Props(new Collaborator("pa3", logA, sharedClockEntry = true, Set("e4"), probeA3.ref)))
val actorB = locationB.system.actorOf(Props(new Collaborator("pb", logB, sharedClockEntry = true, Set("e1", "e6"), probeB.ref)))

def vectorTime(a: Long, b: Long) = (a, b) match {
case (0L, 0L) => VectorTime()
Expand Down Expand Up @@ -131,54 +114,46 @@ abstract class EventsourcedActorCausalitySpec extends WordSpec with Matchers wit
"located at the same location" can {
"track causality if enabled (sharedClockEntry = false)" in {
val logName = "L1"
val logNode = this.node("A", Set(logName), 2552, Set())
val log = logNode.logs(logName)
val logId = logNode.endpoint.logId(logName)

val locationA = location("A")
val endpointA = locationA.endpoint(Set(logName), Set())

val logA = endpointA.logs(logName)
val logIdA = endpointA.logId(logName)

val actorIdA = "PA"
val actorIdB = "PB"
val actorIdC = "PC"

val probeA = new TestProbe(logNode.system)
val probeB = new TestProbe(logNode.system)
val probeC = new TestProbe(logNode.system)
val probeA = new TestProbe(locationA.system)
val probeB = new TestProbe(locationA.system)
val probeC = new TestProbe(locationA.system)

val actorA = logNode.system.actorOf(Props(new Collaborator(actorIdA, log, sharedClockEntry = false, Set("e1", "e3"), probeA.ref)))
val actorB = logNode.system.actorOf(Props(new Collaborator(actorIdB, log, sharedClockEntry = false, Set("e2", "e3"), probeB.ref)))
val actorC = logNode.system.actorOf(Props(new Collaborator(actorIdC, log, sharedClockEntry = true, Set("e1", "e2", "e3"), probeC.ref)))
val actorA = locationA.system.actorOf(Props(new Collaborator(actorIdA, logA, sharedClockEntry = false, Set("e1", "e3"), probeA.ref)))
val actorB = locationA.system.actorOf(Props(new Collaborator(actorIdB, logA, sharedClockEntry = false, Set("e2", "e3"), probeB.ref)))
val actorC = locationA.system.actorOf(Props(new Collaborator(actorIdC, logA, sharedClockEntry = true, Set("e1", "e2", "e3"), probeC.ref)))

actorA ! "e1"
probeA.expectMsg(("e1", VectorTime(actorIdA -> 1L), VectorTime(actorIdA -> 1L)))
probeC.expectMsg(("e1", VectorTime(actorIdA -> 1L), VectorTime(actorIdA -> 1L, logId -> 1L)))
probeC.expectMsg(("e1", VectorTime(actorIdA -> 1L), VectorTime(actorIdA -> 1L, logIdA -> 1L)))

actorB ! "e2"
probeB.expectMsg(("e2", VectorTime(actorIdB -> 1L), VectorTime(actorIdB -> 1L)))
probeC.expectMsg(("e2", VectorTime(actorIdB -> 1L), VectorTime(actorIdA -> 1L, actorIdB -> 1L, logId -> 2L)))
probeC.expectMsg(("e2", VectorTime(actorIdB -> 1L), VectorTime(actorIdA -> 1L, actorIdB -> 1L, logIdA -> 2L)))

actorC ! "e3"
probeA.expectMsg(("e3", VectorTime(actorIdA -> 1L, actorIdB -> 1L, logId -> 3L), VectorTime(actorIdA -> 2L, actorIdB -> 1L, logId -> 3L)))
probeB.expectMsg(("e3", VectorTime(actorIdA -> 1L, actorIdB -> 1L, logId -> 3L), VectorTime(actorIdA -> 1L, actorIdB -> 2L, logId -> 3L)))
probeC.expectMsg(("e3", VectorTime(actorIdA -> 1L, actorIdB -> 1L, logId -> 3L), VectorTime(actorIdA -> 1L, actorIdB -> 1L, logId -> 3L)))
probeA.expectMsg(("e3", VectorTime(actorIdA -> 1L, actorIdB -> 1L, logIdA -> 3L), VectorTime(actorIdA -> 2L, actorIdB -> 1L, logIdA -> 3L)))
probeB.expectMsg(("e3", VectorTime(actorIdA -> 1L, actorIdB -> 1L, logIdA -> 3L), VectorTime(actorIdA -> 1L, actorIdB -> 2L, logIdA -> 3L)))
probeC.expectMsg(("e3", VectorTime(actorIdA -> 1L, actorIdB -> 1L, logIdA -> 3L), VectorTime(actorIdA -> 1L, actorIdB -> 1L, logIdA -> 3L)))
}
}
}
}

class EventsourcedActorCausalitySpecLeveldb extends EventsourcedActorCausalitySpec with EventLogCleanupLeveldb {
override val logFactory: String => Props = id => EventLogLifecycleLeveldb.TestEventLog.props(id, batching = true)
class EventsourcedActorCausalitySpecLeveldb extends EventsourcedActorCausalitySpec with MultiLocationSpecLeveldb {
override val logFactory: String => Props = id => SingleLocationSpecLeveldb.TestEventLog.props(id, batching = true)
}

class EventsourcedActorCausalitySpecCassandra extends EventsourcedActorCausalitySpec with EventLogCleanupCassandra {
override val logFactory: String => Props = id => EventLogLifecycleCassandra.TestEventLog.props(id, batching = true)

override def beforeAll(): Unit = {
super.beforeAll()
EmbeddedCassandraServerHelper.startEmbeddedCassandra(60000)
}

override def node(nodeName: String, logNames: Set[String], port: Int, connections: Set[ReplicationConnection]): ReplicationNode = {
val node = super.node(nodeName, logNames, port, connections)
Cassandra(node.system) // enforce keyspace/schema setup
node
}
class EventsourcedActorCausalitySpecCassandra extends EventsourcedActorCausalitySpec with MultiLocationSpecCassandra {
override val logFactory: String => Props = id => SingleLocationSpecCassandra.TestEventLog.props(id, batching = true)
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@ import scala.util._
import akka.actor._
import akka.testkit._

import com.rbmhtechnology.eventuate.log.EventLogLifecycleCassandra
import com.rbmhtechnology.eventuate.log.EventLogLifecycleLeveldb

import org.scalatest._

object EventsourcedActorIntegrationSpec {
Expand Down Expand Up @@ -234,12 +231,9 @@ object EventsourcedActorIntegrationSpec {
}
}

abstract class EventsourcedActorIntegrationSpec extends TestKit(ActorSystem("test")) with WordSpecLike with Matchers with BeforeAndAfterEach {
abstract class EventsourcedActorIntegrationSpec extends TestKit(ActorSystem("test")) with WordSpecLike with Matchers with SingleLocationSpec {
import EventsourcedActorIntegrationSpec._

def log: ActorRef
def logId: String

var probe: TestProbe = _

override def beforeEach(): Unit = {
Expand Down Expand Up @@ -430,10 +424,10 @@ abstract class EventsourcedActorIntegrationSpec extends TestKit(ActorSystem("tes
}
}

class EventsourcedActorIntegrationSpecLeveldb extends EventsourcedActorIntegrationSpec with EventLogLifecycleLeveldb {
class EventsourcedActorIntegrationSpecLeveldb extends EventsourcedActorIntegrationSpec with SingleLocationSpecLeveldb {
override def batching = false
}

class EventsourcedActorIntegrationSpecCassandra extends EventsourcedActorIntegrationSpec with EventLogLifecycleCassandra {
class EventsourcedActorIntegrationSpecCassandra extends EventsourcedActorIntegrationSpec with SingleLocationSpecCassandra {
override def batching = false
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ import scala.util._
import akka.actor._
import akka.testkit._

import com.rbmhtechnology.eventuate.log.EventLogLifecycleCassandra
import com.rbmhtechnology.eventuate.log.EventLogLifecycleLeveldb

import org.scalatest._

object EventsourcedActorThroughputSpec {
Expand Down Expand Up @@ -88,10 +85,9 @@ object EventsourcedActorThroughputSpec {
}
}

abstract class EventsourcedActorThroughputSpec extends TestKit(ActorSystem("test")) with WordSpecLike with Matchers with BeforeAndAfterEach {
abstract class EventsourcedActorThroughputSpec extends TestKit(ActorSystem("test")) with WordSpecLike with Matchers with SingleLocationSpec {
import EventsourcedActorThroughputSpec._

def log: ActorRef
var probe: TestProbe = _

override def beforeEach(): Unit = {
Expand Down Expand Up @@ -154,5 +150,5 @@ abstract class EventsourcedActorThroughputSpec extends TestKit(ActorSystem("test
}
}

class EventsourcedActorThroughputSpecLeveldb extends EventsourcedActorThroughputSpec with EventLogLifecycleLeveldb
class EventsourcedActorThroughputSpecCassandra extends EventsourcedActorThroughputSpec with EventLogLifecycleCassandra
class EventsourcedActorThroughputSpecLeveldb extends EventsourcedActorThroughputSpec with SingleLocationSpecLeveldb
class EventsourcedActorThroughputSpecCassandra extends EventsourcedActorThroughputSpec with SingleLocationSpecCassandra
Loading

0 comments on commit c95ed9c

Please sign in to comment.