Skip to content

Commit

Permalink
Use context to find retrying executions (#171)
Browse files Browse the repository at this point in the history
  • Loading branch information
Adrien Surée authored Sep 21, 2017
1 parent 93d2989 commit f3c1f10
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 6 deletions.
10 changes: 5 additions & 5 deletions core/src/main/scala/com/criteo/cuttle/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -234,20 +234,20 @@ class Executor[S <: Scheduling] private[cuttle] (
private def retryingExecutions(filteredJobs: Set[String]): Seq[(Execution[S], FailingJob, ExecutionStatus)] = {
val runningIds = runningState.single
.filter({ case (e, _) => filteredJobs.contains(e.job.id)})
.map({ case (e, _) => e.job.id -> e}).toMap
.map({ case (e, _) => (e.job.id, e.context) -> e}).toMap

recentFailures.single
.flatMap({ case ((job,_),(_, failingJob)) => runningIds.get(job.id).map((_, failingJob, ExecutionRunning)) })
.flatMap({ case ((job,context),(_, failingJob)) => runningIds.get((job.id, context)).map((_, failingJob, ExecutionRunning)) })
.toSeq
}

private def retryingExecutionsSize(filteredJobs: Set[String]): Int = {
atomic {implicit txn =>
val runningIds = runningState
.filter({ case (e, _) => filteredJobs.contains(e.job.id)})
.map({ case (e, _) => e.job.id }).toSet
.map({ case (e, _) => (e.job.id, e.context) }).toSet

recentFailures.count({ case ((job,_), _) => runningIds.contains(job.id)})
recentFailures.count({ case ((job,context), _) => runningIds.contains((job.id, context))})
}
}

Expand Down Expand Up @@ -333,7 +333,7 @@ class Executor[S <: Scheduling] private[cuttle] (
// Count as failing all jobs that have failed and are not running (throttledState)
// and all jobs that have recently failed and are now running.
private[cuttle] def failingExecutionsSize(filteredJobs: Set[String]): Int =
throttledState.single.keys.filter(e => filteredJobs.contains(e.job.id)).size +
throttledState.single.keys.filter(e => filteredJobs.contains(e.job.id)).size +
retryingExecutionsSize(filteredJobs)
private[cuttle] def allFailingExecutions: Seq[Execution[S]] =
throttledState.single.toSeq.map(_._1)
Expand Down
2 changes: 1 addition & 1 deletion devloop.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
let sbt = startSbt({
sh: 'sbt -DdevMode',
sh: 'sbt -DdevMode=true',
watch: ['build.sbt']
});

Expand Down

0 comments on commit f3c1f10

Please sign in to comment.