Skip to content

Commit

Permalink
[Bugfix] Backfill shows complete in the UI before being complete (#329)
Browse files Browse the repository at this point in the history
  • Loading branch information
Masuzu authored Nov 28, 2018
1 parent 11fde76 commit 67fdc61
Show file tree
Hide file tree
Showing 3 changed files with 208 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -780,18 +780,21 @@ case class TimeSeriesScheduler(logger: Logger) extends Scheduler[TimeSeries] {
acc + (job -> acc(job).update(context.toInterval, jobState))
}

def jobHasExecutionsRunningOnPeriod(job: Job[TimeSeries], period: Interval[Instant]): Boolean = {
val jobStateOnPeriod = newState(job).intersect(period).toList
jobStateOnPeriod.exists { case (interval, jobState) =>
jobState match {
case Done(_) => false
case _ => true
}
}
}

val notCompletedBackfills = backfills.filter { bf =>
val itvl = Interval(bf.start, bf.end)
bf.jobs.exists(
job =>
newState(job)
.intersect(itvl)
.toList
.exists(_._2 match {
case Done(_) => true
case _ => false
}))
bf.jobs.exists(job => jobHasExecutionsRunningOnPeriod(job, itvl))
}

(newState, notCompletedBackfills, backfills -- notCompletedBackfills)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package com.criteo.cuttle.timeseries

import java.time.ZoneOffset.UTC
import java.time.temporal.ChronoUnit.HOURS
import java.time.{Duration, Instant, LocalDate}
import java.util.concurrent.TimeUnit

import scala.concurrent.duration.Duration.Inf
import scala.concurrent.{Await, Future}
import scala.util.{Failure, Success}

import com.wix.mysql._
import com.wix.mysql.config.Charset._
import com.wix.mysql.config._
import com.wix.mysql.distribution.Version._

import com.criteo.cuttle.platforms.local._
import com.criteo.cuttle.timeseries.TimeSeriesUtils.{Run, TimeSeriesJob}
import com.criteo.cuttle.{Auth, Database => CuttleDatabase, _}


object TimeSeriesSchedulerIntegrationTests {
// TODO: turn this into a unit test. This is not done for now as the thread pool responsible for checking the lock on
// the state database creates non-daemon threads, which would result in the unit test not ending unless it is interrupted
// from the outside.
def main(args: Array[String]): Unit = {
val config = {
MysqldConfig
.aMysqldConfig(v5_7_latest)
.withCharset(UTF8)
.withTimeout(3600, TimeUnit.SECONDS)
.withPort(3388)
.build()
}
val mysqld = EmbeddedMysql.anEmbeddedMysql(config).addSchema("cuttle_dev").start()

println("started!")
println("if needed you can connect to this running db using:")
println("> mysql -u root -h 127.0.0.1 -P 3388 cuttle_dev")

val project = CuttleProject("Hello World", version = "123", env = ("dev", false)) {
Jobs.childJob dependsOn Jobs.rootJob
}

val retryImmediatelyStrategy = new RetryStrategy {
def apply[S <: Scheduling](job: Job[S], context: S#Context, previouslyFailing: List[String]) = Duration.ZERO
def retryWindow = Duration.ZERO
}

val xa = CuttleDatabase.connect(DatabaseConfig(Seq(DBLocation("127.0.0.1", 3388)), "cuttle_dev", "root", ""))
val executor =
new Executor[TimeSeries](Seq(LocalPlatform(maxForkedProcesses = 10)), xa, logger, project.name, project.version)(
retryImmediatelyStrategy)
val scheduler = project.scheduler

scheduler.initialize(project.jobs, xa, logger)

var runningExecutions = Set.empty[Run]

logger.info("Starting 'root-job' and 'child-job'")
runningExecutions = doSynchronousExecutionStep(scheduler, runningExecutions, project.jobs, executor, xa)
assert(runningExecutionsToJobIdAndResult(runningExecutions).equals(Set(("root-job", true))))

logger.info("'root-job' completed, the child job 'child-job' is triggered but fails")
runningExecutions = doSynchronousExecutionStep(scheduler, runningExecutions, project.jobs, executor, xa)
assert(runningExecutionsToJobIdAndResult(runningExecutions).equals(Set(("child-job", false))))

logger.info("'child-job' is paused")
val guestUser = Auth.User("Guest")
scheduler.pauseJobs(Set(Jobs.childJob), executor, xa)(guestUser)
assert(
(scheduler
.pausedJobs()
.map { case PausedJob(jobId, user, date) => (jobId, user) })
.equals(Set(("child-job", guestUser))))
runningExecutions = doSynchronousExecutionStep(scheduler, runningExecutions, project.jobs, executor, xa)
assert(runningExecutions.isEmpty)

logger.info("'child-job' is resumed")
scheduler.resumeJobs(Set(Jobs.childJob), xa)(guestUser)
assert(scheduler.pausedJobs().isEmpty)
runningExecutions = doSynchronousExecutionStep(scheduler, runningExecutions, project.jobs, executor, xa)
assert(runningExecutionsToJobIdAndResult(runningExecutions).equals(Set(("child-job", true))))

logger.info("No more jobs to schedule for now")
runningExecutions = doSynchronousExecutionStep(scheduler, runningExecutions, project.jobs, executor, xa)
assert(runningExecutions.isEmpty)

mysqld.stop()
}

private def doSynchronousExecutionStep(scheduler: TimeSeriesScheduler,
runningExecutions: Set[Run],
workflow: Workflow,
executor: Executor[TimeSeries],
xa: XA): Set[(TimeSeriesJob, TimeSeriesContext, Future[Completed])] = {
val newRunningExecutions =
scheduler.updateCurrentExecutionsAndSubmitNewExecutions(runningExecutions, workflow, executor, xa)

import scala.concurrent.ExecutionContext.Implicits.global
val executionsResult = Future.sequence(newRunningExecutions.map {
case (job, executionContext, futureResult) =>
logger.info(s"Executed ${job.id}")
futureResult
})
Await.ready(executionsResult, Inf)
newRunningExecutions
}

private def runningExecutionsToJobIdAndResult(runningExecutions: Set[Run]): Set[(String, Boolean)] = {
def executionResultToBoolean(result: Future[Completed]) = result.value match {
case Some(Success(_)) => true
case Some(Failure(t)) => false
case None => throw new Exception("The execution should have completed.")
}
runningExecutions.map {
case (job, executionContext, futureResult) => (job.id, executionResultToBoolean(futureResult))
}
}

object Jobs {
private val start: Instant = LocalDate.now.minusDays(1).atStartOfDay.toInstant(UTC)

val rootJob = Job("root-job", daily(UTC, start)) { implicit e =>
Future(Completed)
}

private var failedOnce = false
// A job which fails the first time it runs
val childJob = Job("child-job", daily(UTC, start)) { implicit e =>
if (!failedOnce) {
failedOnce = true
throw new Exception("Always fails at the first execution")
} else {
Future(Completed)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,137 +1,64 @@
package com.criteo.cuttle.timeseries

import java.time.ZoneOffset.UTC
import java.time.{Duration, Instant, LocalDate}
import java.util.concurrent.TimeUnit

import scala.concurrent.duration.Duration.Inf
import scala.concurrent.{Await, Future}
import scala.util.{Failure, Success}

import com.wix.mysql._
import com.wix.mysql.config.Charset._
import com.wix.mysql.config._
import com.wix.mysql.distribution.Version._

import com.criteo.cuttle.platforms.local._
import com.criteo.cuttle.timeseries.TimeSeriesUtils.{Run, TimeSeriesJob}
import com.criteo.cuttle.{Auth, Database => CuttleDatabase, _}

object TimeSeriesSchedulerSpec {
// TODO: turn this into a unit test. This is not done for now as the thread pool responsible for checking the lock on
// the state database creates non-daemon threads, which would result in the unit test not ending unless it is interrupted
// from the outside.
def main(args: Array[String]): Unit = {
val config = {
MysqldConfig
.aMysqldConfig(v5_7_latest)
.withCharset(UTF8)
.withTimeout(3600, TimeUnit.SECONDS)
.withPort(3388)
.build()
}
val mysqld = EmbeddedMysql.anEmbeddedMysql(config).addSchema("cuttle_dev").start()

println("started!")
println("if needed you can connect to this running db using:")
println("> mysql -u root -h 127.0.0.1 -P 3388 cuttle_dev")

val project = CuttleProject("Hello World", version = "123", env = ("dev", false)) {
Jobs.childJob dependsOn Jobs.rootJob
}

val retryImmediatelyStrategy = new RetryStrategy {
def apply[S <: Scheduling](job: Job[S], context: S#Context, previouslyFailing: List[String]) = Duration.ZERO
def retryWindow = Duration.ZERO
}

val xa = CuttleDatabase.connect(DatabaseConfig(Seq(DBLocation("127.0.0.1", 3388)), "cuttle_dev", "root", ""))
val executor =
new Executor[TimeSeries](Seq(LocalPlatform(maxForkedProcesses = 10)), xa, logger, project.name, project.version)(
retryImmediatelyStrategy)
val scheduler = project.scheduler

scheduler.initialize(project.jobs, xa, logger)

var runningExecutions = Set.empty[Run]

logger.info("Starting 'root-job' and 'child-job'")
runningExecutions = doSynchronousExecutionStep(scheduler, runningExecutions, project.jobs, executor, xa)
assert(runningExecutionsToJobIdAndResult(runningExecutions).equals(Set(("root-job", true))))

logger.info("'root-job' completed, the child job 'child-job' is triggered but fails")
runningExecutions = doSynchronousExecutionStep(scheduler, runningExecutions, project.jobs, executor, xa)
assert(runningExecutionsToJobIdAndResult(runningExecutions).equals(Set(("child-job", false))))

logger.info("'child-job' is paused")
val guestUser = Auth.User("Guest")
scheduler.pauseJobs(Set(Jobs.childJob), executor, xa)(guestUser)
assert(
(scheduler
.pausedJobs()
.map { case PausedJob(jobId, user, date) => (jobId, user) })
.equals(Set(("child-job", guestUser))))
runningExecutions = doSynchronousExecutionStep(scheduler, runningExecutions, project.jobs, executor, xa)
assert(runningExecutions.isEmpty)

logger.info("'child-job' is resumed")
scheduler.resumeJobs(Set(Jobs.childJob), xa)(guestUser)
assert(scheduler.pausedJobs().isEmpty)
runningExecutions = doSynchronousExecutionStep(scheduler, runningExecutions, project.jobs, executor, xa)
assert(runningExecutionsToJobIdAndResult(runningExecutions).equals(Set(("child-job", true))))

logger.info("No more jobs to schedule for now")
runningExecutions = doSynchronousExecutionStep(scheduler, runningExecutions, project.jobs, executor, xa)
assert(runningExecutions.isEmpty)

mysqld.stop()
import scala.concurrent.Future

import org.scalatest.FunSuite

import com.criteo.cuttle.{Completed, Job, TestScheduling, logger}
import com.criteo.cuttle.timeseries.JobState.{Done, Todo}
import com.criteo.cuttle.timeseries.TimeSeriesUtils.State
import com.criteo.cuttle.timeseries.intervals.{Interval, IntervalMap}


class TimeSeriesSchedulerSpec extends FunSuite with TestScheduling {
private val scheduling: TimeSeries = hourly(date"2017-03-25T02:00:00Z")
private val testJob = Job("test_job", scheduling)(completed)
private val scheduler = TimeSeriesScheduler(logger)

private val backfill = Backfill(
"some-id",
date"2017-03-25T01:00:00Z",
date"2017-03-25T05:00:00Z",
Set(testJob),
priority = 0,
name = "backfill",
description = "",
status = "RUNNING",
createdBy = "")

test("identity new backfills") {
val state: State = Map(
testJob -> IntervalMap(
Interval(date"2017-03-25T00:00:00Z", date"2017-03-25T01:00:00Z") -> Done(""),
// Backfill completed on the last 3 hours of the backfill period, first hour not yet done
Interval(date"2017-03-25T01:00:00Z", date"2017-03-25T02:00:00Z") -> Todo(Some(backfill)),
Interval(date"2017-03-25T02:00:00Z", date"2017-03-25T05:00:00Z") -> Done("")
)
)
val (stateSnapshot, newBackfills, completedBackfills) = scheduler.collectCompletedJobs(state, Set(backfill), completed = Set.empty)
assert(newBackfills.equals(Set(backfill)))
assert(completedBackfills.isEmpty)
}

private def doSynchronousExecutionStep(scheduler: TimeSeriesScheduler,
runningExecutions: Set[Run],
workflow: Workflow,
executor: Executor[TimeSeries],
xa: XA): Set[(TimeSeriesJob, TimeSeriesContext, Future[Completed])] = {
val newRunningExecutions =
scheduler.updateCurrentExecutionsAndSubmitNewExecutions(runningExecutions, workflow, executor, xa)

import scala.concurrent.ExecutionContext.Implicits.global
val executionsResult = Future.sequence(newRunningExecutions.map {
case (job, executionContext, futureResult) =>
logger.info(s"Executed ${job.id}")
futureResult
})
Await.ready(executionsResult, Inf)
newRunningExecutions
}

private def runningExecutionsToJobIdAndResult(runningExecutions: Set[Run]): Set[(String, Boolean)] = {
def executionResultToBoolean(result: Future[Completed]) = result.value match {
case Some(Success(_)) => true
case Some(Failure(t)) => false
case None => throw new Exception("The execution should have completed.")
}
runningExecutions.map {
case (job, executionContext, futureResult) => (job.id, executionResultToBoolean(futureResult))
}
test("complete backfills") {
val state: State = Map(
testJob -> IntervalMap(
Interval(date"2017-03-25T00:00:00Z", date"2017-03-25T01:00:00Z") -> Done(""),
Interval(date"2017-03-25T02:00:00Z", date"2017-03-25T05:00:00Z") -> Done("")
)
)
val (stateSnapshot, newBackfills, completedBackfills) = scheduler.collectCompletedJobs(
state,
Set(backfill),
completed = Set(
// Another non backfilled execution completed on the period where 'backfill' is still running
(testJob, TimeSeriesContext(date"2017-03-25T01:00:00Z", date"2017-03-25T02:00:00Z"), Future.successful(Completed))
))
assert(completedBackfills.equals(Set(backfill)))
assert(newBackfills.isEmpty)
}

object Jobs {
private val start: Instant = LocalDate.now.minusDays(1).atStartOfDay.toInstant(UTC)

val rootJob = Job("root-job", daily(UTC, start)) { implicit e =>
Future(Completed)
}

private var failedOnce = false
// A job which fails the first time it runs
val childJob = Job("child-job", daily(UTC, start)) { implicit e =>
if (!failedOnce) {
failedOnce = true
throw new Exception("Always fails at the first execution")
} else {
Future(Completed)
}
}
test("complete regular execution while backfill is still running") {
// Edge-case not possible since it is not possible to run a backfill of a job on a period where that job did not complete
}
}
}

0 comments on commit 67fdc61

Please sign in to comment.