diff --git a/.gitignore b/.gitignore index ea1008171..8d0647553 100644 --- a/.gitignore +++ b/.gitignore @@ -5,5 +5,8 @@ node_modules dev.config.json .DS_Store -# price.log gererated by HelloCronScheduling +# files generated by example workflows price.log +letter.log +number1.log +number2.log diff --git a/cron/src/main/scala/com/criteo/cuttle/cron/CronApp.scala b/cron/src/main/scala/com/criteo/cuttle/cron/CronApp.scala index d195d95e0..37c6058c4 100644 --- a/cron/src/main/scala/com/criteo/cuttle/cron/CronApp.scala +++ b/cron/src/main/scala/com/criteo/cuttle/cron/CronApp.scala @@ -3,19 +3,18 @@ package com.criteo.cuttle.cron import java.time.Instant import java.util.concurrent.TimeUnit -import scala.util.{Success, Try} - import cats.data.EitherT import cats.effect.IO +import com.criteo.cuttle.Auth._ +import com.criteo.cuttle.Metrics.{Gauge, Prometheus} +import com.criteo.cuttle._ +import com.criteo.cuttle.utils.getJVMUptime import io.circe._ import io.circe.syntax._ import lol.http._ import lol.json._ -import com.criteo.cuttle.Auth._ -import com.criteo.cuttle.Metrics.{Gauge, Prometheus} -import com.criteo.cuttle._ -import com.criteo.cuttle.utils.getJVMUptime +import scala.util.{Success, Try} private[cron] case class CronApp(project: CronProject, executor: Executor[CronScheduling])( implicit val transactor: XA @@ -71,19 +70,16 @@ private[cron] case class CronApp(project: CronProject, executor: Executor[CronSc Ok(bodyFromStream) } - case GET at "/api/dashboard" => - Ok(scheduler.getStats(allJobIds)) - - case GET at "/api/jobs/paused" => - Ok(scheduler.getPausedJobs.asJson) + case GET at "/api/dags/paused" => + Ok(scheduler.getPausedDags.asJson) // we only show 20 recent executions by default but it could be modified via query parameter case GET at url"/api/cron/executions?dag=$dag&start=$start&end=$end&limit=$limit" => val jsonOrError: EitherT[IO, Throwable, Json] = for { - dag <- EitherT.fromOption[IO](workload.dags.find(_.id == dag), throw new Exception(s"Unknown job dag $dag")) + dag <- EitherT.fromOption[IO](workload.dags.find(_.id == dag), throw new Exception(s"Unknown job DAG $dag")) jobIds <- EitherT.rightT[IO, Throwable](dag.cronPipeline.vertices.map(_.id)) - startDate <- EitherT.rightT[IO, Throwable](Try(Instant.parse(start)).getOrElse(minStartDateForExecutions)) - endDate <- EitherT.rightT[IO, Throwable](Try(Instant.parse(end)).getOrElse(maxStartDateForExecutions)) + startDate <- EitherT.rightT[IO, Throwable](Try(Some(Instant.parse(start))).getOrElse(None)) + endDate <- EitherT.rightT[IO, Throwable](Try(Some(Instant.parse(end))).getOrElse(None)) limit <- EitherT.rightT[IO, Throwable](Try(limit.toInt).getOrElse(Int.MaxValue)) executions <- EitherT.right[Throwable](buildExecutionsList(executor, jobIds, startDate, endDate, limit)) executionListFlat <- EitherT.rightT[IO, Throwable](executions.values.toSet.flatten) diff --git a/cron/src/main/scala/com/criteo/cuttle/cron/CronExpression.scala b/cron/src/main/scala/com/criteo/cuttle/cron/CronExpression.scala index 6c0fa3a5a..e3783fcb8 100644 --- a/cron/src/main/scala/com/criteo/cuttle/cron/CronExpression.scala +++ b/cron/src/main/scala/com/criteo/cuttle/cron/CronExpression.scala @@ -1,14 +1,13 @@ package com.criteo.cuttle.cron -import cron4s.Cron -import cron4s.lib.javatime._ -import java.time.{Duration, Instant, ZoneId, ZoneOffset} import java.time.temporal.ChronoUnit +import java.time.{Duration, Instant, ZoneId, ZoneOffset} -import io.circe.{Encoder, Json} +import cron4s.Cron +import cron4s.lib.javatime._ import io.circe.syntax._ +import io.circe.{Encoder, Json} -import java.time.ZoneOffset import scala.concurrent.duration._ /** diff --git a/cron/src/main/scala/com/criteo/cuttle/cron/CronModel.scala b/cron/src/main/scala/com/criteo/cuttle/cron/CronModel.scala index 80c20316a..fca212c5f 100644 --- a/cron/src/main/scala/com/criteo/cuttle/cron/CronModel.scala +++ b/cron/src/main/scala/com/criteo/cuttle/cron/CronModel.scala @@ -1,17 +1,19 @@ package com.criteo.cuttle.cron import java.time.Instant -import cats.effect.concurrent.Deferred -import cats.effect.IO -import scala.concurrent.duration._ -import scala.concurrent.stm.{Ref, _} +import cats.effect.IO +import cats.effect.concurrent.Deferred +import com.criteo.cuttle.Auth.User +import com.criteo.cuttle.{Job, Logger, Scheduling, SchedulingContext, Tag, Workload} +import doobie.ConnectionIO import io.circe._ -import io.circe.syntax._ +import io.circe.generic.semiauto._ import io.circe.java8.time._ -import com.criteo.cuttle.Auth.User -import com.criteo.cuttle.{ExecutionStatus, Job, Logger, PausedJob, Scheduling, SchedulingContext, Tag, Workload} +import io.circe.syntax._ +import scala.concurrent.duration._ +import scala.concurrent.stm.{atomic, Ref} import scala.reflect.ClassTag private[cron] case class ScheduledAt(instant: Instant, delay: FiniteDuration) @@ -22,25 +24,25 @@ private[cron] case class ScheduledAt(instant: Instant, delay: FiniteDuration) private[cron] case class CronState(logger: Logger) { private val executions = Ref(Map.empty[CronDag, Either[Instant, Set[CronExecution]]]) - private val paused = Ref(Map.empty[CronDag, PausedJob]) + private val paused = Ref(Map.empty[CronDag, PausedDag]) private val runNowHandlers: Ref[Map[CronDag, Deferred[IO, (ScheduledAt, User)]]] = Ref(Map.empty[CronDag, Deferred[IO, (ScheduledAt, User)]])( implicitly[ClassTag[Map[CronDag, Deferred[IO, (ScheduledAt, User)]]]] ) private val jobDagState = Ref(Map.empty[CronDag, Map[CronJob, CronJobState]]) - private[cron] def init(availableJobDags: Set[CronDag], pausedJobs: Seq[PausedJob]) = { + private[cron] def init(availableJobDags: Set[CronDag], pausedDags: Seq[PausedDag]) = { logger.debug("Cron Scheduler States initialization") val available = availableJobDags.map(dag => dag.id -> dag).toMap - val initialPausedJobs = pausedJobs.collect { - case pausedJob if available.contains(pausedJob.id) => - logger.debug(s"Job ${pausedJob.id} is paused") - available(pausedJob.id) -> pausedJob + val initialPausedDags = pausedDags.collect { + case pausedDag if available.contains(pausedDag.id) => + logger.debug(s"DAG ${pausedDag.id} is paused") + available(pausedDag.id) -> pausedDag }.toMap atomic { implicit txn => - paused() = initialPausedJobs + paused() = initialPausedDags } } @@ -50,7 +52,7 @@ private[cron] case class CronState(logger: Logger) { val dependenciesSatisfied = dag.cronPipeline.parentsMap.filter { case (_, deps) => deps.forall { p => - successfulJobs.contains(p.parent) + successfulJobs.contains(p.from) } }.keySet val candidates = dependenciesSatisfied ++ dag.cronPipeline.roots @@ -73,7 +75,7 @@ private[cron] case class CronState(logger: Logger) { jobDagState() = jobDagState() + (dag -> Map.empty) } - private[cron] def getPausedJobs(): Set[PausedJob] = paused.single.get.values.toSet + private[cron] def getPausedDags(): Set[PausedDag] = paused.single.get.values.toSet private[cron] def isPaused(dag: CronDag): Boolean = paused.single.get.contains(dag) private[cron] def addNextEventToState(dag: CronDag, instant: Instant): Unit = atomic { implicit txn => @@ -100,7 +102,7 @@ private[cron] case class CronState(logger: Logger) { executions() = executions() - dag } - private[cron] def pauseDags(dags: Set[CronDag])(implicit user: User): Set[PausedJob] = { + private[cron] def pauseDags(dags: Set[CronDag])(implicit user: User): Set[PausedDag] = { val pauseDate = Instant.now() atomic { implicit txn => val dagsToPause = dags @@ -108,10 +110,10 @@ private[cron] case class CronState(logger: Logger) { .toSeq dagsToPause.foreach(removeDagFromState) - val justPausedJobs = dagsToPause.map(job => PausedJob(job.id, user, pauseDate)) - paused() = paused() ++ dagsToPause.zip(justPausedJobs) + val justPausedDags = dagsToPause.map(dag => PausedDag(dag.id, user, pauseDate)) + paused() = paused() ++ dagsToPause.zip(justPausedDags) - justPausedJobs.toSet + justPausedDags.toSet } } @@ -129,57 +131,45 @@ private[cron] case class CronState(logger: Logger) { runNowHandlers() = runNowHandlers() - dag } - private[cron] def getRunNowHandlers(jobIds: Set[String]) = atomic { implicit txn => - runNowHandlers().filter(cronJob => jobIds.contains(cronJob._1.id)) + private[cron] def getRunNowHandlers(dagIds: Set[String]) = atomic { implicit txn => + runNowHandlers().filter(cronJob => dagIds.contains(cronJob._1.id)) } - private[cron] def snapshotAsJson(jobIds: Set[String]) = atomic { implicit txn => - val activeJobsSnapshot = executions().collect { - case (cronDag: CronDag, state) if jobIds.contains(cronDag.id) => - cronDag.asJson - .deepMerge( - Json.obj( - state.fold( - "nextInstant" -> _.asJson, - ( - (a: Set[CronExecution]) => - ("currentExecutions" -> Json - .arr(a.map(_.toExecutionLog(ExecutionStatus.ExecutionRunning).asJson).toArray: _*)) - ) - ) - ) - ) - .deepMerge( - Json.obj( - "status" -> "active".asJson - ) - ) - } - val pausedJobsSnapshot = paused().collect { - case (cronJob, pausedJob) if jobIds.contains(cronJob.id) => pausedJob.asJson + private[cron] def snapshotAsJson(dagIds: Set[String]) = atomic { implicit txn => + val activeDagsSnapshot = executions().collect { + case (cronDag: CronDag, Left(nextInstant)) if dagIds.contains(cronDag.id) => + Json.obj( + "id" -> cronDag.id.asJson, + "status" -> "waiting".asJson, + "nextInstant" -> nextInstant.asJson + ) + case (cronDag: CronDag, Right(_)) if dagIds.contains(cronDag.id) => + Json.obj( + "id" -> cronDag.id.asJson, + "status" -> "running".asJson + ) } - val acc = (activeJobsSnapshot ++ pausedJobsSnapshot).toSeq Json.arr( - acc: _* + activeDagsSnapshot.toSeq: _* ) } private[cron] def snapshot(dagIds: Set[String]) = atomic { implicit txn => val activeJobsSnapshot = executions().filterKeys(cronDag => dagIds.contains(cronDag.id)) - val pausedJobsSnapshot = paused().filterKeys(cronDag => dagIds.contains(cronDag.id)) + val pausedDagsSnapshot = paused().filterKeys(cronDag => dagIds.contains(cronDag.id)) - activeJobsSnapshot -> pausedJobsSnapshot + activeJobsSnapshot -> pausedDagsSnapshot } - override def toString(): String = { + override def toString: String = { val builder = new StringBuilder() val state = executions.single.get builder.append("\n======State======\n") state.foreach { - case (job, jobState) => + case (dag, dagState) => val messages = Seq( - job.id, - jobState.fold(_ toString, _.map(_.id).mkString(",")) + dag.id, + dagState.fold(_.toString, _.map(_.id).mkString(",")) ) builder.append(messages mkString " :: ") builder.append("\n") @@ -192,25 +182,23 @@ private[cron] case class CronState(logger: Logger) { /** A [[CronContext]] is passed to [[com.criteo.cuttle.Execution executions]] initiated by * the [[CronScheduler]]. */ -case class CronContext(instant: Instant, retry: Int, parentDag: String) extends SchedulingContext { +case class CronContext(instant: Instant, runNowUser: Option[String] = None) extends SchedulingContext { def compareTo(other: SchedulingContext): Int = other match { - case CronContext(otherInstant, _, _) => + case CronContext(otherInstant, _) => instant.compareTo(otherInstant) } + override def logIntoDatabase: ConnectionIO[String] = Database.serializeContext(this) + override def asJson: Json = CronContext.encoder(this) - override def longRunningId(): String = toString + override def longRunningId(): String = s"$instant|${runNowUser.getOrElse("")}" } case object CronContext { - implicit val encoder: Encoder[CronContext] = - Encoder.forProduct3("interval", "retry", "parentJob")(cc => (cc.instant, cc.retry, cc.parentDag)) - implicit def decoder: Decoder[CronContext] = - Decoder.forProduct3[CronContext, Instant, Int, String]("interval", "retry", "parentJob")( - (instant: Instant, retry: Int, parentJob: String) => CronContext(instant, retry, parentJob) - ) + implicit val encoder: Encoder[CronContext] = deriveEncoder + implicit def decoder: Decoder[CronContext] = deriveDecoder } /** Configure a [[com.criteo.cuttle.Job job]] as a [[CronScheduling]] job. diff --git a/cron/src/main/scala/com/criteo/cuttle/cron/CronPipeline.scala b/cron/src/main/scala/com/criteo/cuttle/cron/CronPipeline.scala index 939c55054..af4b3a93d 100644 --- a/cron/src/main/scala/com/criteo/cuttle/cron/CronPipeline.scala +++ b/cron/src/main/scala/com/criteo/cuttle/cron/CronPipeline.scala @@ -9,25 +9,25 @@ import scala.language.implicitConversions //A non-cyclic DAG, convention is that child depends on parents case class CronPipeline(vertices: Set[CronJob], edges: Set[Dependency]) { - def children: Set[CronJob] = edges.map { case Dependency(child, _) => child } + def children: Set[CronJob] = edges.map(_.to) def roots: Set[CronJob] = vertices.filter(!children.contains(_)) - private[cron] def parents: Set[CronJob] = edges.map { case Dependency(_, parent) => parent } + private[cron] def parents: Set[CronJob] = edges.map(_.from) private[cron] def leaves: Set[CronJob] = vertices.filter(!parents.contains(_)) - private[cron] def parentsMap = edges.groupBy { case Dependency(child, _) => child } - private[cron] def childrenMap = edges.groupBy { case Dependency(_, parent) => parent } + private[cron] def parentsMap = edges.groupBy(_.to) + private[cron] def childrenMap = edges.groupBy(_.from) def dependsOn(right: CronPipeline): CronPipeline = { val left = this val newEdges: Set[Dependency] = for { v1 <- left.roots v2 <- right.leaves - } yield Dependency(v1, v2) + } yield Dependency(from = v2, to = v1) val duplicates = left.vertices.map(_.id).intersect(right.vertices.map(_.id)) - if (duplicates.size != 0) { + if (duplicates.nonEmpty) { throw new Exception("Duplicate job ids: " + duplicates.mkString(",")) } new CronPipeline( @@ -39,7 +39,7 @@ case class CronPipeline(vertices: Set[CronJob], edges: Set[Dependency]) { def and(other: CronPipeline): CronPipeline = { val leftWorkflow = this val duplicates = leftWorkflow.vertices.map(_.id).intersect(other.vertices.map(_.id)) - if (duplicates.size != 0) { + if (duplicates.nonEmpty) { throw new Exception("Duplicate job ids: " + duplicates.mkString(",")) } new CronPipeline( @@ -60,27 +60,27 @@ case class CronPipeline(vertices: Set[CronJob], edges: Set[Dependency]) { CronDag(id, this, CronExpression(cronExpression), name, description, tags) } -//Convention is that child depends on parents -case class Dependency(child: CronJob, parent: CronJob) - -object Dependency { - implicit val encodeUser: Encoder[Dependency] = new Encoder[Dependency] { - override def apply(dependency: Dependency) = - Json.obj { - "child" -> dependency.child.asJson - "parent" -> dependency.parent.asJson - } - } -} - object CronPipeline { implicit def fromCronJob(job: CronJob): CronPipeline = new CronPipeline(Set(job), Set.empty) - implicit val encodeUser: Encoder[CronPipeline] = new Encoder[CronPipeline] { + implicit val encodePipeline: Encoder[CronPipeline] = new Encoder[CronPipeline] { override def apply(pipeline: CronPipeline) = Json.obj( - "vertices" -> Json.arr(pipeline.vertices.map(_.asJson).toSeq: _*), + "vertices" -> Json.arr(pipeline.vertices.map(_.id.asJson).toSeq: _*), "edges" -> Json.arr(pipeline.edges.map(_.asJson).toSeq: _*) ) } } + +// "to" depends on "from", or "from" -> "to" +case class Dependency(from: CronJob, to: CronJob) + +object Dependency { + implicit val encodeDependency: Encoder[Dependency] = new Encoder[Dependency] { + override def apply(dependency: Dependency) = + Json.obj { + "from" -> dependency.from.id.asJson + "to" -> dependency.to.id.asJson + } + } +} diff --git a/cron/src/main/scala/com/criteo/cuttle/cron/CronProject.scala b/cron/src/main/scala/com/criteo/cuttle/cron/CronProject.scala index 0baa7358b..3307fa8ae 100644 --- a/cron/src/main/scala/com/criteo/cuttle/cron/CronProject.scala +++ b/cron/src/main/scala/com/criteo/cuttle/cron/CronProject.scala @@ -1,15 +1,15 @@ package com.criteo.cuttle.cron -import scala.concurrent.duration._ - -import lol.http._ -import io.circe.{Encoder, Json} +import com.criteo.cuttle.Auth.Authenticator +import com.criteo.cuttle.ThreadPools.Implicits.serverThreadPool +import com.criteo.cuttle.ThreadPools._ +import com.criteo.cuttle._ +import doobie.implicits._ import io.circe.syntax._ +import io.circe.{Encoder, Json} +import lol.http._ -import com.criteo.cuttle._ -import com.criteo.cuttle.ThreadPools._ -import com.criteo.cuttle.ThreadPools.Implicits.serverThreadPool -import com.criteo.cuttle.Auth.Authenticator +import scala.concurrent.duration._ /** * A cuttle project is a workflow to execute with the appropriate scheduler. @@ -44,6 +44,10 @@ class CronProject private[cuttle] (val name: String, logger.info("Connecting to database") implicit val transactor = com.criteo.cuttle.Database.connect(databaseConfig)(logger) + logger.info("Applying migrations to database") + Database.doSchemaUpdates.transact(transactor).unsafeRunSync + logger.info("Database up-to-date") + logger.info("Creating Executor") val executor = new Executor[CronScheduling](platforms, transactor, logger, name, version, logsRetention)(retryStrategy) diff --git a/cron/src/main/scala/com/criteo/cuttle/cron/CronScheduler.scala b/cron/src/main/scala/com/criteo/cuttle/cron/CronScheduler.scala index e6f0fe990..3d4dd1428 100644 --- a/cron/src/main/scala/com/criteo/cuttle/cron/CronScheduler.scala +++ b/cron/src/main/scala/com/criteo/cuttle/cron/CronScheduler.scala @@ -3,18 +3,17 @@ package com.criteo.cuttle.cron import java.time.Instant import java.util.concurrent.TimeUnit -import scala.concurrent.stm.Txn.ExternalDecider -import scala.concurrent.stm._ import cats.effect.IO import cats.effect.concurrent.Deferred import cats.implicits._ -import doobie.implicits._ -import io.circe.Json import com.criteo.cuttle._ -import com.criteo.cuttle.utils._ import com.criteo.cuttle.cron.Implicits._ +import com.criteo.cuttle.utils._ +import doobie.implicits._ +import io.circe.Json import scala.concurrent.duration.FiniteDuration +import scala.concurrent.stm.{atomic, InTxnEnd, Txn} /** A [[CronScheduler]] executes the set of Jobs at the time instants defined by Cron expressions. * Each [[com.criteo.cuttle.Job Job]] has it's own expression and executed separately from others. @@ -50,20 +49,20 @@ case class CronScheduler(logger: Logger) extends Scheduler[CronScheduling] { * @param txn current STM transaction context */ private def setExernalDecider[A](dbConnection: doobie.ConnectionIO[A])(implicit transactor: XA, txn: InTxnEnd): Unit = - Txn.setExternalDecider(new ExternalDecider { + Txn.setExternalDecider(new Txn.ExternalDecider { def shouldCommit(implicit txn: InTxnEnd): Boolean = { dbConnection.transact(transactor).unsafeRunSync true } }) - private[cron] def getPausedJobs = state.getPausedJobs() + private[cron] def getPausedDags = state.getPausedDags() private[cron] def snapshot(dagIds: Set[String]) = state.snapshot(dagIds) private[cron] def pauseDags(dags: Set[CronDag], executor: Executor[CronScheduling])(implicit transactor: XA, user: Auth.User): Unit = { - logger.debug(s"Pausing job dags $dags") + logger.debug(s"Pausing job DAGs $dags") val cancelableExecutions = atomic { implicit tx => val dagsToPause = state.pauseDags(dags) @@ -94,7 +93,7 @@ case class CronScheduler(logger: Logger) extends Scheduler[CronScheduling] { private[cron] def resumeDags(dags: Set[CronDag], executor: Executor[CronScheduling])(implicit transactor: XA, user: Auth.User): Unit = { - logger.debug(s"Resuming job dags $dags") + logger.debug(s"Resuming job DAGs $dags") val dagIdsToExecute = dags.map(_.id) val resumeQuery = dagIdsToExecute.map(queries.resumeJob).reduceLeft(_ *> _) @@ -103,19 +102,19 @@ case class CronScheduler(logger: Logger) extends Scheduler[CronScheduling] { state.resumeDags(dags) } val programs = dags.map { dag => - logger.debug(s"Activating job dags $dag") + logger.debug(s"Activating job DAGs $dag") run(dag, executor) } - logger.info(s"Relaunching job dags $dags") + logger.info(s"Relaunching job DAGs $dags") unsafeRunAsync(programs) } - private[cron] def runJobsNow(jobsToRun: Set[CronDag], executor: Executor[CronScheduling])(implicit transactor: XA, + private[cron] def runJobsNow(dagsToRun: Set[CronDag], executor: Executor[CronScheduling])(implicit transactor: XA, user: Auth.User): Unit = { - logger.info(s"Request by ${user.userId} to run on demand jobs ${jobsToRun.map(_.id).mkString}") - val runNowHandlers = state.getRunNowHandlers(jobsToRun.map(_.id)) - if (runNowHandlers.size == 0) { + logger.info(s"Request by ${user.userId} to run on demand DAGs ${dagsToRun.map(_.id).mkString}") + val runNowHandlers = state.getRunNowHandlers(dagsToRun.map(_.id)) + if (runNowHandlers.isEmpty) { logger.info("No job in waiting state.") } for ((job, d) <- runNowHandlers) { @@ -132,10 +131,10 @@ case class CronScheduler(logger: Logger) extends Scheduler[CronScheduling] { } else { state .getNextJobsInDag(dag) - .map { node: CronJob => + .map { job: CronJob => for { - runResult <- runAndRetry(dag, node, scheduledAt, runNowUser) - _ <- IO(state.cronJobFinished(dag, node, success = runResult.isRight)) + runResult <- runAndRetry(dag, job, scheduledAt, runNowUser) + _ <- IO(state.cronJobFinished(dag, job, success = runResult.isRight)) result <- runNextPart(dag, scheduledAt, runNowUser) } yield result } @@ -152,7 +151,7 @@ case class CronScheduler(logger: Logger) extends Scheduler[CronScheduling] { val runIO = for { runInfo <- IO { logger.debug(s"Sending job ${job.id} to executor") - val cronContext = CronContext(scheduledAt.instant, retryNum, job.id) + val cronContext = CronContext(scheduledAt.instant, runNowUser.map(_.userId)) executor.run(job, cronContext) } _ <- IO(runNowUser.fold(())(user => runInfo._1.streams.info(s"Run now request from ${user.userId}"))) @@ -177,7 +176,7 @@ case class CronScheduler(logger: Logger) extends Scheduler[CronScheduling] { } } - // don't run anything when job is paused + // don't run anything when dag is paused if (state.isPaused(dag)) { IO.pure(Completed) } else { @@ -186,11 +185,11 @@ case class CronScheduler(logger: Logger) extends Scheduler[CronScheduling] { completed <- maybeScheduledAt match { // we couldn't get next event from cron4s and it didn't fail so it means we've finished for this job case None => - logger.info(s"Job dag ${dag.id} has finished. We will not submit executions anymore") + logger.info(s"Job DAG ${dag.id} has finished. We will not submit executions anymore") IO.pure(Completed) // we got next event: update state with it, wait for it and run it or retry until max retry case Some(scheduledAt) => - logger.debug(s"Run job dag ${dag.id} at ${scheduledAt.instant} with a delay of ${scheduledAt.delay}") + logger.debug(s"Run job DAG ${dag.id} at ${scheduledAt.instant} with a delay of ${scheduledAt.delay}") for { _ <- IO(state.addNextEventToState(dag, scheduledAt.instant)) _ <- logState @@ -212,7 +211,7 @@ case class CronScheduler(logger: Logger) extends Scheduler[CronScheduling] { runIO.recover { case e => - val message = s"Fatal error Cron loop for job dag ${dag.id} will be stopped" + val message = s"Fatal error Cron loop for job DAG ${dag.id} will be stopped" logger.error(message) logger.error(e.getMessage) throw new Exception(message) @@ -225,20 +224,20 @@ case class CronScheduler(logger: Logger) extends Scheduler[CronScheduling] { programs.toList.parSequence.unsafeRunAsyncAndForget() } - override def getStats(jobIds: Set[String]): Json = state.snapshotAsJson(jobIds) + override def getStats(dagIds: Set[String]): Json = state.snapshotAsJson(dagIds) override def start(workload: Workload[CronScheduling], executor: Executor[CronScheduling], xa: XA, logger: Logger = logger): Unit = { - logger.info("Getting paused jobs") - val pausedJob = queries.getPausedJobs.transact(xa).unsafeRunSync() + logger.info("Getting paused DAGs") + val pausedDag = queries.getPausedJobs.transact(xa).unsafeRunSync() val programs = workload match { case CronWorkload(dags) => logger.info("Init Cron State") - state.init(dags, pausedJob) - logger.info(s"Building IOs for Cron Workload with ${dags.size} job dag(s)") + state.init(dags, pausedDag) + logger.info(s"Building IOs for Cron Workload with ${dags.size} job DAG(s)") dags.map(dag => run(dag, executor)) } diff --git a/cron/src/main/scala/com/criteo/cuttle/cron/Database.scala b/cron/src/main/scala/com/criteo/cuttle/cron/Database.scala index 94f6a4318..a8f124406 100644 --- a/cron/src/main/scala/com/criteo/cuttle/cron/Database.scala +++ b/cron/src/main/scala/com/criteo/cuttle/cron/Database.scala @@ -2,15 +2,91 @@ package com.criteo.cuttle.cron import java.time.Instant -import doobie.util.fragment.Fragment +import cats._ +import cats.implicits._ +import com.criteo.cuttle.utils +import com.criteo.cuttle.Database.JsonMeta +import doobie._ import doobie.implicits._ +import io.circe._ +import io.circe.parser._ +import io.circe.java8.time._ private[cron] object Database { - def sqlGetContextsBetween(start: Instant, end: Instant, job: CronJob): Fragment = + + private val migrateContexts: ConnectionIO[Unit] = { + val chunkSize = 1024 * 10 + val insertTmp = Update[(String, String)]("INSERT into tmp (old_context_id, new_context_id) VALUES (? , ?)") + val insertContext = + Update[(String, Json, Instant)]("REPLACE into cron_contexts (id, json, instant) VALUES (? , ?, ?)") + for { + _ <- sql""" + CREATE TABLE cron_contexts ( + id VARCHAR(1000) NOT NULL, + json JSON NOT NULL, + instant DATETIME(3) NOT NULL, + PRIMARY KEY(id) + ) ENGINE = INNODB; + + CREATE INDEX cron_contexts_by_instant ON cron_contexts (instant); + """.update.run + _ <- sql"""CREATE TEMPORARY TABLE tmp (old_context_id VARCHAR(1000), new_context_id VARCHAR(1000))""".update.run + _ <- sql"""SELECT DISTINCT context_id FROM executions""" + .query[String] + .streamWithChunkSize(chunkSize) + .chunkLimit(chunkSize) + .evalMap { oldContexts => + val execIdOldNewContext = oldContexts.map { oldContextId => + // Convert old context {"interval": xxx, ...} to new context with only the interval value named "instant" + parse(oldContextId).flatMap(_.hcursor.downField("interval").as[Instant]) match { + case Left(_) => (oldContextId, CronContext(Instant.EPOCH)) + case Right(instant) => (oldContextId, CronContext(instant)) + } + } + (insertTmp.updateMany(execIdOldNewContext.map { + case (oldContextId, newContext) => + (oldContextId, newContext.longRunningId()) + }), insertContext.updateMany(execIdOldNewContext.map { + case (_, newContext) => + (newContext.longRunningId(), newContext.asJson, newContext.instant) + })).mapN(_ + _) + } + .compile + .drain + _ <- sql"""CREATE INDEX tmp_id ON tmp (old_context_id)""".update.run + _ <- sql""" + UPDATE executions exec + JOIN tmp ON exec.context_id = tmp.old_context_id + SET exec.context_id = tmp.new_context_id + """.update.run + } yield () + } + + val schema = List( + migrateContexts + ) + + val doSchemaUpdates: ConnectionIO[Unit] = utils.updateSchema("cron_schema_evolutions", schema) + + def sqlGetContextsBetween(start: Option[Instant], end: Option[Instant]): Fragment = { + val where: Fragment = (start, end) match { + case (Some(s), Some(e)) => sql"WHERE instant BETWEEN $s AND $e" + case (Some(s), None) => sql"WHERE instant >= $s" + case (None, Some(e)) => sql"WHERE instant <= $e" + case (None, None) => Fragment.empty + } + sql"SELECT id, json FROM cron_contexts" ++ where + } + + def serializeContext(context: CronContext): ConnectionIO[String] = { + val id = context.longRunningId() sql""" - SELECT context_id as id, context_id as json FROM executions - WHERE - executions.job = ${job.id} - AND (start_time >= ${start} and start_time <= ${end}) or (end_time >= ${start} and end_time <= ${end}) - """ + REPLACE INTO cron_contexts (id, json, instant) + VALUES ( + $id, + ${context.asJson}, + ${context.instant} + ) + """.update.run *> Applicative[ConnectionIO].pure(id) + } } diff --git a/cron/src/main/scala/com/criteo/cuttle/cron/UI.scala b/cron/src/main/scala/com/criteo/cuttle/cron/UI.scala index 14f5062bb..85b9b7bc1 100644 --- a/cron/src/main/scala/com/criteo/cuttle/cron/UI.scala +++ b/cron/src/main/scala/com/criteo/cuttle/cron/UI.scala @@ -1,18 +1,19 @@ package com.criteo.cuttle.cron -import java.time.{Instant, ZoneId} import java.time.format.DateTimeFormatter +import java.time.{Instant, ZoneId} -import scala.util.Try import cats.Foldable import cats.data.EitherT import cats.effect.IO import cats.implicits._ -import lol.http._ +import com.criteo.cuttle.{ExecutionLog, ExecutionStatus, Executor, XA} +import io.circe.Json import lol.html._ +import lol.http._ import lol.json._ -import io.circe.Json -import com.criteo.cuttle.{ExecutionLog, ExecutionStatus, Executor, PausedJob, XA} + +import scala.util.Try private[cron] case class UI(project: CronProject, executor: Executor[CronScheduling])(implicit val transactor: XA) { private val scheduler = project.scheduler @@ -63,7 +64,7 @@ private[cron] case class UI(project: CronProject, executor: Executor[CronSchedul private implicit val stateToHtml = ToHtml { state: Either[Instant, Set[CronExecution]] => state.fold( instant => tmpl"Scheduled at @timeFormat(instant)", - executions => foldHtml(executions.toList)(e => tmpl"

