Skip to content

Commit

Permalink
Bumps lolhttp to 9.0.2 (#229)
Browse files Browse the repository at this point in the history
* Bumps lolhttp to 9.0.2

* Removes warnings
  • Loading branch information
dufrannea authored Feb 15, 2018
1 parent 9dbd4f0 commit 1ae9921
Show file tree
Hide file tree
Showing 9 changed files with 103 additions and 80 deletions.
11 changes: 5 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
val devMode = settingKey[Boolean]("Some build optimization are applied in devMode.")
val writeClasspath = taskKey[File]("Write the project classpath to a file.")

val VERSION = "0.3.1"
val VERSION = "0.3.2"

lazy val commonSettings = Seq(
organization := "com.criteo.cuttle",
Expand Down Expand Up @@ -179,9 +179,9 @@ lazy val cuttle =
"com.criteo.lolhttp" %% "lolhttp",
"com.criteo.lolhttp" %% "loljson",
"com.criteo.lolhttp" %% "lolhtml"
).map(_ % "0.9.0"),
).map(_ % "0.9.2"),
libraryDependencies ++= Seq("core", "generic", "parser")
.map(module => "io.circe" %% s"circe-${module}" % "0.9.0-M3"),
.map(module => "io.circe" %% s"circe-${module}" % "0.9.1"),
libraryDependencies ++= Seq(
"de.sciss" %% "fingertree" % "1.5.2",
"org.scala-stm" %% "scala-stm" % "0.8",
Expand All @@ -194,7 +194,7 @@ lazy val cuttle =
libraryDependencies ++= Seq(
"org.tpolecat" %% "doobie-core",
"org.tpolecat" %% "doobie-hikari"
).map(_ % "0.5.0-M11"),
).map(_ % "0.5.0"),
libraryDependencies ++= Seq(
"mysql" % "mysql-connector-java" % "6.0.6"
),
Expand Down Expand Up @@ -226,8 +226,7 @@ lazy val cuttle =
if (operatingSystem.indexOf("win") >= 0) {
val yarnJsPath = ("where yarn.js" !!).trim()
assert(s"""node "$yarnJsPath" install""" ! logger == 0, "yarn failed")
}
else {
} else {
assert("yarn install" ! logger == 0, "yarn failed")
}
logger.out("Running webpack...")
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/com/criteo/cuttle/Database.scala
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ private[cuttle] object Database {
(for {
locks <- sql"""
SELECT locked_by, locked_at FROM locks WHERE TIMESTAMPDIFF(MINUTE, locked_at, NOW()) < 5;
""".query[(String, Instant)].list
""".query[(String, Instant)].to[List]
_ <- if (locks.isEmpty) {
sql"""
DELETE FROM locks;
Expand Down Expand Up @@ -260,7 +260,7 @@ private[cuttle] trait Queries {
fr"job",
NonEmptyList.fromListUnsafe(jobs.toList)) ++ orderBy ++ sql""" LIMIT $limit OFFSET $offset""")
.query[(String, String, Instant, Instant, Json, ExecutionStatus, Int)]
.list
.to[List]
.map(_.map {
case (id, job, startTime, endTime, context, status, waitingSeconds) =>
ExecutionLog(id, job, Some(startTime), Some(endTime), context, status, waitingSeconds = waitingSeconds)
Expand Down Expand Up @@ -317,7 +317,7 @@ private[cuttle] trait Queries {
where job=$jobId and end_time > DATE_SUB(CURDATE(), INTERVAL 30 DAY) order by start_time asc, end_time asc
"""
.query[(Instant, Instant, Int, Int, ExecutionStatus)]
.list
.to[List]
.map(_.map {
case (startTime, endTime, durationSeconds, waitingSeconds, status) =>
new ExecutionStat(startTime, endTime, durationSeconds, waitingSeconds, status)
Expand Down
120 changes: 69 additions & 51 deletions core/src/main/scala/com/criteo/cuttle/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import scala.concurrent.duration._
import scala.concurrent.stm.Txn.ExternalDecider
import scala.concurrent.stm._
import scala.concurrent.{Future, Promise}
import scala.reflect.{ClassTag, classTag}
import scala.reflect.{classTag, ClassTag}
import scala.util.{Failure, Success, Try}

import cats.Eq
Expand Down Expand Up @@ -138,18 +138,19 @@ class CancellationListener private[cuttle] (execution: Execution[_], private[cut
* @param executionContext The scoped `scala.concurrent.ExecutionContext` for this execution.
*/
case class Execution[S <: Scheduling](
id: String,
job: Job[S],
context: S#Context,
streams: ExecutionStreams,
platforms: Seq[ExecutionPlatform],
projectName: String
)(implicit val executionContext: SideEffectExecutionContext) {
id: String,
job: Job[S],
context: S#Context,
streams: ExecutionStreams,
platforms: Seq[ExecutionPlatform],
projectName: String
)(implicit val executionContext: SideEffectExecutionContext) {

private var waitingSeconds = 0
private[cuttle] var startTime: Option[Instant] = None
private val cancelListeners = TSet.empty[CancellationListener]
private val cancelled = Ref(false)

/**
* An execution with forcedSuccess set to true will have its side effect return a successful Future instance even if the
* user code raised an exception or returned a failed Future instance.
Expand Down Expand Up @@ -208,14 +209,14 @@ case class Execution[S <: Scheduling](
hasBeenCancelled
}

def forceSuccess()(implicit user: User): Unit = {
def forceSuccess()(implicit user: User): Unit =
if (!atomic { implicit txn =>
forcedSuccess.getAndTransform(_ => true)
}) {
streams.debug(s"""Possible execution failures will be ignored and final execution status will be marked as success.
forcedSuccess.getAndTransform(_ => true)
}) {
streams.debug(
s"""Possible execution failures will be ignored and final execution status will be marked as success.
|Change initiated by user ${user.userId} at ${Instant.now().toString}.""".stripMargin)
}
}

private[cuttle] def toExecutionLog(status: ExecutionStatus, failing: Option[FailingJob] = None) =
ExecutionLog(
Expand Down Expand Up @@ -349,10 +350,11 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla
// signals whether the instance is shutting down
private val isShuttingDown: Ref[Boolean] = Ref(false)
private val timer = new Timer("com.criteo.cuttle.Executor.timer")
private val executionsCounters: Ref[Counter[Long]] = Ref(Counter[Long](
"cuttle_executions_total",
help = "The number of finished executions that we have in concrete states by job and by tag"
))
private val executionsCounters: Ref[Counter[Long]] = Ref(
Counter[Long](
"cuttle_executions_total",
help = "The number of finished executions that we have in concrete states by job and by tag"
))

// executions that failed recently and are now running
private def retryingExecutions(filteredJobs: Set[String]): Seq[(Execution[S], FailingJob, ExecutionStatus)] =
Expand Down Expand Up @@ -388,7 +390,8 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla
.filter({ case (_, s) => s == ExecutionStatus.ExecutionWaiting })
.foreach({ case (e, _) => e.updateWaitingTime(intervalSeconds) })
})
.run
.compile
.drain
.unsafeRunAsync(_ => ())
}

