Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds default values mechanism on read for counters #293

Merged
merged 1 commit into from
Jul 23, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions core/src/main/scala/com/criteo/cuttle/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -976,19 +976,21 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla
private case class ExecutionInfo(jobId: String, tags: Set[String], status: ExecutionStatus)

/**
* @param jobs list of job IDs
* @param jobs list of jobs
* @param getStateAtomic Atomically get executor stats. Given a list of jobs ids, returns how much
* ((running, waiting), paused, failing) jobs are in concrete states
* @param runningExecutions executions which are either either running or waiting for a free thread to start
* @param pausedExecutions
*/
private[cuttle] def getMetrics(jobs: Set[String])(
private[cuttle] def getMetrics(jobs: Set[Job[S]])(
getStateAtomic: Set[String] => ((Int, Int), Int, Int),
runningExecutions: Seq[(Execution[S], ExecutionStatus)],
pausedExecutions: Seq[Execution[S]],
failingExecutions: Seq[Execution[S]]
): Seq[Metric] = {
val ((runningCount, waitingCount), pausedCount, failingCount) = getStateAtomic(jobs)
val jobIds = jobs.map(_.id)

val ((runningCount, waitingCount), pausedCount, failingCount) = getStateAtomic(jobIds)
val statMetrics = Seq(
Gauge("cuttle_scheduler_stat_count", "The number of jobs that we have in concrete states")
.labeled("type" -> "running", runningCount)
Expand Down Expand Up @@ -1017,15 +1019,20 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla
statMetrics ++
Seq(getMetricsByTag(running, waiting, paused, failing)) ++
Seq(getMetricsByJob(running, waiting, paused, failing)) ++
Seq(executionsCounters.single())
Seq(executionsCounters.single().withDefaultsFor({
for {
job <- jobs.toSeq
outcome <- Seq("success", "failure")
} yield Set(("job_id", job.id), ("type", outcome)) ++ (if (job.tags.nonEmpty) Set("tags" -> job.tags.map(_.name).mkString(",")) else Nil)
}))
}

/**
* @param jobs the list of jobs ids
*/
override def getMetrics(jobs: Set[String], workflow: Workflow[S]): Seq[Metric] =
atomic { implicit txn =>
getMetrics(jobs)(
getMetrics(workflow.vertices)(
getStateAtomic,
runningExecutions,
pausedState.values.flatMap(_.executions.keys).toSeq,
Expand Down
7 changes: 6 additions & 1 deletion core/src/main/scala/com/criteo/cuttle/Metrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ object Metrics {
val currentCount = labels2Value.getOrElse(label, number.zero).asInstanceOf[T]
copy(labels2Value = labels2Value + (label -> number.plus(currentCount, number.one).asInstanceOf[AnyVal]))
}

def withDefaultsFor(labels: Seq[Set[(String, String)]]): Counter[T] = {
val n = labels.map(label => label -> number.zero.asInstanceOf[AnyVal]).toList.toMap
copy(labels2Value = n ++ labels2Value)
}
}

/** Components able to provide metrics. */
Expand All @@ -76,7 +81,7 @@ object Metrics {
val labeledMetrics = metric.labels2Value.map {
case (labels, value) =>
val labelsSerialized =
if (labels.nonEmpty) s" {${labels.map(label => s"""${label._1}="${label._2}"""").mkString(", ")}} "
if (labels.nonEmpty) s" {${labels.toSeq.sortBy(_._1).map(label => s"""${label._1}="${label._2}"""").mkString(", ")}} "
else " "
s"${metric.name}$labelsSerialized$value"
}
Expand Down
13 changes: 7 additions & 6 deletions core/src/test/scala/com/criteo/cuttle/ExecutorSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class ExecutorSpec extends FunSuite with TestScheduling {
testExecutor.updateFinishedExecutionCounters(buildExecutionForJob(fooBarJob), "failure")

val metrics = Prometheus.serialize(
testExecutor.getMetrics(Set("jobA"))(
testExecutor.getMetrics(Set(fooJob))(
getStateAtomic = _ => {
((5, 1), 3, 2)
},
Expand Down Expand Up @@ -98,11 +98,12 @@ class ExecutorSpec extends FunSuite with TestScheduling {
|cuttle_scheduler_stat_count_by_job {job="untagged_job", type="paused"} 1
|# HELP cuttle_executions_total The number of finished executions that we have in concrete states by job and by tag
|# TYPE cuttle_executions_total counter
|cuttle_executions_total {type="success", job_id="foo_bar_job", tags="foo,bar"} 1
|cuttle_executions_total {type="success", job_id="foo_job", tags="foo"} 2
|cuttle_executions_total {type="failure", job_id="foo_bar_job", tags="foo,bar"} 1
|cuttle_executions_total {type="success", job_id="untagged_job"} 1
|cuttle_executions_total {type="failure", job_id="untagged_job"} 1
|cuttle_executions_total {job_id="foo_job", tags="foo", type="failure"} 0
|cuttle_executions_total {job_id="foo_bar_job", tags="foo,bar", type="success"} 1
|cuttle_executions_total {job_id="foo_job", tags="foo", type="success"} 2
|cuttle_executions_total {job_id="foo_bar_job", tags="foo,bar", type="failure"} 1
|cuttle_executions_total {job_id="untagged_job", type="success"} 1
|cuttle_executions_total {job_id="untagged_job", type="failure"} 1
|""".stripMargin

assert(metrics == expectedMetrics)
Expand Down