Running @e.job.id, retry: @e.context.retry

") + executions => foldHtml(executions.toList)(e => tmpl"

Running @e.job.id

") ) } @@ -98,15 +99,15 @@ private[cron] case class UI(project: CronProject, executor: Executor[CronSchedul } } - private implicit val pausedToHtml = ToHtml { pausedDags: Map[CronDag, PausedJob] => + private implicit val pausedToHtml = ToHtml { pausedDags: Map[CronDag, PausedDag] => foldHtml(pausedDags.toList.sortBy(_._1.id)) { - case (cronDag, pausedJob) => + case (cronDag, pausedDag) => tmpl""" @cronDag.id @cronDag.name @cronDag.cronExpression.tz.getId @cronDag.cronExpression.cronExpression - Paused by @pausedJob.user.userId at @timeFormat(pausedJob.date) + Paused by @pausedDag.user.userId at @timeFormat(pausedDag.date) Runs
@@ -148,8 +149,8 @@ private[cron] case class UI(project: CronProject, executor: Executor[CronSchedul } } - def home(activeAndPausedJobs: (Map[CronDag, Either[Instant, Set[CronExecution]]], Map[CronDag, PausedJob])) = { - val (activeDags, pausedJobs) = activeAndPausedJobs + def home(activeAndPausedDags: (Map[CronDag, Either[Instant, Set[CronExecution]]], Map[CronDag, PausedDag])) = { + val (activeDags, pausedDags) = activeAndPausedDags Layout( tmpl""" @@ -184,7 +185,7 @@ private[cron] case class UI(project: CronProject, executor: Executor[CronSchedul } - @pausedJobs + @pausedDags
""" ) @@ -225,7 +226,7 @@ private[cron] case class UI(project: CronProject, executor: Executor[CronSchedul jobIds <- EitherT.rightT[IO, Throwable](dag.cronPipeline.vertices.map(_.id)) limit <- EitherT.rightT[IO, Throwable](Try(limit.toInt).getOrElse(Int.MaxValue)) executionList <- EitherT.right[Throwable]( - buildExecutionsList(executor, jobIds, minStartDateForExecutions, maxStartDateForExecutions, limit) + buildExecutionsList(executor, jobIds, None, None, limit) ) } yield (dag, executionList) diff --git a/cron/src/main/scala/com/criteo/cuttle/cron/package.scala b/cron/src/main/scala/com/criteo/cuttle/cron/package.scala index 97b6d0207..055bba3c9 100644 --- a/cron/src/main/scala/com/criteo/cuttle/cron/package.scala +++ b/cron/src/main/scala/com/criteo/cuttle/cron/package.scala @@ -1,12 +1,13 @@ package com.criteo.cuttle -import cats.effect.IO -import com.criteo.cuttle.ThreadPools._ -import com.criteo.cuttle.ThreadPools.ThreadPoolSystemProperties._ - import java.time.Instant import java.util.concurrent.atomic.AtomicInteger +import cats.effect.IO +import com.criteo.cuttle.Auth.User +import com.criteo.cuttle.ThreadPools.ThreadPoolSystemProperties._ +import com.criteo.cuttle.ThreadPools._ + import scala.concurrent.ExecutionContext import scala.language.implicitConversions @@ -14,6 +15,12 @@ package object cron { type CronJob = Job[CronScheduling] type CronExecution = Execution[CronScheduling] + // In the Cron scheduler, we do not pause jobs, we pause entire DAGs + type PausedDag = PausedJob + object PausedDag { + def apply(id: String, user: User, date: Instant): PausedDag = PausedJob(id, user, date) + } + object Implicits { //Backward compat for Job to CronDag @@ -44,24 +51,25 @@ package object cron { } - // Fair assumptions about start and end date within which we operate by default if user doesn't specify his interval. - // We choose these dates over Instant.MIN and Instant.MAX because MySQL works within this range. - private[cron] val minStartDateForExecutions = Instant.parse("1000-01-01T00:00:00Z") - private[cron] val maxStartDateForExecutions = Instant.parse("9999-12-31T23:59:59Z") - - // This function was implemented because executor.archivedExecutions returns duplicates when passing the same table - // into the context query. private[cron] def buildExecutionsList(executor: Executor[CronScheduling], - jobPartIds: Set[String], - startDate: Instant, - endDate: Instant, + jobIds: Set[String], + startDate: Option[Instant], + endDate: Option[Instant], limit: Int): IO[Map[Instant, Seq[ExecutionLog]]] = for { - archived <- executor.rawArchivedExecutions(jobPartIds, "", asc = false, 0, limit) + archived <- executor.archivedExecutions( + queryContexts = Database.sqlGetContextsBetween(startDate, endDate), + jobs = jobIds, + sort = "", + asc = false, + offset = 0, + limit = limit + ) running <- IO(executor.runningExecutions.collect { case (e, status) - if jobPartIds.contains(e.job.id) && e.context.instant.isAfter(startDate) && e.context.instant - .isBefore(endDate) => + if jobIds.contains(e.job.id) + && startDate.forall(e.context.instant.isAfter) + && endDate.forall(e.context.instant.isBefore) => e.toExecutionLog(status) }) } yield (running ++ archived).groupBy( diff --git a/timeseries/src/main/scala/com/criteo/cuttle/timeseries/CuttleProject.scala b/timeseries/src/main/scala/com/criteo/cuttle/timeseries/CuttleProject.scala index 6424def08..d49a67b0c 100644 --- a/timeseries/src/main/scala/com/criteo/cuttle/timeseries/CuttleProject.scala +++ b/timeseries/src/main/scala/com/criteo/cuttle/timeseries/CuttleProject.scala @@ -45,7 +45,14 @@ class CuttleProject private[cuttle] (val name: String, jobsToBePausedOnStartup: Set[Job[TimeSeries]] = Set.empty ): Unit = { val (routes, startScheduler) = - build(platforms, databaseConfig, retryStrategy, paused, stateRetention, logsRetention, maxVersionsHistory, jobsToBePausedOnStartup) + build(platforms, + databaseConfig, + retryStrategy, + paused, + stateRetention, + logsRetention, + maxVersionsHistory, + jobsToBePausedOnStartup) startScheduler()