Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scala 3 build #68

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
language: scala
dist: trusty
scala:
- 2.13.0
- 2.12.8
- 2.13.7
- 2.12.15
- 3.1.1-RC2
jdk:
- oraclejdk8
sudo: false
Expand Down
28 changes: 24 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,38 @@ licenses := Seq(

homepage := Some(new URL("https://github.com/NewMotion/akka-rabbitmq"))

scalaVersion := "2.13.6"
scalaVersion := "2.13.7"

crossScalaVersions := Seq("2.13.6", "2.12.12", "3.1.0")
crossScalaVersions := Seq("2.13.7", "2.12.15", "3.1.1-RC2")

def akka(name: String): ModuleID = "com.typesafe.akka" %% s"akka-$name" % "2.6.+"

libraryDependencies ++= Seq(
"com.rabbitmq" % "amqp-client" % "5.13.1",
"com.rabbitmq" % "amqp-client" % "5.14.0",
akka("actor") % "provided",
akka("testkit") % "test",
"com.typesafe" % "config" % "1.4.1" % Test,
("org.specs2" %% "specs2-mock" % "4.13.0" % Test).cross(CrossVersion.for3Use2_13)
("org.specs2" %% "specs2-mock" % "4.13.1" % Test).cross(CrossVersion.for3Use2_13)
)

val scalaReleaseVersion = SettingKey[Int]("scalaReleaseVersion")
scalaReleaseVersion := {
val v = scalaVersion.value
CrossVersion.partialVersion(v).map(_._1.toInt).getOrElse {
throw new RuntimeException(s"could not get Scala release version from $v")
}
}

Test / unmanagedSourceDirectories ++= {
if (scalaReleaseVersion.value > 2) {
Seq(
(LocalRootProject / baseDirectory).value / "src" / "test" / "scala-3"
)
} else {
Seq(
(LocalRootProject / baseDirectory).value / "src" / "test" / s"scala-2"
)
}
}

Format.settings
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.5.5
sbt.version=1.6.1
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import akka.testkit.TestFSMRef
import akka.actor.ActorRef
import ChannelActor._
import com.rabbitmq.client.ShutdownSignalException

import collection.immutable.Queue
import java.io.IOException
import ConnectionActor.ProvideChannel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import ConnectionActor._
import com.rabbitmq.client.{ ShutdownListener, ShutdownSignalException }
import org.mockito.InOrder

import java.io.IOException
import scala.concurrent.duration._
import scala.collection.immutable.Iterable
import java.io.IOException

/**
* @author Yaroslav Klymko
Expand Down
151 changes: 151 additions & 0 deletions src/test/scala-3/com/newmotion/akka/rabbitmq/ChannelActorSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package com.newmotion.akka.rabbitmq

import org.specs2.mock.Mockito
import akka.testkit.TestFSMRef
import akka.actor.ActorRef
import ChannelActor._
import com.rabbitmq.client.ShutdownSignalException

import collection.immutable.Queue
import java.io.IOException
import ConnectionActor.ProvideChannel

import scala.reflect.ClassTag

/**
* @author Yaroslav Klymko
*/
class ChannelActorSpec extends ActorSpec with Mockito {
"ChannelActor" should {
"setup channel when channel received" in new TestScope {
actorRef ! channel
state mustEqual connected()
there was one(setupChannel).apply(channel, actorRef)
there was one(channel).addShutdownListener(actor)
}
"close channel if received unexpectedly" in new TestScope {
actorRef.setState(Connected, Connected(channel))
val newChannel: Channel = mock[Channel]
actorRef ! newChannel
there was one(newChannel).close()
state mustEqual connected(channel)
}
"process message if has channel" in new TestScope {
actorRef.setState(Connected, Connected(channel))
actorRef ! ChannelMessage(onChannel)
there was one(onChannel).apply(channel)
state mustEqual connected()
}
"process message if has channel, and when fails but channel is still open, drops the message and reconnects" in new TestScope {
actorRef.setState(Connected, Connected(channel))
actorRef ! ChannelMessage(onChannelFailure, dropIfNoChannel = false)
state mustEqual disconnected()
expectMsg(ProvideChannel)
}
"process message if has channel, and when fails and channel is not open, retains the message for retry and reconnects" in new TestScope {
actorRef.setState(Connected, Connected(closedChannel))
actorRef ! ChannelMessage(onChannelFailure, dropIfNoChannel = false)
state mustEqual disconnected(Retrying(3, onChannelFailure))
expectMsg(ProvideChannel)
}
"process message if has channel, and when fails and channel is not open, retains the message with retry count decremented and reconnects" in new TestScope {
actorRef.setState(Connected, Connected(closedChannel))
actorRef ! ChannelMessage(Retrying(2, onChannelFailure), dropIfNoChannel = false)
state mustEqual disconnected(Retrying(1, onChannelFailure))
expectMsg(ProvideChannel)
}
"process message if has channel, and when fails and channel is not open and retry count is 0, drops the message and reconnects" in new TestScope {
actorRef.setState(Connected, Connected(closedChannel))
actorRef ! ChannelMessage(Retrying(0, onChannelFailure), dropIfNoChannel = false)
state mustEqual disconnected()
expectMsg(ProvideChannel)
}
"process message if has channel, and when fails and channel is not open and dropIfNoChannel is true, drops the message and reconnects" in new TestScope {
actorRef.setState(Connected, Connected(closedChannel))
actorRef ! ChannelMessage(onChannelFailure)
state mustEqual disconnected()
expectMsg(ProvideChannel)
}
"collect channel message if no channel" in new TestScope {
actorRef ! ChannelMessage(onChannel, dropIfNoChannel = false)
state mustEqual disconnected(onChannel)
}
"drop channel message if no channel and allowed to drop" in new TestScope {
actorRef ! ChannelMessage(onChannel)
state mustEqual disconnected()
}
"leave channel if told by parent" in new TestScope {
actorRef.setState(Connected, Connected(channel))
actorRef ! ParentShutdownSignal
state mustEqual disconnected()
expectNoMessage()
}
"leave channel on ShutdownSignal" in new TestScope {
actorRef.setState(Connected, Connected(channel))
actor.shutdownCompleted(channelShutdownSignal)
state mustEqual disconnected()
expectMsg(ProvideChannel)
}
"stay connected on connection-level ShutdownSignal (waiting for ParentShutdown from ConnectionActor)" in new TestScope {
actorRef.setState(Connected, Connected(channel))
actor.shutdownCompleted(connectionShutdownSignal)
state mustEqual connected()
}
"process queued channel messages when channel received" in new TestScope {
actorRef.setState(Disconnected, InMemory(Queue(onChannel, onChannel)))
actorRef ! channel
there was two(onChannel).apply(channel)
state mustEqual connected()
}
"process queued channel messages when channel received and failed" in new TestScope {
val last: OnChannel = mock[OnChannel]
actorRef.setState(Disconnected, InMemory(Queue(onChannel, onChannelFailure, last)))
actorRef ! channel
there was one(onChannel).apply(channel)
state mustEqual disconnected(last)
}
"respond to GetState message" in new TestScope {
actorRef ! GetState
expectMsg(Disconnected)
actorRef.setState(Connected, Connected(channel))
actorRef ! GetState
expectMsg(Connected)
}
"request channel on postRestart" in new TestScope {
actor.postRestart(new RuntimeException(""))
expectMsg(ProvideChannel)
}
}

private abstract class TestScope extends ActorScope {
val setupChannel: (Channel, ActorRef) => Unit = mock[(Channel, ActorRef) => Unit](ClassTag(classOf[(Channel, ActorRef) => Unit]))
val onChannel: OnChannel = mock[OnChannel]
val channel: Channel = {
val channel = mock[Channel]
channel.isOpen returns true
channel
}
val closedChannel: Channel = {
val channel = mock[Channel]
channel.isOpen returns false
channel
}
val channelShutdownSignal: ShutdownSignalException = mock[ShutdownSignalException]
channelShutdownSignal.getReference returns channel

val connectionShutdownSignal: ShutdownSignalException = mock[ShutdownSignalException]
connectionShutdownSignal.getReference returns mock[Connection]

val actorRef: TestFSMRef[State, Data, TestChannelActor] = TestFSMRef(new TestChannelActor)

def actor: ChannelActor = actorRef.underlyingActor.asInstanceOf[ChannelActor]
def state: (State, Data) = actorRef.stateName -> actorRef.stateData
def disconnected(xs: OnChannel*): (ChannelActor.Disconnected.type, InMemory) = Disconnected -> InMemory(Queue(xs: _*))
def connected(x: Channel = channel): (ChannelActor.Connected.type, Connected) = Connected -> Connected(x)
val onChannelFailure: Channel => Any = { _ => throw new IOException() }

class TestChannelActor extends ChannelActor(setupChannel) {
override def connectionActor: ActorRef = testActor
}
}
}
Loading