Skip to content

Commit

Permalink
Adjust data retention reallocate strategy to use CrateDB 5.2 node att…
Browse files Browse the repository at this point in the history
…ributes
  • Loading branch information
hammerhead committed Dec 22, 2022
1 parent f8b20d6 commit cf82c2a
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 27 deletions.
15 changes: 4 additions & 11 deletions dags/data_retention_reallocate_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
Prerequisites
-------------
In CrateDB, tables for storing retention policies need to be created once manually.
See the file setup/data_retention_schema.sql in this repository.
- CrateDB 5.2.0 or later
- Tables for storing retention policies need to be created once manually in
CrateDB. See the file setup/data_retention_schema.sql in this repository.
"""
from pathlib import Path
import pendulum
Expand Down Expand Up @@ -58,18 +59,10 @@ def generate_sql_reallocate(policy):
def data_retention_reallocate():
policies = map_policy.expand(policy=get_policies())

reallocate = SQLExecuteQueryOperator.partial(
SQLExecuteQueryOperator.partial(
task_id="reallocate_partitions",
conn_id="cratedb_connection",
).expand(sql=generate_sql_reallocate.expand(policy=policies))

track = SQLExecuteQueryOperator.partial(
task_id="add_tracking_information",
conn_id="cratedb_connection",
sql="data_retention_reallocate_tracking.sql",
).expand(parameters=policies)

reallocate >> track


data_retention_reallocate()
1 change: 0 additions & 1 deletion include/data_retention_reallocate_tracking.sql

This file was deleted.

18 changes: 12 additions & 6 deletions include/data_retention_retrieve_reallocate_policies.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
WITH partition_allocations AS (
SELECT DISTINCT s.schema_name AS table_schema,
s.table_name,
s.partition_ident,
n.attributes
FROM sys.shards s
JOIN sys.nodes n ON s.node['id'] = n.id
)
SELECT QUOTE_IDENT(p.table_schema),
QUOTE_IDENT(p.table_name),
QUOTE_IDENT(p.table_schema) || '.' || QUOTE_IDENT(p.table_name),
Expand All @@ -9,11 +17,9 @@ FROM information_schema.table_partitions p
JOIN doc.retention_policies r ON p.table_schema = r.table_schema
AND p.table_name = r.table_name
AND p.values[r.partition_column] < %(day)s::TIMESTAMP - (r.retention_period || ' days')::INTERVAL
LEFT JOIN doc.retention_policy_tracking t ON t.table_schema = p.table_schema
AND t.table_name = p.table_name
JOIN partition_allocations a ON a.table_schema = p.table_schema
AND a.table_name = p.table_name
AND p.partition_ident = a.partition_ident
AND attributes[r.reallocation_attribute_name] <> r.reallocation_attribute_value
WHERE r.strategy = 'reallocate'
AND (
t.last_partition_value IS NULL
OR p.values[r.partition_column] > t.last_partition_value
)
ORDER BY 5 ASC;
9 changes: 0 additions & 9 deletions setup/data_retention_schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,3 @@ CREATE TABLE IF NOT EXISTS doc.retention_policies (
PRIMARY KEY ("table_schema", "table_name", "strategy")
)
CLUSTERED INTO 1 SHARDS;

CREATE TABLE IF NOT EXISTS doc.retention_policy_tracking (
"table_schema" TEXT,
"table_name" TEXT,
"strategy" TEXT,
"last_partition_value" TIMESTAMP WITH TIME ZONE NOT NULL,
PRIMARY KEY (table_schema, table_name, strategy)
)
CLUSTERED INTO 1 SHARDS;

0 comments on commit cf82c2a

Please sign in to comment.