From 1070cd1bebde1a286ed680e0ac1f49ea9788b195 Mon Sep 17 00:00:00 2001 From: Guillaume Bort Date: Fri, 1 Feb 2019 10:32:38 +0100 Subject: [PATCH] Configurable logs & timeseries state retention Refactoring how the TimeSeriesScheduler is instantiated (it was a typeclass on Scheduling before but now we don't need to retrieve the instance implicitely). Also cleaning the code duplication between CuttleProject.start & CuttleProject.build --- .../scala/com/criteo/cuttle/Database.scala | 10 ++++ .../com/criteo/cuttle/ExecutionStreams.scala | 16 +++++-- .../scala/com/criteo/cuttle/Executor.scala | 6 ++- .../com/criteo/cuttle/cron/CronProject.scala | 6 ++- .../cuttle/examples/HelloTimeSeries.scala | 2 +- .../cuttle/timeseries/CuttleProject.scala | 45 +++++++++-------- .../criteo/cuttle/timeseries/Database.scala | 48 ++++++------------- .../cuttle/timeseries/TimeSeriesApp.scala | 6 +-- .../timeseries/TimeSeriesScheduler.scala | 9 ++-- .../TimeSeriesSchedulerIntegrationTests.scala | 2 +- 10 files changed, 77 insertions(+), 73 deletions(-) diff --git a/core/src/main/scala/com/criteo/cuttle/Database.scala b/core/src/main/scala/com/criteo/cuttle/Database.scala index 4046bd7b7..a1d3d99ae 100644 --- a/core/src/main/scala/com/criteo/cuttle/Database.scala +++ b/core/src/main/scala/com/criteo/cuttle/Database.scala @@ -2,6 +2,7 @@ package com.criteo.cuttle import java.time._ import java.util.concurrent.TimeUnit +import scala.concurrent.duration.{Duration => ScalaDuration} import scala.util._ @@ -353,6 +354,15 @@ private[cuttle] case class Queries(logger: Logger) { (${id}, ${streams}) """.update.run + def applyLogsRetention(logsRetention: ScalaDuration): ConnectionIO[Int] = + sql""" + DELETE es + FROM executions_streams es + JOIN executions e + ON es.id = e.id + WHERE end_time < ${Instant.now.minusSeconds(logsRetention.toSeconds)} + """.update.run + def archivedStreams(id: String): ConnectionIO[Option[String]] = sql"SELECT streams FROM executions_streams WHERE id = ${id}" .query[String] diff --git a/core/src/main/scala/com/criteo/cuttle/ExecutionStreams.scala b/core/src/main/scala/com/criteo/cuttle/ExecutionStreams.scala index 52f69027c..88583511e 100644 --- a/core/src/main/scala/com/criteo/cuttle/ExecutionStreams.scala +++ b/core/src/main/scala/com/criteo/cuttle/ExecutionStreams.scala @@ -149,11 +149,17 @@ private[cuttle] object ExecutionStreams { logFile(id).delete() } - def archive(id: ExecutionId, queries: Queries, xa: XA): Unit = { - queries - .archiveStreams(id, streamsAsString(id).getOrElse(sys.error(s"Cannot archive streams for execution $id"))) - .transact(xa) - .unsafeRunSync() + def archive(id: ExecutionId, queries: Queries, logsRetention: Option[Duration], xa: XA): Unit = { + (for { + _ <- + queries + .archiveStreams(id, streamsAsString(id).getOrElse(sys.error(s"Cannot archive streams for execution $id"))) + .transact(xa) + _ <- + logsRetention.map { retention => + queries.applyLogsRetention(retention).transact(xa) + }.getOrElse(IO.unit) + } yield ()).unsafeRunSync() discard(id) } } diff --git a/core/src/main/scala/com/criteo/cuttle/Executor.scala b/core/src/main/scala/com/criteo/cuttle/Executor.scala index 653b02927..f1578f46d 100755 --- a/core/src/main/scala/com/criteo/cuttle/Executor.scala +++ b/core/src/main/scala/com/criteo/cuttle/Executor.scala @@ -7,6 +7,7 @@ import java.util.concurrent.atomic.AtomicBoolean import java.util.{Timer, TimerTask} import scala.concurrent.duration._ +import scala.concurrent.duration.{Duration => ScalaDuration} import scala.concurrent.stm._ import scala.concurrent.{Future, Promise} import scala.reflect.{ClassTag, classTag} @@ -411,7 +412,8 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla xa: XA, logger: Logger, val projectName: String, - val projectVersion: String)(implicit retryStrategy: RetryStrategy) + val projectVersion: String, + logsRetention: Option[ScalaDuration] = None)(implicit retryStrategy: RetryStrategy) extends MetricProvider[S] { import ExecutionStatus._ @@ -723,7 +725,7 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla .andThen { case result => try { - ExecutionStreams.archive(execution.id, queries, xa) + ExecutionStreams.archive(execution.id, queries, logsRetention, xa) } catch { case e: Throwable => e.printStackTrace() diff --git a/cron/src/main/scala/com/criteo/cuttle/cron/CronProject.scala b/cron/src/main/scala/com/criteo/cuttle/cron/CronProject.scala index 32d9c24ff..9f1689e42 100644 --- a/cron/src/main/scala/com/criteo/cuttle/cron/CronProject.scala +++ b/cron/src/main/scala/com/criteo/cuttle/cron/CronProject.scala @@ -32,18 +32,20 @@ class CronProject private[cuttle] (val name: String, * @param port The port to use for the HTTP daemon. * @param databaseConfig JDBC configuration for MySQL server * @param retryStrategy The strategy to use for execution retry. Default to exponential backoff. + * @param logsRetention If specified, automatically clean the execution logs older than the given duration. */ def start( platforms: Seq[ExecutionPlatform] = CronProject.defaultPlatforms, port: Int = CronProject.port, databaseConfig: DatabaseConfig = CronProject.databaseConfig, - retryStrategy: RetryStrategy = CronProject.retryStrategy + retryStrategy: RetryStrategy = CronProject.retryStrategy, + logsRetention: Option[Duration] = None ): Unit = { logger.info("Connecting to database") implicit val transactor = com.criteo.cuttle.Database.connect(databaseConfig)(logger) logger.info("Creating Executor") - val executor = new Executor[CronScheduling](platforms, transactor, logger, name, version)(retryStrategy) + val executor = new Executor[CronScheduling](platforms, transactor, logger, name, version, logsRetention)(retryStrategy) logger.info("Scheduler starting workflow") scheduler.start(workload, executor, transactor, logger) diff --git a/examples/src/main/scala/com/criteo/cuttle/examples/HelloTimeSeries.scala b/examples/src/main/scala/com/criteo/cuttle/examples/HelloTimeSeries.scala index 14095e732..122ff854a 100644 --- a/examples/src/main/scala/com/criteo/cuttle/examples/HelloTimeSeries.scala +++ b/examples/src/main/scala/com/criteo/cuttle/examples/HelloTimeSeries.scala @@ -119,6 +119,6 @@ object HelloTimeSeries { world dependsOn (hello1 and hello2 and hello3) }. // Starts the scheduler and an HTTP server. - start() + start(logsRetention = Some(1.minute)) } } diff --git a/timeseries/src/main/scala/com/criteo/cuttle/timeseries/CuttleProject.scala b/timeseries/src/main/scala/com/criteo/cuttle/timeseries/CuttleProject.scala index 74dde87bb..5538146c8 100644 --- a/timeseries/src/main/scala/com/criteo/cuttle/timeseries/CuttleProject.scala +++ b/timeseries/src/main/scala/com/criteo/cuttle/timeseries/CuttleProject.scala @@ -5,6 +5,7 @@ import lol.http._ import com.criteo.cuttle._ import com.criteo.cuttle.{Database => CuttleDatabase} import com.criteo.cuttle.ThreadPools._, Implicits.serverThreadPool +import scala.concurrent.duration.Duration /** * A cuttle project is a workflow to execute with the appropriate scheduler. @@ -15,7 +16,6 @@ class CuttleProject private[cuttle] (val name: String, val description: String, val env: (String, Boolean), val jobs: Workflow, - val scheduler: TimeSeriesScheduler, val authenticator: Auth.Authenticator, val logger: Logger) { @@ -27,30 +27,28 @@ class CuttleProject private[cuttle] (val name: String, * @param httpPort The port to use for the HTTP daemon. * @param databaseConfig JDBC configuration for MySQL server 5.7. * @param retryStrategy The strategy to use for execution retry. Default to exponential backoff. + * @param paused Automatically pause all jobs at startup. + * @param stateRetention If specified, automatically clean the timeseries state older than the given duration. + * @param logsRetention If specified, automatically clean the execution logs older than the given duration. */ def start( platforms: Seq[ExecutionPlatform] = CuttleProject.defaultPlatforms, httpPort: Int = 8888, databaseConfig: DatabaseConfig = DatabaseConfig.fromEnv, retryStrategy: RetryStrategy = RetryStrategy.ExponentialBackoffRetryStrategy, - paused: Boolean = false + paused: Boolean = false, + stateRetention: Option[Duration] = None, + logsRetention: Option[Duration] = None ): Unit = { - val xa = CuttleDatabase.connect(databaseConfig)(logger) - val executor = new Executor[TimeSeries](platforms, xa, logger, name, version)(retryStrategy) - - if (paused) { - logger.info("Pausing workflow") - scheduler.pauseJobs(jobs.all, executor, xa)(Auth.User("Startup")) - } + val (routes, startScheduler) = build(platforms, databaseConfig, retryStrategy, paused, stateRetention, logsRetention) - logger.info("Start workflow") - scheduler.start(jobs, executor, xa, logger) + startScheduler() logger.info("Start server") Server.listen(port = httpPort, onError = { e => e.printStackTrace() InternalServerError(e.getMessage) - })(TimeSeriesApp(this, executor, xa).routes) + })(routes) logger.info(s"Listening on http://localhost:$httpPort") } @@ -61,23 +59,34 @@ class CuttleProject private[cuttle] (val name: String, * @param platforms The configured [[ExecutionPlatform ExecutionPlatforms]] to use to execute jobs. * @param databaseConfig JDBC configuration for MySQL server 5.7. * @param retryStrategy The strategy to use for execution retry. Default to exponential backoff. + * @param paused Automatically pause all jobs at startup. + * @param stateRetention If specified, automatically clean the timeseries state older than the given duration. + * @param logsRetention If specified, automatically clean the execution logs older than the given duration. * * @return a tuple with cuttleRoutes (needed to start a server) and a function to start the scheduler */ def build( platforms: Seq[ExecutionPlatform] = CuttleProject.defaultPlatforms, databaseConfig: DatabaseConfig = DatabaseConfig.fromEnv, - retryStrategy: RetryStrategy = RetryStrategy.ExponentialBackoffRetryStrategy + retryStrategy: RetryStrategy = RetryStrategy.ExponentialBackoffRetryStrategy, + paused: Boolean = false, + stateRetention: Option[Duration] = None, + logsRetention: Option[Duration] = None ): (Service, () => Unit) = { val xa = CuttleDatabase.connect(databaseConfig)(logger) - val executor = new Executor[TimeSeries](platforms, xa, logger, name, version)(retryStrategy) + val executor = new Executor[TimeSeries](platforms, xa, logger, name, version, logsRetention)(retryStrategy) + val scheduler = new TimeSeriesScheduler(logger, stateRetention) val startScheduler = () => { + if (paused) { + logger.info("Pausing workflow") + scheduler.pauseJobs(jobs.all, executor, xa)(Auth.User("Startup")) + } logger.info("Start workflow") scheduler.start(jobs, executor, xa, logger) } - (TimeSeriesApp(this, executor, xa).routes, startScheduler) + (TimeSeriesApp(this, executor, scheduler, xa).routes, startScheduler) } } @@ -95,16 +104,14 @@ object CuttleProject { * if the environment is a production one). * @param authenticator The way to authenticate HTTP request for the UI and the private API. * @param jobs The workflow to run in this project. - * @param scheduler The scheduler instance to use to schedule the Workflow jobs. * @param logger The logger to use to log internal debug informations. */ def apply(name: String, version: String = "", description: String = "", env: (String, Boolean) = ("", false), - authenticator: Auth.Authenticator = Auth.GuestAuth)(jobs: Workflow)(implicit scheduler: TimeSeriesScheduler, - logger: Logger): CuttleProject = - new CuttleProject(name, version, description, env, jobs, scheduler, authenticator, logger) + authenticator: Auth.Authenticator = Auth.GuestAuth)(jobs: Workflow)(implicit logger: Logger): CuttleProject = + new CuttleProject(name, version, description, env, jobs, authenticator, logger) private[CuttleProject] def defaultPlatforms: Seq[ExecutionPlatform] = { import java.util.concurrent.TimeUnit.SECONDS diff --git a/timeseries/src/main/scala/com/criteo/cuttle/timeseries/Database.scala b/timeseries/src/main/scala/com/criteo/cuttle/timeseries/Database.scala index 1bf49263e..f64e53700 100644 --- a/timeseries/src/main/scala/com/criteo/cuttle/timeseries/Database.scala +++ b/timeseries/src/main/scala/com/criteo/cuttle/timeseries/Database.scala @@ -4,6 +4,7 @@ import com.criteo.cuttle._ import Internal._ import java.time._ +import scala.concurrent.duration.Duration import cats.Applicative import cats.data.{NonEmptyList, OptionT} @@ -48,36 +49,6 @@ private[timeseries] object Database { } yield () } - val cleanBigTimeseriesState: ConnectionIO[Unit] = { - sql""" - CREATE TABLE IF NOT EXISTS timeseries_state_temp ( - state JSON NOT NULL, - date DATETIME NOT NULL, - PRIMARY KEY (date), - KEY timeseries_state_by_date (date) - ) ENGINE = INNODB; - - TRUNCATE TABLE timeseries_state_temp; - - INSERT INTO timeseries_state_temp - SELECT - * - FROM timeseries_state - WHERE date > (NOW() - INTERVAL 1 WEEK); - - TRUNCATE TABLE timeseries_state; - - INSERT INTO timeseries_state - SELECT - * - FROM timeseries_state_temp; - - DROP TABLE timeseries_state_temp; - - OPTIMIZE table timeseries_state; - """.update.run.map(_ => Unit) - } - val schema = List( sql""" CREATE TABLE timeseries_state ( @@ -115,7 +86,7 @@ private[timeseries] object Database { CREATE INDEX timeseries_backfills_by_status ON timeseries_backfills (status); """.update.run, contextIdMigration, - cleanBigTimeseriesState + NoUpdate // We removed this migration, so we reserve this slot ) val doSchemaUpdates: ConnectionIO[Unit] = utils.updateSchema("timeseries", schema) @@ -161,10 +132,16 @@ private[timeseries] object Database { .value } - def serializeState(state: State): ConnectionIO[Int] = { + def serializeState(state: State, retention: Option[Duration]): ConnectionIO[Int] = { import JobState.{Done, Todo} val now = Instant.now() + val cleanStateBefore = retention.map { duration => + if(duration.toSeconds <= 0 ) + sys.error(s"State retention is badly configured: ${duration}") + else + now.minusSeconds(duration.toSeconds) + } val stateJson = state.toList.map { case (job, im) => (job.id, im.toList.filter { @@ -178,8 +155,11 @@ private[timeseries] object Database { }.asJson for { - // First clean state older that 1 week ago - _ <- sql"DELETE FROM timeseries_state where date < (${now} - INTERVAL 1 WEEK)".update.run + // Apply state retention if needed + _ <- + cleanStateBefore.map { t => + sql"DELETE FROM timeseries_state where date < ${t}".update.run + }.getOrElse(NoUpdate) // Insert the latest state x <- sql"INSERT INTO timeseries_state (state, date) VALUES (${stateJson}, ${now})".update.run } yield x diff --git a/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesApp.scala b/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesApp.scala index 891061593..942bd2544 100644 --- a/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesApp.scala +++ b/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesApp.scala @@ -35,7 +35,7 @@ private[timeseries] object TimeSeriesApp { "name" -> project.name.asJson, "version" -> Option(project.version).filterNot(_.isEmpty).asJson, "description" -> Option(project.description).filterNot(_.isEmpty).asJson, - "scheduler" -> project.scheduler.name.asJson, + "scheduler" -> "timeseries".asJson, "env" -> Json.obj( "name" -> Option(project.env._1).filterNot(_.isEmpty).asJson, "critical" -> project.env._2.asJson @@ -85,9 +85,9 @@ private[timeseries] object TimeSeriesApp { } -private[timeseries] case class TimeSeriesApp(project: CuttleProject, executor: Executor[TimeSeries], xa: XA) { +private[timeseries] case class TimeSeriesApp(project: CuttleProject, executor: Executor[TimeSeries], scheduler: TimeSeriesScheduler, xa: XA) { - import project.{jobs, scheduler} + import project.{jobs} import JobState._ import TimeSeriesApp._ diff --git a/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesScheduler.scala b/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesScheduler.scala index 140677128..add97b33d 100644 --- a/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesScheduler.scala +++ b/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesScheduler.scala @@ -295,10 +295,7 @@ case class TimeSeries(calendar: TimeSeriesCalendar, start: Instant, end: Option[ } /** [[TimeSeries]] utilities. */ -object TimeSeries { - /* Provide a default [[TimeSeriesScheduler]] for [[TimeSeries]] scheduling. */ - implicit def scheduler(implicit logger: Logger) = TimeSeriesScheduler(logger) -} +object TimeSeries private[timeseries] sealed trait JobState private[timeseries] object JobState { @@ -328,7 +325,7 @@ private[timeseries] object JobState { * The scheduler also allow to [[Backfill]] already computed partitions. The [[Backfill]] can be recursive * or not and an audit log of backfills is kept. */ -case class TimeSeriesScheduler(logger: Logger) extends Scheduler[TimeSeries] { +case class TimeSeriesScheduler(logger: Logger, stateRetention: Option[ScalaDuration] = None) extends Scheduler[TimeSeries] { import JobState.{Done, Running, Todo} import TimeSeriesUtils._ @@ -651,7 +648,7 @@ case class TimeSeriesScheduler(logger: Logger) extends Scheduler[TimeSeries] { } if (completed.nonEmpty || toRun.nonEmpty) - runOrLogAndDie(Database.serializeState(stateSnapshot).transact(xa).unsafeRunSync, + runOrLogAndDie(Database.serializeState(stateSnapshot, stateRetention).transact(xa).unsafeRunSync, "TimeseriesScheduler, cannot serialize state, shutting down") if (completedBackfills.nonEmpty) diff --git a/timeseries/src/test/scala/com/criteo/cuttle/timeseries/TimeSeriesSchedulerIntegrationTests.scala b/timeseries/src/test/scala/com/criteo/cuttle/timeseries/TimeSeriesSchedulerIntegrationTests.scala index 962d77652..c12108518 100644 --- a/timeseries/src/test/scala/com/criteo/cuttle/timeseries/TimeSeriesSchedulerIntegrationTests.scala +++ b/timeseries/src/test/scala/com/criteo/cuttle/timeseries/TimeSeriesSchedulerIntegrationTests.scala @@ -43,7 +43,7 @@ object TimeSeriesSchedulerIntegrationTests { val executor = new Executor[TimeSeries](Seq(LocalPlatform(maxForkedProcesses = 10)), xa, logger, project.name, project.version)( retryImmediatelyStrategy) - val scheduler = project.scheduler + val scheduler = new TimeSeriesScheduler(logger) scheduler.initialize(project.jobs, xa, logger)