Skip to content

Commit

Permalink
Adds default values mechanism on read for counters (#293)
Browse files Browse the repository at this point in the history
  • Loading branch information
dufrannea authored Jul 23, 2018
1 parent 5b6de14 commit f072e81
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 12 deletions.
17 changes: 12 additions & 5 deletions core/src/main/scala/com/criteo/cuttle/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -976,19 +976,21 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla
private case class ExecutionInfo(jobId: String, tags: Set[String], status: ExecutionStatus)

/**
* @param jobs list of job IDs
* @param jobs list of jobs
* @param getStateAtomic Atomically get executor stats. Given a list of jobs ids, returns how much
* ((running, waiting), paused, failing) jobs are in concrete states
* @param runningExecutions executions which are either either running or waiting for a free thread to start
* @param pausedExecutions
*/
private[cuttle] def getMetrics(jobs: Set[String])(
private[cuttle] def getMetrics(jobs: Set[Job[S]])(
getStateAtomic: Set[String] => ((Int, Int), Int, Int),
runningExecutions: Seq[(Execution[S], ExecutionStatus)],
pausedExecutions: Seq[Execution[S]],
failingExecutions: Seq[Execution[S]]
): Seq[Metric] = {
val ((runningCount, waitingCount), pausedCount, failingCount) = getStateAtomic(jobs)
val jobIds = jobs.map(_.id)

val ((runningCount, waitingCount), pausedCount, failingCount) = getStateAtomic(jobIds)
val statMetrics = Seq(
Gauge("cuttle_scheduler_stat_count", "The number of jobs that we have in concrete states")
.labeled("type" -> "running", runningCount)
Expand Down Expand Up @@ -1017,15 +1019,20 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla
statMetrics ++
Seq(getMetricsByTag(running, waiting, paused, failing)) ++
Seq(getMetricsByJob(running, waiting, paused, failing)) ++
Seq(executionsCounters.single())
Seq(executionsCounters.single().withDefaultsFor({
for {
job <- jobs.toSeq
outcome <- Seq("success", "failure")
} yield Set(("job_id", job.id), ("type", outcome)) ++ (if (job.tags.nonEmpty) Set("tags" -> job.tags.map(_.name).mkString(",")) else Nil)
}))
}

/**
* @param jobs the list of jobs ids
*/
override def getMetrics(jobs: Set[String], workflow: Workflow[S]): Seq[Metric] =
atomic { implicit txn =>
getMetrics(jobs)(
getMetrics(workflow.vertices)(
getStateAtomic,
runningExecutions,
pausedState.values.flatMap(_.executions.keys).toSeq,
Expand Down
7 changes: 6 additions & 1 deletion core/src/main/scala/com/criteo/cuttle/Metrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ object Metrics {
val currentCount = labels2Value.getOrElse(label, number.zero).asInstanceOf[T]
copy(labels2Value = labels2Value + (label -> number.plus(currentCount, number.one).asInstanceOf[AnyVal]))
}

def withDefaultsFor(labels: Seq[Set[(String, String)]]): Counter[T] = {
val n = labels.map(label => label -> number.zero.asInstanceOf[AnyVal]).toList.toMap
copy(labels2Value = n ++ labels2Value)
}
}

/** Components able to provide metrics. */
Expand All @@ -76,7 +81,7 @@ object Metrics {
val labeledMetrics = metric.labels2Value.map {
case (labels, value) =>
val labelsSerialized =
if (labels.nonEmpty) s" {${labels.map(label => s"""${label._1}="${label._2}"""").mkString(", ")}} "
if (labels.nonEmpty) s" {${labels.toSeq.sortBy(_._1).map(label => s"""${label._1}="${label._2}"""").mkString(", ")}} "
else " "
s"${metric.name}$labelsSerialized$value"
}
Expand Down
13 changes: 7 additions & 6 deletions core/src/test/scala/com/criteo/cuttle/ExecutorSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class ExecutorSpec extends FunSuite with TestScheduling {
testExecutor.updateFinishedExecutionCounters(buildExecutionForJob(fooBarJob), "failure")

val metrics = Prometheus.serialize(
testExecutor.getMetrics(Set("jobA"))(
testExecutor.getMetrics(Set(fooJob))(
getStateAtomic = _ => {
((5, 1), 3, 2)
},
Expand Down Expand Up @@ -98,11 +98,12 @@ class ExecutorSpec extends FunSuite with TestScheduling {
|cuttle_scheduler_stat_count_by_job {job="untagged_job", type="paused"} 1
|# HELP cuttle_executions_total The number of finished executions that we have in concrete states by job and by tag
|# TYPE cuttle_executions_total counter
|cuttle_executions_total {type="success", job_id="foo_bar_job", tags="foo,bar"} 1
|cuttle_executions_total {type="success", job_id="foo_job", tags="foo"} 2
|cuttle_executions_total {type="failure", job_id="foo_bar_job", tags="foo,bar"} 1
|cuttle_executions_total {type="success", job_id="untagged_job"} 1
|cuttle_executions_total {type="failure", job_id="untagged_job"} 1
|cuttle_executions_total {job_id="foo_job", tags="foo", type="failure"} 0
|cuttle_executions_total {job_id="foo_bar_job", tags="foo,bar", type="success"} 1
|cuttle_executions_total {job_id="foo_job", tags="foo", type="success"} 2
|cuttle_executions_total {job_id="foo_bar_job", tags="foo,bar", type="failure"} 1
|cuttle_executions_total {job_id="untagged_job", type="success"} 1
|cuttle_executions_total {job_id="untagged_job", type="failure"} 1
|""".stripMargin

assert(metrics == expectedMetrics)
Expand Down

0 comments on commit f072e81

Please sign in to comment.