Skip to content

Commit

Permalink
Add the endpoint /metrics for Prometheus. (#175)
Browse files Browse the repository at this point in the history
Executor and Timeseries metrics in Prometheus format.
Small refactorings in Executor.scala, Timeseries.scala, App.scala.
  • Loading branch information
eryshev authored Oct 2, 2017
1 parent b4548d0 commit f5d6d8e
Show file tree
Hide file tree
Showing 9 changed files with 146 additions and 40 deletions.
44 changes: 25 additions & 19 deletions core/src/main/scala/com/criteo/cuttle/App.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global

import authentication._
import logging._
import ExecutionStatus._
import Metrics.{Prometheus, Gauge}
import utils.getJVMUptime

private[cuttle] object App {
private implicit val S = fs2.Strategy.fromExecutionContext(global)
Expand Down Expand Up @@ -136,10 +137,13 @@ private[cuttle] object App {
}
}

private[cuttle] case class App[S <: Scheduling](project: CuttleProject[S], executor: Executor[S], xa: XA, logger: Logger) {
private[cuttle] case class App[S <: Scheduling](project: CuttleProject[S], executor: Executor[S], xa: XA,
logger: Logger) {
import App._
import project.{scheduler, workflow}

private def allJobs = workflow.vertices.map(_.id)

val publicApi: PartialService = {

case GET at url"/api/status" => {
Expand All @@ -157,40 +161,42 @@ private[cuttle] case class App[S <: Scheduling](project: CuttleProject[S], execu
case GET at url"/api/statistics?events=$events&jobs=$jobs" =>
val filteredJobs = Try(jobs.split(",").toSeq.filter(_.nonEmpty)).toOption
.filter(_.nonEmpty)
.getOrElse(workflow.vertices.map(_.id))
.getOrElse(allJobs)
.toSet
def getStats() =
Some(
(executor.runningExecutionsSizes(filteredJobs),
executor.pausedExecutionsSize(filteredJobs),
executor.failingExecutionsSize(filteredJobs)))
def asJson(x: ((Int, Int), Int, Int)) = x match {
case ((running, waiting), paused, failing) =>
Json.obj(
"running" -> running.asJson,
"waiting" -> waiting.asJson,
"paused" -> paused.asJson,
"failing" -> failing.asJson,
"scheduler" -> scheduler.getStats(filteredJobs)
)

def getStats() = Try(
executor.getStats(filteredJobs) -> scheduler.getStats(filteredJobs)
).toOption

def asJson(x: (Json, Json)) = x match {
case (executorStats, schedulerStats) =>
executorStats.deepMerge(Json.obj(
"scheduler" -> schedulerStats
))
}

events match {
case "true" | "yes" =>
sse(getStats _, asJson)
case _ =>
Ok(asJson(getStats().get))
getStats().map(stat => Ok(asJson(stat))).getOrElse(InternalServerError)
}

case GET at url"/api/statistics/$jobName" =>
Ok(executor.jobStatsForLastThirtyDays(jobName).asJson)

case GET at "/metrics" =>
val metrics = executor.getMetrics(allJobs) ++ scheduler.getMetrics(allJobs) :+
Gauge("jvm_uptime_seconds", getJVMUptime)
Ok(Prometheus.format(metrics))

case GET at url"/api/executions/status/$kind?limit=$l&offset=$o&events=$events&sort=$sort&order=$a&jobs=$jobs" =>
val limit = Try(l.toInt).toOption.getOrElse(25)
val offset = Try(o.toInt).toOption.getOrElse(0)
val asc = (a.toLowerCase == "asc")
val filteredJobs = Try(jobs.split(",").toSeq.filter(_.nonEmpty)).toOption
.filter(_.nonEmpty)
.getOrElse(workflow.vertices.map(_.id))
.getOrElse(allJobs)
.toSet
def getExecutions() = kind match {
case "started" =>
Expand Down
7 changes: 3 additions & 4 deletions core/src/main/scala/com/criteo/cuttle/Cuttle.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package com.criteo.cuttle

import lol.http._
import scala.concurrent.ExecutionContext.Implicits.global
import com.criteo.cuttle.logging.Logger

class CuttleProject[S <: Scheduling] private[cuttle] (
val name: String,
Expand All @@ -19,7 +18,7 @@ class CuttleProject[S <: Scheduling] private[cuttle] (
httpPort: Int = 8888,
databaseConfig: DatabaseConfig = DatabaseConfig.fromEnv,
retryStrategy: RetryStrategy = RetryStrategy.ExponentialBackoffRetryStrategy
) = {
): Unit = {
val xa = Database.connect(databaseConfig)
val executor = new Executor[S](platforms, xa, logger = logger)(retryStrategy)

Expand All @@ -37,8 +36,8 @@ object CuttleProject {
version: String = "",
description: String = "",
env: (String, Boolean) = ("", false),
authenticator: Authenticator = GuestAuth)
(workflow: Workflow[S])(implicit scheduler: Scheduler[S], logger: Logger): CuttleProject[S] =
authenticator: Authenticator = GuestAuth)(
workflow: Workflow[S])(implicit scheduler: Scheduler[S], logger: Logger): CuttleProject[S] =
new CuttleProject(name, version, description, env, workflow, scheduler, authenticator, logger)

private[CuttleProject] def defaultPlatforms: Seq[ExecutionPlatform] = {
Expand Down
50 changes: 40 additions & 10 deletions core/src/main/scala/com/criteo/cuttle/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@ import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.stm._
import scala.concurrent.stm.Txn.ExternalDecider
import scala.concurrent.duration._
import scala.reflect.{classTag, ClassTag}
import scala.reflect.{ClassTag, classTag}
import lol.http.PartialService
import doobie.imports._
import cats.implicits._
import io.circe._

import io.circe.syntax._
import authentication._
import logging._

import Metrics._

trait RetryStrategy {
def apply[S <: Scheduling](job: Job[S], context: S#Context, previouslyFailing: List[String]): Duration
Expand Down Expand Up @@ -207,7 +208,7 @@ private[cuttle] object ExecutionPlatform {
class Executor[S <: Scheduling] private[cuttle] (
val platforms: Seq[ExecutionPlatform],
xa: XA,
logger: Logger)(implicit retryStrategy: RetryStrategy) {
logger: Logger)(implicit retryStrategy: RetryStrategy) extends MetricProvider {

val queries = new Queries {
val appLogger = logger
Expand Down Expand Up @@ -274,9 +275,7 @@ class Executor[S <: Scheduling] private[cuttle] (
flagWaitingExecutions(runningState.single.keys.toSeq)

private[cuttle] def runningExecutionsSizeTotal(filteredJobs: Set[String]): Int =
runningState.single.keys
.filter(e => filteredJobs.contains(e.job.id))
.size
runningState.single.keys.count(e => filteredJobs.contains(e.job.id))

private[cuttle] def runningExecutionsSizes(filteredJobs: Set[String]): (Int, Int) = {
val statuses =
Expand Down Expand Up @@ -378,6 +377,39 @@ class Executor[S <: Scheduling] private[cuttle] (
limit: Int): Seq[ExecutionLog] =
queries.getExecutionLog(queryContexts, jobs, sort, asc, offset, limit).transact(xa).unsafePerformIO

/**
* Atomically get executor stats.
* @param jobs the list of jobs ids
* @return how much ((running, waiting), paused, failing) jobs are in concrete states
* */
private def getStateAtomic(jobs: Set[String]) = atomic { implicit txn =>
(runningExecutionsSizes(jobs), pausedExecutionsSize(jobs), failingExecutionsSize(jobs))
}

private[cuttle] def getStats(jobs: Set[String]): Json = {
val ((running, waiting), paused, failing) = getStateAtomic(jobs)
// DB state call
val finished = archivedExecutionsSize(jobs)
Map(
"running" -> running,
"waiting" -> waiting,
"paused" -> paused,
"failing" -> failing,
"finished" -> finished
).asJson
}

override def getMetrics(jobs: Set[String]): Seq[Metric] = {
val ((running, waiting), paused, failing) = getStateAtomic(jobs)

Seq(
Gauge("scheduler_stat_count", running, Seq("type" -> "running")),
Gauge("scheduler_stat_count", waiting, Seq("type" -> "waiting")),
Gauge("scheduler_stat_count", paused, Seq("type" -> "paused")),
Gauge("scheduler_stat_count", failing, Seq("type" -> "failing"))
)
}

private[cuttle] def pausedJobs: Seq[String] =
pausedState.single.keys.toSeq

Expand All @@ -391,9 +423,7 @@ class Executor[S <: Scheduling] private[cuttle] (
private[cuttle] def getExecution(queryContexts: Fragment, executionId: String): Option[ExecutionLog] =
atomic { implicit tx =>
val predicate = (e: Execution[S]) => e.id == executionId
pausedState.values
.map(_.keys)
.flatten
pausedState.values.flatMap(_.keys)
.find(predicate)
.map(_.toExecutionLog(ExecutionPaused))
.orElse(throttledState.keys
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.criteo.cuttle.logging
package com.criteo.cuttle

trait Logger {
def debug(message: => String): Unit
Expand Down
27 changes: 27 additions & 0 deletions core/src/main/scala/com/criteo/cuttle/Metrics.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.criteo.cuttle

object Metrics {
sealed trait Metric {
def toString: String
}

case class Gauge(name: String, value: Long, tags: Seq[(String, String)] = Seq.empty) extends Metric

trait MetricProvider {
def getMetrics(jobs: Set[String]): Seq[Metric]
}

private[cuttle] object Prometheus {
def format(metrics: Seq[Metric]): String = {
val prometheusMetrics = metrics.map {
case Gauge(name, value, tags) =>
s"$name${
if (tags.nonEmpty) s" {${tags.map(tag => s"""${tag._1}="${tag._2}"""").mkString(", ")}}"
else ""
} $value"
}

s"${prometheusMetrics.mkString("\n")}\n"
}
}
}
5 changes: 3 additions & 2 deletions core/src/main/scala/com/criteo/cuttle/Scheduling.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,17 @@ import io.circe.Json
import doobie.imports._
import java.util.Comparator

import logging._
import authentication._
import Metrics.MetricProvider

trait Scheduler[S <: Scheduling] {
trait Scheduler[S <: Scheduling] extends MetricProvider {
def start(workflow: Workflow[S], executor: Executor[S], xa: XA, logger: Logger): Unit
private[cuttle] def publicRoutes(workflow: Workflow[S], executor: Executor[S], xa: XA): PartialService =
PartialFunction.empty
private[cuttle] def privateRoutes(workflow: Workflow[S], executor: Executor[S], xa: XA): AuthenticatedService =
PartialFunction.empty
val allContexts: Fragment

def getStats(jobs: Set[String]): Json
}

Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/com/criteo/cuttle/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package com.criteo.cuttle

import java.util.UUID
import java.util.concurrent.{Executors, ThreadFactory, TimeUnit}
import java.lang.management.ManagementFactory

import scala.concurrent.duration.Duration
import scala.concurrent.{ExecutionContext, Future, Promise}
Expand Down Expand Up @@ -55,4 +56,6 @@ package object utils {
case e => service(e)
}
}

private[cuttle] def getJVMUptime = ManagementFactory.getRuntimeMXBean.getUptime / 1000
}
1 change: 0 additions & 1 deletion core/src/main/scala/com/criteo/cuttle/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package com.criteo
import scala.concurrent._
import doobie.imports._
import cats.free._
import cuttle.logging.Logger

package cuttle {
sealed trait Completed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package com.criteo.cuttle.timeseries

import Internal._
import com.criteo.cuttle._
import com.criteo.cuttle.logging._

import scala.concurrent._
import scala.concurrent.duration.{Duration => ScalaDuration}
Expand All @@ -22,6 +21,7 @@ import java.time.temporal.{ChronoUnit, TemporalAdjusters}
import com.criteo.cuttle.timeseries.TimeSeriesGrid.{Daily, Hourly, Monthly}
import intervals.{Bound, Interval, IntervalMap}
import Bound.{Bottom, Finite, Top}
import Metrics._

sealed trait TimeSeriesGrid {
def next(t: Instant): Instant
Expand Down Expand Up @@ -434,15 +434,56 @@ case class TimeSeriesScheduler(logger: Logger) extends Scheduler[TimeSeries] wit
}
}

override def getStats(jobs: Set[String]) = {
private def getRunningBackfillsSize(jobs: Set[String]) = {
val runningBackfills = state match {
case (_, backfills) =>
backfills.filter(
bf =>
bf.status == "RUNNING" &&
bf.jobs.map(_.id).intersect(jobs).nonEmpty)
}
Map("backfills" -> runningBackfills.size).asJson

runningBackfills.size
}

/**
* We compute the last instant when job was in a valid state
* @param jobs set of jobs to process
* @return Iterable of job to to last instant when job was in a valid state.
* Iterable is empty when job doesn't contain any "DONE" interval.
*/
private def getTimeOfLastSuccess(jobs: Set[String]) = {
_state.single().collect {
case (job, intervals) if jobs.contains(job.id) =>
val intervalList = intervals.toList
val lastValidInterval = intervalList.takeWhile {
case (_, Running(_)) => false
case (_, Todo(None)) => false
case _ => true
}.lastOption

lastValidInterval.map {
case (interval, _) => job -> (interval.hi match {
case Finite(instant) => instant
case _ => Instant.MAX
})
}
}.flatten
}

override def getMetrics(jobs: Set[String]): Seq[Metric] = {
val timeOfLastSuccessMetrics = getTimeOfLastSuccess(jobs).map {
case (job, instant) =>
Gauge("scheduler_last_success_epoch_seconds", Instant.now().getEpochSecond - instant.getEpochSecond,
Seq("job_id" -> job.id, "job_name" -> job.name))
}

Seq(Gauge("scheduler_stat_count", getRunningBackfillsSize(jobs), Seq("type" -> "backfills"))) ++
timeOfLastSuccessMetrics
}

override def getStats(jobs: Set[String]): Json = {
Map("backfills" -> getRunningBackfillsSize(jobs)).asJson
}
}

Expand Down

0 comments on commit f5d6d8e

Please sign in to comment.