From c2c5abfcc4bce7ded1c90a6bd0baf3e027e62431 Mon Sep 17 00:00:00 2001 From: Masuzu Date: Fri, 30 Nov 2018 15:52:14 +0100 Subject: [PATCH] [Bugfix] Jobs todo never launched (#333) The bug was caused by the incorrect resolution of completed parent dependencies which spanned over contiguous time intervals, which were treated as disjoint intervals intead of a single interval. --- .../timeseries/TimeSeriesScheduler.scala | 39 +++++++--- .../TimeSeriesSchedulerIntegrationTests.scala | 1 - .../timeseries/TimeSeriesSchedulerSpec.scala | 72 ++++++++++++++----- 3 files changed, 81 insertions(+), 31 deletions(-) diff --git a/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesScheduler.scala b/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesScheduler.scala index 7d4b21b46..89a37ebe8 100644 --- a/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesScheduler.scala +++ b/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesScheduler.scala @@ -782,11 +782,12 @@ case class TimeSeriesScheduler(logger: Logger) extends Scheduler[TimeSeries] { def jobHasExecutionsRunningOnPeriod(job: Job[TimeSeries], period: Interval[Instant]): Boolean = { val jobStateOnPeriod = newState(job).intersect(period).toList - jobStateOnPeriod.exists { case (interval, jobState) => - jobState match { - case Done(_) => false - case _ => true - } + jobStateOnPeriod.exists { + case (interval, jobState) => + jobState match { + case Done(_) => false + case _ => true + } } } @@ -819,28 +820,44 @@ case class TimeSeriesScheduler(logger: Logger) extends Scheduler[TimeSeries] { if (low >= high) None else Some(Interval(low, high)) } + def joinIntervals(intervals: List[Interval[Instant]]): List[Interval[Instant]] = + intervals + .foldLeft(IntervalMap.empty[Instant, Unit]) { case (intervalMap, interval) => intervalMap.update(interval, ()) } + .toList + .map { case (interval, _) => interval } + workflow.vertices.filter(job => !pausedJobIds.contains(job.id)).toList.flatMap { job => val full = IntervalMap[Instant, Unit](Interval[Instant](Bottom, Top) -> (())) val dependenciesSatisfied = parentsMap .getOrElse(job, Set.empty) .map { case (_, parent, lbl) => - val intervals: List[Interval[Instant]] = state(parent).collect { case Done(_) => () }.toList.map(_._1) + val donePeriods: IntervalMap[Instant, Unit] = state(parent).collect { case Done(_) => () } + val intervals: List[Interval[Instant]] = + joinIntervals(donePeriods.toList.map { case (interval, _) => interval }) val newIntervals = intervals.collect(applyDep(reverseDescr(lbl))) - newIntervals.foldLeft(IntervalMap.empty[Instant, Unit])(_.update(_, ())) + val intervalMapOfSatisfiedDeps = newIntervals.foldLeft(IntervalMap.empty[Instant, Unit])(_.update(_, ())) + intervalMapOfSatisfiedDeps } .fold(full)(_ whenIsDef _) + val noChildrenRunning = childrenMap .getOrElse(job, Set.empty) .map { case (child, _, lbl) => - val intervals = state(child).collect { case Running(_) => () }.toList.map(_._1) + val runningPeriods: IntervalMap[Instant, Unit] = state(child).collect { case Running(_) => () } + val intervals = joinIntervals(runningPeriods.toList.map { case (interval, _) => interval }) val newIntervals = intervals.collect(applyDep(lbl)) - newIntervals.foldLeft(IntervalMap.empty[Instant, Unit])(_.update(_, ())) + val intervalMapWithCompletedChildren = + newIntervals.foldLeft(IntervalMap.empty[Instant, Unit])(_.update(_, ())) + intervalMapWithCompletedChildren } .fold(full)(_ whenIsUndef _) - val toRun = state(job) - .collect { case Todo(maybeBackfill) => maybeBackfill } + + val todoPeriods: IntervalMap[Instant, Option[Backfill]] = state(job).collect { + case Todo(maybeBackfill) => maybeBackfill + } + val toRun = todoPeriods .whenIsDef(dependenciesSatisfied) .whenIsDef(noChildrenRunning) diff --git a/timeseries/src/test/scala/com/criteo/cuttle/timeseries/TimeSeriesSchedulerIntegrationTests.scala b/timeseries/src/test/scala/com/criteo/cuttle/timeseries/TimeSeriesSchedulerIntegrationTests.scala index 53bf8c700..6024cb7f5 100644 --- a/timeseries/src/test/scala/com/criteo/cuttle/timeseries/TimeSeriesSchedulerIntegrationTests.scala +++ b/timeseries/src/test/scala/com/criteo/cuttle/timeseries/TimeSeriesSchedulerIntegrationTests.scala @@ -18,7 +18,6 @@ import com.criteo.cuttle.platforms.local._ import com.criteo.cuttle.timeseries.TimeSeriesUtils.{Run, TimeSeriesJob} import com.criteo.cuttle.{Auth, Database => CuttleDatabase, _} - object TimeSeriesSchedulerIntegrationTests { // TODO: turn this into a unit test. This is not done for now as the thread pool responsible for checking the lock on // the state database creates non-daemon threads, which would result in the unit test not ending unless it is interrupted diff --git a/timeseries/src/test/scala/com/criteo/cuttle/timeseries/TimeSeriesSchedulerSpec.scala b/timeseries/src/test/scala/com/criteo/cuttle/timeseries/TimeSeriesSchedulerSpec.scala index baacef43a..5bda26fdd 100644 --- a/timeseries/src/test/scala/com/criteo/cuttle/timeseries/TimeSeriesSchedulerSpec.scala +++ b/timeseries/src/test/scala/com/criteo/cuttle/timeseries/TimeSeriesSchedulerSpec.scala @@ -1,48 +1,52 @@ package com.criteo.cuttle.timeseries +import java.time.Duration + import scala.concurrent.Future import org.scalatest.FunSuite -import com.criteo.cuttle.{Completed, Job, TestScheduling, logger} +import com.criteo.cuttle.{logger, Completed, Job, TestScheduling} import com.criteo.cuttle.timeseries.JobState.{Done, Todo} import com.criteo.cuttle.timeseries.TimeSeriesUtils.State import com.criteo.cuttle.timeseries.intervals.{Interval, IntervalMap} - -class TimeSeriesSchedulerSpec extends FunSuite with TestScheduling { +class TimeSeriesSchedulerSpec extends FunSuite with TestScheduling { private val scheduling: TimeSeries = hourly(date"2017-03-25T02:00:00Z") private val testJob = Job("test_job", scheduling)(completed) + + private val parentScheduling: TimeSeries = hourly(date"2017-03-25T01:00:00Z") + private val parentTestJob = Job("parent_test_job", parentScheduling)(completed) private val scheduler = TimeSeriesScheduler(logger) - private val backfill = Backfill( - "some-id", - date"2017-03-25T01:00:00Z", - date"2017-03-25T05:00:00Z", - Set(testJob), - priority = 0, - name = "backfill", - description = "", - status = "RUNNING", - createdBy = "") + private val backfill = Backfill("some-id", + date"2017-03-25T01:00:00Z", + date"2017-03-25T05:00:00Z", + Set(testJob), + priority = 0, + name = "backfill", + description = "", + status = "RUNNING", + createdBy = "") test("identity new backfills") { val state: State = Map( - testJob -> IntervalMap( + testJob -> IntervalMap( Interval(date"2017-03-25T00:00:00Z", date"2017-03-25T01:00:00Z") -> Done(""), // Backfill completed on the last 3 hours of the backfill period, first hour not yet done Interval(date"2017-03-25T01:00:00Z", date"2017-03-25T02:00:00Z") -> Todo(Some(backfill)), Interval(date"2017-03-25T02:00:00Z", date"2017-03-25T05:00:00Z") -> Done("") ) ) - val (stateSnapshot, newBackfills, completedBackfills) = scheduler.collectCompletedJobs(state, Set(backfill), completed = Set.empty) + val (stateSnapshot, newBackfills, completedBackfills) = + scheduler.collectCompletedJobs(state, Set(backfill), completed = Set.empty) assert(newBackfills.equals(Set(backfill))) assert(completedBackfills.isEmpty) } test("complete backfills") { val state: State = Map( - testJob -> IntervalMap( + testJob -> IntervalMap( Interval(date"2017-03-25T00:00:00Z", date"2017-03-25T01:00:00Z") -> Done(""), Interval(date"2017-03-25T02:00:00Z", date"2017-03-25T05:00:00Z") -> Done("") ) @@ -52,8 +56,11 @@ class TimeSeriesSchedulerSpec extends FunSuite with TestScheduling { Set(backfill), completed = Set( // Another non backfilled execution completed on the period where 'backfill' is still running - (testJob, TimeSeriesContext(date"2017-03-25T01:00:00Z", date"2017-03-25T02:00:00Z"), Future.successful(Completed)) - )) + (testJob, + TimeSeriesContext(date"2017-03-25T01:00:00Z", date"2017-03-25T02:00:00Z"), + Future.successful(Completed)) + ) + ) assert(completedBackfills.equals(Set(backfill))) assert(newBackfills.isEmpty) } @@ -61,4 +68,31 @@ class TimeSeriesSchedulerSpec extends FunSuite with TestScheduling { test("complete regular execution while backfill is still running") { // Edge-case not possible since it is not possible to run a backfill of a job on a period where that job did not complete } -} \ No newline at end of file + + test("identify jobs to do") { + val state: State = Map( + parentTestJob -> IntervalMap( + Interval(date"2017-03-25T01:00:00Z", date"2017-03-25T02:00:00Z") -> Done("v1"), + Interval(date"2017-03-25T02:00:00Z", date"2017-03-25T04:00:00Z") -> Done("v2"), + Interval(date"2017-03-25T04:00:00Z", date"2017-03-25T05:00:00Z") -> Todo(None) + ), + testJob -> IntervalMap( + Interval(date"2017-03-25T02:00:00Z", date"2017-03-25T03:00:00Z") -> Todo(None), + Interval(date"2017-03-25T03:00:00Z", date"2017-03-25T04:00:00Z") -> Done("v2"), + Interval(date"2017-03-25T04:00:00Z", date"2017-03-25T05:00:00Z") -> Todo(None) + ) + ) + + val jobsToRun = scheduler.jobsToRun( + (testJob dependsOn parentTestJob)(TimeSeriesDependency(Duration.ofHours(-1), Duration.ofHours(0))), + state, + date"2017-03-25T05:00:00Z", + "last_version" + ) + assert( + jobsToRun.toSet.equals(Set( + (testJob, TimeSeriesContext(date"2017-03-25T02:00:00Z", date"2017-03-25T03:00:00Z", None, "last_version")), + (parentTestJob, TimeSeriesContext(date"2017-03-25T04:00:00Z", date"2017-03-25T05:00:00Z", None, "last_version")) + ))) + } +}