Skip to content

Commit

Permalink
add previous failures to execution object (#368)
Browse files Browse the repository at this point in the history
Provides an execution with its previous failures in a form of execution logs
  • Loading branch information
eryshev authored Feb 21, 2019
1 parent a3e7f7e commit d324be6
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 8 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
val devMode = settingKey[Boolean]("Some build optimization are applied in devMode.")
val writeClasspath = taskKey[File]("Write the project classpath to a file.")

val VERSION = "0.7.0"
val VERSION = "0.8.0"

lazy val catsCore = "1.5.0"
lazy val circe = "0.10.1"
Expand Down
18 changes: 12 additions & 6 deletions core/src/main/scala/com/criteo/cuttle/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ private[cuttle] case class FailingJob(failedExecutions: List[ExecutionLog], next
failedExecutions.headOption.flatMap(_.endTime).exists(_.isAfter(date))
}

private[cuttle] case class ExecutionLog(
case class ExecutionLog(
id: String,
job: String,
startTime: Option[Instant],
Expand Down Expand Up @@ -206,7 +206,8 @@ case class Execution[S <: Scheduling](
streams: ExecutionStreams,
platforms: Seq[ExecutionPlatform],
projectName: String,
projectVersion: String
projectVersion: String,
previousFailures: List[ExecutionLog]
)(implicit val executionContext: SideEffectThreadPool) {

private var waitingSeconds = 0
Expand Down Expand Up @@ -805,23 +806,28 @@ class Executor[S <: Scheduling] private[cuttle] (
}
})(Implicits.sideEffectThreadPool)

val previousFailures = recentFailures
.get(job -> context).map(rf =>
rf._2.failedExecutions
).getOrElse(List.empty)

val execution = Execution(
id = nextExecutionId,
job,
context,
streams = streams,
platforms,
projectName,
projectVersion
projectVersion,
previousFailures
)(sideEffectExecutionContext)
val promise = Promise[Completed]

if (recentFailures.contains(job -> context)) {
val (_, failingJob) = recentFailures(job -> context)
recentFailures += ((job -> context) -> (Some(execution) -> failingJob))
val throttleFor =
retryStrategy(job, context, recentFailures(job -> context)._2.failedExecutions.map(_.id))
val launchDate = failingJob.failedExecutions.head.endTime.get.plus(throttleFor)
val retryInterval = retryStrategy(job, context, previousFailures.map(_.id))
val launchDate = failingJob.failedExecutions.head.endTime.get.plus(retryInterval)
throttledState += (execution -> ((promise, failingJob.copy(nextRetry = Some(launchDate)))))
(job, execution, promise, Throttled(launchDate))
} else {
Expand Down
3 changes: 2 additions & 1 deletion core/src/test/scala/com/criteo/cuttle/ExecutorSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ class ExecutorSpec extends FunSuite with TestScheduling {
},
platforms = Seq.empty,
"project_name",
"test_version"
"test_version",
List.empty
)

private val fooTag = Tag("foo")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ object HelloTimeSeries {
tags = Set(Tag("hello"), Tag("unsafe"))) { implicit e =>
// Here we mix a Scala code execution and a sh script execution.
e.streams.info("Hello 3 from an unsafe job")
e.streams.info(s"My previous failures are ${e.previousFailures}")
val completed = exec"sleep 3" ()

completed.map { _ =>
Expand Down

0 comments on commit d324be6

Please sign in to comment.