Skip to content

Commit

Permalink
wait for Init message in actor before handling other messages.
Browse files Browse the repository at this point in the history
  • Loading branch information
lacarvalho91 committed Aug 8, 2017
1 parent 39063b9 commit 572c072
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 23 deletions.
30 changes: 17 additions & 13 deletions scheduler/src/main/scala/com/sky/kms/SchedulingActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,19 @@ import com.sky.kms.streams.ScheduledMessagePublisher
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.FiniteDuration

class SchedulingActor(sourceQueue: SourceQueue[(String, ScheduledMessage)], scheduler: Scheduler) extends Actor with ActorLogging {
class SchedulingActor(queue: SourceQueue[(String, ScheduledMessage)], scheduler: Scheduler) extends Actor with ActorLogging {

override def receive: Receive = receiveScheduleMessages(Map.empty)
override def receive: Receive = waitForInit

def receiveScheduleMessages(schedules: Map[ScheduleId, Cancellable]): Receive = {
private val waitForInit: Receive = {
case Init =>
context.become(receiveWithSchedules(Map.empty))
sender ! Ack
}

def receiveWithSchedules(schedules: Map[ScheduleId, Cancellable]): Receive = {

val receiveSchedulingMessage: PartialFunction[Any, Map[ScheduleId, Cancellable]] = {
val handleSchedulingMessage: PartialFunction[Any, Map[ScheduleId, Cancellable]] = {
case CreateOrUpdate(scheduleId: ScheduleId, schedule: Schedule) =>
if (cancel(scheduleId, schedules))
log.info(s"Updating schedule $scheduleId")
Expand All @@ -40,19 +46,17 @@ class SchedulingActor(sourceQueue: SourceQueue[(String, ScheduledMessage)], sche
schedules - scheduleId
}

val receiveTriggerMessage: PartialFunction[Any, Unit] = {
case Trigger(scheduleId, schedule) =>
log.info(s"$scheduleId is due. Adding schedule to queue. Scheduled time was ${schedule.time}")
sourceQueue.offer((scheduleId, messageFrom(schedule)))
}
(handleSchedulingMessage andThen updateStateAndAck) orElse handleTrigger
}

(receiveSchedulingMessage andThen updateStateAndAck) orElse receiveTriggerMessage orElse {
case Init => sender ! Ack
}
private val handleTrigger: Receive = {
case Trigger(scheduleId, schedule) =>
log.info(s"$scheduleId is due. Adding schedule to queue. Scheduled time was ${schedule.time}")
queue.offer((scheduleId, messageFrom(schedule)))
}

private def updateStateAndAck(schedules: Map[ScheduleId, Cancellable]): Unit = {
context.become(receiveScheduleMessages(schedules))
context.become(receiveWithSchedules(schedules))
sender ! Ack
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ trait ConsumerRecordDecoder[T] {

object ConsumerRecordDecoder {
def instance[T](f: ConsumerRecord[String, Array[Byte]] => T) = new ConsumerRecordDecoder[T] {
final def apply(cr: ConsumerRecord[String, Array[Byte]]): T =
f(cr)
final def apply(cr: ConsumerRecord[String, Array[Byte]]): T = f(cr)
}
}
34 changes: 26 additions & 8 deletions scheduler/src/test/scala/com/sky/kms/SchedulingActorSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,24 @@ package com.sky.kms

import java.util.UUID

import akka.actor.ActorRef
import akka.event.LoggingAdapter
import akka.stream.scaladsl.SourceQueue
import akka.testkit.{ImplicitSender, TestActorRef}
import com.miguno.akka.testing.VirtualTime
import com.sky.kms.SchedulingActor._
import com.sky.kms.common.AkkaBaseSpec
import com.sky.kms.common.TestDataUtils._
import com.sky.kms.domain.PublishableMessage.ScheduledMessage
import com.sky.kms.domain._
import common.AkkaBaseSpec
import common.TestDataUtils._
import org.mockito.Mockito._
import org.scalatest.mockito.MockitoSugar

class SchedulingActorSpec extends AkkaBaseSpec with ImplicitSender with MockitoSugar {

val mockLogger = mock[LoggingAdapter]
val mockSourceQueue = mock[SourceQueue[(ScheduleId, ScheduledMessage)]]

"A scheduling actor" must {
"schedule new messages at the given time" in new SchedulingActorTest {
val (scheduleId, schedule) = generateSchedule()
Expand Down Expand Up @@ -60,8 +64,17 @@ class SchedulingActorSpec extends AkkaBaseSpec with ImplicitSender with MockitoS
verify(mockSourceQueue).offer((scheduleId, updatedSchedule.toScheduledMessage))
}

"does nothing when an Init message is received" in new SchedulingActorTest {
actorRef ! Init
"accept scheduling messages only after it has received an Init" in {

val actorRef = TestActorRef(new SchedulingActor(mockSourceQueue, system.scheduler))
val (scheduleId, schedule) = generateSchedule()

actorRef ! CreateOrUpdate(scheduleId, schedule)
expectNoMsg()

init(actorRef)

actorRef ! CreateOrUpdate(scheduleId, schedule)
expectMsg(Ack)
}

Expand All @@ -70,16 +83,13 @@ class SchedulingActorSpec extends AkkaBaseSpec with ImplicitSender with MockitoS
private class SchedulingActorTest {
val now = System.currentTimeMillis()

val mockLogger = mock[LoggingAdapter]
val mockSourceQueue = mock[SourceQueue[(ScheduleId, ScheduledMessage)]]
val time = new VirtualTime

val actorRef = TestActorRef(new SchedulingActor(mockSourceQueue, time.scheduler) {
override def log: LoggingAdapter = mockLogger
})

def generateSchedule(): (ScheduleId, Schedule) =
(UUID.randomUUID().toString, random[Schedule])
init(actorRef)

def advanceToTimeFrom(schedule: Schedule, startTime: Long = now): Unit =
time.advance(schedule.timeInMillis - startTime)
Expand All @@ -95,4 +105,12 @@ class SchedulingActorSpec extends AkkaBaseSpec with ImplicitSender with MockitoS
}
}

private def generateSchedule(): (ScheduleId, Schedule) =
(UUID.randomUUID().toString, random[Schedule])

private def init(actorRef: ActorRef) = {
actorRef ! Init
expectMsg(Ack)
}

}

0 comments on commit 572c072

Please sign in to comment.