Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Schedule messages #2

Merged
merged 22 commits into from
Aug 9, 2017
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
f397574
Add tests for scheduling messages (WIP)
robertotena Aug 2, 2017
b0d1a69
scheduling actor complete, just needs to be wired up.
Aug 2, 2017
b115607
e2e test.
Aug 3, 2017
b9a321b
Wired up application. TODO: implement publisher stream Flow and reade…
Aug 3, 2017
26c540e
Rename scheduler type alias.
Aug 3, 2017
fe9469d
Scheduling messages and deleting them from the input topic after they…
Aug 4, 2017
5f6bbcd
Introduced error handling in ScheduleReader. Just need to provide an …
Aug 6, 2017
b18e3ec
Fix splitToScheduleAndMetadata test.
Aug 7, 2017
b84b948
Logging in publisher stream.
Aug 7, 2017
3355089
Add some clarity around actor receive.
Aug 7, 2017
1f04e03
Fix random data generation error.
lacarvalho91 Aug 7, 2017
2958a7b
Fix memory leak from consecutive test runs.
robertotena Aug 7, 2017
a5e304c
WIP supervision metrics.
lacarvalho91 Aug 7, 2017
ab143f5
Transform schedules to scheduled messages before adding to the queue.
robertotena Aug 8, 2017
1063f0c
Address review comments.
lacarvalho91 Aug 8, 2017
a81702d
Replace EitherFanOut with PartitionWith.
lacarvalho91 Aug 8, 2017
33e3e35
Rename package.
lacarvalho91 Aug 8, 2017
229013a
Add custom metrics component
robertotena Aug 8, 2017
3b12464
Remove unused implicits from AppConfig, put some default config in re…
lacarvalho91 Aug 8, 2017
39063b9
Removed monitoring stuff as it has been agreed that it will be done s…
lacarvalho91 Aug 8, 2017
3a81593
wait for Init message in actor before handling other messages.
lacarvalho91 Aug 8, 2017
837e36c
Address review comments.
lacarvalho91 Aug 9, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion avro/src/main/scala/GenerateSchema.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Paths}

import com.sksamuel.avro4s._
import com.sky.kafka.message.scheduler.domain.Schedule
import com.sky.kms.domain.Schedule

