From 9f8f5e1e96f38ae0c8e88ada532fe16a2922d8e4 Mon Sep 17 00:00:00 2001 From: Arnaud Dufranne Date: Fri, 20 Jul 2018 18:04:23 +0200 Subject: [PATCH] Adds default values mechanism on read for counters --- .../main/scala/com/criteo/cuttle/Executor.scala | 17 ++++++++++++----- .../main/scala/com/criteo/cuttle/Metrics.scala | 7 ++++++- .../scala/com/criteo/cuttle/ExecutorSpec.scala | 13 +++++++------ 3 files changed, 25 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/com/criteo/cuttle/Executor.scala b/core/src/main/scala/com/criteo/cuttle/Executor.scala index 31a4eb5d8..d95952f83 100644 --- a/core/src/main/scala/com/criteo/cuttle/Executor.scala +++ b/core/src/main/scala/com/criteo/cuttle/Executor.scala @@ -976,19 +976,21 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla private case class ExecutionInfo(jobId: String, tags: Set[String], status: ExecutionStatus) /** - * @param jobs list of job IDs + * @param jobs list of jobs * @param getStateAtomic Atomically get executor stats. Given a list of jobs ids, returns how much * ((running, waiting), paused, failing) jobs are in concrete states * @param runningExecutions executions which are either either running or waiting for a free thread to start * @param pausedExecutions */ - private[cuttle] def getMetrics(jobs: Set[String])( + private[cuttle] def getMetrics(jobs: Set[Job[S]])( getStateAtomic: Set[String] => ((Int, Int), Int, Int), runningExecutions: Seq[(Execution[S], ExecutionStatus)], pausedExecutions: Seq[Execution[S]], failingExecutions: Seq[Execution[S]] ): Seq[Metric] = { - val ((runningCount, waitingCount), pausedCount, failingCount) = getStateAtomic(jobs) + val jobIds = jobs.map(_.id) + + val ((runningCount, waitingCount), pausedCount, failingCount) = getStateAtomic(jobIds) val statMetrics = Seq( Gauge("cuttle_scheduler_stat_count", "The number of jobs that we have in concrete states") .labeled("type" -> "running", runningCount) @@ -1017,7 +1019,12 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla statMetrics ++ Seq(getMetricsByTag(running, waiting, paused, failing)) ++ Seq(getMetricsByJob(running, waiting, paused, failing)) ++ - Seq(executionsCounters.single()) + Seq(executionsCounters.single().withDefaultsFor({ + for { + job <- jobs.toSeq + outcome <- Seq("success", "failure") + } yield Set(("job_id", job.id), ("type", outcome)) ++ (if (job.tags.nonEmpty) Set("tags" -> job.tags.map(_.name).mkString(",")) else Nil) + })) } /** @@ -1025,7 +1032,7 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla */ override def getMetrics(jobs: Set[String], workflow: Workflow[S]): Seq[Metric] = atomic { implicit txn => - getMetrics(jobs)( + getMetrics(workflow.vertices)( getStateAtomic, runningExecutions, pausedState.values.flatMap(_.executions.keys).toSeq, diff --git a/core/src/main/scala/com/criteo/cuttle/Metrics.scala b/core/src/main/scala/com/criteo/cuttle/Metrics.scala index e32907904..2275d222f 100644 --- a/core/src/main/scala/com/criteo/cuttle/Metrics.scala +++ b/core/src/main/scala/com/criteo/cuttle/Metrics.scala @@ -63,6 +63,11 @@ object Metrics { val currentCount = labels2Value.getOrElse(label, number.zero).asInstanceOf[T] copy(labels2Value = labels2Value + (label -> number.plus(currentCount, number.one).asInstanceOf[AnyVal])) } + + def withDefaultsFor(labels: Seq[Set[(String, String)]]): Counter[T] = { + val n = labels.map(label => label -> number.zero.asInstanceOf[AnyVal]).toList.toMap + copy(labels2Value = n ++ labels2Value) + } } /** Components able to provide metrics. */ @@ -76,7 +81,7 @@ object Metrics { val labeledMetrics = metric.labels2Value.map { case (labels, value) => val labelsSerialized = - if (labels.nonEmpty) s" {${labels.map(label => s"""${label._1}="${label._2}"""").mkString(", ")}} " + if (labels.nonEmpty) s" {${labels.toSeq.sortBy(_._1).map(label => s"""${label._1}="${label._2}"""").mkString(", ")}} " else " " s"${metric.name}$labelsSerialized$value" } diff --git a/core/src/test/scala/com/criteo/cuttle/ExecutorSpec.scala b/core/src/test/scala/com/criteo/cuttle/ExecutorSpec.scala index a6e3c1414..f98745813 100644 --- a/core/src/test/scala/com/criteo/cuttle/ExecutorSpec.scala +++ b/core/src/test/scala/com/criteo/cuttle/ExecutorSpec.scala @@ -41,7 +41,7 @@ class ExecutorSpec extends FunSuite with TestScheduling { testExecutor.updateFinishedExecutionCounters(buildExecutionForJob(fooBarJob), "failure") val metrics = Prometheus.serialize( - testExecutor.getMetrics(Set("jobA"))( + testExecutor.getMetrics(Set(fooJob))( getStateAtomic = _ => { ((5, 1), 3, 2) }, @@ -98,11 +98,12 @@ class ExecutorSpec extends FunSuite with TestScheduling { |cuttle_scheduler_stat_count_by_job {job="untagged_job", type="paused"} 1 |# HELP cuttle_executions_total The number of finished executions that we have in concrete states by job and by tag |# TYPE cuttle_executions_total counter - |cuttle_executions_total {type="success", job_id="foo_bar_job", tags="foo,bar"} 1 - |cuttle_executions_total {type="success", job_id="foo_job", tags="foo"} 2 - |cuttle_executions_total {type="failure", job_id="foo_bar_job", tags="foo,bar"} 1 - |cuttle_executions_total {type="success", job_id="untagged_job"} 1 - |cuttle_executions_total {type="failure", job_id="untagged_job"} 1 + |cuttle_executions_total {job_id="foo_job", tags="foo", type="failure"} 0 + |cuttle_executions_total {job_id="foo_bar_job", tags="foo,bar", type="success"} 1 + |cuttle_executions_total {job_id="foo_job", tags="foo", type="success"} 2 + |cuttle_executions_total {job_id="foo_bar_job", tags="foo,bar", type="failure"} 1 + |cuttle_executions_total {job_id="untagged_job", type="success"} 1 + |cuttle_executions_total {job_id="untagged_job", type="failure"} 1 |""".stripMargin assert(metrics == expectedMetrics)