Skip to content

Commit

Permalink
[Draft] Move pause execution management inside scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
Masuzu committed Oct 10, 2018
1 parent a714826 commit 68037e2
Show file tree
Hide file tree
Showing 8 changed files with 277 additions and 254 deletions.
3 changes: 3 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,9 @@ lazy val cuttle =
lazy val timeseries =
(project in file("timeseries"))
.settings(commonSettings: _*)
.settings(libraryDependencies ++= Seq(
"com.wix" % "wix-embedded-mysql" % "2.1.4" % "test"
))
.settings(
// Webpack
resourceGenerators in Compile += Def.task {
Expand Down
190 changes: 17 additions & 173 deletions core/src/main/scala/com/criteo/cuttle/Executor.scala

Large diffs are not rendered by default.

13 changes: 1 addition & 12 deletions core/src/test/scala/com/criteo/cuttle/ExecutorSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class ExecutorSpec extends FunSuite with TestScheduling {
val metrics = Prometheus.serialize(
testExecutor.getMetrics(Set(fooJob))(
getStateAtomic = _ => {
((5, 1), 3, 2)
((5, 1), 2)
},
runningExecutions = Seq(
buildExecutionForJob(fooJob) -> ExecutionStatus.ExecutionRunning,
Expand All @@ -53,11 +53,6 @@ class ExecutorSpec extends FunSuite with TestScheduling {
buildExecutionForJob(untaggedJob) -> ExecutionStatus.ExecutionRunning,
buildExecutionForJob(untaggedJob) -> ExecutionStatus.ExecutionWaiting
),
pausedExecutions = Seq(
buildExecutionForJob(fooJob),
buildExecutionForJob(fooBarJob),
buildExecutionForJob(untaggedJob)
),
failingExecutions = Seq(
buildExecutionForJob(fooBarJob),
buildExecutionForJob(fooBarJob),
Expand All @@ -73,14 +68,11 @@ class ExecutorSpec extends FunSuite with TestScheduling {
|# TYPE cuttle_scheduler_stat_count gauge
|cuttle_scheduler_stat_count {type="running"} 5
|cuttle_scheduler_stat_count {type="waiting"} 1
|cuttle_scheduler_stat_count {type="paused"} 3
|cuttle_scheduler_stat_count {type="failing"} 2
|# HELP cuttle_scheduler_stat_count_by_tag The number of executions that we have in concrete states by tag
|# TYPE cuttle_scheduler_stat_count_by_tag gauge
|cuttle_scheduler_stat_count_by_tag {tag="bar", type="paused"} 1
|cuttle_scheduler_stat_count_by_tag {tag="foo", type="waiting"} 1
|cuttle_scheduler_stat_count_by_tag {tag="foo", type="running"} 3
|cuttle_scheduler_stat_count_by_tag {tag="foo", type="paused"} 2
|cuttle_scheduler_stat_count_by_tag {tag="bar", type="failing"} 2
|cuttle_scheduler_stat_count_by_tag {tag="foo", type="failing"} 2
|cuttle_scheduler_stat_count_by_tag {tag="bar", type="running"} 1
Expand All @@ -90,12 +82,9 @@ class ExecutorSpec extends FunSuite with TestScheduling {
|cuttle_scheduler_stat_count_by_job {job="foo_bar_job", type="running"} 1
|cuttle_scheduler_stat_count_by_job {job="untagged_job", type="running"} 1
|cuttle_scheduler_stat_count_by_job {job="foo_job", type="waiting"} 1
|cuttle_scheduler_stat_count_by_job {job="foo_job", type="paused"} 1
|cuttle_scheduler_stat_count_by_job {job="untagged_job", type="failing"} 1
|cuttle_scheduler_stat_count_by_job {job="foo_job", type="running"} 2
|cuttle_scheduler_stat_count_by_job {job="foo_bar_job", type="paused"} 1
|cuttle_scheduler_stat_count_by_job {job="foo_bar_job", type="failing"} 2
|cuttle_scheduler_stat_count_by_job {job="untagged_job", type="paused"} 1
|# HELP cuttle_executions_total The number of finished executions that we have in concrete states by job and by tag
|# TYPE cuttle_executions_total counter
|cuttle_executions_total {job_id="foo_job", tags="foo", type="failure"} 0
Expand Down
6 changes: 0 additions & 6 deletions timeseries/src/main/javascript/app/menu/Menu.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,6 @@ const Menu = ({ classes, className, active, statistics }: Props) => (
active={active.id === "executions/finished"}
label="Finished"
link="/executions/finished"
/>,
<MenuSubEntry
active={active.id === "executions/paused"}
label="Paused"
link="/executions/paused"
badges={[statistics.paused && { label: statistics.paused }]}
/>
]}
/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class CuttleProject private[cuttle] (

if (paused) {
logger.info("Pausing workflow")
executor.pauseJobs(jobs.all)(Auth.User("Startup"))
scheduler.pauseJobs(jobs.all, executor, xa)(Auth.User("Startup"))
}

logger.info("Start workflow")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,12 +207,6 @@ private[timeseries] case class TimeSeriesApp(project: CuttleProject, executor: E
executor.failingExecutionsSize(ids) -> executor
.failingExecutions(ids, sort, asc, offset, limit)
.toList))
case "paused" =>
IO(
Some(
executor.pausedExecutionsSize(ids) -> executor
.pausedExecutions(ids, sort, asc, offset, limit)
.toList))
case "finished" =>
executor
.archivedExecutionsSize(ids)
Expand Down Expand Up @@ -284,7 +278,7 @@ private[timeseries] case class TimeSeriesApp(project: CuttleProject, executor: E
}

case GET at "/api/jobs/paused" =>
Ok(executor.pausedJobs.asJson)
Ok(scheduler.pausedJobs().asJson)

case GET at "/api/project_definition" =>
Ok(project.asJson)
Expand All @@ -301,24 +295,24 @@ private[timeseries] case class TimeSeriesApp(project: CuttleProject, executor: E

case POST at url"/api/jobs/pause?jobs=$jobs" => { implicit user =>
getJobsOrNotFound(jobs).fold(IO.pure, jobs => {
executor.pauseJobs(jobs)
scheduler.pauseJobs(jobs, executor, xa)
Ok
})
}

case POST at url"/api/jobs/resume?jobs=$jobs" => { implicit user =>
getJobsOrNotFound(jobs).fold(IO.pure, jobs => {
executor.resumeJobs(jobs)
scheduler.resumeJobs(jobs, xa)
Ok
})
}
case POST at url"/api/jobs/all/unpause" => { implicit user =>
executor.resumeJobs(jobs.all)
scheduler.resumeJobs(jobs.all, xa)
Ok
}
case POST at url"/api/jobs/$id/unpause" => { implicit user =>
jobs.all.find(_.id == id).fold(NotFound) { job =>
executor.resumeJobs(Set(job))
scheduler.resumeJobs(Set(job), xa)
Ok
}
}
Expand Down Expand Up @@ -533,7 +527,7 @@ private[timeseries] case class TimeSeriesApp(project: CuttleProject, executor: E
acc
}

val pausedJobs = executor.pausedJobs
val pausedJobs = scheduler.pausedJobs().map(_.id)
val allFailing = executor.allFailingExecutions
val allWaitingIds = executor.allRunning
.filter(_.status == ExecutionWaiting)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import java.time.ZoneOffset.UTC
import java.time._
import java.time.temporal.ChronoUnit._
import java.time.temporal.{ChronoUnit, TemporalAdjusters}
import java.util.UUID
import java.util.{Comparator, UUID}

import scala.collection.mutable
import scala.concurrent._
import scala.concurrent.duration.{Duration => ScalaDuration}
import scala.concurrent.stm.Txn.ExternalDecider
import scala.concurrent.stm._
import scala.math.Ordering.Implicits._

Expand Down Expand Up @@ -247,6 +248,12 @@ private[timeseries] object TimeSeriesContext {
implicit val encoder: Encoder[TimeSeriesContext] = deriveEncoder
implicit def decoder(implicit jobs: Set[Job[TimeSeries]]): Decoder[TimeSeriesContext] =
deriveDecoder

/** Provide an implicit `Ordering` for [[TimeSeriesContext]] based on the `compareTo` function. */
implicit val ordering: Ordering[TimeSeriesContext] =
Ordering.comparatorToOrdering(new Comparator[TimeSeriesContext] {
def compare(o1: TimeSeriesContext, o2: TimeSeriesContext) = o1.compareTo(o2)
})
}

/** A [[TimeSeriesDependency]] qualify the dependency between 2 [[com.criteo.cuttle.Job Jobs]] in a
Expand Down Expand Up @@ -323,6 +330,14 @@ case class TimeSeriesScheduler(logger: Logger) extends Scheduler[TimeSeries] {

private val _backfills = Ref(Set.empty[Backfill])

private val _pausedJobs = Ref(Set.empty[PausedJob])

def pausedJobs(): Set[PausedJob] = atomic { implicit txn => _pausedJobs() }

private val queries = new Queries {
val appLogger: Logger = logger
}

private[timeseries] def state: (State, Set[Backfill]) = atomic { implicit txn =>
(_state(), _backfills())
}
Expand Down Expand Up @@ -526,7 +541,109 @@ case class TimeSeriesScheduler(logger: Logger) extends Scheduler[TimeSeries] {
backfills
}

def start(workflow0: Workload[TimeSeries], executor: Executor[TimeSeries], xa: XA, logger: Logger): Unit = {
private[timeseries] def pauseJobs(jobs: Set[Job[TimeSeries]], executor: Executor[TimeSeries], xa: XA)(implicit user: Auth.User): Unit = {
val executionsToCancel = atomic { implicit tx =>
val pauseDate = Instant.now()
val pausedJobIds = _pausedJobs().map(_.id)
val jobsToPause: Set[PausedJob] = jobs
.filter(job => !pausedJobIds.contains(job.id))
.map(job => PausedJob(job.id, user, pauseDate))

if (jobsToPause.isEmpty) return

_pausedJobs() = _pausedJobs() ++ jobsToPause

val pauseQuery = jobsToPause.map(queries.pauseJob).reduceLeft(_ *> _)
Txn.setExternalDecider(new ExternalDecider {
def shouldCommit(implicit txn: InTxnEnd): Boolean = {
pauseQuery.transact(xa).unsafeRunSync
true
}
})

jobsToPause.flatMap { pausedJob =>
executor.runningState.filterKeys(_.job.id == pausedJob.id).keys ++ executor.throttledState
.filterKeys(_.job.id == pausedJob.id)
.keys
}
}
logger.debug(s"we will cancel ${executionsToCancel.size} executions")
executionsToCancel.toList.sortBy(_.context).reverse.foreach { execution =>
execution.streams.debug(s"Job has been paused by user ${user.userId}")
execution.cancel()
}
}

private[timeseries] def resumeJobs(jobs: Set[Job[TimeSeries]], xa: XA)(implicit user: Auth.User): Unit = {
val jobIdsToResume = jobs.map(_.id)
val resumeQuery = jobIdsToResume.map(queries.resumeJob).reduceLeft(_ *> _)

atomic { implicit tx =>
Txn.setExternalDecider(new ExternalDecider {
def shouldCommit(implicit tx: InTxnEnd): Boolean = {
resumeQuery.transact(xa).unsafeRunSync
true
}
})

_pausedJobs() = _pausedJobs() -- _pausedJobs().filter(pausedJob => jobIdsToResume.contains(pausedJob.id))
}
}

/**
* Given a list of current executions, update their state and submit new executions depending on the current time and
* changes in execution states.
* @param running set of still running executions
* @return new set of running executions
**/
private[timeseries] def updateCurrentExecutionsAndSubmitNewExecutions(running: Set[Run], workflow: Workflow, executor: Executor[TimeSeries], xa: XA): Set[Run] = {
val (completed, stillRunning) = running.partition {
case (_, _, effect) => effect.isCompleted
}

val (stateSnapshot, completedBackfills, toRun) = atomic { implicit txn =>
val (stateSnapshot, newBackfills, completedBackfills) =
collectCompletedJobs(_state(), _backfills(), completed)

val toRun = jobsToRun(workflow, stateSnapshot, Instant.now, executor.projectVersion)

_state() = stateSnapshot
_backfills() = newBackfills

(stateSnapshot, completedBackfills, toRun)
}

val newExecutions = executor.runAll(toRun)

atomic { implicit txn =>
_state() = newExecutions.foldLeft(_state()) {
case (st, (execution, _)) =>
st + (execution.job ->
st(execution.job).update(execution.context.toInterval, Running(execution.id)))
}
}

if (completed.nonEmpty || toRun.nonEmpty)
runOrLogAndDie(Database.serializeState(stateSnapshot).transact(xa).unsafeRunSync,
"TimeseriesScheduler, cannot serialize state, shutting down")

if (completedBackfills.nonEmpty)
runOrLogAndDie(
Database
.setBackfillStatus(completedBackfills.map(_.id), "COMPLETE")
.transact(xa)
.unsafeRunSync,
"TimeseriesScheduler, cannot serialize state, shutting down"
)

val newRunning = stillRunning ++ newExecutions.map {
case (execution, result) =>
(execution.job, execution.context, result)
}
newRunning
}

private[timeseries] def initialize(workflow0: Workload[TimeSeries], xa: XA, logger: Logger) = {
val workflow = workflow0.asInstanceOf[Workflow]
logger.info("Validate workflow before start")
TimeSeriesUtils.validate(workflow) match {
Expand Down Expand Up @@ -585,55 +702,15 @@ case class TimeSeriesScheduler(logger: Logger) extends Scheduler[TimeSeries] {
_state() = _state() + (job -> jobState)
}
}
workflow
}

def mainLoop(running: Set[Run]): Unit = {
val (completed, stillRunning) = running.partition {
case (_, _, effect) => effect.isCompleted
}

val (stateSnapshot, completedBackfills, toRun) = atomic { implicit txn =>
val (stateSnapshot, newBackfills, completedBackfills) =
collectCompletedJobs(_state(), _backfills(), completed)

val toRun = jobsToRun(workflow, stateSnapshot, Instant.now, executor.projectVersion)

_state() = stateSnapshot
_backfills() = newBackfills

(stateSnapshot, completedBackfills, toRun)
}

val newExecutions = executor.runAll(toRun)

atomic { implicit txn =>
_state() = newExecutions.foldLeft(_state()) {
case (st, (execution, _)) =>
st + (execution.job ->
st(execution.job).update(execution.context.toInterval, Running(execution.id)))
}
}

if (completed.nonEmpty || toRun.nonEmpty)
runOrLogAndDie(Database.serializeState(stateSnapshot).transact(xa).unsafeRunSync,
"TimeseriesScheduler, cannot serialize state, shutting down")

if (completedBackfills.nonEmpty)
runOrLogAndDie(
Database
.setBackfillStatus(completedBackfills.map(_.id), "COMPLETE")
.transact(xa)
.unsafeRunSync,
"TimeseriesScheduler, cannot serialize state, shutting down"
)

val newRunning = stillRunning ++ newExecutions.map {
case (execution, result) =>
(execution.job, execution.context, result)
}
def start(workflow0: Workload[TimeSeries], executor: Executor[TimeSeries], xa: XA, logger: Logger): Unit = {
val workflow = initialize(workflow0, xa, logger)

utils.Timeout(ScalaDuration.create(1, "s")).andThen {
case _ => mainLoop(newRunning)
}
def mainLoop(running: Set[Run]): Unit = {
val newRunning = updateCurrentExecutionsAndSubmitNewExecutions(running, workflow, executor, xa)
utils.Timeout(ScalaDuration.create(1, "s")).andThen { case _ => mainLoop(newRunning) }
}

mainLoop(Set.empty)
Expand Down Expand Up @@ -683,6 +760,7 @@ case class TimeSeriesScheduler(logger: Logger) extends Scheduler[TimeSeries] {

val parentsMap = workflow.edges.groupBy { case (child, _, _) => child }
val childrenMap = workflow.edges.groupBy { case (_, parent, _) => parent }
val pausedJobIds = pausedJobs().map(_.id)

def reverseDescr(dep: TimeSeriesDependency) =
TimeSeriesDependency(dep.offsetLow.negated, dep.offsetHigh.negated)
Expand All @@ -693,7 +771,7 @@ case class TimeSeriesScheduler(logger: Logger) extends Scheduler[TimeSeries] {
if (low >= high) None else Some(Interval(low, high))
}

workflow.vertices.toList.flatMap { job =>
workflow.vertices.filter(job => !pausedJobIds.contains(job.id)).toList.flatMap { job =>
val full = IntervalMap[Instant, Unit](Interval[Instant](Bottom, Top) -> (()))
val dependenciesSatisfied = parentsMap
.getOrElse(job, Set.empty)
Expand Down
Loading

0 comments on commit 68037e2

Please sign in to comment.