From 8443658887f0dd55c9800093c389ec73acb56d10 Mon Sep 17 00:00:00 2001 From: Masuzu Date: Tue, 9 Oct 2018 18:29:37 +0200 Subject: [PATCH] [Draft] Move pause execution management inside scheduler --- build.sbt | 3 + .../scala/com/criteo/cuttle/Executor.scala | 190 ++---------------- .../com/criteo/cuttle/ExecutorSpec.scala | 13 +- .../src/main/javascript/app/menu/Menu.js | 6 - .../cuttle/timeseries/CuttleProject.scala | 2 +- .../cuttle/timeseries/TimeSeriesApp.scala | 18 +- .../timeseries/TimeSeriesScheduler.scala | 172 +++++++++++----- .../timeseries/TimeSeriesSchedulerSpec.scala | 121 +++++++++++ 8 files changed, 271 insertions(+), 254 deletions(-) create mode 100644 timeseries/src/test/scala/com/criteo/cuttle/timeseries/TimeSeriesSchedulerSpec.scala diff --git a/build.sbt b/build.sbt index f3a086c6d..a55264f21 100644 --- a/build.sbt +++ b/build.sbt @@ -209,6 +209,9 @@ lazy val cuttle = lazy val timeseries = (project in file("timeseries")) .settings(commonSettings: _*) + .settings(libraryDependencies ++= Seq( + "com.wix" % "wix-embedded-mysql" % "2.1.4" % "test" + )) .settings( // Webpack resourceGenerators in Compile += Def.task { diff --git a/core/src/main/scala/com/criteo/cuttle/Executor.scala b/core/src/main/scala/com/criteo/cuttle/Executor.scala index 00b2fac97..6bd369fe6 100755 --- a/core/src/main/scala/com/criteo/cuttle/Executor.scala +++ b/core/src/main/scala/com/criteo/cuttle/Executor.scala @@ -403,12 +403,9 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla val appLogger: Logger = logger } - private val pausedState: TMap[String, PausedJobWithExecutions[S]] = { - val pausedJobs = queries.getPausedJobs.transact(xa).unsafeRunSync - TMap(pausedJobs.map(pausedJob => pausedJob.id -> pausedJob.toPausedJobWithExecutions[S]()): _*) - } + // TODO: move to the scheduler private[cuttle] val runningState = TMap.empty[Execution[S], Future[Completed]] - private val throttledState = TMap.empty[Execution[S], (Promise[Completed], FailingJob)] + private[cuttle] val throttledState = TMap.empty[Execution[S], (Promise[Completed], FailingJob)] private val recentFailures = TMap.empty[(Job[S], S#Context), (Option[Execution[S]], FailingJob)] // signals whether the instance is shutting down @@ -521,42 +518,6 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla execution.toExecutionLog(status) }) - private[cuttle] def pausedExecutionsSize(filteredJobs: Set[String]): Int = - pausedState.single.values.foldLeft(0) { - case (acc, PausedJobWithExecutions(id, _, _, executions)) => - if (filteredJobs.contains(id)) - acc + executions.size - else - acc - } - - private[cuttle] def pausedExecutions(filteredJobs: Set[String], - sort: String, - asc: Boolean, - offset: Int, - limit: Int): Seq[ExecutionLog] = { - val filteredExecutions = pausedState.single.values - .collect { - case PausedJobWithExecutions(id, _, _, executions) if filteredJobs.contains(id) => executions.keys - } - .flatten - .toSeq - - val ordering = sort match { - case "job" => Ordering.by((e: Execution[S]) => (e.job.id, e)) - case "startTime" => Ordering.by((e: Execution[S]) => (e.startTime.toString, e)) - case _ => Ordering[Execution[S]] - } - - val finalOrdering = if (asc) ordering else ordering.reverse - - val sortedExecutions = filteredExecutions.sorted(finalOrdering) - - sortedExecutions - .slice(offset, offset + limit) - .map(_.toExecutionLog(ExecutionPaused)) - } - private[cuttle] def allFailingExecutions: Seq[Execution[S]] = throttledState.single.keys.toSeq @@ -615,11 +576,9 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla xa: XA): IO[Seq[ExecutionLog]] = queries.getExecutionLog(queryContexts, jobs, sort, asc, offset, limit).transact(xa) - private[cuttle] def pausedJobs: Seq[PausedJob] = pausedState.single.values.map(_.toPausedJob()).toSeq - private[cuttle] def cancelExecution(executionId: String)(implicit user: User): Unit = { val toCancel = atomic { implicit tx => - (runningState.keys ++ pausedState.values.flatMap(_.executions.keys) ++ throttledState.keys) + (runningState.keys ++ throttledState.keys) .find(_.id == executionId) } toCancel.foreach(_.cancel()) @@ -628,13 +587,9 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla private[cuttle] def getExecution(queryContexts: Fragment, executionId: String): IO[Option[ExecutionLog]] = atomic { implicit tx => val predicate = (e: Execution[S]) => e.id == executionId - pausedState.values - .flatMap(_.executions.keys) + throttledState.keys .find(predicate) - .map(_.toExecutionLog(ExecutionPaused)) - .orElse(throttledState.keys - .find(predicate) - .map(e => e.toExecutionLog(ExecutionThrottled).copy(failing = throttledState.get(e).map(_._2)))) + .map(e => e.toExecutionLog(ExecutionThrottled).copy(failing = throttledState.get(e).map(_._2))) .orElse(runningState.keys.find(predicate).map { execution => execution.toExecutionLog(flagWaitingExecutions(execution :: Nil).head._2) }) @@ -646,69 +601,7 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla private[cuttle] def openStreams(executionId: String): fs2.Stream[IO, Byte] = ExecutionStreams.getStreams(executionId, queries, xa) - private[cuttle] def pauseJobs(jobs: Set[Job[S]])(implicit user: User): Unit = { - val executionsToCancel = atomic { implicit tx => - val pauseDate = Instant.now() - val pausedJobIds = pausedJobs.map(_.id) - val jobsToPause = jobs - .filter(job => !pausedJobIds.contains(job.id)) - .map(job => PausedJob(job.id, user, pauseDate)) - if (jobsToPause.isEmpty) return - - val pauseQuery = jobsToPause.map(queries.pauseJob).reduceLeft(_ *> _) - Txn.setExternalDecider(new ExternalDecider { - def shouldCommit(implicit txn: InTxnEnd): Boolean = { - pauseQuery.transact(xa).unsafeRunSync - true - } - }) - - jobsToPause.flatMap { pausedJob => - pausedState.update(pausedJob.id, pausedJob.toPausedJobWithExecutions()) - runningState.filterKeys(_.job.id == pausedJob.id).keys ++ throttledState - .filterKeys(_.job.id == pausedJob.id) - .keys - } - } - logger.debug(s"we will cancel ${executionsToCancel.size} executions") - executionsToCancel.toList.sortBy(_.context).reverse.foreach { execution => - execution.streams.debug(s"Job has been paused by user ${user.userId}") - execution.cancel() - } - } - - private[cuttle] def resumeJobs(jobs: Set[Job[S]])(implicit user: User): Unit = { - val resumeQuery = jobs.map(job => queries.resumeJob(job.id)).reduceLeft(_ *> _) - val jobIdsToResume = jobs.map(_.id) - - val executionsToResume = atomic { implicit tx => - Txn.setExternalDecider(new ExternalDecider { - def shouldCommit(implicit tx: InTxnEnd): Boolean = { - resumeQuery.transact(xa).unsafeRunSync - true - } - }) - val executionsToResume = pausedState.collect { - case (id, PausedJobWithExecutions(_, _, _, executions)) if jobIdsToResume.contains(id) => executions - }.flatten - - jobs.foreach(job => pausedState -= job.id) - - executionsToResume.map { - case (execution, promise) => - addExecution2RunningState(execution, promise) - execution -> promise - } - } - - executionsToResume.toList.sortBy(_._1.context).foreach { - case (execution, promise) => - execution.streams.debug(s"Job has been resumed by user ${user.userId}.") - unsafeDoRun(execution, promise) - } - } - - private[cuttle] def relaunch(jobs: Set[String])(implicit user: User): Unit = { + private[cuttle] def relaunch(jobs: Set[String])(implicit user: User): Unit = { val execution2Promise = atomic { implicit txn => throttledState.collect { case (execution, (promise, _)) if jobs.contains(execution.job.id) => @@ -727,7 +620,7 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla private[cuttle] def forceSuccess(executionId: String)(implicit user: User): Unit = { val toForce = atomic { implicit tx => - (runningState.keys ++ pausedState.values.flatMap(_.executions.keys) ++ throttledState.keys) + (runningState.keys ++ throttledState.keys) .find(execution => execution.id == executionId) } toForce.foreach(_.forceSuccess()) @@ -740,7 +633,7 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla // cancel all executions val toCancel = atomic { implicit txn => isShuttingDown() = true - runningState.keys ++ pausedState.values.flatMap(_.executions.keys) ++ throttledState.keys + runningState.keys ++ throttledState.keys } toCancel.foreach(_.cancel()) @@ -834,7 +727,6 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla private def run0(all: Seq[(Job[S], S#Context)]): Seq[(Execution[S], Future[Completed])] = { sealed trait NewExecution case object ToRunNow extends NewExecution - case object Paused extends NewExecution case class Throttled(launchDate: Instant) extends NewExecution val index: Map[(Job[S], S#Context),(Execution[S], Future[Completed])] = runningState.single.map { @@ -852,20 +744,12 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla if(i > 1000 && i % 1000 == 0) logger.info(s"Submitted ${i}/${all.size} jobs") val maybeAlreadyRunning: Option[(Execution[S], Future[Completed])] = index.get((job, context)) - lazy val maybePaused: Option[(Execution[S], Future[Completed])] = pausedState - .get(job.id) - .flatMap(_.executions.collectFirst { - case (execution, promise) if execution.context == context => - execution -> promise.future - }) - lazy val maybeThrottled: Option[(Execution[S], Future[Completed])] = throttledState.find { case (e, _) => e.job == job && e.context == context }.map { case (e, (p, _)) => (e, p.future) } maybeAlreadyRunning - .orElse(maybePaused) .orElse(maybeThrottled) .toLeft { val nextExecutionId = utils.randomUUID @@ -899,12 +783,7 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla )(sideEffectExecutionContext) val promise = Promise[Completed] - if (pausedState.contains(job.id)) { - val pausedJobWithExecutions = pausedState(job.id) - pausedState += job.id -> pausedJobWithExecutions.copy( - executions = pausedJobWithExecutions.executions + (execution -> promise)) - (job, execution, promise, Paused) - } else if (recentFailures.contains(job -> context)) { + if (recentFailures.contains(job -> context)) { val (_, failingJob) = recentFailures(job -> context) recentFailures += ((job -> context) -> (Some(execution) -> failingJob)) val throttleFor = @@ -931,25 +810,6 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla whatToDo match { case ToRunNow => unsafeDoRun(execution, promise) - case Paused => - execution.streams.debug(s"Delayed because job ${execution.job.id} is paused") - // we attach this callback to freshly created "Paused" execution - execution.onCancel { - () => - val cancelNow = atomic { implicit tx => - // we take the first pair of jobId, executions and filter executions by execution - val maybeJobId2Executions = pausedState.get(job.id).collectFirst { - case pwe @ PausedJobWithExecutions(_, _, _, executions) if executions.contains(execution) => - job.id -> pwe.copy(executions = executions.filterKeys(_ == execution)) - } - - maybeJobId2Executions.fold(false) { x => - pausedState += x - true - } - } - if (cancelNow) promise.tryFailure(ExecutionCancelled) - } case Throttled(launchDate) => execution.streams.debug(s"Delayed until $launchDate because previous execution failed") val timerTask = new TimerTask { @@ -1013,18 +873,19 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla * @return how much ((running, waiting), paused, failing) jobs are in concrete states * */ private def getStateAtomic(jobs: Set[String]) = atomic { implicit txn => - (runningExecutionsSizes(jobs), pausedExecutionsSize(jobs), failingExecutionsSize(jobs)) + (runningExecutionsSizes(jobs), failingExecutionsSize(jobs)) } private[cuttle] def getStats(jobs: Set[String]): IO[Json] = { - val ((running, waiting), paused, failing) = getStateAtomic(jobs) + val ((running, waiting), failing) = getStateAtomic(jobs) // DB state call archivedExecutionsSize(jobs).map( finished => Map( "running" -> running, "waiting" -> waiting, - "paused" -> paused, + // TODO: to check in UI + "paused" -> 0, "failing" -> failing, "finished" -> finished ).asJson) @@ -1043,19 +904,17 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla * @param pausedExecutions */ private[cuttle] def getMetrics(jobs: Set[Job[S]])( - getStateAtomic: Set[String] => ((Int, Int), Int, Int), + getStateAtomic: Set[String] => ((Int, Int), Int), runningExecutions: Seq[(Execution[S], ExecutionStatus)], - pausedExecutions: Seq[Execution[S]], failingExecutions: Seq[Execution[S]] ): Seq[Metric] = { val jobIds = jobs.map(_.id) - val ((runningCount, waitingCount), pausedCount, failingCount) = getStateAtomic(jobIds) + val ((runningCount, waitingCount), failingCount) = getStateAtomic(jobIds) val statMetrics = Seq( Gauge("cuttle_scheduler_stat_count", "The number of jobs that we have in concrete states") .labeled("type" -> "running", runningCount) .labeled("type" -> "waiting", waitingCount) - .labeled("type" -> "paused", pausedCount) .labeled("type" -> "failing", failingCount) ) @@ -1068,17 +927,13 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla execution.status == ExecutionStatus.ExecutionRunning } - val paused: Seq[ExecutionInfo] = pausedExecutions.map { exec => - ExecutionInfo(exec.job.id, exec.job.tags.map(_.name), ExecutionStatus.ExecutionPaused) - } - val failing: Seq[ExecutionInfo] = failingExecutions.map { exec => ExecutionInfo(exec.job.id, exec.job.tags.map(_.name), ExecutionStatus.ExecutionThrottled) } statMetrics ++ - Seq(getMetricsByTag(running, waiting, paused, failing)) ++ - Seq(getMetricsByJob(running, waiting, paused, failing)) ++ + Seq(getMetricsByTag(running, waiting, failing)) ++ + Seq(getMetricsByJob(running, waiting, failing)) ++ Seq( executionsCounters .single() @@ -1101,14 +956,12 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla getMetrics(jobs.all.filter(j => jobIds.contains(j.id)))( getStateAtomic, runningExecutions, - pausedState.values.flatMap(_.executions.keys).toSeq, allFailingExecutions ) } private def getMetricsByTag(running: Seq[ExecutionInfo], waiting: Seq[ExecutionInfo], - paused: Seq[ExecutionInfo], failing: Seq[ExecutionInfo]): Metrics.Metric = (// Explode by tag running @@ -1125,13 +978,6 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla .groupBy(identity) .mapValues("waiting" -> _.size) .toList ++ - paused - .flatMap { info => - info.tags - } - .groupBy(identity) - .mapValues("paused" -> _.size) - .toList ++ failing .flatMap { info => info.tags @@ -1149,12 +995,10 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla private def getMetricsByJob(running: Seq[ExecutionInfo], waiting: Seq[ExecutionInfo], - paused: Seq[ExecutionInfo], failing: Seq[ExecutionInfo]): Metrics.Metric = ( running.groupBy(_.jobId).mapValues("running" -> _.size).toList ++ waiting.groupBy(_.jobId).mapValues("waiting" -> _.size).toList ++ - paused.groupBy(_.jobId).mapValues("paused" -> _.size).toList ++ failing.groupBy(_.jobId).mapValues("failing" -> _.size).toList ).foldLeft( Gauge("cuttle_scheduler_stat_count_by_job", "The number of executions that we have in concrete states by job") diff --git a/core/src/test/scala/com/criteo/cuttle/ExecutorSpec.scala b/core/src/test/scala/com/criteo/cuttle/ExecutorSpec.scala index 5d7c4f590..4e1f4ca8a 100644 --- a/core/src/test/scala/com/criteo/cuttle/ExecutorSpec.scala +++ b/core/src/test/scala/com/criteo/cuttle/ExecutorSpec.scala @@ -43,7 +43,7 @@ class ExecutorSpec extends FunSuite with TestScheduling { val metrics = Prometheus.serialize( testExecutor.getMetrics(Set(fooJob))( getStateAtomic = _ => { - ((5, 1), 3, 2) + ((5, 1), 2) }, runningExecutions = Seq( buildExecutionForJob(fooJob) -> ExecutionStatus.ExecutionRunning, @@ -53,11 +53,6 @@ class ExecutorSpec extends FunSuite with TestScheduling { buildExecutionForJob(untaggedJob) -> ExecutionStatus.ExecutionRunning, buildExecutionForJob(untaggedJob) -> ExecutionStatus.ExecutionWaiting ), - pausedExecutions = Seq( - buildExecutionForJob(fooJob), - buildExecutionForJob(fooBarJob), - buildExecutionForJob(untaggedJob) - ), failingExecutions = Seq( buildExecutionForJob(fooBarJob), buildExecutionForJob(fooBarJob), @@ -73,14 +68,11 @@ class ExecutorSpec extends FunSuite with TestScheduling { |# TYPE cuttle_scheduler_stat_count gauge |cuttle_scheduler_stat_count {type="running"} 5 |cuttle_scheduler_stat_count {type="waiting"} 1 - |cuttle_scheduler_stat_count {type="paused"} 3 |cuttle_scheduler_stat_count {type="failing"} 2 |# HELP cuttle_scheduler_stat_count_by_tag The number of executions that we have in concrete states by tag |# TYPE cuttle_scheduler_stat_count_by_tag gauge - |cuttle_scheduler_stat_count_by_tag {tag="bar", type="paused"} 1 |cuttle_scheduler_stat_count_by_tag {tag="foo", type="waiting"} 1 |cuttle_scheduler_stat_count_by_tag {tag="foo", type="running"} 3 - |cuttle_scheduler_stat_count_by_tag {tag="foo", type="paused"} 2 |cuttle_scheduler_stat_count_by_tag {tag="bar", type="failing"} 2 |cuttle_scheduler_stat_count_by_tag {tag="foo", type="failing"} 2 |cuttle_scheduler_stat_count_by_tag {tag="bar", type="running"} 1 @@ -90,12 +82,9 @@ class ExecutorSpec extends FunSuite with TestScheduling { |cuttle_scheduler_stat_count_by_job {job="foo_bar_job", type="running"} 1 |cuttle_scheduler_stat_count_by_job {job="untagged_job", type="running"} 1 |cuttle_scheduler_stat_count_by_job {job="foo_job", type="waiting"} 1 - |cuttle_scheduler_stat_count_by_job {job="foo_job", type="paused"} 1 |cuttle_scheduler_stat_count_by_job {job="untagged_job", type="failing"} 1 |cuttle_scheduler_stat_count_by_job {job="foo_job", type="running"} 2 - |cuttle_scheduler_stat_count_by_job {job="foo_bar_job", type="paused"} 1 |cuttle_scheduler_stat_count_by_job {job="foo_bar_job", type="failing"} 2 - |cuttle_scheduler_stat_count_by_job {job="untagged_job", type="paused"} 1 |# HELP cuttle_executions_total The number of finished executions that we have in concrete states by job and by tag |# TYPE cuttle_executions_total counter |cuttle_executions_total {job_id="foo_job", tags="foo", type="failure"} 0 diff --git a/timeseries/src/main/javascript/app/menu/Menu.js b/timeseries/src/main/javascript/app/menu/Menu.js index f6c80b97f..b0872296e 100644 --- a/timeseries/src/main/javascript/app/menu/Menu.js +++ b/timeseries/src/main/javascript/app/menu/Menu.js @@ -70,12 +70,6 @@ const Menu = ({ classes, className, active, statistics }: Props) => ( active={active.id === "executions/finished"} label="Finished" link="/executions/finished" - />, - ]} /> diff --git a/timeseries/src/main/scala/com/criteo/cuttle/timeseries/CuttleProject.scala b/timeseries/src/main/scala/com/criteo/cuttle/timeseries/CuttleProject.scala index 56867adf3..7fc21601f 100644 --- a/timeseries/src/main/scala/com/criteo/cuttle/timeseries/CuttleProject.scala +++ b/timeseries/src/main/scala/com/criteo/cuttle/timeseries/CuttleProject.scala @@ -41,7 +41,7 @@ class CuttleProject private[cuttle] ( if (paused) { logger.info("Pausing workflow") - executor.pauseJobs(jobs.all)(Auth.User("Startup")) + scheduler.pauseJobs(jobs.all, executor, xa)(Auth.User("Startup")) } logger.info("Start workflow") diff --git a/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesApp.scala b/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesApp.scala index 705cb3c1c..18650fe21 100644 --- a/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesApp.scala +++ b/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesApp.scala @@ -207,12 +207,6 @@ private[timeseries] case class TimeSeriesApp(project: CuttleProject, executor: E executor.failingExecutionsSize(ids) -> executor .failingExecutions(ids, sort, asc, offset, limit) .toList)) - case "paused" => - IO( - Some( - executor.pausedExecutionsSize(ids) -> executor - .pausedExecutions(ids, sort, asc, offset, limit) - .toList)) case "finished" => executor .archivedExecutionsSize(ids) @@ -284,7 +278,7 @@ private[timeseries] case class TimeSeriesApp(project: CuttleProject, executor: E } case GET at "/api/jobs/paused" => - Ok(executor.pausedJobs.asJson) + Ok(scheduler.pausedJobs().asJson) case GET at "/api/project_definition" => Ok(project.asJson) @@ -301,24 +295,24 @@ private[timeseries] case class TimeSeriesApp(project: CuttleProject, executor: E case POST at url"/api/jobs/pause?jobs=$jobs" => { implicit user => getJobsOrNotFound(jobs).fold(IO.pure, jobs => { - executor.pauseJobs(jobs) + scheduler.pauseJobs(jobs, executor, xa) Ok }) } case POST at url"/api/jobs/resume?jobs=$jobs" => { implicit user => getJobsOrNotFound(jobs).fold(IO.pure, jobs => { - executor.resumeJobs(jobs) + scheduler.resumeJobs(jobs, xa) Ok }) } case POST at url"/api/jobs/all/unpause" => { implicit user => - executor.resumeJobs(jobs.all) + scheduler.resumeJobs(jobs.all, xa) Ok } case POST at url"/api/jobs/$id/unpause" => { implicit user => jobs.all.find(_.id == id).fold(NotFound) { job => - executor.resumeJobs(Set(job)) + scheduler.resumeJobs(Set(job), xa) Ok } } @@ -533,7 +527,7 @@ private[timeseries] case class TimeSeriesApp(project: CuttleProject, executor: E acc } - val pausedJobs = executor.pausedJobs + val pausedJobs = scheduler.pausedJobs().map(_.id) val allFailing = executor.allFailingExecutions val allWaitingIds = executor.allRunning .filter(_.status == ExecutionWaiting) diff --git a/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesScheduler.scala b/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesScheduler.scala index 2e050ade7..291e89ba4 100644 --- a/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesScheduler.scala +++ b/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesScheduler.scala @@ -4,11 +4,12 @@ import java.time.ZoneOffset.UTC import java.time._ import java.time.temporal.ChronoUnit._ import java.time.temporal.{ChronoUnit, TemporalAdjusters} -import java.util.UUID +import java.util.{Comparator, UUID} import scala.collection.mutable import scala.concurrent._ import scala.concurrent.duration.{Duration => ScalaDuration} +import scala.concurrent.stm.Txn.ExternalDecider import scala.concurrent.stm._ import scala.math.Ordering.Implicits._ @@ -247,6 +248,12 @@ private[timeseries] object TimeSeriesContext { implicit val encoder: Encoder[TimeSeriesContext] = deriveEncoder implicit def decoder(implicit jobs: Set[Job[TimeSeries]]): Decoder[TimeSeriesContext] = deriveDecoder + + /** Provide an implicit `Ordering` for [[TimeSeriesContext]] based on the `compareTo` function. */ + implicit val ordering: Ordering[TimeSeriesContext] = + Ordering.comparatorToOrdering(new Comparator[TimeSeriesContext] { + def compare(o1: TimeSeriesContext, o2: TimeSeriesContext) = o1.compareTo(o2) + }) } /** A [[TimeSeriesDependency]] qualify the dependency between 2 [[com.criteo.cuttle.Job Jobs]] in a @@ -323,6 +330,14 @@ case class TimeSeriesScheduler(logger: Logger) extends Scheduler[TimeSeries] { private val _backfills = Ref(Set.empty[Backfill]) + private val _pausedJobs = Ref(Set.empty[PausedJob]) + + def pausedJobs(): Set[PausedJob] = atomic { implicit txn => _pausedJobs() } + + private val queries = new Queries { + val appLogger: Logger = logger + } + private[timeseries] def state: (State, Set[Backfill]) = atomic { implicit txn => (_state(), _backfills()) } @@ -526,7 +541,103 @@ case class TimeSeriesScheduler(logger: Logger) extends Scheduler[TimeSeries] { backfills } - def start(workflow0: Workload[TimeSeries], executor: Executor[TimeSeries], xa: XA, logger: Logger): Unit = { + private[timeseries] def pauseJobs(jobs: Set[Job[TimeSeries]], executor: Executor[TimeSeries], xa: XA)(implicit user: Auth.User): Unit = { + val executionsToCancel = atomic { implicit tx => + val pauseDate = Instant.now() + val pausedJobIds = _pausedJobs().map(_.id) + val jobsToPause: Set[PausedJob] = jobs + .filter(job => !pausedJobIds.contains(job.id)) + .map(job => PausedJob(job.id, user, pauseDate)) + + if (jobsToPause.isEmpty) return + + _pausedJobs() = _pausedJobs() ++ jobsToPause + + val pauseQuery = jobsToPause.map(queries.pauseJob).reduceLeft(_ *> _) + Txn.setExternalDecider(new ExternalDecider { + def shouldCommit(implicit txn: InTxnEnd): Boolean = { + pauseQuery.transact(xa).unsafeRunSync + true + } + }) + + jobsToPause.flatMap { pausedJob => + executor.runningState.filterKeys(_.job.id == pausedJob.id).keys ++ executor.throttledState + .filterKeys(_.job.id == pausedJob.id) + .keys + } + } + logger.debug(s"we will cancel ${executionsToCancel.size} executions") + executionsToCancel.toList.sortBy(_.context).reverse.foreach { execution => + execution.streams.debug(s"Job has been paused by user ${user.userId}") + execution.cancel() + } + } + + private[timeseries] def resumeJobs(jobs: Set[Job[TimeSeries]], xa: XA)(implicit user: Auth.User): Unit = { + val jobIdsToResume = jobs.map(_.id) + val resumeQuery = jobIdsToResume.map(queries.resumeJob).reduceLeft(_ *> _) + + atomic { implicit tx => + Txn.setExternalDecider(new ExternalDecider { + def shouldCommit(implicit tx: InTxnEnd): Boolean = { + resumeQuery.transact(xa).unsafeRunSync + true + } + }) + + _pausedJobs() = _pausedJobs() -- _pausedJobs().filter(pausedJob => jobIdsToResume.contains(pausedJob.id)) + } + } + + private[timeseries] def executeMainLoopSingleIteration(running: Set[Run], workflow: Workflow, executor: Executor[TimeSeries], xa: XA): Set[Run] = { + val (completed, stillRunning) = running.partition { + case (_, _, effect) => effect.isCompleted + } + + val (stateSnapshot, completedBackfills, toRun) = atomic { implicit txn => + val (stateSnapshot, newBackfills, completedBackfills) = + collectCompletedJobs(_state(), _backfills(), completed) + + val toRun = jobsToRun(workflow, stateSnapshot, Instant.now, executor.projectVersion) + + _state() = stateSnapshot + _backfills() = newBackfills + + (stateSnapshot, completedBackfills, toRun) + } + + val newExecutions = executor.runAll(toRun) + + atomic { implicit txn => + _state() = newExecutions.foldLeft(_state()) { + case (st, (execution, _)) => + st + (execution.job -> + st(execution.job).update(execution.context.toInterval, Running(execution.id))) + } + } + + if (completed.nonEmpty || toRun.nonEmpty) + runOrLogAndDie(Database.serializeState(stateSnapshot).transact(xa).unsafeRunSync, + "TimeseriesScheduler, cannot serialize state, shutting down") + + if (completedBackfills.nonEmpty) + runOrLogAndDie( + Database + .setBackfillStatus(completedBackfills.map(_.id), "COMPLETE") + .transact(xa) + .unsafeRunSync, + "TimeseriesScheduler, cannot serialize state, shutting down" + ) + + val newRunning = stillRunning ++ newExecutions.map { + case (execution, result) => + (execution.job, execution.context, result) + } + newRunning + } + + private[timeseries] def initialize(workflow0: Workload[TimeSeries], xa: XA, logger: Logger) = { val workflow = workflow0.asInstanceOf[Workflow] logger.info("Validate workflow before start") TimeSeriesUtils.validate(workflow) match { @@ -585,55 +696,15 @@ case class TimeSeriesScheduler(logger: Logger) extends Scheduler[TimeSeries] { _state() = _state() + (job -> jobState) } } + workflow + } - def mainLoop(running: Set[Run]): Unit = { - val (completed, stillRunning) = running.partition { - case (_, _, effect) => effect.isCompleted - } - - val (stateSnapshot, completedBackfills, toRun) = atomic { implicit txn => - val (stateSnapshot, newBackfills, completedBackfills) = - collectCompletedJobs(_state(), _backfills(), completed) - - val toRun = jobsToRun(workflow, stateSnapshot, Instant.now, executor.projectVersion) - - _state() = stateSnapshot - _backfills() = newBackfills - - (stateSnapshot, completedBackfills, toRun) - } - - val newExecutions = executor.runAll(toRun) - - atomic { implicit txn => - _state() = newExecutions.foldLeft(_state()) { - case (st, (execution, _)) => - st + (execution.job -> - st(execution.job).update(execution.context.toInterval, Running(execution.id))) - } - } - - if (completed.nonEmpty || toRun.nonEmpty) - runOrLogAndDie(Database.serializeState(stateSnapshot).transact(xa).unsafeRunSync, - "TimeseriesScheduler, cannot serialize state, shutting down") - - if (completedBackfills.nonEmpty) - runOrLogAndDie( - Database - .setBackfillStatus(completedBackfills.map(_.id), "COMPLETE") - .transact(xa) - .unsafeRunSync, - "TimeseriesScheduler, cannot serialize state, shutting down" - ) - - val newRunning = stillRunning ++ newExecutions.map { - case (execution, result) => - (execution.job, execution.context, result) - } + def start(workflow0: Workload[TimeSeries], executor: Executor[TimeSeries], xa: XA, logger: Logger): Unit = { + val workflow = initialize(workflow0, xa, logger) - utils.Timeout(ScalaDuration.create(1, "s")).andThen { - case _ => mainLoop(newRunning) - } + def mainLoop(running: Set[Run]): Unit = { + val newRunning = executeMainLoopSingleIteration(running, workflow, executor, xa) + utils.Timeout(ScalaDuration.create(1, "s")).andThen { case _ => mainLoop(newRunning) } } mainLoop(Set.empty) @@ -683,6 +754,7 @@ case class TimeSeriesScheduler(logger: Logger) extends Scheduler[TimeSeries] { val parentsMap = workflow.edges.groupBy { case (child, _, _) => child } val childrenMap = workflow.edges.groupBy { case (_, parent, _) => parent } + val pausedJobIds = pausedJobs().map(_.id) def reverseDescr(dep: TimeSeriesDependency) = TimeSeriesDependency(dep.offsetLow.negated, dep.offsetHigh.negated) @@ -693,7 +765,7 @@ case class TimeSeriesScheduler(logger: Logger) extends Scheduler[TimeSeries] { if (low >= high) None else Some(Interval(low, high)) } - workflow.vertices.toList.flatMap { job => + workflow.vertices.filter(job => !pausedJobIds.contains(job.id)).toList.flatMap { job => val full = IntervalMap[Instant, Unit](Interval[Instant](Bottom, Top) -> (())) val dependenciesSatisfied = parentsMap .getOrElse(job, Set.empty) diff --git a/timeseries/src/test/scala/com/criteo/cuttle/timeseries/TimeSeriesSchedulerSpec.scala b/timeseries/src/test/scala/com/criteo/cuttle/timeseries/TimeSeriesSchedulerSpec.scala new file mode 100644 index 000000000..25f295ccc --- /dev/null +++ b/timeseries/src/test/scala/com/criteo/cuttle/timeseries/TimeSeriesSchedulerSpec.scala @@ -0,0 +1,121 @@ +package com.criteo.cuttle.timeseries + +import java.time.ZoneOffset.UTC +import java.time.{Duration, Instant, LocalDate} +import java.util.concurrent.TimeUnit + +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration.Duration.Inf +import scala.concurrent.{Await, Future} +import scala.util.{Failure, Success} + +import com.wix.mysql._ +import com.wix.mysql.config.Charset._ +import com.wix.mysql.config._ +import com.wix.mysql.distribution.Version._ + +import com.criteo.cuttle.platforms.local._ +import com.criteo.cuttle.timeseries.TimeSeriesUtils.{Run, TimeSeriesJob} +import com.criteo.cuttle.{Auth, Database => CuttleDatabase, _} + + +object TimeSeriesSchedulerSpec { + // TODO: turn this into a unit test. This is not done for now as the thread pool responsible for checking the lock on + // the state database creates non-daemon threads, which would result in the unit test not ending unless it is interrupted + // from the outside. + def main(args: Array[String]): Unit = { + val config = { + MysqldConfig.aMysqldConfig(v5_7_latest).withCharset(UTF8).withTimeout(3600, TimeUnit.SECONDS).withPort(3388).build() + } + val mysqld = EmbeddedMysql.anEmbeddedMysql(config).addSchema("cuttle_dev").start() + + println("started!") + println("if needed you can connect to this running db using:") + println("> mysql -u root -h 127.0.0.1 -P 3388 cuttle_dev") + + val project = CuttleProject("Hello World", version = "123", env = ("dev", false)) { + Jobs.childJob dependsOn Jobs.rootJob + } + + val retryImmediatelyStrategy = new RetryStrategy { + def apply[S <: Scheduling](job: Job[S], context: S#Context, previouslyFailing: List[String]) = Duration.ZERO + def retryWindow = Duration.ZERO + } + + val xa = CuttleDatabase.connect(DatabaseConfig(Seq(DBLocation("127.0.0.1", 3388)), "cuttle_dev", "root", "")) + val executor = new Executor[TimeSeries](Seq(LocalPlatform(maxForkedProcesses = 10)), xa, logger, project.version)(retryImmediatelyStrategy) + val scheduler = project.scheduler + + scheduler.initialize(project.jobs, xa, logger) + + var runningExecutions = Set.empty[Run] + + logger.info("Starting 'root-job' and 'child-job'") + runningExecutions = doSynchronousExecutionStep(scheduler, runningExecutions, project.jobs, executor, xa) + assert(runningExecutionsToJobIdAndResult(runningExecutions).equals(Set(("root-job", true)))) + + logger.info("'root-job' completed, the child job 'child-job' is triggered but fails") + runningExecutions = doSynchronousExecutionStep(scheduler, runningExecutions, project.jobs, executor, xa) + assert(runningExecutionsToJobIdAndResult(runningExecutions).equals(Set(("child-job", false)))) + + logger.info("'child-job' is paused") + val guestUser = Auth.User("Guest") + scheduler.pauseJobs(Set(Jobs.childJob), executor, xa)(guestUser) + assert((scheduler.pausedJobs().map { case PausedJob(jobId, user, date) => (jobId, user) }).equals( + Set(("child-job", guestUser)))) + runningExecutions = doSynchronousExecutionStep(scheduler, runningExecutions, project.jobs, executor, xa) + assert(runningExecutions.isEmpty) + + logger.info("'child-job' is resumed") + scheduler.resumeJobs(Set(Jobs.childJob), xa)(guestUser) + assert(scheduler.pausedJobs().isEmpty) + runningExecutions = doSynchronousExecutionStep(scheduler, runningExecutions, project.jobs, executor, xa) + assert(runningExecutionsToJobIdAndResult(runningExecutions).equals(Set(("child-job", true)))) + + logger.info("No more jobs to schedule for now") + runningExecutions = doSynchronousExecutionStep(scheduler, runningExecutions, project.jobs, executor, xa) + assert(runningExecutions.isEmpty) + + mysqld.stop() + } + + private def doSynchronousExecutionStep(scheduler: TimeSeriesScheduler, + runningExecutions: Set[Run], + workflow: Workflow, + executor: Executor[TimeSeries], + xa: XA): Set[(TimeSeriesJob, TimeSeriesContext, Future[Completed])] = { + val newRunningExecutions = scheduler.executeMainLoopSingleIteration(runningExecutions, workflow, executor, xa) + val executionsResult = Future.sequence(newRunningExecutions.map { case (job, executionContext, futureResult) => futureResult }) + Await.ready(executionsResult, Inf) + newRunningExecutions + } + + private def runningExecutionsToJobIdAndResult(runningExecutions: Set[Run]): Set[(String, Boolean)] = { + def executionResultToBoolean(result: Future[Completed]) = result.value match { + case Some(Success(_)) => true + case Some(Failure(t)) => false + case None => throw new Exception("The execution should have completed.") + } + runningExecutions.map { case (job, executionContext, futureResult) => (job.id, executionResultToBoolean(futureResult)) } + } + + object Jobs { + private val start: Instant = LocalDate.now.minusDays(1).atStartOfDay.toInstant(UTC) + + val rootJob = Job("root-job", daily(UTC, start)) { implicit e => + Future(Completed) + } + + private var failedOnce = false + // A job which fails the first time it runs + val childJob = Job("child-job", daily(UTC, start)) { implicit e => + if (!failedOnce) { + failedOnce = true + throw new Exception("Always fails at the first execution") + } + else { + Future(Completed) + } + } + } +}