Skip to content

Commit

Permalink
Resolve SA warnings in migrations scripts (apache#39418)
Browse files Browse the repository at this point in the history
  • Loading branch information
Taragolis authored and RodrigoGanancia committed May 10, 2024
1 parent b537d6e commit 4c8ccb2
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,7 @@ def upgrade():
constraints = get_mssql_table_constraints(conn, "task_instance")
pk, _ = constraints["PRIMARY KEY"].popitem()
batch_op.drop_constraint(pk, type_="primary")
elif dialect_name not in ("sqlite"):
batch_op.drop_constraint("task_instance_pkey", type_="primary")
batch_op.drop_constraint("task_instance_pkey", type_="primary")
batch_op.drop_index("ti_dag_date")
batch_op.drop_index("ti_state_lkp")
batch_op.drop_column("execution_date")
Expand Down Expand Up @@ -402,7 +401,7 @@ def _multi_table_update(dialect_name, target, column):
if dialect_name == "sqlite":
# Most SQLite versions don't support multi table update (and SQLA doesn't know about it anyway), so we
# need to do a Correlated subquery update
sub_q = select(dag_run.c[column.name]).where(condition)
sub_q = select(dag_run.c[column.name]).where(condition).scalar_subquery()

return target.update().values({column: sub_q})
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,16 +93,14 @@ def upgrade():
xcom = Table("xcom", metadata, *_get_old_xcom_columns())
dagrun = _get_dagrun_table()
query = select(
[
dagrun.c.id,
xcom.c.task_id,
xcom.c.key,
xcom.c.value,
xcom.c.timestamp,
xcom.c.dag_id,
dagrun.c.run_id,
literal_column("-1"),
],
dagrun.c.id,
xcom.c.task_id,
xcom.c.key,
xcom.c.value,
xcom.c.timestamp,
xcom.c.dag_id,
dagrun.c.run_id,
literal_column("-1"),
).select_from(
xcom.join(
right=dagrun,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def _multi_table_update(dialect_name, target, column):
if dialect_name == "sqlite":
# Most SQLite versions don't support multi table update (and SQLA doesn't know about it anyway), so we
# need to do a Correlated subquery update
sub_q = select(dag_run.c[column.name]).where(condition)
sub_q = select(dag_run.c[column.name]).where(condition).scalar_subquery()

return target.update().values({column: sub_q})
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def _update_value_from_dag_run(
if dialect_name == "sqlite":
# Most SQLite versions don't support multi table update (and SQLA doesn't know about it anyway), so we
# need to do a Correlated subquery update
sub_q = select(dag_run.c[target_column.name]).where(condition)
sub_q = select(dag_run.c[target_column.name]).where(condition).scalar_subquery()

return target_table.update().values({target_column: sub_q})
else:
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
45e8aa557ff6c5c69995915206df2fef17495d5da04453e897cea73a8df0f492
ec3c33d964d9afc51f0fbbee9bb2ef8c8dc496675a6ef96891a30d0403bb6823
4 changes: 2 additions & 2 deletions docs/apache-airflow/img/airflow_erd.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit 4c8ccb2

Please sign in to comment.