Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete timeseries_state older than 1 week. #354

Merged
merged 1 commit into from
Jan 29, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,44 @@ private[timeseries] object Database {
}
.compile
.drain
_ <- sql"CREATE INDEX tmp_id ON tmp (id)".update.run
_ <- sql"""CREATE INDEX tmp_id ON tmp (id)""".update.run
_ <- sql"""UPDATE timeseries_contexts ctx JOIN tmp ON ctx.id = tmp.id
SET ctx.id = tmp.new_id""".update.run
_ <- sql"""UPDATE executions JOIN tmp ON executions.context_id = tmp.id
SET executions.context_id = tmp.new_id""".update.run
} yield ()
}

val cleanBigTimeseriesState: ConnectionIO[Unit] = {
sql"""
CREATE TABLE IF NOT EXISTS timeseries_state_temp (
state JSON NOT NULL,
date DATETIME NOT NULL,
PRIMARY KEY (date),
KEY timeseries_state_by_date (date)
) ENGINE = INNODB;

TRUNCATE TABLE timeseries_state_temp;

INSERT INTO timeseries_state_temp
SELECT
*
FROM timeseries_state
WHERE date > (NOW() - INTERVAL 1 WEEK);

TRUNCATE TABLE timeseries_state;

INSERT INTO timeseries_state
SELECT
*
FROM timeseries_state_temp;

DROP TABLE timeseries_state_temp;

OPTIMIZE table timeseries_state;
""".update.run.map(_ => Unit)
}

val schema = List(
sql"""
CREATE TABLE timeseries_state (
Expand Down Expand Up @@ -84,7 +114,8 @@ private[timeseries] object Database {
CREATE INDEX timeseries_backfills_by_date ON timeseries_backfills (created_at);
CREATE INDEX timeseries_backfills_by_status ON timeseries_backfills (status);
""".update.run,
contextIdMigration
contextIdMigration,
cleanBigTimeseriesState
)

val doSchemaUpdates: ConnectionIO[Unit] = utils.updateSchema("timeseries", schema)
Expand Down Expand Up @@ -145,7 +176,13 @@ private[timeseries] object Database {
}
})
}.asJson
sql"INSERT INTO timeseries_state (state, date) VALUES (${stateJson}, ${now})".update.run

for {
// First clean state older that 1 week ago
_ <- sql"DELETE FROM timeseries_state where date < (${now} - INTERVAL 1 WEEK)".update.run
// Insert the latest state
x <- sql"INSERT INTO timeseries_state (state, date) VALUES (${stateJson}, ${now})".update.run
} yield x
}

def queryBackfills(where: Option[Fragment] = None) = {
Expand Down