Skip to content

Commit

Permalink
Configurable logs & timeseries state retention
Browse files Browse the repository at this point in the history
Refactoring how the TimeSeriesScheduler is instantiated (it was
a typeclass on Scheduling before but now we don't need to retrieve
the instance implicitely).

Also cleaning the code duplication between CuttleProject.start &
CuttleProject.build
  • Loading branch information
guillaumebort committed Feb 1, 2019
1 parent 4996a82 commit 1070cd1
Show file tree
Hide file tree
Showing 10 changed files with 77 additions and 73 deletions.
10 changes: 10 additions & 0 deletions core/src/main/scala/com/criteo/cuttle/Database.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package com.criteo.cuttle

import java.time._
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.{Duration => ScalaDuration}

import scala.util._

Expand Down Expand Up @@ -353,6 +354,15 @@ private[cuttle] case class Queries(logger: Logger) {
(${id}, ${streams})
""".update.run

def applyLogsRetention(logsRetention: ScalaDuration): ConnectionIO[Int] =
sql"""
DELETE es
FROM executions_streams es
JOIN executions e
ON es.id = e.id
WHERE end_time < ${Instant.now.minusSeconds(logsRetention.toSeconds)}
""".update.run

def archivedStreams(id: String): ConnectionIO[Option[String]] =
sql"SELECT streams FROM executions_streams WHERE id = ${id}"
.query[String]
Expand Down
16 changes: 11 additions & 5 deletions core/src/main/scala/com/criteo/cuttle/ExecutionStreams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,17 @@ private[cuttle] object ExecutionStreams {
logFile(id).delete()
}

def archive(id: ExecutionId, queries: Queries, xa: XA): Unit = {
queries
.archiveStreams(id, streamsAsString(id).getOrElse(sys.error(s"Cannot archive streams for execution $id")))
.transact(xa)
.unsafeRunSync()
def archive(id: ExecutionId, queries: Queries, logsRetention: Option[Duration], xa: XA): Unit = {
(for {
_ <-
queries
.archiveStreams(id, streamsAsString(id).getOrElse(sys.error(s"Cannot archive streams for execution $id")))
.transact(xa)
_ <-
logsRetention.map { retention =>
queries.applyLogsRetention(retention).transact(xa)
}.getOrElse(IO.unit)
} yield ()).unsafeRunSync()
discard(id)
}
}
6 changes: 4 additions & 2 deletions core/src/main/scala/com/criteo/cuttle/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import java.util.{Timer, TimerTask}

import scala.concurrent.duration._
import scala.concurrent.duration.{Duration => ScalaDuration}
import scala.concurrent.stm._
import scala.concurrent.{Future, Promise}
import scala.reflect.{ClassTag, classTag}
Expand Down Expand Up @@ -411,7 +412,8 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla
xa: XA,
logger: Logger,
val projectName: String,
val projectVersion: String)(implicit retryStrategy: RetryStrategy)
val projectVersion: String,
logsRetention: Option[ScalaDuration] = None)(implicit retryStrategy: RetryStrategy)
extends MetricProvider[S] {

import ExecutionStatus._
Expand Down Expand Up @@ -723,7 +725,7 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla
.andThen {
case result =>
try {
ExecutionStreams.archive(execution.id, queries, xa)
ExecutionStreams.archive(execution.id, queries, logsRetention, xa)
} catch {
case e: Throwable =>
e.printStackTrace()
Expand Down
6 changes: 4 additions & 2 deletions cron/src/main/scala/com/criteo/cuttle/cron/CronProject.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,20 @@ class CronProject private[cuttle] (val name: String,
* @param port The port to use for the HTTP daemon.
* @param databaseConfig JDBC configuration for MySQL server
* @param retryStrategy The strategy to use for execution retry. Default to exponential backoff.
* @param logsRetention If specified, automatically clean the execution logs older than the given duration.
*/
def start(
platforms: Seq[ExecutionPlatform] = CronProject.defaultPlatforms,
port: Int = CronProject.port,
databaseConfig: DatabaseConfig = CronProject.databaseConfig,
retryStrategy: RetryStrategy = CronProject.retryStrategy
retryStrategy: RetryStrategy = CronProject.retryStrategy,
logsRetention: Option[Duration] = None
): Unit = {
logger.info("Connecting to database")
implicit val transactor = com.criteo.cuttle.Database.connect(databaseConfig)(logger)

logger.info("Creating Executor")
val executor = new Executor[CronScheduling](platforms, transactor, logger, name, version)(retryStrategy)
val executor = new Executor[CronScheduling](platforms, transactor, logger, name, version, logsRetention)(retryStrategy)

logger.info("Scheduler starting workflow")
scheduler.start(workload, executor, transactor, logger)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,6 @@ object HelloTimeSeries {
world dependsOn (hello1 and hello2 and hello3)
}.
// Starts the scheduler and an HTTP server.
start()
start(logsRetention = Some(1.minute))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import lol.http._
import com.criteo.cuttle._
import com.criteo.cuttle.{Database => CuttleDatabase}
import com.criteo.cuttle.ThreadPools._, Implicits.serverThreadPool
import scala.concurrent.duration.Duration

/**
* A cuttle project is a workflow to execute with the appropriate scheduler.
Expand All @@ -15,7 +16,6 @@ class CuttleProject private[cuttle] (val name: String,
val description: String,
val env: (String, Boolean),
val jobs: Workflow,
val scheduler: TimeSeriesScheduler,
val authenticator: Auth.Authenticator,
val logger: Logger) {

Expand All @@ -27,30 +27,28 @@ class CuttleProject private[cuttle] (val name: String,
* @param httpPort The port to use for the HTTP daemon.
* @param databaseConfig JDBC configuration for MySQL server 5.7.
* @param retryStrategy The strategy to use for execution retry. Default to exponential backoff.
* @param paused Automatically pause all jobs at startup.
* @param stateRetention If specified, automatically clean the timeseries state older than the given duration.
* @param logsRetention If specified, automatically clean the execution logs older than the given duration.
*/
def start(
platforms: Seq[ExecutionPlatform] = CuttleProject.defaultPlatforms,
httpPort: Int = 8888,
databaseConfig: DatabaseConfig = DatabaseConfig.fromEnv,
retryStrategy: RetryStrategy = RetryStrategy.ExponentialBackoffRetryStrategy,
paused: Boolean = false
paused: Boolean = false,
stateRetention: Option[Duration] = None,
logsRetention: Option[Duration] = None
): Unit = {
val xa = CuttleDatabase.connect(databaseConfig)(logger)
val executor = new Executor[TimeSeries](platforms, xa, logger, name, version)(retryStrategy)

if (paused) {
logger.info("Pausing workflow")
scheduler.pauseJobs(jobs.all, executor, xa)(Auth.User("Startup"))
}
val (routes, startScheduler) = build(platforms, databaseConfig, retryStrategy, paused, stateRetention, logsRetention)

logger.info("Start workflow")
scheduler.start(jobs, executor, xa, logger)
startScheduler()

logger.info("Start server")
Server.listen(port = httpPort, onError = { e =>
e.printStackTrace()
InternalServerError(e.getMessage)
})(TimeSeriesApp(this, executor, xa).routes)
})(routes)

logger.info(s"Listening on http://localhost:$httpPort")
}
Expand All @@ -61,23 +59,34 @@ class CuttleProject private[cuttle] (val name: String,
* @param platforms The configured [[ExecutionPlatform ExecutionPlatforms]] to use to execute jobs.
* @param databaseConfig JDBC configuration for MySQL server 5.7.
* @param retryStrategy The strategy to use for execution retry. Default to exponential backoff.
* @param paused Automatically pause all jobs at startup.
* @param stateRetention If specified, automatically clean the timeseries state older than the given duration.
* @param logsRetention If specified, automatically clean the execution logs older than the given duration.
*
* @return a tuple with cuttleRoutes (needed to start a server) and a function to start the scheduler
*/
def build(
platforms: Seq[ExecutionPlatform] = CuttleProject.defaultPlatforms,
databaseConfig: DatabaseConfig = DatabaseConfig.fromEnv,
retryStrategy: RetryStrategy = RetryStrategy.ExponentialBackoffRetryStrategy
retryStrategy: RetryStrategy = RetryStrategy.ExponentialBackoffRetryStrategy,
paused: Boolean = false,
stateRetention: Option[Duration] = None,
logsRetention: Option[Duration] = None
): (Service, () => Unit) = {
val xa = CuttleDatabase.connect(databaseConfig)(logger)
val executor = new Executor[TimeSeries](platforms, xa, logger, name, version)(retryStrategy)
val executor = new Executor[TimeSeries](platforms, xa, logger, name, version, logsRetention)(retryStrategy)
val scheduler = new TimeSeriesScheduler(logger, stateRetention)

val startScheduler = () => {
if (paused) {
logger.info("Pausing workflow")
scheduler.pauseJobs(jobs.all, executor, xa)(Auth.User("Startup"))
}
logger.info("Start workflow")
scheduler.start(jobs, executor, xa, logger)
}

(TimeSeriesApp(this, executor, xa).routes, startScheduler)
(TimeSeriesApp(this, executor, scheduler, xa).routes, startScheduler)
}
}

Expand All @@ -95,16 +104,14 @@ object CuttleProject {
* if the environment is a production one).
* @param authenticator The way to authenticate HTTP request for the UI and the private API.
* @param jobs The workflow to run in this project.
* @param scheduler The scheduler instance to use to schedule the Workflow jobs.
* @param logger The logger to use to log internal debug informations.
*/
def apply(name: String,
version: String = "",
description: String = "",
env: (String, Boolean) = ("", false),
authenticator: Auth.Authenticator = Auth.GuestAuth)(jobs: Workflow)(implicit scheduler: TimeSeriesScheduler,
logger: Logger): CuttleProject =
new CuttleProject(name, version, description, env, jobs, scheduler, authenticator, logger)
authenticator: Auth.Authenticator = Auth.GuestAuth)(jobs: Workflow)(implicit logger: Logger): CuttleProject =
new CuttleProject(name, version, description, env, jobs, authenticator, logger)

private[CuttleProject] def defaultPlatforms: Seq[ExecutionPlatform] = {
import java.util.concurrent.TimeUnit.SECONDS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import com.criteo.cuttle._
import Internal._

import java.time._
import scala.concurrent.duration.Duration

import cats.Applicative
import cats.data.{NonEmptyList, OptionT}
Expand Down Expand Up @@ -48,36 +49,6 @@ private[timeseries] object Database {
} yield ()
}

val cleanBigTimeseriesState: ConnectionIO[Unit] = {
sql"""
CREATE TABLE IF NOT EXISTS timeseries_state_temp (
state JSON NOT NULL,
date DATETIME NOT NULL,
PRIMARY KEY (date),
KEY timeseries_state_by_date (date)
) ENGINE = INNODB;

TRUNCATE TABLE timeseries_state_temp;

INSERT INTO timeseries_state_temp
SELECT
*
FROM timeseries_state
WHERE date > (NOW() - INTERVAL 1 WEEK);

TRUNCATE TABLE timeseries_state;

INSERT INTO timeseries_state
SELECT
*
FROM timeseries_state_temp;

DROP TABLE timeseries_state_temp;

OPTIMIZE table timeseries_state;
""".update.run.map(_ => Unit)
}

val schema = List(
sql"""
CREATE TABLE timeseries_state (
Expand Down Expand Up @@ -115,7 +86,7 @@ private[timeseries] object Database {
CREATE INDEX timeseries_backfills_by_status ON timeseries_backfills (status);
""".update.run,
contextIdMigration,
cleanBigTimeseriesState
NoUpdate // We removed this migration, so we reserve this slot
)

val doSchemaUpdates: ConnectionIO[Unit] = utils.updateSchema("timeseries", schema)
Expand Down Expand Up @@ -161,10 +132,16 @@ private[timeseries] object Database {
.value
}

def serializeState(state: State): ConnectionIO[Int] = {
def serializeState(state: State, retention: Option[Duration]): ConnectionIO[Int] = {
import JobState.{Done, Todo}

val now = Instant.now()
val cleanStateBefore = retention.map { duration =>
if(duration.toSeconds <= 0 )
sys.error(s"State retention is badly configured: ${duration}")
else
now.minusSeconds(duration.toSeconds)
}
val stateJson = state.toList.map {
case (job, im) =>
(job.id, im.toList.filter {
Expand All @@ -178,8 +155,11 @@ private[timeseries] object Database {
}.asJson

for {
// First clean state older that 1 week ago
_ <- sql"DELETE FROM timeseries_state where date < (${now} - INTERVAL 1 WEEK)".update.run
// Apply state retention if needed
_ <-
cleanStateBefore.map { t =>
sql"DELETE FROM timeseries_state where date < ${t}".update.run
}.getOrElse(NoUpdate)
// Insert the latest state
x <- sql"INSERT INTO timeseries_state (state, date) VALUES (${stateJson}, ${now})".update.run
} yield x
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ private[timeseries] object TimeSeriesApp {
"name" -> project.name.asJson,
"version" -> Option(project.version).filterNot(_.isEmpty).asJson,
"description" -> Option(project.description).filterNot(_.isEmpty).asJson,
"scheduler" -> project.scheduler.name.asJson,
"scheduler" -> "timeseries".asJson,
"env" -> Json.obj(
"name" -> Option(project.env._1).filterNot(_.isEmpty).asJson,
"critical" -> project.env._2.asJson
Expand Down Expand Up @@ -85,9 +85,9 @@ private[timeseries] object TimeSeriesApp {

}

private[timeseries] case class TimeSeriesApp(project: CuttleProject, executor: Executor[TimeSeries], xa: XA) {
private[timeseries] case class TimeSeriesApp(project: CuttleProject, executor: Executor[TimeSeries], scheduler: TimeSeriesScheduler, xa: XA) {

import project.{jobs, scheduler}
import project.{jobs}

import JobState._
import TimeSeriesApp._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,10 +295,7 @@ case class TimeSeries(calendar: TimeSeriesCalendar, start: Instant, end: Option[
}

/** [[TimeSeries]] utilities. */
object TimeSeries {
/* Provide a default [[TimeSeriesScheduler]] for [[TimeSeries]] scheduling. */
implicit def scheduler(implicit logger: Logger) = TimeSeriesScheduler(logger)
}
object TimeSeries

private[timeseries] sealed trait JobState
private[timeseries] object JobState {
Expand Down Expand Up @@ -328,7 +325,7 @@ private[timeseries] object JobState {
* The scheduler also allow to [[Backfill]] already computed partitions. The [[Backfill]] can be recursive
* or not and an audit log of backfills is kept.
*/
case class TimeSeriesScheduler(logger: Logger) extends Scheduler[TimeSeries] {
case class TimeSeriesScheduler(logger: Logger, stateRetention: Option[ScalaDuration] = None) extends Scheduler[TimeSeries] {
import JobState.{Done, Running, Todo}
import TimeSeriesUtils._

Expand Down Expand Up @@ -651,7 +648,7 @@ case class TimeSeriesScheduler(logger: Logger) extends Scheduler[TimeSeries] {
}

if (completed.nonEmpty || toRun.nonEmpty)
runOrLogAndDie(Database.serializeState(stateSnapshot).transact(xa).unsafeRunSync,
runOrLogAndDie(Database.serializeState(stateSnapshot, stateRetention).transact(xa).unsafeRunSync,
"TimeseriesScheduler, cannot serialize state, shutting down")

if (completedBackfills.nonEmpty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ object TimeSeriesSchedulerIntegrationTests {
val executor =
new Executor[TimeSeries](Seq(LocalPlatform(maxForkedProcesses = 10)), xa, logger, project.name, project.version)(
retryImmediatelyStrategy)
val scheduler = project.scheduler
val scheduler = new TimeSeriesScheduler(logger)

scheduler.initialize(project.jobs, xa, logger)

Expand Down

0 comments on commit 1070cd1

Please sign in to comment.