Skip to content

Commit

Permalink
Delete timeseries_state older than 1 week.
Browse files Browse the repository at this point in the history
  • Loading branch information
Guillaume Bort committed Jan 28, 2019
1 parent 22b2948 commit c9b8b86
Showing 1 changed file with 40 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,44 @@ private[timeseries] object Database {
}
.compile
.drain
_ <- sql"CREATE INDEX tmp_id ON tmp (id)".update.run
_ <- sql"""CREATE INDEX tmp_id ON tmp (id)""".update.run
_ <- sql"""UPDATE timeseries_contexts ctx JOIN tmp ON ctx.id = tmp.id
SET ctx.id = tmp.new_id""".update.run
_ <- sql"""UPDATE executions JOIN tmp ON executions.context_id = tmp.id
SET executions.context_id = tmp.new_id""".update.run
} yield ()
}

val cleanBigTimeseriesState: ConnectionIO[Unit] = {
sql"""
CREATE TABLE IF NOT EXISTS timeseries_state_temp (
state JSON NOT NULL,
date DATETIME NOT NULL,
PRIMARY KEY (date),
KEY timeseries_state_by_date (date)
) ENGINE = INNODB;

TRUNCATE TABLE timeseries_state_temp;

INSERT INTO timeseries_state_temp
SELECT
*
FROM timeseries_state
WHERE date > (NOW() - INTERVAL 1 WEEK);

TRUNCATE TABLE timeseries_state;

INSERT INTO timeseries_state
SELECT
*
FROM timeseries_state_temp;

DROP TABLE timeseries_state_temp;

OPTIMIZE table timeseries_state;
""".update.run.map(_ => Unit)
}

val schema = List(
sql"""
CREATE TABLE timeseries_state (
Expand Down Expand Up @@ -84,7 +114,8 @@ private[timeseries] object Database {
CREATE INDEX timeseries_backfills_by_date ON timeseries_backfills (created_at);
CREATE INDEX timeseries_backfills_by_status ON timeseries_backfills (status);
""".update.run,
contextIdMigration
contextIdMigration,
cleanBigTimeseriesState
)

val doSchemaUpdates: ConnectionIO[Unit] = utils.updateSchema("timeseries", schema)
Expand Down Expand Up @@ -145,7 +176,13 @@ private[timeseries] object Database {
}
})
}.asJson
sql"INSERT INTO timeseries_state (state, date) VALUES (${stateJson}, ${now})".update.run

for {
// First clean state older that 1 week ago
_ <- sql"DELETE FROM timeseries_state where date < (${now} - INTERVAL 1 WEEK)".update.run
// Insert the latest state
x <- sql"INSERT INTO timeseries_state (state, date) VALUES (${stateJson}, ${now})".update.run
} yield x
}

def queryBackfills(where: Option[Fragment] = None) = {
Expand Down

0 comments on commit c9b8b86

Please sign in to comment.