From d324be6243a365fa765439bae9dc8139960af257 Mon Sep 17 00:00:00 2001 From: eryshev Date: Thu, 21 Feb 2019 16:27:51 +0100 Subject: [PATCH] add previous failures to execution object (#368) Provides an execution with its previous failures in a form of execution logs --- build.sbt | 2 +- .../scala/com/criteo/cuttle/Executor.scala | 18 ++++++++++++------ .../scala/com/criteo/cuttle/ExecutorSpec.scala | 3 ++- .../cuttle/examples/HelloTimeSeries.scala | 1 + 4 files changed, 16 insertions(+), 8 deletions(-) diff --git a/build.sbt b/build.sbt index f4ef3fde7..caa7cfd0a 100644 --- a/build.sbt +++ b/build.sbt @@ -1,7 +1,7 @@ val devMode = settingKey[Boolean]("Some build optimization are applied in devMode.") val writeClasspath = taskKey[File]("Write the project classpath to a file.") -val VERSION = "0.7.0" +val VERSION = "0.8.0" lazy val catsCore = "1.5.0" lazy val circe = "0.10.1" diff --git a/core/src/main/scala/com/criteo/cuttle/Executor.scala b/core/src/main/scala/com/criteo/cuttle/Executor.scala index ebe99427b..00fc60e8a 100755 --- a/core/src/main/scala/com/criteo/cuttle/Executor.scala +++ b/core/src/main/scala/com/criteo/cuttle/Executor.scala @@ -98,7 +98,7 @@ private[cuttle] case class FailingJob(failedExecutions: List[ExecutionLog], next failedExecutions.headOption.flatMap(_.endTime).exists(_.isAfter(date)) } -private[cuttle] case class ExecutionLog( +case class ExecutionLog( id: String, job: String, startTime: Option[Instant], @@ -206,7 +206,8 @@ case class Execution[S <: Scheduling]( streams: ExecutionStreams, platforms: Seq[ExecutionPlatform], projectName: String, - projectVersion: String + projectVersion: String, + previousFailures: List[ExecutionLog] )(implicit val executionContext: SideEffectThreadPool) { private var waitingSeconds = 0 @@ -805,6 +806,11 @@ class Executor[S <: Scheduling] private[cuttle] ( } })(Implicits.sideEffectThreadPool) + val previousFailures = recentFailures + .get(job -> context).map(rf => + rf._2.failedExecutions + ).getOrElse(List.empty) + val execution = Execution( id = nextExecutionId, job, @@ -812,16 +818,16 @@ class Executor[S <: Scheduling] private[cuttle] ( streams = streams, platforms, projectName, - projectVersion + projectVersion, + previousFailures )(sideEffectExecutionContext) val promise = Promise[Completed] if (recentFailures.contains(job -> context)) { val (_, failingJob) = recentFailures(job -> context) recentFailures += ((job -> context) -> (Some(execution) -> failingJob)) - val throttleFor = - retryStrategy(job, context, recentFailures(job -> context)._2.failedExecutions.map(_.id)) - val launchDate = failingJob.failedExecutions.head.endTime.get.plus(throttleFor) + val retryInterval = retryStrategy(job, context, previousFailures.map(_.id)) + val launchDate = failingJob.failedExecutions.head.endTime.get.plus(retryInterval) throttledState += (execution -> ((promise, failingJob.copy(nextRetry = Some(launchDate))))) (job, execution, promise, Throttled(launchDate)) } else { diff --git a/core/src/test/scala/com/criteo/cuttle/ExecutorSpec.scala b/core/src/test/scala/com/criteo/cuttle/ExecutorSpec.scala index abc7b7ca9..05c240e80 100644 --- a/core/src/test/scala/com/criteo/cuttle/ExecutorSpec.scala +++ b/core/src/test/scala/com/criteo/cuttle/ExecutorSpec.scala @@ -118,7 +118,8 @@ class ExecutorSpec extends FunSuite with TestScheduling { }, platforms = Seq.empty, "project_name", - "test_version" + "test_version", + List.empty ) private val fooTag = Tag("foo") diff --git a/examples/src/main/scala/com/criteo/cuttle/examples/HelloTimeSeries.scala b/examples/src/main/scala/com/criteo/cuttle/examples/HelloTimeSeries.scala index 122ff854a..4a6ace040 100644 --- a/examples/src/main/scala/com/criteo/cuttle/examples/HelloTimeSeries.scala +++ b/examples/src/main/scala/com/criteo/cuttle/examples/HelloTimeSeries.scala @@ -77,6 +77,7 @@ object HelloTimeSeries { tags = Set(Tag("hello"), Tag("unsafe"))) { implicit e => // Here we mix a Scala code execution and a sh script execution. e.streams.info("Hello 3 from an unsafe job") + e.streams.info(s"My previous failures are ${e.previousFailures}") val completed = exec"sleep 3" () completed.map { _ =>