diff --git a/.travis.yml b/.travis.yml index 80cdfe3..e648380 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,8 +1,9 @@ language: scala dist: trusty scala: - - 2.13.0 - - 2.12.8 + - 2.13.7 + - 2.12.15 + - 3.1.1-RC2 jdk: - oraclejdk8 sudo: false diff --git a/build.sbt b/build.sbt index b5528a6..c99a412 100644 --- a/build.sbt +++ b/build.sbt @@ -7,18 +7,38 @@ licenses := Seq( homepage := Some(new URL("https://github.com/NewMotion/akka-rabbitmq")) -scalaVersion := "2.13.6" +scalaVersion := "2.13.7" -crossScalaVersions := Seq("2.13.6", "2.12.12", "3.1.0") +crossScalaVersions := Seq("2.13.7", "2.12.15", "3.1.1-RC2") def akka(name: String): ModuleID = "com.typesafe.akka" %% s"akka-$name" % "2.6.+" libraryDependencies ++= Seq( - "com.rabbitmq" % "amqp-client" % "5.13.1", + "com.rabbitmq" % "amqp-client" % "5.14.0", akka("actor") % "provided", akka("testkit") % "test", "com.typesafe" % "config" % "1.4.1" % Test, - ("org.specs2" %% "specs2-mock" % "4.13.0" % Test).cross(CrossVersion.for3Use2_13) + ("org.specs2" %% "specs2-mock" % "4.13.1" % Test).cross(CrossVersion.for3Use2_13) ) +val scalaReleaseVersion = SettingKey[Int]("scalaReleaseVersion") +scalaReleaseVersion := { + val v = scalaVersion.value + CrossVersion.partialVersion(v).map(_._1.toInt).getOrElse { + throw new RuntimeException(s"could not get Scala release version from $v") + } +} + +Test / unmanagedSourceDirectories ++= { + if (scalaReleaseVersion.value > 2) { + Seq( + (LocalRootProject / baseDirectory).value / "src" / "test" / "scala-3" + ) + } else { + Seq( + (LocalRootProject / baseDirectory).value / "src" / "test" / s"scala-2" + ) + } +} + Format.settings diff --git a/project/build.properties b/project/build.properties index 10fd9ee..3161d21 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.5.5 +sbt.version=1.6.1 diff --git a/src/test/scala/com/newmotion/akka/rabbitmq/ChannelActorSpec.scala b/src/test/scala-2/com/newmotion/akka/rabbitmq/ChannelActorSpec.scala similarity index 99% rename from src/test/scala/com/newmotion/akka/rabbitmq/ChannelActorSpec.scala rename to src/test/scala-2/com/newmotion/akka/rabbitmq/ChannelActorSpec.scala index 22f0055..07ead3a 100644 --- a/src/test/scala/com/newmotion/akka/rabbitmq/ChannelActorSpec.scala +++ b/src/test/scala-2/com/newmotion/akka/rabbitmq/ChannelActorSpec.scala @@ -5,6 +5,7 @@ import akka.testkit.TestFSMRef import akka.actor.ActorRef import ChannelActor._ import com.rabbitmq.client.ShutdownSignalException + import collection.immutable.Queue import java.io.IOException import ConnectionActor.ProvideChannel diff --git a/src/test/scala/com/newmotion/akka/rabbitmq/ConnectionActorSpec.scala b/src/test/scala-2/com/newmotion/akka/rabbitmq/ConnectionActorSpec.scala similarity index 100% rename from src/test/scala/com/newmotion/akka/rabbitmq/ConnectionActorSpec.scala rename to src/test/scala-2/com/newmotion/akka/rabbitmq/ConnectionActorSpec.scala index 74efd93..a53413b 100644 --- a/src/test/scala/com/newmotion/akka/rabbitmq/ConnectionActorSpec.scala +++ b/src/test/scala-2/com/newmotion/akka/rabbitmq/ConnectionActorSpec.scala @@ -7,9 +7,9 @@ import ConnectionActor._ import com.rabbitmq.client.{ ShutdownListener, ShutdownSignalException } import org.mockito.InOrder +import java.io.IOException import scala.concurrent.duration._ import scala.collection.immutable.Iterable -import java.io.IOException /** * @author Yaroslav Klymko diff --git a/src/test/scala-3/com/newmotion/akka/rabbitmq/ChannelActorSpec.scala b/src/test/scala-3/com/newmotion/akka/rabbitmq/ChannelActorSpec.scala new file mode 100644 index 0000000..0b68e58 --- /dev/null +++ b/src/test/scala-3/com/newmotion/akka/rabbitmq/ChannelActorSpec.scala @@ -0,0 +1,151 @@ +package com.newmotion.akka.rabbitmq + +import org.specs2.mock.Mockito +import akka.testkit.TestFSMRef +import akka.actor.ActorRef +import ChannelActor._ +import com.rabbitmq.client.ShutdownSignalException + +import collection.immutable.Queue +import java.io.IOException +import ConnectionActor.ProvideChannel + +import scala.reflect.ClassTag + +/** + * @author Yaroslav Klymko + */ +class ChannelActorSpec extends ActorSpec with Mockito { + "ChannelActor" should { + "setup channel when channel received" in new TestScope { + actorRef ! channel + state mustEqual connected() + there was one(setupChannel).apply(channel, actorRef) + there was one(channel).addShutdownListener(actor) + } + "close channel if received unexpectedly" in new TestScope { + actorRef.setState(Connected, Connected(channel)) + val newChannel: Channel = mock[Channel] + actorRef ! newChannel + there was one(newChannel).close() + state mustEqual connected(channel) + } + "process message if has channel" in new TestScope { + actorRef.setState(Connected, Connected(channel)) + actorRef ! ChannelMessage(onChannel) + there was one(onChannel).apply(channel) + state mustEqual connected() + } + "process message if has channel, and when fails but channel is still open, drops the message and reconnects" in new TestScope { + actorRef.setState(Connected, Connected(channel)) + actorRef ! ChannelMessage(onChannelFailure, dropIfNoChannel = false) + state mustEqual disconnected() + expectMsg(ProvideChannel) + } + "process message if has channel, and when fails and channel is not open, retains the message for retry and reconnects" in new TestScope { + actorRef.setState(Connected, Connected(closedChannel)) + actorRef ! ChannelMessage(onChannelFailure, dropIfNoChannel = false) + state mustEqual disconnected(Retrying(3, onChannelFailure)) + expectMsg(ProvideChannel) + } + "process message if has channel, and when fails and channel is not open, retains the message with retry count decremented and reconnects" in new TestScope { + actorRef.setState(Connected, Connected(closedChannel)) + actorRef ! ChannelMessage(Retrying(2, onChannelFailure), dropIfNoChannel = false) + state mustEqual disconnected(Retrying(1, onChannelFailure)) + expectMsg(ProvideChannel) + } + "process message if has channel, and when fails and channel is not open and retry count is 0, drops the message and reconnects" in new TestScope { + actorRef.setState(Connected, Connected(closedChannel)) + actorRef ! ChannelMessage(Retrying(0, onChannelFailure), dropIfNoChannel = false) + state mustEqual disconnected() + expectMsg(ProvideChannel) + } + "process message if has channel, and when fails and channel is not open and dropIfNoChannel is true, drops the message and reconnects" in new TestScope { + actorRef.setState(Connected, Connected(closedChannel)) + actorRef ! ChannelMessage(onChannelFailure) + state mustEqual disconnected() + expectMsg(ProvideChannel) + } + "collect channel message if no channel" in new TestScope { + actorRef ! ChannelMessage(onChannel, dropIfNoChannel = false) + state mustEqual disconnected(onChannel) + } + "drop channel message if no channel and allowed to drop" in new TestScope { + actorRef ! ChannelMessage(onChannel) + state mustEqual disconnected() + } + "leave channel if told by parent" in new TestScope { + actorRef.setState(Connected, Connected(channel)) + actorRef ! ParentShutdownSignal + state mustEqual disconnected() + expectNoMessage() + } + "leave channel on ShutdownSignal" in new TestScope { + actorRef.setState(Connected, Connected(channel)) + actor.shutdownCompleted(channelShutdownSignal) + state mustEqual disconnected() + expectMsg(ProvideChannel) + } + "stay connected on connection-level ShutdownSignal (waiting for ParentShutdown from ConnectionActor)" in new TestScope { + actorRef.setState(Connected, Connected(channel)) + actor.shutdownCompleted(connectionShutdownSignal) + state mustEqual connected() + } + "process queued channel messages when channel received" in new TestScope { + actorRef.setState(Disconnected, InMemory(Queue(onChannel, onChannel))) + actorRef ! channel + there was two(onChannel).apply(channel) + state mustEqual connected() + } + "process queued channel messages when channel received and failed" in new TestScope { + val last: OnChannel = mock[OnChannel] + actorRef.setState(Disconnected, InMemory(Queue(onChannel, onChannelFailure, last))) + actorRef ! channel + there was one(onChannel).apply(channel) + state mustEqual disconnected(last) + } + "respond to GetState message" in new TestScope { + actorRef ! GetState + expectMsg(Disconnected) + actorRef.setState(Connected, Connected(channel)) + actorRef ! GetState + expectMsg(Connected) + } + "request channel on postRestart" in new TestScope { + actor.postRestart(new RuntimeException("")) + expectMsg(ProvideChannel) + } + } + + private abstract class TestScope extends ActorScope { + val setupChannel: (Channel, ActorRef) => Unit = mock[(Channel, ActorRef) => Unit](ClassTag(classOf[(Channel, ActorRef) => Unit])) + val onChannel: OnChannel = mock[OnChannel] + val channel: Channel = { + val channel = mock[Channel] + channel.isOpen returns true + channel + } + val closedChannel: Channel = { + val channel = mock[Channel] + channel.isOpen returns false + channel + } + val channelShutdownSignal: ShutdownSignalException = mock[ShutdownSignalException] + channelShutdownSignal.getReference returns channel + + val connectionShutdownSignal: ShutdownSignalException = mock[ShutdownSignalException] + connectionShutdownSignal.getReference returns mock[Connection] + + val actorRef: TestFSMRef[State, Data, TestChannelActor] = TestFSMRef(new TestChannelActor) + + def actor: ChannelActor = actorRef.underlyingActor.asInstanceOf[ChannelActor] + def state: (State, Data) = actorRef.stateName -> actorRef.stateData + def disconnected(xs: OnChannel*): (ChannelActor.Disconnected.type, InMemory) = Disconnected -> InMemory(Queue(xs: _*)) + def connected(x: Channel = channel): (ChannelActor.Connected.type, Connected) = Connected -> Connected(x) + val onChannelFailure: Channel => Any = { _ => throw new IOException() } + + class TestChannelActor extends ChannelActor(setupChannel) { + override def connectionActor: ActorRef = testActor + } + } +} diff --git a/src/test/scala-3/com/newmotion/akka/rabbitmq/ConnectionActorSpec.scala b/src/test/scala-3/com/newmotion/akka/rabbitmq/ConnectionActorSpec.scala new file mode 100644 index 0000000..5a50203 --- /dev/null +++ b/src/test/scala-3/com/newmotion/akka/rabbitmq/ConnectionActorSpec.scala @@ -0,0 +1,221 @@ +package com.newmotion.akka.rabbitmq + +import org.specs2.mock.Mockito +import akka.testkit.TestFSMRef +import akka.actor.{ ActorRef, Props } +import ConnectionActor._ +import com.rabbitmq.client.{ ShutdownListener, ShutdownSignalException } +import org.mockito.InOrder + +import java.io.IOException +import scala.concurrent.duration._ +import scala.collection.immutable.Iterable +import scala.reflect.ClassTag + +/** + * @author Yaroslav Klymko + */ +class ConnectionActorSpec extends ActorSpec with Mockito { + + "ConnectionActor" should { + + "try to connect on startup" in new TestScope { + actorRef ! Connect + state mustEqual connectedAfterRecovery + val order: Option[InOrder] = inOrder(factory, recoveredConnection, setup) + there was one(factory).newConnection() + there was one(recoveredConnection).addShutdownListener(any[ShutdownListener]) + there was one(setup).apply(recoveredConnection, actorRef) + } + + "not reconnect if has connection" in new TestScope { + actorRef.setState(Connected, Connected(initialConnection)) + actorRef ! Connect + state mustEqual connectedInitially + } + + "try to reconnect if failed to connect" in new TestScope { + factory.newConnection throws new IOException + actorRef ! Connect + state mustEqual disconnected + } + + "try to reconnect if can't create new channel" in new TestScope { + initialConnection.createChannel() throws new IOException + recoveredConnection.createChannel() throws new IOException + actorRef.setState(Connected, Connected(initialConnection)) + actorRef ! create + state mustEqual disconnected + } + + "attempt to connect on Connect message" in new TestScope { + factory.newConnection throws new IOException thenReturns recoveredConnection + actorRef ! Connect + state mustEqual disconnected + actorRef ! Connect + state mustEqual connectedAfterRecovery + } + + "reconnect on ShutdownSignalException from server" in new TestScope { + actorRef.setState(Connected, Connected(initialConnection)) + actor.shutdownCompleted(shutdownSignal()) + eventually(state mustEqual connectedAfterRecovery) + } + + "keep trying to reconnect on ShutdownSignalException from server" in new TestScope { + actorRef.setState(Connected, Connected(initialConnection)) + factory.newConnection throws new IOException thenThrow new IOException thenReturns recoveredConnection + actor.shutdownCompleted(shutdownSignal()) + state mustEqual disconnected + } + + "not reconnect on ShutdownSignalException for different connection" in new TestScope { + actorRef.setState(Connected, Connected(initialConnection)) + actor.shutdownCompleted(shutdownSignal(reference = mock[Connection])) + there was no(factory).newConnection() + } + + "create children actor with channel" in new TestScope { + actorRef.setState(Connected, Connected(initialConnection)) + actorRef ! create + expectMsg(channel) + expectMsg(ChannelCreated(testActor)) + } + + "create children actor without channel if failed to create new channel" in new TestScope { + initialConnection.createChannel() throws new IOException + recoveredConnection.createChannel() throws new IOException + actorRef.setState(Connected, Connected(initialConnection)) + actorRef ! create + expectMsg(ChannelCreated(testActor)) + expectMsg(ParentShutdownSignal) + state mustEqual disconnected + } + + "create children actor without channel" in new TestScope { + actorRef ! create + expectMsg(ChannelCreated(testActor)) + } + + "notify children if connection lost" in new TestScope { + actorRef.setState(Connected, Connected(initialConnection)) + actor.shutdownCompleted(shutdownSignal()) + expectMsg(ParentShutdownSignal) + } + + "notify children when connection established" in new TestScope { + actorRef ! Connect + expectMsg(channel) + } + + "close connection on shutdown" in new TestScope { + actorRef.setState(Connected, Connected(initialConnection)) + actorRef.stop() + there was one(initialConnection).close() + } + + "not become Disconnected when getting an AmqpShutdownSignal because of its own reconnection procedure" in new TestScope { + // connection actor starts out connected + actorRef.setState(Connected, Connected(initialConnection)) + + // give it a channel actor + actorRef ! create + + // so the connection actor will send a channel to the newly created channel actor, and a ChannelCreated to the + // sender of CreateChannel. Both the recipients are testActor here. + expectMsg(channel) + expectMsg(ChannelCreated(testActor)) + + // tell the actor the connection went away + actor.shutdownCompleted(shutdownSignal()) + + // give connection actor the time to close and reconnect + expectMsg(ParentShutdownSignal) + there was one(initialConnection).close() + expectMsg(channel) + + // now because of this close, RabbitMQ may tell the actor that the previous connection was shut down by the app + actor.shutdownCompleted(shutdownSignal(initiatedByApplication = true, reference = initialConnection)) + + // now, let's see if the ConnectionActor stays responsive + actorRef ! ProvideChannel + expectMsg(channel) + } + + "respond to GetState message" in new TestScope { + actorRef ! GetState + expectMsg(Disconnected) + actorRef.setState(Connected, Connected(initialConnection)) + actorRef ! GetState + expectMsg(Connected) + } + + "create only one channel when reconnecting" in new TestScopeBase { + override val reconnectionDelay: FiniteDuration = FiniteDuration(0, SECONDS) + + val setupChannel: (Channel, ActorRef) => Unit = mock[(Channel, ActorRef) => Unit](ClassTag(classOf[(Channel, ActorRef) => Unit])) + val createChannel: CreateChannel = CreateChannel(ChannelActor.props(setupChannel)) + + class TestConnectionActor extends ConnectionActor(factory, reconnectionDelay, setup) { + override def preStart(): Unit = {} + } + + val connectionActorRef: TestFSMRef[State, Data, TestConnectionActor] = TestFSMRef(new TestConnectionActor) + + connectionActorRef.setState(Connected, Connected(initialConnection)) + connectionActorRef ! createChannel + expectMsgType[ChannelCreated] + there was one(initialConnection).createChannel + + connectionActorRef ! AmqpShutdownSignal(shutdownSignal()) + (1 to 10).map(_ => there was atMostOne(recoveredConnection).createChannel) + } + } + + private abstract class TestScope extends TestScopeBase { + class TestConnectionActor extends ConnectionActor(factory, reconnectionDelay, setup) { + override def children: Iterable[ActorRef] = Iterable(testActor) + override def newChild(props: Props, name: Option[String]): ActorRef = testActor + override def preStart(): Unit = {} + } + + val actorRef: TestFSMRef[State, Data, TestConnectionActor] = TestFSMRef(new TestConnectionActor) + + def actor: ConnectionActor = actorRef.underlyingActor.asInstanceOf[ConnectionActor] + def state: (State, Data) = actorRef.stateName -> actorRef.stateData + } + + private abstract class TestScopeBase extends ActorScope { + val channel: Channel = mock[Channel] + + def createMockConnection(): Connection = { + val connection = mock[Connection] + connection.isOpen returns true + connection.createChannel() returns channel + connection + } + + val initialConnection: Connection = createMockConnection() + val recoveredConnection: Connection = createMockConnection() + + val factory: ConnectionFactory = { + val factory = mock[ConnectionFactory] + factory.newConnection() returns recoveredConnection + factory + } + val create: CreateChannel = CreateChannel(null) + val reconnectionDelay: FiniteDuration = FiniteDuration(1, SECONDS) + val setup: (Connection, ActorRef) => Any = mock[(Connection, ActorRef) => Any](ClassTag(classOf[(Connection, ActorRef) => Any])) + + def disconnected: (ConnectionActor.Disconnected.type, ConnectionActor.NoConnection.type) = Disconnected -> NoConnection + def connectedInitially: (ConnectionActor.Connected.type, Connected) = Connected -> Connected(initialConnection) + def connectedAfterRecovery: (ConnectionActor.Connected.type, Connected) = Connected -> Connected(recoveredConnection) + + def shutdownSignal(initiatedByApplication: Boolean = false, reference: AnyRef = initialConnection): ShutdownSignalException = { + val shutdownSignal = mock[ShutdownSignalException] + shutdownSignal.isInitiatedByApplication returns initiatedByApplication + shutdownSignal.getReference returns reference + shutdownSignal + } + } +}