Skip to content

Commit

Permalink
Cron context fixes (#560)
Browse files Browse the repository at this point in the history
* Clean up job <-> dag confusion in Cron scheduler

* Refactor Cron Dependency as (from, to)

* Fix the Cron context to only contain instant and runNow user

- Add schema evolution to create a cron_contexts table containing the
contexts and convert the old values to new values
- This also fixes job retries ignoring the retry strategy due to retries
having a different context from the original execution (as they
contained the retry number)
  • Loading branch information
Lordshinjo authored Aug 27, 2020
1 parent 47f5c7b commit 3cb321d
Show file tree
Hide file tree
Showing 11 changed files with 260 additions and 179 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,8 @@ node_modules
dev.config.json
.DS_Store

# price.log gererated by HelloCronScheduling
# files generated by example workflows
price.log
letter.log
number1.log
number2.log
24 changes: 10 additions & 14 deletions cron/src/main/scala/com/criteo/cuttle/cron/CronApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,18 @@ package com.criteo.cuttle.cron
import java.time.Instant
import java.util.concurrent.TimeUnit

import scala.util.{Success, Try}

import cats.data.EitherT
import cats.effect.IO
import com.criteo.cuttle.Auth._
import com.criteo.cuttle.Metrics.{Gauge, Prometheus}
import com.criteo.cuttle._
import com.criteo.cuttle.utils.getJVMUptime
import io.circe._
import io.circe.syntax._
import lol.http._
import lol.json._

import com.criteo.cuttle.Auth._
import com.criteo.cuttle.Metrics.{Gauge, Prometheus}
import com.criteo.cuttle._
import com.criteo.cuttle.utils.getJVMUptime
import scala.util.{Success, Try}

private[cron] case class CronApp(project: CronProject, executor: Executor[CronScheduling])(
implicit val transactor: XA
Expand Down Expand Up @@ -71,19 +70,16 @@ private[cron] case class CronApp(project: CronProject, executor: Executor[CronSc
Ok(bodyFromStream)
}

case GET at "/api/dashboard" =>
Ok(scheduler.getStats(allJobIds))

case GET at "/api/jobs/paused" =>
Ok(scheduler.getPausedJobs.asJson)
case GET at "/api/dags/paused" =>
Ok(scheduler.getPausedDags.asJson)

// we only show 20 recent executions by default but it could be modified via query parameter
case GET at url"/api/cron/executions?dag=$dag&start=$start&end=$end&limit=$limit" =>
val jsonOrError: EitherT[IO, Throwable, Json] = for {
dag <- EitherT.fromOption[IO](workload.dags.find(_.id == dag), throw new Exception(s"Unknown job dag $dag"))
dag <- EitherT.fromOption[IO](workload.dags.find(_.id == dag), throw new Exception(s"Unknown job DAG $dag"))
jobIds <- EitherT.rightT[IO, Throwable](dag.cronPipeline.vertices.map(_.id))
startDate <- EitherT.rightT[IO, Throwable](Try(Instant.parse(start)).getOrElse(minStartDateForExecutions))
endDate <- EitherT.rightT[IO, Throwable](Try(Instant.parse(end)).getOrElse(maxStartDateForExecutions))
startDate <- EitherT.rightT[IO, Throwable](Try(Some(Instant.parse(start))).getOrElse(None))
endDate <- EitherT.rightT[IO, Throwable](Try(Some(Instant.parse(end))).getOrElse(None))
limit <- EitherT.rightT[IO, Throwable](Try(limit.toInt).getOrElse(Int.MaxValue))
executions <- EitherT.right[Throwable](buildExecutionsList(executor, jobIds, startDate, endDate, limit))
executionListFlat <- EitherT.rightT[IO, Throwable](executions.values.toSet.flatten)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package com.criteo.cuttle.cron

import cron4s.Cron
import cron4s.lib.javatime._
import java.time.{Duration, Instant, ZoneId, ZoneOffset}
import java.time.temporal.ChronoUnit
import java.time.{Duration, Instant, ZoneId, ZoneOffset}

import io.circe.{Encoder, Json}
import cron4s.Cron
import cron4s.lib.javatime._
import io.circe.syntax._
import io.circe.{Encoder, Json}

import java.time.ZoneOffset
import scala.concurrent.duration._

/**
Expand Down
114 changes: 51 additions & 63 deletions cron/src/main/scala/com/criteo/cuttle/cron/CronModel.scala
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
package com.criteo.cuttle.cron

import java.time.Instant
import cats.effect.concurrent.Deferred
import cats.effect.IO

import scala.concurrent.duration._
import scala.concurrent.stm.{Ref, _}
import cats.effect.IO
import cats.effect.concurrent.Deferred
import com.criteo.cuttle.Auth.User
import com.criteo.cuttle.{Job, Logger, Scheduling, SchedulingContext, Tag, Workload}
import doobie.ConnectionIO
import io.circe._
import io.circe.syntax._
import io.circe.generic.semiauto._
import io.circe.java8.time._
import com.criteo.cuttle.Auth.User
import com.criteo.cuttle.{ExecutionStatus, Job, Logger, PausedJob, Scheduling, SchedulingContext, Tag, Workload}
import io.circe.syntax._

import scala.concurrent.duration._
import scala.concurrent.stm.{atomic, Ref}
import scala.reflect.ClassTag

private[cron] case class ScheduledAt(instant: Instant, delay: FiniteDuration)
Expand All @@ -22,25 +24,25 @@ private[cron] case class ScheduledAt(instant: Instant, delay: FiniteDuration)
private[cron] case class CronState(logger: Logger) {

private val executions = Ref(Map.empty[CronDag, Either[Instant, Set[CronExecution]]])
private val paused = Ref(Map.empty[CronDag, PausedJob])
private val paused = Ref(Map.empty[CronDag, PausedDag])
private val runNowHandlers: Ref[Map[CronDag, Deferred[IO, (ScheduledAt, User)]]] =
Ref(Map.empty[CronDag, Deferred[IO, (ScheduledAt, User)]])(
implicitly[ClassTag[Map[CronDag, Deferred[IO, (ScheduledAt, User)]]]]
)
private val jobDagState = Ref(Map.empty[CronDag, Map[CronJob, CronJobState]])

private[cron] def init(availableJobDags: Set[CronDag], pausedJobs: Seq[PausedJob]) = {
private[cron] def init(availableJobDags: Set[CronDag], pausedDags: Seq[PausedDag]) = {
logger.debug("Cron Scheduler States initialization")

val available = availableJobDags.map(dag => dag.id -> dag).toMap
val initialPausedJobs = pausedJobs.collect {
case pausedJob if available.contains(pausedJob.id) =>
logger.debug(s"Job ${pausedJob.id} is paused")
available(pausedJob.id) -> pausedJob
val initialPausedDags = pausedDags.collect {
case pausedDag if available.contains(pausedDag.id) =>
logger.debug(s"DAG ${pausedDag.id} is paused")
available(pausedDag.id) -> pausedDag
}.toMap

atomic { implicit txn =>
paused() = initialPausedJobs
paused() = initialPausedDags
}
}

Expand All @@ -50,7 +52,7 @@ private[cron] case class CronState(logger: Logger) {
val dependenciesSatisfied = dag.cronPipeline.parentsMap.filter {
case (_, deps) =>
deps.forall { p =>
successfulJobs.contains(p.parent)
successfulJobs.contains(p.from)
}
}.keySet
val candidates = dependenciesSatisfied ++ dag.cronPipeline.roots
Expand All @@ -73,7 +75,7 @@ private[cron] case class CronState(logger: Logger) {
jobDagState() = jobDagState() + (dag -> Map.empty)
}

private[cron] def getPausedJobs(): Set[PausedJob] = paused.single.get.values.toSet
private[cron] def getPausedDags(): Set[PausedDag] = paused.single.get.values.toSet
private[cron] def isPaused(dag: CronDag): Boolean = paused.single.get.contains(dag)

private[cron] def addNextEventToState(dag: CronDag, instant: Instant): Unit = atomic { implicit txn =>
Expand All @@ -100,18 +102,18 @@ private[cron] case class CronState(logger: Logger) {
executions() = executions() - dag
}

private[cron] def pauseDags(dags: Set[CronDag])(implicit user: User): Set[PausedJob] = {
private[cron] def pauseDags(dags: Set[CronDag])(implicit user: User): Set[PausedDag] = {
val pauseDate = Instant.now()
atomic { implicit txn =>
val dagsToPause = dags
.filterNot(dag => paused().contains(dag))
.toSeq

dagsToPause.foreach(removeDagFromState)
val justPausedJobs = dagsToPause.map(job => PausedJob(job.id, user, pauseDate))
paused() = paused() ++ dagsToPause.zip(justPausedJobs)
val justPausedDags = dagsToPause.map(dag => PausedDag(dag.id, user, pauseDate))
paused() = paused() ++ dagsToPause.zip(justPausedDags)

justPausedJobs.toSet
justPausedDags.toSet
}
}

Expand All @@ -129,57 +131,45 @@ private[cron] case class CronState(logger: Logger) {
runNowHandlers() = runNowHandlers() - dag
}

private[cron] def getRunNowHandlers(jobIds: Set[String]) = atomic { implicit txn =>
runNowHandlers().filter(cronJob => jobIds.contains(cronJob._1.id))
private[cron] def getRunNowHandlers(dagIds: Set[String]) = atomic { implicit txn =>
runNowHandlers().filter(cronJob => dagIds.contains(cronJob._1.id))
}

private[cron] def snapshotAsJson(jobIds: Set[String]) = atomic { implicit txn =>
val activeJobsSnapshot = executions().collect {
case (cronDag: CronDag, state) if jobIds.contains(cronDag.id) =>
cronDag.asJson
.deepMerge(
Json.obj(
state.fold(
"nextInstant" -> _.asJson,
(
(a: Set[CronExecution]) =>
("currentExecutions" -> Json
.arr(a.map(_.toExecutionLog(ExecutionStatus.ExecutionRunning).asJson).toArray: _*))
)
)
)
)
.deepMerge(
Json.obj(
"status" -> "active".asJson
)
)
}
val pausedJobsSnapshot = paused().collect {
case (cronJob, pausedJob) if jobIds.contains(cronJob.id) => pausedJob.asJson
private[cron] def snapshotAsJson(dagIds: Set[String]) = atomic { implicit txn =>
val activeDagsSnapshot = executions().collect {
case (cronDag: CronDag, Left(nextInstant)) if dagIds.contains(cronDag.id) =>
Json.obj(
"id" -> cronDag.id.asJson,
"status" -> "waiting".asJson,
"nextInstant" -> nextInstant.asJson
)
case (cronDag: CronDag, Right(_)) if dagIds.contains(cronDag.id) =>
Json.obj(
"id" -> cronDag.id.asJson,
"status" -> "running".asJson
)
}
val acc = (activeJobsSnapshot ++ pausedJobsSnapshot).toSeq
Json.arr(
acc: _*
activeDagsSnapshot.toSeq: _*
)
}

private[cron] def snapshot(dagIds: Set[String]) = atomic { implicit txn =>
val activeJobsSnapshot = executions().filterKeys(cronDag => dagIds.contains(cronDag.id))
val pausedJobsSnapshot = paused().filterKeys(cronDag => dagIds.contains(cronDag.id))
val pausedDagsSnapshot = paused().filterKeys(cronDag => dagIds.contains(cronDag.id))

activeJobsSnapshot -> pausedJobsSnapshot
activeJobsSnapshot -> pausedDagsSnapshot
}

override def toString(): String = {
override def toString: String = {
val builder = new StringBuilder()
val state = executions.single.get
builder.append("\n======State======\n")
state.foreach {
case (job, jobState) =>
case (dag, dagState) =>
val messages = Seq(
job.id,
jobState.fold(_ toString, _.map(_.id).mkString(","))
dag.id,
dagState.fold(_.toString, _.map(_.id).mkString(","))
)
builder.append(messages mkString " :: ")
builder.append("\n")
Expand All @@ -192,25 +182,23 @@ private[cron] case class CronState(logger: Logger) {
/** A [[CronContext]] is passed to [[com.criteo.cuttle.Execution executions]] initiated by
* the [[CronScheduler]].
*/
case class CronContext(instant: Instant, retry: Int, parentDag: String) extends SchedulingContext {
case class CronContext(instant: Instant, runNowUser: Option[String] = None) extends SchedulingContext {

def compareTo(other: SchedulingContext): Int = other match {
case CronContext(otherInstant, _, _) =>
case CronContext(otherInstant, _) =>
instant.compareTo(otherInstant)
}

override def logIntoDatabase: ConnectionIO[String] = Database.serializeContext(this)

override def asJson: Json = CronContext.encoder(this)

override def longRunningId(): String = toString
override def longRunningId(): String = s"$instant|${runNowUser.getOrElse("")}"
}

case object CronContext {
implicit val encoder: Encoder[CronContext] =
Encoder.forProduct3("interval", "retry", "parentJob")(cc => (cc.instant, cc.retry, cc.parentDag))
implicit def decoder: Decoder[CronContext] =
Decoder.forProduct3[CronContext, Instant, Int, String]("interval", "retry", "parentJob")(
(instant: Instant, retry: Int, parentJob: String) => CronContext(instant, retry, parentJob)
)
implicit val encoder: Encoder[CronContext] = deriveEncoder
implicit def decoder: Decoder[CronContext] = deriveDecoder
}

/** Configure a [[com.criteo.cuttle.Job job]] as a [[CronScheduling]] job.
Expand Down
44 changes: 22 additions & 22 deletions cron/src/main/scala/com/criteo/cuttle/cron/CronPipeline.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,25 @@ import scala.language.implicitConversions
//A non-cyclic DAG, convention is that child depends on parents
case class CronPipeline(vertices: Set[CronJob], edges: Set[Dependency]) {

def children: Set[CronJob] = edges.map { case Dependency(child, _) => child }
def children: Set[CronJob] = edges.map(_.to)

def roots: Set[CronJob] = vertices.filter(!children.contains(_))

private[cron] def parents: Set[CronJob] = edges.map { case Dependency(_, parent) => parent }
private[cron] def parents: Set[CronJob] = edges.map(_.from)

private[cron] def leaves: Set[CronJob] = vertices.filter(!parents.contains(_))

private[cron] def parentsMap = edges.groupBy { case Dependency(child, _) => child }
private[cron] def childrenMap = edges.groupBy { case Dependency(_, parent) => parent }
private[cron] def parentsMap = edges.groupBy(_.to)
private[cron] def childrenMap = edges.groupBy(_.from)

def dependsOn(right: CronPipeline): CronPipeline = {
val left = this
val newEdges: Set[Dependency] = for {
v1 <- left.roots
v2 <- right.leaves
} yield Dependency(v1, v2)
} yield Dependency(from = v2, to = v1)
val duplicates = left.vertices.map(_.id).intersect(right.vertices.map(_.id))
if (duplicates.size != 0) {
if (duplicates.nonEmpty) {
throw new Exception("Duplicate job ids: " + duplicates.mkString(","))
}
new CronPipeline(
Expand All @@ -39,7 +39,7 @@ case class CronPipeline(vertices: Set[CronJob], edges: Set[Dependency]) {
def and(other: CronPipeline): CronPipeline = {
val leftWorkflow = this
val duplicates = leftWorkflow.vertices.map(_.id).intersect(other.vertices.map(_.id))
if (duplicates.size != 0) {
if (duplicates.nonEmpty) {
throw new Exception("Duplicate job ids: " + duplicates.mkString(","))
}
new CronPipeline(
Expand All @@ -60,27 +60,27 @@ case class CronPipeline(vertices: Set[CronJob], edges: Set[Dependency]) {
CronDag(id, this, CronExpression(cronExpression), name, description, tags)
}

//Convention is that child depends on parents
case class Dependency(child: CronJob, parent: CronJob)

object Dependency {
implicit val encodeUser: Encoder[Dependency] = new Encoder[Dependency] {
override def apply(dependency: Dependency) =
Json.obj {
"child" -> dependency.child.asJson
"parent" -> dependency.parent.asJson
}
}
}

object CronPipeline {
implicit def fromCronJob(job: CronJob): CronPipeline = new CronPipeline(Set(job), Set.empty)

implicit val encodeUser: Encoder[CronPipeline] = new Encoder[CronPipeline] {
implicit val encodePipeline: Encoder[CronPipeline] = new Encoder[CronPipeline] {
override def apply(pipeline: CronPipeline) =
Json.obj(
"vertices" -> Json.arr(pipeline.vertices.map(_.asJson).toSeq: _*),
"vertices" -> Json.arr(pipeline.vertices.map(_.id.asJson).toSeq: _*),
"edges" -> Json.arr(pipeline.edges.map(_.asJson).toSeq: _*)
)
}
}

// "to" depends on "from", or "from" -> "to"
case class Dependency(from: CronJob, to: CronJob)

object Dependency {
implicit val encodeDependency: Encoder[Dependency] = new Encoder[Dependency] {
override def apply(dependency: Dependency) =
Json.obj {
"from" -> dependency.from.id.asJson
"to" -> dependency.to.id.asJson
}
}
}
Loading

0 comments on commit 3cb321d

Please sign in to comment.