Expand Down Expand Up @@ -710,15 +713,15 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla

private[cuttle] def updateFinishedExecutionCounters(execution: Execution[S], status: String): Unit =
atomic { implicit txn =>
val tagsLabel = if (execution.job.tags.nonEmpty)
Set("tags" -> execution.job.tags.map(_.name).mkString(","))
else
Set.empty
val tagsLabel =
if (execution.job.tags.nonEmpty)
Set("tags" -> execution.job.tags.map(_.name).mkString(","))
else
Set.empty
executionsCounters() = executionsCounters().inc(
Set("type" -> status, "job_id" -> execution.job.id) ++ tagsLabel
)
}

private def run0(all: Seq[(Job[S], S#Context)]): Seq[(Execution[S], Future[Completed])] = {
sealed trait NewExecution
case object ToRunNow extends NewExecution
Expand Down Expand Up @@ -896,7 +899,6 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla
private[cuttle] def healthCheck(): Try[Boolean] =
Try(queries.healthCheck.transact(xa).unsafeRunSync)


private case class ExecutionInfo(jobId: String, tags: Set[String], status: ExecutionStatus)

/**
Expand All @@ -922,8 +924,9 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla
)

val (running: Seq[ExecutionInfo], waiting: Seq[ExecutionInfo]) = runningExecutions
.map { case (exec, status) =>
ExecutionInfo(exec.job.id, exec.job.tags.map(_.name), status)
.map {
case (exec, status) =>
ExecutionInfo(exec.job.id, exec.job.tags.map(_.name), status)
}
.partition { execution =>
execution.status == ExecutionStatus.ExecutionRunning
Expand Down Expand Up @@ -956,45 +959,60 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla
)
}

private def getMetricsByTag(
running: Seq[ExecutionInfo],
waiting: Seq[ExecutionInfo],
paused: Seq[ExecutionInfo],
failing: Seq[ExecutionInfo]): Metrics.Metric = {
( // Explode by tag
running.flatMap { info => info.tags }
.groupBy(identity).mapValues("running" -> _.size).toList ++
waiting.flatMap { info => info.tags }
.groupBy(identity).mapValues("waiting" -> _.size).toList ++
paused.flatMap { info => info.tags }
.groupBy(identity).mapValues("paused" -> _.size).toList ++
failing.flatMap { info => info.tags }
.groupBy(identity).mapValues("failing" -> _.size).toList
).foldLeft(
private def getMetricsByTag(running: Seq[ExecutionInfo],
waiting: Seq[ExecutionInfo],
paused: Seq[ExecutionInfo],
failing: Seq[ExecutionInfo]): Metrics.Metric =
(// Explode by tag
running
.flatMap { info =>
info.tags
}
.groupBy(identity)
.mapValues("running" -> _.size)
.toList ++
waiting
.flatMap { info =>
info.tags
}
.groupBy(identity)
.mapValues("waiting" -> _.size)
.toList ++
paused
.flatMap { info =>
info.tags
}
.groupBy(identity)
.mapValues("paused" -> _.size)
.toList ++
failing
.flatMap { info =>
info.tags
}
.groupBy(identity)
.mapValues("failing" -> _.size)
.toList).foldLeft(
Gauge("cuttle_scheduler_stat_count_by_tag", "The number of executions that we have in concrete states by tag")
) {
case (gauge, (tag, (status, count))) =>
gauge.labeled(Set("tag" -> tag, "type" -> status), count)
case (gauge, _) =>
gauge
}
}

private def getMetricsByJob(
running: Seq[ExecutionInfo],
waiting: Seq[ExecutionInfo],
paused: Seq[ExecutionInfo],
failing: Seq[ExecutionInfo]): Metrics.Metric = {
private def getMetricsByJob(running: Seq[ExecutionInfo],
waiting: Seq[ExecutionInfo],
paused: Seq[ExecutionInfo],
failing: Seq[ExecutionInfo]): Metrics.Metric =
(
running.groupBy(_.jobId).mapValues("running" -> _.size).toList ++
waiting.groupBy(_.jobId).mapValues("waiting" -> _.size).toList ++
paused.groupBy(_.jobId).mapValues("paused" -> _.size).toList ++
failing.groupBy(_.jobId).mapValues("failing" -> _.size).toList
waiting.groupBy(_.jobId).mapValues("waiting" -> _.size).toList ++
paused.groupBy(_.jobId).mapValues("paused" -> _.size).toList ++
failing.groupBy(_.jobId).mapValues("failing" -> _.size).toList
).foldLeft(
Gauge("cuttle_scheduler_stat_count_by_job", "The number of executions that we have in concrete states by job")
) {
case (gauge, (jobId, (status, count))) =>
gauge.labeled(Set("job" -> jobId, "type" -> status), count)
}
}
}
12 changes: 6 additions & 6 deletions core/src/main/scala/com/criteo/cuttle/Metrics.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.criteo.cuttle

import scala.concurrent.stm.{TMap, atomic}
import scala.math.Numeric

/** Expose cuttle metrics via the [[https://prometheus.io prometheus]] protocol. */
Expand Down Expand Up @@ -50,12 +49,13 @@ object Metrics {
* @param name The metric name.
* @param help Metric description if provided.
* @param labels2Value map of (label name, label value) pairs to counter values
*/
*/
case class Counter[T](
name: String,
help: String = "",
labels2Value: Map[Set[(String, String)], AnyVal] = Map.empty
)(implicit number: Numeric[T]) extends Metric {
name: String,
help: String = "",
labels2Value: Map[Set[(String, String)], AnyVal] = Map.empty
)(implicit number: Numeric[T])
extends Metric {

override val metricType: MetricType = counter

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ class RateLimiter(tokens: Int, refillRateInMs: Int) extends WaitingExecutionQueu
}
fs2.Stream(runNext())
})
.run
.compile
.drain
.unsafeRunAsync(_ => ())

def canRunNextCondition(implicit txn: InTxn) = _tokens() >= 1
Expand Down
6 changes: 3 additions & 3 deletions core/src/test/scala/com/criteo/cuttle/ExecutorSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ class ExecutorSpec extends FunSuite with TestScheduling {
}

private def buildJob(jobId: String, tags: Set[Tag] = Set.empty): Job[TestScheduling] =
Job(jobId, TestScheduling(), jobId, tags = tags) {
implicit execution => Future { Completed }(execution.executionContext)
Job(jobId, TestScheduling(), jobId, tags = tags) { implicit execution =>
Future { Completed }(execution.executionContext)
}

private def buildExecutionForJob(job: Job[TestScheduling]): Execution[TestScheduling] =
Expand All @@ -132,5 +132,5 @@ class ExecutorSpec extends FunSuite with TestScheduling {

private val fooBarJob: Job[TestScheduling] = buildJob("foo_bar_job", Set(fooTag, barTag))

private val untaggedJob :Job[TestScheduling] = buildJob("untagged_job")
private val untaggedJob: Job[TestScheduling] = buildJob("untagged_job")
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,23 @@ private[timeseries] object Database {

val contextIdMigration: ConnectionIO[Unit] = {
implicit val jobs: Set[TimeSeriesJob] = Set.empty
val chunkSize = 1024*10
val chunkSize = 1024 * 10
val stream = sql"SELECT id, json FROM timeseries_contexts"
.query[(String, Json)].processWithChunkSize(chunkSize)
.query[(String, Json)]
.streamWithChunkSize(chunkSize)
val insert = Update[(String, String)]("INSERT into tmp (id, new_id) VALUES (? , ?)")
for {
_ <- sql"CREATE TEMPORARY TABLE tmp (id VARCHAR(1000), new_id VARCHAR(1000))".update.run
_ <-
stream.chunkLimit(chunkSize).evalMap { oldContexts =>
insert.updateMany(oldContexts.map { case (id, json) =>
(id, json.as[TimeSeriesContext].right.get.toId)
_ <- stream
.chunkLimit(chunkSize)
.evalMap { oldContexts =>
insert.updateMany(oldContexts.map {
case (id, json) =>
(id, json.as[TimeSeriesContext].right.get.toId)
})
}.run
}
.compile
.drain
_ <- sql"CREATE INDEX tmp_id ON tmp (id)".update.run
_ <- sql"""UPDATE timeseries_contexts ctx JOIN tmp ON ctx.id = tmp.id
SET ctx.id = tmp.new_id""".update.run
Expand Down Expand Up @@ -209,7 +214,7 @@ private[timeseries] object Database {
ORDER BY c.id DESC
"""
.query[(String, String, Instant, Instant, Json, ExecutionStatus, Int)]
.list
.to[List]
.map(_.map {
case (id, job, startTime, endTime, context, status, waitingSeconds) =>
ExecutionLog(id, job, Some(startTime), Some(endTime), context, status, waitingSeconds = waitingSeconds)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ private[timeseries] trait TimeSeriesApp { self: TimeSeriesScheduler =>
case GET at url"/api/timeseries/backfills" =>
Database
.queryBackfills()
.list
.to[List]
.map(_.map {
case (id, name, description, jobs, priority, start, end, created_at, status, created_by) =>
Json.obj(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ case class TimeSeriesScheduler(logger: Logger) extends Scheduler[TimeSeries] wit
atomic { implicit txn =>
val incompleteBackfills = Database
.queryBackfills(Some(sql"""status = 'RUNNING'"""))
.list
.to[List]
.map(_.map {
case (id, name, description, jobsIdsString, priority, start, end, _, status, createdBy) =>
val jobsIds = jobsIdsString.split(",")
Expand Down

0 comments on commit 1ae9921

Please sign in to comment.