Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the endpoint /api/metrics for Prometheus. #175

Merged
merged 14 commits into from
Oct 2, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 25 additions & 19 deletions core/src/main/scala/com/criteo/cuttle/App.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global

import authentication._
import logging._
import ExecutionStatus._
import Metrics.{Prometheus, Gauge}
import utils.getJVMUptime

private[cuttle] object App {
private implicit val S = fs2.Strategy.fromExecutionContext(global)
Expand Down Expand Up @@ -136,10 +137,13 @@ private[cuttle] object App {
}
}

private[cuttle] case class App[S <: Scheduling](project: CuttleProject[S], executor: Executor[S], xa: XA, logger: Logger) {
private[cuttle] case class App[S <: Scheduling](project: CuttleProject[S], executor: Executor[S], xa: XA,
logger: Logger) {
import App._
import project.{scheduler, workflow}

private def allJobs = workflow.vertices.map(_.id)

val publicApi: PartialService = {

case GET at url"/api/status" => {
Expand All @@ -157,40 +161,42 @@ private[cuttle] case class App[S <: Scheduling](project: CuttleProject[S], execu
case GET at url"/api/statistics?events=$events&jobs=$jobs" =>
val filteredJobs = Try(jobs.split(",").toSeq.filter(_.nonEmpty)).toOption
.filter(_.nonEmpty)
.getOrElse(workflow.vertices.map(_.id))
.getOrElse(allJobs)
.toSet
def getStats() =
Some(
(executor.runningExecutionsSizes(filteredJobs),
executor.pausedExecutionsSize(filteredJobs),
executor.failingExecutionsSize(filteredJobs)))
def asJson(x: ((Int, Int), Int, Int)) = x match {
case ((running, waiting), paused, failing) =>
Json.obj(
"running" -> running.asJson,
"waiting" -> waiting.asJson,
"paused" -> paused.asJson,
"failing" -> failing.asJson,
"scheduler" -> scheduler.getStats(filteredJobs)
)

def getStats() = Try(
executor.getStats(filteredJobs) -> scheduler.getStats(filteredJobs)
).toOption

def asJson(x: (Json, Json)) = x match {
case (executorStats, schedulerStats) =>
executorStats.deepMerge(Json.obj(
"scheduler" -> schedulerStats
))
}

events match {
case "true" | "yes" =>
sse(getStats _, asJson)
case _ =>
Ok(asJson(getStats().get))
getStats().map(stat => Ok(asJson(stat))).getOrElse(InternalServerError)
}

case GET at url"/api/statistics/$jobName" =>
Ok(executor.jobStatsForLastThirtyDays(jobName).asJson)

case GET at "/metrics" =>
val metrics = executor.getMetrics(allJobs) ++ scheduler.getMetrics(allJobs) :+
Gauge("jvm_uptime_seconds", getJVMUptime)
Ok(Prometheus.format(metrics))

case GET at url"/api/executions/status/$kind?limit=$l&offset=$o&events=$events&sort=$sort&order=$a&jobs=$jobs" =>
val limit = Try(l.toInt).toOption.getOrElse(25)
val offset = Try(o.toInt).toOption.getOrElse(0)
val asc = (a.toLowerCase == "asc")
val filteredJobs = Try(jobs.split(",").toSeq.filter(_.nonEmpty)).toOption
.filter(_.nonEmpty)
.getOrElse(workflow.vertices.map(_.id))
.getOrElse(allJobs)
.toSet
def getExecutions() = kind match {
case "started" =>
Expand Down
7 changes: 3 additions & 4 deletions core/src/main/scala/com/criteo/cuttle/Cuttle.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package com.criteo.cuttle

import lol.http._
import scala.concurrent.ExecutionContext.Implicits.global
import com.criteo.cuttle.logging.Logger

class CuttleProject[S <: Scheduling] private[cuttle] (
val name: String,
Expand All @@ -19,7 +18,7 @@ class CuttleProject[S <: Scheduling] private[cuttle] (
httpPort: Int = 8888,
databaseConfig: DatabaseConfig = DatabaseConfig.fromEnv,
retryStrategy: RetryStrategy = RetryStrategy.ExponentialBackoffRetryStrategy
) = {
): Unit = {
val xa = Database.connect(databaseConfig)
val executor = new Executor[S](platforms, xa, logger = logger)(retryStrategy)

Expand All @@ -37,8 +36,8 @@ object CuttleProject {
version: String = "",
description: String = "",
env: (String, Boolean) = ("", false),
authenticator: Authenticator = GuestAuth)
(workflow: Workflow[S])(implicit scheduler: Scheduler[S], logger: Logger): CuttleProject[S] =
authenticator: Authenticator = GuestAuth)(
workflow: Workflow[S])(implicit scheduler: Scheduler[S], logger: Logger): CuttleProject[S] =
new CuttleProject(name, version, description, env, workflow, scheduler, authenticator, logger)

private[CuttleProject] def defaultPlatforms: Seq[ExecutionPlatform] = {
Expand Down
50 changes: 40 additions & 10 deletions core/src/main/scala/com/criteo/cuttle/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@ import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.stm._
import scala.concurrent.stm.Txn.ExternalDecider
import scala.concurrent.duration._
import scala.reflect.{classTag, ClassTag}
import scala.reflect.{ClassTag, classTag}
import lol.http.PartialService
import doobie.imports._
import cats.implicits._
import io.circe._

import io.circe.syntax._
import authentication._
import logging._

import Metrics._

trait RetryStrategy {
def apply[S <: Scheduling](job: Job[S], context: S#Context, previouslyFailing: List[String]): Duration
Expand Down Expand Up @@ -207,7 +208,7 @@ private[cuttle] object ExecutionPlatform {
class Executor[S <: Scheduling] private[cuttle] (
val platforms: Seq[ExecutionPlatform],
xa: XA,
logger: Logger)(implicit retryStrategy: RetryStrategy) {
logger: Logger)(implicit retryStrategy: RetryStrategy) extends MetricProvider {

val queries = new Queries {
val appLogger = logger
Expand Down Expand Up @@ -274,9 +275,7 @@ class Executor[S <: Scheduling] private[cuttle] (
flagWaitingExecutions(runningState.single.keys.toSeq)

private[cuttle] def runningExecutionsSizeTotal(filteredJobs: Set[String]): Int =
runningState.single.keys
.filter(e => filteredJobs.contains(e.job.id))
.size
runningState.single.keys.count(e => filteredJobs.contains(e.job.id))

private[cuttle] def runningExecutionsSizes(filteredJobs: Set[String]): (Int, Int) = {
val statuses =
Expand Down Expand Up @@ -378,6 +377,39 @@ class Executor[S <: Scheduling] private[cuttle] (
limit: Int): Seq[ExecutionLog] =
queries.getExecutionLog(queryContexts, jobs, sort, asc, offset, limit).transact(xa).unsafePerformIO

/**
* Atomically get executor stats.
* @param jobs the list of jobs ids
* @return how much ((running, waiting), paused, failing) jobs are in concrete states
* */
private def getStateAtomic(jobs: Set[String]) = atomic { implicit txn =>
(runningExecutionsSizes(jobs), pausedExecutionsSize(jobs), failingExecutionsSize(jobs))
}

private[cuttle] def getStats(jobs: Set[String]): Json = {
val ((running, waiting), paused, failing) = getStateAtomic(jobs)
// DB state call
val finished = archivedExecutionsSize(jobs)
Map(
"running" -> running,
"waiting" -> waiting,
"paused" -> paused,
"failing" -> failing,
"finished" -> finished
).asJson
}

override def getMetrics(jobs: Set[String]): Seq[Metric] = {
val ((running, waiting), paused, failing) = getStateAtomic(jobs)

Seq(
Gauge("scheduler_stat_count", running, Seq("type" -> "running")),
Gauge("scheduler_stat_count", waiting, Seq("type" -> "waiting")),
Gauge("scheduler_stat_count", paused, Seq("type" -> "paused")),
Gauge("scheduler_stat_count", failing, Seq("type" -> "failing"))
)
}

private[cuttle] def pausedJobs: Seq[String] =
pausedState.single.keys.toSeq

Expand All @@ -391,9 +423,7 @@ class Executor[S <: Scheduling] private[cuttle] (
private[cuttle] def getExecution(queryContexts: Fragment, executionId: String): Option[ExecutionLog] =
atomic { implicit tx =>
val predicate = (e: Execution[S]) => e.id == executionId
pausedState.values
.map(_.keys)
.flatten
pausedState.values.flatMap(_.keys)
.find(predicate)
.map(_.toExecutionLog(ExecutionPaused))
.orElse(throttledState.keys
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.criteo.cuttle.logging
package com.criteo.cuttle

trait Logger {
def debug(message: => String): Unit
Expand Down
27 changes: 27 additions & 0 deletions core/src/main/scala/com/criteo/cuttle/Metrics.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.criteo.cuttle
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we scope all this into a Metrics object, so in the public API it's all coherent?


object Metrics {
sealed trait Metric {
def toString: String
}

case class Gauge(name: String, value: Long, tags: Seq[(String, String)] = Seq.empty) extends Metric

trait MetricProvider {
def getMetrics(jobs: Set[String]): Seq[Metric]
}

private[cuttle] object Prometheus {
def format(metrics: Seq[Metric]): String = {
val prometheusMetrics = metrics.map {
case Gauge(name, value, tags) =>
s"$name${
if (tags.nonEmpty) s" {${tags.map(tag => s"""${tag._1}="${tag._2}"""").mkString(", ")}}"
else ""
} $value"
}

s"${prometheusMetrics.mkString("\n")}\n"
}
}
}
5 changes: 3 additions & 2 deletions core/src/main/scala/com/criteo/cuttle/Scheduling.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,17 @@ import io.circe.Json
import doobie.imports._
import java.util.Comparator

import logging._
import authentication._
import Metrics.MetricProvider

trait Scheduler[S <: Scheduling] {
trait Scheduler[S <: Scheduling] extends MetricProvider {
def start(workflow: Workflow[S], executor: Executor[S], xa: XA, logger: Logger): Unit
private[cuttle] def publicRoutes(workflow: Workflow[S], executor: Executor[S], xa: XA): PartialService =
PartialFunction.empty
private[cuttle] def privateRoutes(workflow: Workflow[S], executor: Executor[S], xa: XA): AuthenticatedService =
PartialFunction.empty
val allContexts: Fragment

def getStats(jobs: Set[String]): Json
}

Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/com/criteo/cuttle/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package com.criteo.cuttle

import java.util.UUID
import java.util.concurrent.{Executors, ThreadFactory, TimeUnit}
import java.lang.management.ManagementFactory

import scala.concurrent.duration.Duration
import scala.concurrent.{ExecutionContext, Future, Promise}
Expand Down Expand Up @@ -55,4 +56,6 @@ package object utils {
case e => service(e)
}
}

private[cuttle] def getJVMUptime = ManagementFactory.getRuntimeMXBean.getUptime / 1000
}
1 change: 0 additions & 1 deletion core/src/main/scala/com/criteo/cuttle/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package com.criteo
import scala.concurrent._
import doobie.imports._
import cats.free._
import cuttle.logging.Logger

package cuttle {
sealed trait Completed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package com.criteo.cuttle.timeseries

import Internal._
import com.criteo.cuttle._
import com.criteo.cuttle.logging._

import scala.concurrent._
import scala.concurrent.duration.{Duration => ScalaDuration}
Expand All @@ -22,6 +21,7 @@ import java.time.temporal.{ChronoUnit, TemporalAdjusters}
import com.criteo.cuttle.timeseries.TimeSeriesGrid.{Daily, Hourly, Monthly}
import intervals.{Bound, Interval, IntervalMap}
import Bound.{Bottom, Finite, Top}
import Metrics._

sealed trait TimeSeriesGrid {
def next(t: Instant): Instant
Expand Down Expand Up @@ -434,15 +434,56 @@ case class TimeSeriesScheduler(logger: Logger) extends Scheduler[TimeSeries] wit
}
}

override def getStats(jobs: Set[String]) = {
private def getRunningBackfillsSize(jobs: Set[String]) = {
val runningBackfills = state match {
case (_, backfills) =>
backfills.filter(
bf =>
bf.status == "RUNNING" &&
bf.jobs.map(_.id).intersect(jobs).nonEmpty)
}
Map("backfills" -> runningBackfills.size).asJson

runningBackfills.size
}

/**
* We compute the last instant when job was in a valid state
* @param jobs set of jobs to process
* @return Iterable of job to to last instant when job was in a valid state.
* Iterable is empty when job doesn't contain any "DONE" interval.
*/
private def getTimeOfLastSuccess(jobs: Set[String]) = {
_state.single().collect {
case (job, intervals) if jobs.contains(job.id) =>
val intervalList = intervals.toList
val lastValidInterval = intervalList.takeWhile {
case (_, Running(_)) => false
case (_, Todo(None)) => false
case _ => true
}.lastOption

lastValidInterval.map {
case (interval, _) => job -> (interval.hi match {
case Finite(instant) => instant
case _ => Instant.MAX
})
}
}.flatten
}

override def getMetrics(jobs: Set[String]): Seq[Metric] = {
val timeOfLastSuccessMetrics = getTimeOfLastSuccess(jobs).map {
case (job, instant) =>
Gauge("scheduler_last_success_epoch_seconds", Instant.now().getEpochSecond - instant.getEpochSecond,
Seq("job_id" -> job.id, "job_name" -> job.name))
}

Seq(Gauge("scheduler_stat_count", getRunningBackfillsSize(jobs), Seq("type" -> "backfills"))) ++
timeOfLastSuccessMetrics
}

override def getStats(jobs: Set[String]): Json = {
Map("backfills" -> getRunningBackfillsSize(jobs)).asJson
}
}

Expand Down