diff --git a/timeseries/src/main/scala/com/criteo/cuttle/timeseries/Database.scala b/timeseries/src/main/scala/com/criteo/cuttle/timeseries/Database.scala index a6f232db5..1bf49263e 100644 --- a/timeseries/src/main/scala/com/criteo/cuttle/timeseries/Database.scala +++ b/timeseries/src/main/scala/com/criteo/cuttle/timeseries/Database.scala @@ -40,7 +40,7 @@ private[timeseries] object Database { } .compile .drain - _ <- sql"CREATE INDEX tmp_id ON tmp (id)".update.run + _ <- sql"""CREATE INDEX tmp_id ON tmp (id)""".update.run _ <- sql"""UPDATE timeseries_contexts ctx JOIN tmp ON ctx.id = tmp.id SET ctx.id = tmp.new_id""".update.run _ <- sql"""UPDATE executions JOIN tmp ON executions.context_id = tmp.id @@ -48,6 +48,36 @@ private[timeseries] object Database { } yield () } + val cleanBigTimeseriesState: ConnectionIO[Unit] = { + sql""" + CREATE TABLE IF NOT EXISTS timeseries_state_temp ( + state JSON NOT NULL, + date DATETIME NOT NULL, + PRIMARY KEY (date), + KEY timeseries_state_by_date (date) + ) ENGINE = INNODB; + + TRUNCATE TABLE timeseries_state_temp; + + INSERT INTO timeseries_state_temp + SELECT + * + FROM timeseries_state + WHERE date > (NOW() - INTERVAL 1 WEEK); + + TRUNCATE TABLE timeseries_state; + + INSERT INTO timeseries_state + SELECT + * + FROM timeseries_state_temp; + + DROP TABLE timeseries_state_temp; + + OPTIMIZE table timeseries_state; + """.update.run.map(_ => Unit) + } + val schema = List( sql""" CREATE TABLE timeseries_state ( @@ -84,7 +114,8 @@ private[timeseries] object Database { CREATE INDEX timeseries_backfills_by_date ON timeseries_backfills (created_at); CREATE INDEX timeseries_backfills_by_status ON timeseries_backfills (status); """.update.run, - contextIdMigration + contextIdMigration, + cleanBigTimeseriesState ) val doSchemaUpdates: ConnectionIO[Unit] = utils.updateSchema("timeseries", schema) @@ -145,7 +176,13 @@ private[timeseries] object Database { } }) }.asJson - sql"INSERT INTO timeseries_state (state, date) VALUES (${stateJson}, ${now})".update.run + + for { + // First clean state older that 1 week ago + _ <- sql"DELETE FROM timeseries_state where date < (${now} - INTERVAL 1 WEEK)".update.run + // Insert the latest state + x <- sql"INSERT INTO timeseries_state (state, date) VALUES (${stateJson}, ${now})".update.run + } yield x } def queryBackfills(where: Option[Fragment] = None) = {