Skip to content

Commit

Permalink
Adds default values mechanism on read for counters
Browse files Browse the repository at this point in the history
  • Loading branch information
dufrannea committed Jul 20, 2018
1 parent 5b6de14 commit 5d1609c
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 5 deletions.
17 changes: 12 additions & 5 deletions core/src/main/scala/com/criteo/cuttle/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -976,19 +976,21 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla
private case class ExecutionInfo(jobId: String, tags: Set[String], status: ExecutionStatus)

/**
* @param jobs list of job IDs
* @param jobs list of jobs
* @param getStateAtomic Atomically get executor stats. Given a list of jobs ids, returns how much
* ((running, waiting), paused, failing) jobs are in concrete states
* @param runningExecutions executions which are either either running or waiting for a free thread to start
* @param pausedExecutions
*/
private[cuttle] def getMetrics(jobs: Set[String])(
private[cuttle] def getMetrics(jobs: Set[Job[S]])(
getStateAtomic: Set[String] => ((Int, Int), Int, Int),
runningExecutions: Seq[(Execution[S], ExecutionStatus)],
pausedExecutions: Seq[Execution[S]],
failingExecutions: Seq[Execution[S]]
): Seq[Metric] = {
val ((runningCount, waitingCount), pausedCount, failingCount) = getStateAtomic(jobs)
val jobIds = jobs.map(_.id)

val ((runningCount, waitingCount), pausedCount, failingCount) = getStateAtomic(jobIds)
val statMetrics = Seq(
Gauge("cuttle_scheduler_stat_count", "The number of jobs that we have in concrete states")
.labeled("type" -> "running", runningCount)
Expand Down Expand Up @@ -1017,15 +1019,20 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla
statMetrics ++
Seq(getMetricsByTag(running, waiting, paused, failing)) ++
Seq(getMetricsByJob(running, waiting, paused, failing)) ++
Seq(executionsCounters.single())
Seq(executionsCounters.single().withDefaultsFor({
for {
job <- jobs.toSeq
outcome <- Seq("success", "failure")
} yield Set(("job_id", job.id), ("type", outcome)) ++ (if (job.tags.nonEmpty) Set("tags" -> job.tags.map(_.name).mkString(",")) else Nil)
}))
}

/**
* @param jobs the list of jobs ids
*/
override def getMetrics(jobs: Set[String], workflow: Workflow[S]): Seq[Metric] =
atomic { implicit txn =>
getMetrics(jobs)(
getMetrics(workflow.vertices)(
getStateAtomic,
runningExecutions,
pausedState.values.flatMap(_.executions.keys).toSeq,
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/scala/com/criteo/cuttle/Metrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ object Metrics {
val currentCount = labels2Value.getOrElse(label, number.zero).asInstanceOf[T]
copy(labels2Value = labels2Value + (label -> number.plus(currentCount, number.one).asInstanceOf[AnyVal]))
}

def withDefaultsFor(labels: Seq[Set[(String, String)]]): Counter[T] = {
val n = labels.map(label => label -> number.zero.asInstanceOf[AnyVal]).toList.toMap
copy(labels2Value = labels2Value ++ n)
}
}

/** Components able to provide metrics. */
Expand Down

0 comments on commit 5d1609c

Please sign in to comment.