-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Schedule messages #2
Conversation
…r stream error handling. Metrics?
… have been emitted. Tests passing. TODO: error handling in ScheduleReader stream.
…implicit Sink[ErrorType, _].
|
||
object SchedulerApp extends App with AkkaComponents with LazyLogging { | ||
object SchedulerApp extends App with LazyLogging with AkkaComponents { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For my own understanding - is there any logical reason why the order of these is swapped? I had no awareness that this made any difference to the compiler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe there are circumstances where it would make a difference, but in this case - no.
Spent some time trying to work out how to properly inject the implicits from AkkaComponents
so it would've moved during that experimentation.
|
||
implicit val system = ActorSystem("kafka-message-scheduler") | ||
implicit val materializer = ActorMaterializer() | ||
|
||
val decider: Supervision.Decider = { t => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this stuff and the tests for it are a WIP
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partial review
|
||
sys.addShutdownHook { | ||
logger.info("Kafka Message Scheduler shutting down...") | ||
Await.ready(runningStream.shutdown(), conf.shutdownTimeout.stream) | ||
Await.ready(system.terminate(), conf.shutdownTimeout.system) | ||
Rewriter.stop(app).value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All of this is a bit messy Grafter stuff mixed with Akka, Kamon, ... all working in a different way. It would be good to create an abstraction so that we can close everything in the same way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seconded, it's pretty hard to follow everything at the moment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed, will take some time to come up with that i would have thought. for this PR or another?
log.info(s"Updating schedule $scheduleId") | ||
else | ||
log.info(s"Creating schedule $scheduleId") | ||
val cancellable = scheduler.scheduleOnce(timeFromNow(schedule.time)){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I usually prefer to schedule a message to the actor itself instead of executing a function, as it makes it easier to handle concurrency properly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good shout
@@ -22,4 +28,8 @@ object ApplicationError { | |||
case schemaError: InvalidSchemaError => invalidSchemaErrorShow.show(schemaError) | |||
case messageFormatError: AvroMessageFormatError => avroMessageFormatErrorShow.show(messageFormatError) | |||
} | |||
|
|||
//TODO: is this enough for now? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use a Reader? Happy to postpone it to later. It should really be created from the raw configuration (not application config). We should later create a project with all our common code (a la akka-stream-contrib
).
|
||
case class Schedule(time: OffsetDateTime, topic: String, key: Array[Byte], value: Array[Byte]) extends ScheduleData | ||
|
||
case class ScheduleMetadata(scheduleId: ScheduleId, topic: String) extends ScheduleData |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something like ScheduledMessage(topic: String, key: Array[Byte], value: Array[Byte])
and ScheduleDeletion(scheduleId: ScheduleId)
might be more appropriate (note the topic missing from ScheduleDeletion
as the ...ToProducerRecord
should take it from config.
|
||
package object scheduler extends LazyLogging { | ||
|
||
case class AppConfig(scheduler: SchedulerConfig) | ||
type DecodeResult = Either[ApplicationError, (ScheduleId, Option[Schedule])] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure DecodeResult
is the right abstraction. It does not communicate what it is. With that name I would have expected type DecodeResult[T] = Either[DecodingError, T]
where DecodingError inherits from ApplicationError
... but that might not deserve being a separate type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will remove type alias
def reader(implicit system: ActorSystem, materializer: ActorMaterializer): Reader[AppConfig, ScheduleReader] = | ||
for { | ||
config <- SchedulerConfig.reader | ||
schedulingActorRef <- SchedulingActor.reader |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do I take it this is all about using Reader
for dependency injection, as per http://eed3si9n.com/herding-cats/Reader.html?
import akka.stream.stage.{GraphStage, InHandler, OutHandler} | ||
|
||
//Taken from: https://stackoverflow.com/a/38445121/8424807 | ||
class EitherFanOut[L, R] extends GraphStage[FanOutShape2[Either[L, R], L, R]] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't PartitionWith basically what this is?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looked at that but maybe I misunderstood it - will take another look.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it works! 🎉
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest that to the StackOverflow question 😄
import com.sky.kafka.message.scheduler.kafka._ | ||
import com.typesafe.scalalogging.LazyLogging | ||
|
||
case class ScheduleReader(config: SchedulerConfig, scheduleSource: Source[DecodeResult, Control], schedulingSink: Sink[Any, NotUsed]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about adding Scaladoc about for the purpose of these classes? In this case this would be something like...
/**
* Provides stream from the schedule source to the scheduling actor.
*/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reading through the previous class PartitionedSink
I was thinking exactly the same thing! Possibly worth pairing on the docs with someone?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, I'm not sure about the name ScheduleReader
. Maybe I'm being dense but I got confused between this and the Reader
concept that's introduced in this PR.
How about ScheduleInputHandler
or something else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ScheduleReaderStream
?
I'm not a fan of changing it, because ScheduleReader
describes it very well IMO. Does the above distinguish it enough?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that ruins it actually as it would be SchedulerReaderStream.stream
.
I would rather it stayed ScheduleReader
, perhaps we should discuss as a team
class AkkaComponentsSpec extends AkkaBaseSpec { | ||
|
||
"decider" should { | ||
"return a 'Restart' supervision strategy" in { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hopefully there is a higher level test that shows the effect of this error handling logic. Also not sure of the value that this test is giving. It looks to me like testing accessors (testing something that is declarative).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this will evolve into a test that is testing we report metrics in the supervisor strategy, its a WIP
verify(mockSourceQueue).offer((scheduleId, updatedSchedule)) | ||
} | ||
|
||
"send an Ack to the sender when receiving an Init message" in new SchedulingActorTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we send the Ack
back on every scheduling message as well as on Init
? Yes, just seen that createSchedule
is masking what is happening. I think in this case we want to see both the message and be told we expect the Ack
.
At this point we can also rename this test case as "does nothing on init" or something on those lines, given that we don't talk about messages in the other test names.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we already see both the message and that we expect the Ack
- unless I'm misunderstanding?
have renamed the test
|
||
object EmbeddedKafka { | ||
|
||
val kafkaServer = new KafkaServer | ||
|
||
val bootstrapServer = s"localhost:${kafkaServer.kafkaPort}" | ||
|
||
implicit class KafkaServerOps(val kafkaServer: KafkaServer) extends AnyVal { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We definitely need to add a comment about why this is here, where it comes from, and why we haven't fixed code quality issues.
High-level: I'm not convinced by the root package being How about the root package being |
object KafkaStream { | ||
|
||
def source[T](config: SchedulerConfig)(implicit system: ActorSystem, crDecoder: ConsumerRecordDecoder[T]): Source[T, Control] = | ||
Consumer.plainSource( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to add a Github issue to track moving to a committableSource
to provide at-least-once semantics?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also in the meantime for any commits to be made, you'll need to change enable.auto.commit
to true
in application.conf
.
From implicit val offsetDateTimeToValue = new ToValue[OffsetDateTime] {
override def apply(value: OffsetDateTime): String = value.toString
} My understanding was that it's not great practice to be using the |
|
||
package object config { | ||
|
||
case class AppConfig(scheduler: SchedulerConfig)(implicit system: ActorSystem, materialzer: ActorMaterializer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo: materialzer
is missing an i
} | ||
|
||
(receiveCreateOrUpdateMessage orElse receiveCancelMessage andThen updateStateAndAck) orElse { | ||
case Init => sender ! Ack |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic priority of the above line isn't 100% clear to me. Is it:
((receiveCreateOrUpdateMessage orElse receiveCancelMessage) andThen updateStateAndAck) orElse
or
(receiveCreateOrUpdateMessage orElse (receiveCancelMessage andThen updateStateAndAck)) orElse
Maybe this is obvious to masters of partial functions, but I don't think that being explicit would hurt?
Also there's an argument for putting the Init
case in a named partial function for clarity. I'm less fussed about this though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be clearer now
import com.sky.kafkamessage.scheduler.kafka.ProducerRecordEncoder | ||
import org.apache.kafka.clients.producer.ProducerRecord | ||
|
||
sealed trait PublishableMessage |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
help me name this pls 😆
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SchedulerControlMessage
?
with extensions ScheduleDefinition
and ScheduleDeletion
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think ScheduledMessage
and ScheduleDeletion
are good for the sub-classes just not sure on the name of the trait
8abe011
to
804832a
Compare
804832a
to
229013a
Compare
|
||
case class Cancel(scheduleId: ScheduleId) extends SchedulingMessage | ||
|
||
private case class Trigger(scheduleId: ScheduleId, schedule: Schedule) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'm not 100% on this name either, maybe its ok
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's probably fine
.settings | ||
.supervisionDecider(exception) shouldBe Restart | ||
|
||
verify(metrics).counter(key) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@paoloambrosio-skyuk wasn't sure how we wanted to test these metrics, whether this is ok (using mocks) or starting Kamon
and starting an MBeanServer
and querying that
572c072
to
96f6125
Compare
96f6125
to
3a81593
Compare
README.md
Outdated
|
||
The `schedule-topic` must be configured to use [log compaction](https://kafka.apache.org/documentation/#compaction). | ||
This is to allow the scheduler to delete the schedule after it has been written to its destination topic. This is very | ||
important because the scheduler uses the `schedule-topic` to reconstruct its state. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not only to allow deletion, it's also to ensure that (longer-term) schedules don't get lost after a restart.
build.sbt
Outdated
|
||
"io.kamon" %% "kamon-jmx" % kamonVersion, | ||
"io.kamon" %% "kamon-akka-2.5" % kamonVersion, | ||
"io.kamon" %% "kamon-core" % kamonVersion, | ||
|
||
"org.scalatest" %% "scalatest" % "3.0.1" % Test, | ||
"com.typesafe.akka" %% "akka-testkit" % akkaVersion % Test, | ||
"com.typesafe.akka" %% "akka-stream-testkit" % akkaVersion % Test, | ||
"net.cakesolutions" %% "scala-kafka-client-testkit" % kafkaVersion % Test, | ||
"org.slf4j" % "log4j-over-slf4j" % "1.7.21" % Test, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we're a new module I see no reason not to use SLF4J 1.7.25
|
||
sys.addShutdownHook { | ||
logger.info("Kafka Message Scheduler shutting down...") | ||
Rewriter.stop(app).value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Restoring @paoloambrosio-skyuk's comment:
All of this is a bit messy Grafter stuff mixed with Akka, Kamon, ... all working in a different way. It would be good to create an abstraction so that we can close everything in the same way.
If we're not addressing that in this PR, perhaps create an issue or at least an issue comment to track, and link to it from here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created #8
|
||
/** | ||
* Provides a stream that consumes from the queue of triggered messages, | ||
* writes the messages to Kafka and then deletes the schedules from Kafka |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about
writes the scheduled messages to the specified Kafka topics, and then deletes the schedules form the scheduling Kafka topic to mark completion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where were you when I was writing reports at uni!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lacarvalho91 I was busy giving grammatical advice to developers elsewhere 😉
implicit val scheduleDeletionProducerRecordEnc: ProducerRecordEncoder[ScheduleDeletion] = | ||
ProducerRecordEncoder.instance(deletion => new ProducerRecord(deletion.scheduleTopic, deletion.scheduleId.getBytes, null)) | ||
|
||
implicit def scheduleDataToProducerRecord(msg: PublishableMessage): ProducerRecord[Array[Byte], Array[Byte]] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this function naming is a legacy of the old name ScheduleData
.
I think it probably needs to be named publishableMessageToProducerRecord
.
import scala.concurrent.ExecutionContext.Implicits.global | ||
import scala.concurrent.duration.FiniteDuration | ||
|
||
class SchedulingActor(queue: SourceQueue[(String, ScheduledMessage)], scheduler: Scheduler) extends Actor with ActorLogging { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually think that the scheduler
parameter should be renamed akkaScheduler
.
There's so much "schedule" text dotted around this class that I believe it would increase readability if we clearly identify the Akka scheduling component.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call
922bdb0
to
837e36c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I'm happy enough with this now 👍
I won't merge yet - I'll give @paoloambrosio-skyuk a chance for a final review first.
if (cancel(scheduleId, schedules)) | ||
log.info(s"Cancelled schedule $scheduleId") | ||
else | ||
log.warning(s"Couldn't cancel $scheduleId") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this log message appears every time a schedule is triggered because we delete the schedule topic after.
What happens is we send the null to the schedule-topic
then we process that as a Cancel
(to support explicit deletes, and to remove the schedule from the actors state) but the call to cancel
fails since the event has already been triggered.
Not sure how we want to handle this, I guess we would want some kind of warning if we fail to cancel something when it should have actually been cancellable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this will be non-trivial, so I've raised an issue to cover this: Erroneous WARNING message after every schedule firing
@@ -1,4 +1,4 @@ | |||
package common | |||
package com.sky.kms.common |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be worth discussing why we put tests in the same package. It has always fascinated me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this not so you can use "package-private" visibilty, and still be able to test it, at least in Java.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed. You should NOT test the internals, and separating packages does that ;-)
Will also be looking into adding metrics but is ready for an initial review.
I didn't realise git pair would be amending commit author outside of using
git pair-commit
.