object GenerateSchema extends App {

Expand Down
11 changes: 9 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,28 @@ val dependencies = Seq(
"com.typesafe.akka" %% "akka-stream" % akkaVersion,
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
"com.typesafe.akka" %% "akka-stream-kafka" % "0.16",
"com.typesafe.akka" %% "akka-stream-contrib" % "0.8",

"com.typesafe.scala-logging" %% "scala-logging" % "3.5.0",
"com.sksamuel.avro4s" %% "avro4s-core" % "1.7.0",
"org.typelevel" %% "cats" % "0.9.0",
"ch.qos.logback" % "logback-classic" % "1.2.3" % Runtime,
"com.github.pureconfig" %% "pureconfig" % "0.7.2",
"org.zalando" %% "grafter" % "2.0.1",

"io.kamon" %% "kamon-jmx" % kamonVersion,
"io.kamon" %% "kamon-akka-2.5" % kamonVersion,
"io.kamon" %% "kamon-core" % kamonVersion,

"org.scalatest" %% "scalatest" % "3.0.1" % Test,
"com.typesafe.akka" %% "akka-testkit" % akkaVersion % Test,
"com.typesafe.akka" %% "akka-stream-testkit" % akkaVersion % Test,
"net.cakesolutions" %% "scala-kafka-client-testkit" % kafkaVersion % Test,
"org.slf4j" % "log4j-over-slf4j" % "1.7.21" % Test,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we're a new module I see no reason not to use SLF4J 1.7.25

"com.danielasfregola" %% "random-data-generator" % "2.1" % Test
"com.danielasfregola" %% "random-data-generator" % "2.1" % Test,
"com.47deg" %% "scalacheck-toolbox-datetime"% "0.2.2" % Test,
"com.miguno.akka" %% "akka-mock-scheduler" % "0.5.1" % Test,
"org.mockito" % "mockito-all" % "1.10.19" % Test
)

val commonSettings = Seq(
Expand Down Expand Up @@ -73,7 +79,8 @@ lazy val scheduler = (project in file("scheduler"))
javaAgents += "org.aspectj" % "aspectjweaver" % "1.8.10",
javaOptions in Universal += jmxSettings,
buildInfoSettings,
dockerSettings
dockerSettings,
dependencyOverrides += "org.scalacheck" %% "scalacheck" % "1.13.5"
).enablePlugins(DockerPlugin)

val schema = inputKey[Unit]("Generate the Avro schema file for the Schedule schema.")
Expand Down
7 changes: 2 additions & 5 deletions scheduler/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@ scheduler {
schedule-topic = ${?SCHEDULE_TOPIC}
kafka-brokers = "localhost:9092"
kafka-brokers = ${?KAFKA_BROKERS}
shutdown-timeout {
stream = 10 seconds
system = 10 seconds
}
}

akka {
Expand All @@ -25,7 +21,8 @@ akka {
use-dispatcher = "akka.kafka.default-dispatcher"

kafka-clients {
enable.auto.commit = false
// until we use a commitableSource
enable.auto.commit = true

group.id = "com.sky.kafka.scheduler"
bootstrap.servers = ${scheduler.kafka-brokers}
Expand Down
7 changes: 7 additions & 0 deletions scheduler/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
scheduler {
shutdown-timeout {
stream = 10 seconds
system = 10 seconds
}
queue-buffer-size = 100
}

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

23 changes: 23 additions & 0 deletions scheduler/src/main/scala/com/sky/kms/AkkaComponents.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.sky.kms

import akka.actor.ActorSystem
import akka.stream.Supervision.Restart
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Supervision}
import com.typesafe.scalalogging.LazyLogging

trait AkkaComponents extends LazyLogging with Monitoring {

implicit val system = ActorSystem("kafka-message-scheduler")

val decider: Supervision.Decider = { t =>
recordException(t)
logger.error(s"Supervision failed.", t)
Restart
}

val settings = ActorMaterializerSettings(system)
.withSupervisionStrategy(decider)

implicit val materializer = ActorMaterializer(settings)

}
20 changes: 20 additions & 0 deletions scheduler/src/main/scala/com/sky/kms/Monitoring.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.sky.kms

import kamon.Kamon
import kamon.metric.MetricsModule

trait Monitoring {

val metrics: MetricsModule = Kamon.metrics

def increment(key: String) = metrics.counter(key).increment()

def recordException(throwable: Throwable) = {
val key = generateKeyFromException(throwable)
metrics.counter(key).increment()
}

private def generateKeyFromException(throwable: Throwable): String = {
return s"exception.${throwable.getClass.getName.replace(".", "_")}"
}
}
29 changes: 29 additions & 0 deletions scheduler/src/main/scala/com/sky/kms/SchedulerApp.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.sky.kms

import com.sky.kms.config.AppConfig
import com.sky.kms.streams.ScheduleReader
import com.typesafe.scalalogging.LazyLogging
import kamon.Kamon
import org.zalando.grafter._
import pureconfig._

import scala.concurrent.Await

object SchedulerApp extends App with LazyLogging with AkkaComponents {

val conf = loadConfigOrThrow[AppConfig]
Kamon.start()

logger.info("Kafka Message Scheduler starting up...")
val app = ScheduleReader.reader.run(conf)

sys.addShutdownHook {
logger.info("Kafka Message Scheduler shutting down...")
Rewriter.stop(app).value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Restoring @paoloambrosio-skyuk's comment:

All of this is a bit messy :trollface: Grafter stuff mixed with Akka, Kamon, ... all working in a different way. It would be good to create an abstraction so that we can close everything in the same way.

If we're not addressing that in this PR, perhaps create an issue or at least an issue comment to track, and link to it from here?

Copy link
Contributor Author

@lacarvalho91 lacarvalho91 Aug 9, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created #8


materializer.shutdown()
Await.result(system.terminate(), conf.scheduler.shutdownTimeout.system)

Kamon.shutdown()
}
}
Loading