From a1cb4827a364b4433883c0cb059a43bd4510b95b Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Tue, 3 May 2022 10:52:47 -0700 Subject: [PATCH] Optimize 2.3.0 pre-upgrade check queries We have to check for rows that are missing either corresponding TI or DR and move them out of table before adding FKs. We were doing correlation in the JOIN condition but it appears postgres does *not* like this so here we move correlation to WHERE. --- airflow/utils/db.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/airflow/utils/db.py b/airflow/utils/db.py index bc4007c279ef8..0e1ada2bbd48e 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -1092,30 +1092,30 @@ def _task_instance_exists(session, source_table, dag_run, task_instance): """ if 'run_id' not in task_instance.c: # db is < 2.2.0 - source_to_ti_join_cond = and_( + where_clause = and_( source_table.c.dag_id == task_instance.c.dag_id, source_table.c.task_id == task_instance.c.task_id, source_table.c.execution_date == task_instance.c.execution_date, ) ti_to_dr_join_cond = and_( - source_table.c.dag_id == task_instance.c.dag_id, - source_table.c.execution_date == task_instance.c.execution_date, + dag_run.c.dag_id == task_instance.c.dag_id, + dag_run.c.execution_date == task_instance.c.execution_date, ) else: # db is 2.2.0 <= version < 2.3.0 - source_to_ti_join_cond = and_( + where_clause = and_( source_table.c.dag_id == task_instance.c.dag_id, source_table.c.task_id == task_instance.c.task_id, + source_table.c.execution_date == dag_run.c.execution_date, ) ti_to_dr_join_cond = and_( - source_table.c.dag_id == task_instance.c.dag_id, + dag_run.c.dag_id == task_instance.c.dag_id, dag_run.c.run_id == task_instance.c.run_id, - source_table.c.execution_date == dag_run.c.execution_date, ) exists_subquery = ( session.query(text('1')) .select_from(task_instance.join(dag_run, onclause=ti_to_dr_join_cond)) - .filter(source_to_ti_join_cond) + .filter(where_clause) ) return exists_subquery