Skip to content

Commit

Permalink
Address review comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
lacarvalho91 committed Aug 9, 2017
1 parent 3a81593 commit 837e36c
Show file tree
Hide file tree
Showing 6 changed files with 13 additions and 12 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,6 @@ JMX metrics are exposed using Kamon. Port 9186 has to be exposed to obtain them.

The `schedule-topic` must be configured to use [log compaction](https://kafka.apache.org/documentation/#compaction).
This is to allow the scheduler to delete the schedule after it has been written to its destination topic. This is very
important because the scheduler uses the `schedule-topic` to reconstruct its state.

important because the scheduler uses the `schedule-topic` to reconstruct its state. This application will also support
longer-term schedules so log compaction is required to ensure they are not prematurely removed from Kafka allowing the
application to recover them after a restart.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ val dependencies = Seq(
"com.typesafe.akka" %% "akka-testkit" % akkaVersion % Test,
"com.typesafe.akka" %% "akka-stream-testkit" % akkaVersion % Test,
"net.cakesolutions" %% "scala-kafka-client-testkit" % kafkaVersion % Test,
"org.slf4j" % "log4j-over-slf4j" % "1.7.21" % Test,
"org.slf4j" % "log4j-over-slf4j" % "1.7.25" % Test,
"com.danielasfregola" %% "random-data-generator" % "2.1" % Test,
"com.47deg" %% "scalacheck-toolbox-datetime"% "0.2.2" % Test,
"com.miguno.akka" %% "akka-mock-scheduler" % "0.5.1" % Test,
Expand Down
4 changes: 2 additions & 2 deletions scheduler/src/main/scala/com/sky/kms/SchedulingActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import com.sky.kms.streams.ScheduledMessagePublisher
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.FiniteDuration

class SchedulingActor(queue: SourceQueue[(String, ScheduledMessage)], scheduler: Scheduler) extends Actor with ActorLogging {
class SchedulingActor(queue: SourceQueue[(String, ScheduledMessage)], akkaScheduler: Scheduler) extends Actor with ActorLogging {

override def receive: Receive = waitForInit

Expand All @@ -35,7 +35,7 @@ class SchedulingActor(queue: SourceQueue[(String, ScheduledMessage)], scheduler:
log.info(s"Updating schedule $scheduleId")
else
log.info(s"Creating schedule $scheduleId")
val cancellable = scheduler.scheduleOnce(timeFromNow(schedule.time))(self ! Trigger(scheduleId, schedule))
val cancellable = akkaScheduler.scheduleOnce(timeFromNow(schedule.time))(self ! Trigger(scheduleId, schedule))
schedules + (scheduleId -> cancellable)

case Cancel(scheduleId: String) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ object PublishableMessage {
implicit val scheduleDeletionProducerRecordEnc: ProducerRecordEncoder[ScheduleDeletion] =
ProducerRecordEncoder.instance(deletion => new ProducerRecord(deletion.scheduleTopic, deletion.scheduleId.getBytes, null))

implicit def scheduleDataToProducerRecord(msg: PublishableMessage): ProducerRecord[Array[Byte], Array[Byte]] =
implicit def publishableMessageToProducerRecord(msg: PublishableMessage): ProducerRecord[Array[Byte], Array[Byte]] =
msg match {
case scheduledMsg: ScheduledMessage => scheduledMessageProducerRecordEnc(scheduledMsg)
case deletion: ScheduleDeletion => scheduleDeletionProducerRecordEnc(deletion)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ import scala.concurrent.Future

/**
* Provides a stream that consumes from the queue of triggered messages,
* writes the messages to Kafka and then deletes the schedules from Kafka
* writes the scheduled messages to the specified Kafka topics and then deletes the schedules
* from the scheduling Kafka topic to mark completion
*/
case class ScheduledMessagePublisher(config: SchedulerConfig, publisherSink: Sink[In, Mat])
(implicit system: ActorSystem, materializer: ActorMaterializer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@ object EmbeddedKafka {

val bootstrapServer = s"localhost:${kafkaServer.kafkaPort}"

/** The consume method provided by [[cakesolutions.kafka.testkit.KafkaServer]] doesn't provide a way of
* extracting a consumer record, we have added this so we can access the timestamp from a consumer record.
*
* This should be contributed to the library.
*
/**
* The consume method provided by [[cakesolutions.kafka.testkit.KafkaServer]] doesn't provide a way of
* extracting a consumer record, we have added this so we can access the timestamp from a consumer record. In the
* future we should contribute this to the library as it provides better flexibility.
*/
implicit class KafkaServerOps(val kafkaServer: KafkaServer) extends AnyVal {

Expand Down

0 comments on commit 837e36c

Please sign in to comment.