Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cron context fixes #560

Merged
merged 3 commits into from
Aug 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,8 @@ node_modules
dev.config.json
.DS_Store

# price.log gererated by HelloCronScheduling
# files generated by example workflows
price.log
letter.log
number1.log
number2.log
24 changes: 10 additions & 14 deletions cron/src/main/scala/com/criteo/cuttle/cron/CronApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,18 @@ package com.criteo.cuttle.cron
import java.time.Instant
import java.util.concurrent.TimeUnit

import scala.util.{Success, Try}

import cats.data.EitherT
import cats.effect.IO
import com.criteo.cuttle.Auth._
import com.criteo.cuttle.Metrics.{Gauge, Prometheus}
import com.criteo.cuttle._
import com.criteo.cuttle.utils.getJVMUptime
import io.circe._
import io.circe.syntax._
import lol.http._
import lol.json._

import com.criteo.cuttle.Auth._
import com.criteo.cuttle.Metrics.{Gauge, Prometheus}
import com.criteo.cuttle._
import com.criteo.cuttle.utils.getJVMUptime
import scala.util.{Success, Try}

private[cron] case class CronApp(project: CronProject, executor: Executor[CronScheduling])(
implicit val transactor: XA
Expand Down Expand Up @@ -71,19 +70,16 @@ private[cron] case class CronApp(project: CronProject, executor: Executor[CronSc
Ok(bodyFromStream)
}

case GET at "/api/dashboard" =>
Ok(scheduler.getStats(allJobIds))

case GET at "/api/jobs/paused" =>
Ok(scheduler.getPausedJobs.asJson)
case GET at "/api/dags/paused" =>
Ok(scheduler.getPausedDags.asJson)

// we only show 20 recent executions by default but it could be modified via query parameter
case GET at url"/api/cron/executions?dag=$dag&start=$start&end=$end&limit=$limit" =>
val jsonOrError: EitherT[IO, Throwable, Json] = for {
dag <- EitherT.fromOption[IO](workload.dags.find(_.id == dag), throw new Exception(s"Unknown job dag $dag"))
dag <- EitherT.fromOption[IO](workload.dags.find(_.id == dag), throw new Exception(s"Unknown job DAG $dag"))
jobIds <- EitherT.rightT[IO, Throwable](dag.cronPipeline.vertices.map(_.id))
startDate <- EitherT.rightT[IO, Throwable](Try(Instant.parse(start)).getOrElse(minStartDateForExecutions))
endDate <- EitherT.rightT[IO, Throwable](Try(Instant.parse(end)).getOrElse(maxStartDateForExecutions))
startDate <- EitherT.rightT[IO, Throwable](Try(Some(Instant.parse(start))).getOrElse(None))
endDate <- EitherT.rightT[IO, Throwable](Try(Some(Instant.parse(end))).getOrElse(None))
limit <- EitherT.rightT[IO, Throwable](Try(limit.toInt).getOrElse(Int.MaxValue))
executions <- EitherT.right[Throwable](buildExecutionsList(executor, jobIds, startDate, endDate, limit))
executionListFlat <- EitherT.rightT[IO, Throwable](executions.values.toSet.flatten)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package com.criteo.cuttle.cron

import cron4s.Cron
import cron4s.lib.javatime._
import java.time.{Duration, Instant, ZoneId, ZoneOffset}
import java.time.temporal.ChronoUnit
import java.time.{Duration, Instant, ZoneId, ZoneOffset}

import io.circe.{Encoder, Json}
import cron4s.Cron
import cron4s.lib.javatime._
import io.circe.syntax._
import io.circe.{Encoder, Json}

import java.time.ZoneOffset
import scala.concurrent.duration._

/**
Expand Down
114 changes: 51 additions & 63 deletions cron/src/main/scala/com/criteo/cuttle/cron/CronModel.scala
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
package com.criteo.cuttle.cron

import java.time.Instant
import cats.effect.concurrent.Deferred
import cats.effect.IO

import scala.concurrent.duration._
import scala.concurrent.stm.{Ref, _}
import cats.effect.IO
import cats.effect.concurrent.Deferred
import com.criteo.cuttle.Auth.User
import com.criteo.cuttle.{Job, Logger, Scheduling, SchedulingContext, Tag, Workload}
import doobie.ConnectionIO
import io.circe._
import io.circe.syntax._
import io.circe.generic.semiauto._
import io.circe.java8.time._
import com.criteo.cuttle.Auth.User
import com.criteo.cuttle.{ExecutionStatus, Job, Logger, PausedJob, Scheduling, SchedulingContext, Tag, Workload}
import io.circe.syntax._

import scala.concurrent.duration._
import scala.concurrent.stm.{atomic, Ref}
import scala.reflect.ClassTag

private[cron] case class ScheduledAt(instant: Instant, delay: FiniteDuration)
Expand All @@ -22,25 +24,25 @@ private[cron] case class ScheduledAt(instant: Instant, delay: FiniteDuration)
private[cron] case class CronState(logger: Logger) {

private val executions = Ref(Map.empty[CronDag, Either[Instant, Set[CronExecution]]])
private val paused = Ref(Map.empty[CronDag, PausedJob])
private val paused = Ref(Map.empty[CronDag, PausedDag])
private val runNowHandlers: Ref[Map[CronDag, Deferred[IO, (ScheduledAt, User)]]] =
Ref(Map.empty[CronDag, Deferred[IO, (ScheduledAt, User)]])(
implicitly[ClassTag[Map[CronDag, Deferred[IO, (ScheduledAt, User)]]]]
)
private val jobDagState = Ref(Map.empty[CronDag, Map[CronJob, CronJobState]])

private[cron] def init(availableJobDags: Set[CronDag], pausedJobs: Seq[PausedJob]) = {
private[cron] def init(availableJobDags: Set[CronDag], pausedDags: Seq[PausedDag]) = {
logger.debug("Cron Scheduler States initialization")

val available = availableJobDags.map(dag => dag.id -> dag).toMap
val initialPausedJobs = pausedJobs.collect {
case pausedJob if available.contains(pausedJob.id) =>
logger.debug(s"Job ${pausedJob.id} is paused")
available(pausedJob.id) -> pausedJob
val initialPausedDags = pausedDags.collect {
case pausedDag if available.contains(pausedDag.id) =>
logger.debug(s"DAG ${pausedDag.id} is paused")
available(pausedDag.id) -> pausedDag
}.toMap

atomic { implicit txn =>
paused() = initialPausedJobs
paused() = initialPausedDags
}
}

Expand All @@ -50,7 +52,7 @@ private[cron] case class CronState(logger: Logger) {
val dependenciesSatisfied = dag.cronPipeline.parentsMap.filter {
case (_, deps) =>
deps.forall { p =>
successfulJobs.contains(p.parent)
successfulJobs.contains(p.from)
}
}.keySet
val candidates = dependenciesSatisfied ++ dag.cronPipeline.roots
Expand All @@ -73,7 +75,7 @@ private[cron] case class CronState(logger: Logger) {
jobDagState() = jobDagState() + (dag -> Map.empty)
}

private[cron] def getPausedJobs(): Set[PausedJob] = paused.single.get.values.toSet
private[cron] def getPausedDags(): Set[PausedDag] = paused.single.get.values.toSet
private[cron] def isPaused(dag: CronDag): Boolean = paused.single.get.contains(dag)

private[cron] def addNextEventToState(dag: CronDag, instant: Instant): Unit = atomic { implicit txn =>
Expand All @@ -100,18 +102,18 @@ private[cron] case class CronState(logger: Logger) {
executions() = executions() - dag
}

private[cron] def pauseDags(dags: Set[CronDag])(implicit user: User): Set[PausedJob] = {
private[cron] def pauseDags(dags: Set[CronDag])(implicit user: User): Set[PausedDag] = {
val pauseDate = Instant.now()
atomic { implicit txn =>
val dagsToPause = dags
.filterNot(dag => paused().contains(dag))
.toSeq

dagsToPause.foreach(removeDagFromState)
val justPausedJobs = dagsToPause.map(job => PausedJob(job.id, user, pauseDate))
paused() = paused() ++ dagsToPause.zip(justPausedJobs)
val justPausedDags = dagsToPause.map(dag => PausedDag(dag.id, user, pauseDate))
paused() = paused() ++ dagsToPause.zip(justPausedDags)

justPausedJobs.toSet
justPausedDags.toSet
}
}

Expand All @@ -129,57 +131,45 @@ private[cron] case class CronState(logger: Logger) {
runNowHandlers() = runNowHandlers() - dag
}

private[cron] def getRunNowHandlers(jobIds: Set[String]) = atomic { implicit txn =>
runNowHandlers().filter(cronJob => jobIds.contains(cronJob._1.id))
private[cron] def getRunNowHandlers(dagIds: Set[String]) = atomic { implicit txn =>
runNowHandlers().filter(cronJob => dagIds.contains(cronJob._1.id))
}

private[cron] def snapshotAsJson(jobIds: Set[String]) = atomic { implicit txn =>
val activeJobsSnapshot = executions().collect {
case (cronDag: CronDag, state) if jobIds.contains(cronDag.id) =>
cronDag.asJson
.deepMerge(
Json.obj(
state.fold(
"nextInstant" -> _.asJson,
(
(a: Set[CronExecution]) =>
("currentExecutions" -> Json
.arr(a.map(_.toExecutionLog(ExecutionStatus.ExecutionRunning).asJson).toArray: _*))
)
)
)
)
.deepMerge(
Json.obj(
"status" -> "active".asJson
)
)
}
val pausedJobsSnapshot = paused().collect {
case (cronJob, pausedJob) if jobIds.contains(cronJob.id) => pausedJob.asJson
private[cron] def snapshotAsJson(dagIds: Set[String]) = atomic { implicit txn =>
val activeDagsSnapshot = executions().collect {
case (cronDag: CronDag, Left(nextInstant)) if dagIds.contains(cronDag.id) =>
Json.obj(
"id" -> cronDag.id.asJson,
"status" -> "waiting".asJson,
"nextInstant" -> nextInstant.asJson
)
case (cronDag: CronDag, Right(_)) if dagIds.contains(cronDag.id) =>
Json.obj(
"id" -> cronDag.id.asJson,
"status" -> "running".asJson
)
}
val acc = (activeJobsSnapshot ++ pausedJobsSnapshot).toSeq
Json.arr(
acc: _*
activeDagsSnapshot.toSeq: _*
)
}

private[cron] def snapshot(dagIds: Set[String]) = atomic { implicit txn =>
val activeJobsSnapshot = executions().filterKeys(cronDag => dagIds.contains(cronDag.id))
val pausedJobsSnapshot = paused().filterKeys(cronDag => dagIds.contains(cronDag.id))
val pausedDagsSnapshot = paused().filterKeys(cronDag => dagIds.contains(cronDag.id))

activeJobsSnapshot -> pausedJobsSnapshot
activeJobsSnapshot -> pausedDagsSnapshot
}

override def toString(): String = {
override def toString: String = {
val builder = new StringBuilder()
val state = executions.single.get
builder.append("\n======State======\n")
state.foreach {
case (job, jobState) =>
case (dag, dagState) =>
val messages = Seq(
job.id,
jobState.fold(_ toString, _.map(_.id).mkString(","))
dag.id,
dagState.fold(_.toString, _.map(_.id).mkString(","))
)
builder.append(messages mkString " :: ")
builder.append("\n")
Expand All @@ -192,25 +182,23 @@ private[cron] case class CronState(logger: Logger) {
/** A [[CronContext]] is passed to [[com.criteo.cuttle.Execution executions]] initiated by
* the [[CronScheduler]].
*/
case class CronContext(instant: Instant, retry: Int, parentDag: String) extends SchedulingContext {
case class CronContext(instant: Instant, runNowUser: Option[String] = None) extends SchedulingContext {

def compareTo(other: SchedulingContext): Int = other match {
case CronContext(otherInstant, _, _) =>
case CronContext(otherInstant, _) =>
instant.compareTo(otherInstant)
}

override def logIntoDatabase: ConnectionIO[String] = Database.serializeContext(this)

override def asJson: Json = CronContext.encoder(this)

override def longRunningId(): String = toString
override def longRunningId(): String = s"$instant|${runNowUser.getOrElse("")}"
}

case object CronContext {
implicit val encoder: Encoder[CronContext] =
Encoder.forProduct3("interval", "retry", "parentJob")(cc => (cc.instant, cc.retry, cc.parentDag))
implicit def decoder: Decoder[CronContext] =
Decoder.forProduct3[CronContext, Instant, Int, String]("interval", "retry", "parentJob")(
(instant: Instant, retry: Int, parentJob: String) => CronContext(instant, retry, parentJob)
)
implicit val encoder: Encoder[CronContext] = deriveEncoder
implicit def decoder: Decoder[CronContext] = deriveDecoder
}

/** Configure a [[com.criteo.cuttle.Job job]] as a [[CronScheduling]] job.
Expand Down
44 changes: 22 additions & 22 deletions cron/src/main/scala/com/criteo/cuttle/cron/CronPipeline.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,25 @@ import scala.language.implicitConversions
//A non-cyclic DAG, convention is that child depends on parents
case class CronPipeline(vertices: Set[CronJob], edges: Set[Dependency]) {

def children: Set[CronJob] = edges.map { case Dependency(child, _) => child }
def children: Set[CronJob] = edges.map(_.to)

def roots: Set[CronJob] = vertices.filter(!children.contains(_))

private[cron] def parents: Set[CronJob] = edges.map { case Dependency(_, parent) => parent }
private[cron] def parents: Set[CronJob] = edges.map(_.from)

private[cron] def leaves: Set[CronJob] = vertices.filter(!parents.contains(_))

private[cron] def parentsMap = edges.groupBy { case Dependency(child, _) => child }
private[cron] def childrenMap = edges.groupBy { case Dependency(_, parent) => parent }
private[cron] def parentsMap = edges.groupBy(_.to)
private[cron] def childrenMap = edges.groupBy(_.from)

def dependsOn(right: CronPipeline): CronPipeline = {
val left = this
val newEdges: Set[Dependency] = for {
v1 <- left.roots
v2 <- right.leaves
} yield Dependency(v1, v2)
} yield Dependency(from = v2, to = v1)
val duplicates = left.vertices.map(_.id).intersect(right.vertices.map(_.id))
if (duplicates.size != 0) {
if (duplicates.nonEmpty) {
throw new Exception("Duplicate job ids: " + duplicates.mkString(","))
}
new CronPipeline(
Expand All @@ -39,7 +39,7 @@ case class CronPipeline(vertices: Set[CronJob], edges: Set[Dependency]) {
def and(other: CronPipeline): CronPipeline = {
val leftWorkflow = this
val duplicates = leftWorkflow.vertices.map(_.id).intersect(other.vertices.map(_.id))
if (duplicates.size != 0) {
if (duplicates.nonEmpty) {
throw new Exception("Duplicate job ids: " + duplicates.mkString(","))
}
new CronPipeline(
Expand All @@ -60,27 +60,27 @@ case class CronPipeline(vertices: Set[CronJob], edges: Set[Dependency]) {
CronDag(id, this, CronExpression(cronExpression), name, description, tags)
}

//Convention is that child depends on parents
case class Dependency(child: CronJob, parent: CronJob)

object Dependency {
implicit val encodeUser: Encoder[Dependency] = new Encoder[Dependency] {
override def apply(dependency: Dependency) =
Json.obj {
"child" -> dependency.child.asJson
"parent" -> dependency.parent.asJson
}
}
}

object CronPipeline {
implicit def fromCronJob(job: CronJob): CronPipeline = new CronPipeline(Set(job), Set.empty)

implicit val encodeUser: Encoder[CronPipeline] = new Encoder[CronPipeline] {
implicit val encodePipeline: Encoder[CronPipeline] = new Encoder[CronPipeline] {
override def apply(pipeline: CronPipeline) =
Json.obj(
"vertices" -> Json.arr(pipeline.vertices.map(_.asJson).toSeq: _*),
"vertices" -> Json.arr(pipeline.vertices.map(_.id.asJson).toSeq: _*),
"edges" -> Json.arr(pipeline.edges.map(_.asJson).toSeq: _*)
)
}
}

// "to" depends on "from", or "from" -> "to"
case class Dependency(from: CronJob, to: CronJob)

object Dependency {
implicit val encodeDependency: Encoder[Dependency] = new Encoder[Dependency] {
override def apply(dependency: Dependency) =
Json.obj {
"from" -> dependency.from.id.asJson
"to" -> dependency.to.id.asJson
}
}
}
Loading