Skip to content

Commit

Permalink
Simplify data retention DAGs by using dynamic task mapping on the `pa…
Browse files Browse the repository at this point in the history
…rams` parameter

This is a new upstream improvement that required a workaround earlier: apache/airflow#24014
  • Loading branch information
hammerhead committed Dec 23, 2022
1 parent 8e048d6 commit b8916b1
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 50 deletions.
26 changes: 10 additions & 16 deletions dags/data_retention_delete_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,12 @@
from airflow.decorators import dag, task


@task
def generate_sql(policy):
"""Generate DROP statment for a given partition"""
return (
Path("include/data_retention_delete.sql")
.read_text(encoding="utf-8")
.format(
table_fqn=policy[0],
column=policy[1],
value=policy[2],
)
)
def map_policy(policy):
return {
"table_fqn": policy[0],
"column": policy[1],
"value": policy[2],
}


@task
Expand All @@ -35,7 +29,8 @@ def get_policies(ds=None):
pg_hook = PostgresHook(postgres_conn_id="cratedb_connection")
sql = Path("include/data_retention_retrieve_delete_policies.sql")
return pg_hook.get_records(
sql=sql.read_text(encoding="utf-8"), parameters={"day": ds}
sql=sql.read_text(encoding="utf-8"),
parameters={"day": ds},
)


Expand All @@ -45,12 +40,11 @@ def get_policies(ds=None):
catchup=False,
)
def data_retention_delete():
sql_statements = generate_sql.expand(policy=get_policies())

SQLExecuteQueryOperator.partial(
task_id="delete_partition",
conn_id="cratedb_connection",
).expand(sql=sql_statements)
sql="DELETE FROM {{params.table_fqn}} WHERE {{params.column}} = {{params.value}};",
).expand(params=get_policies().map(map_policy))


data_retention_delete()
22 changes: 7 additions & 15 deletions dags/data_retention_reallocate_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ def get_policies(ds=None):
pg_hook = PostgresHook(postgres_conn_id="cratedb_connection")
sql = Path("include/data_retention_retrieve_reallocate_policies.sql")
return pg_hook.get_records(
sql=sql.read_text(encoding="utf-8"), parameters={"day": ds}
sql=sql.read_text(encoding="utf-8"),
parameters={"day": ds},
)


@task
def map_policy(policy):
"""Map index-based policy to readable dict structure"""
return {
Expand All @@ -40,29 +40,21 @@ def map_policy(policy):
}


@task
def generate_sql_reallocate(policy):
"""Generate SQL for reallocation"""
return (
Path("include/data_retention_reallocate.sql")
.read_text(encoding="utf-8")
.format(**policy)
)


@dag(
start_date=pendulum.datetime(2021, 11, 19, tz="UTC"),
schedule="@daily",
catchup=False,
template_searchpath=["include"],
)
def data_retention_reallocate():
policies = map_policy.expand(policy=get_policies())

SQLExecuteQueryOperator.partial(
task_id="reallocate_partitions",
conn_id="cratedb_connection",
).expand(sql=generate_sql_reallocate.expand(policy=policies))
sql="""
ALTER TABLE {{params.table_fqn}} PARTITION ({{params.column}} = {{params.value}})
SET ("routing.allocation.require.{{params.attribute_name}}" = '{{params.attribute_value}}');
""",
).expand(params=get_policies().map(map_policy))


data_retention_reallocate()
26 changes: 10 additions & 16 deletions dags/data_retention_snapshot_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ def get_policies(ds=None):
)


@task
def map_policy(policy):
"""Map index-based policy to readable dict structure"""
return {
Expand All @@ -38,35 +37,30 @@ def map_policy(policy):
}


@task
def generate_sql(query_file, policy):
"""Generate SQL statment for a given partition"""
return Path(query_file).read_text(encoding="utf-8").format(**policy)


@dag(
start_date=pendulum.datetime(2021, 11, 19, tz="UTC"),
schedule="@daily",
catchup=False,
)
def data_retention_snapshot():
policies = map_policy.expand(policy=get_policies())
sql_statements_snapshot = generate_sql.partial(
query_file="include/data_retention_snapshot.sql"
).expand(policy=policies)
sql_statements_delete = generate_sql.partial(
query_file="include/data_retention_delete.sql"
).expand(policy=policies)
policies = get_policies().map(map_policy)

reallocate = SQLExecuteQueryOperator.partial(
task_id="snapshot_partitions",
conn_id="cratedb_connection",
).expand(sql=sql_statements_snapshot)
sql="""
CREATE SNAPSHOT {{params.target_repository_name}}."{{params.schema}}.{{params.table}}-{{params.value}}"
TABLE {{params.table_fqn}} PARTITION ({{params.column}} = {{params.value}})
WITH ("wait_for_completion" = true);
""",
).expand(params=policies)

delete = SQLExecuteQueryOperator.partial(
task_id="delete_partitions",
conn_id="cratedb_connection",
).expand(sql=sql_statements_delete)
sql="DELETE FROM {{params.table_fqn}} WHERE {{params.column}} = {{params.value}};",
).expand(params=policies)

reallocate >> delete

Expand Down
1 change: 0 additions & 1 deletion include/data_retention_delete.sql

This file was deleted.

1 change: 0 additions & 1 deletion include/data_retention_reallocate.sql

This file was deleted.

1 change: 0 additions & 1 deletion include/data_retention_snapshot.sql

This file was deleted.

0 comments on commit b8916b1

Please sign in to comment.