diff --git a/airflow/utils/db.py b/airflow/utils/db.py index 9088823de3589..54aacc4d2276a 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -1292,6 +1292,7 @@ class BadReferenceConfig: ti_lkp_table = _create_ti_key_lkp_table(session=session, table_name='_airflow_tmp_ti_key_lkp') + session.commit() for model, change_version, bad_ref_cfg in models_list: log.debug("checking model %s", model.__tablename__) # We can't use the model here since it may differ from the db state due to @@ -1304,11 +1305,13 @@ class BadReferenceConfig: if "run_id" in source_table.columns: continue + session.commit() bad_rows_subquery = bad_ref_cfg.exists_func(session, source_table, ti_lkp_table=ti_lkp_table) dangling_table_name = _format_airflow_moved_table_name(source_table.name, change_version, 'dangling') select_list = [x.label(x.name) for x in source_table.c] log.debug(bad_rows_subquery.selectable.compile()) invalid_rows_query = session.query(*select_list).filter(~bad_rows_subquery.exists()) + session.commit() if dangling_table_name in existing_table_names: invalid_row_count = invalid_rows_query.count() if invalid_row_count <= 0: @@ -1330,6 +1333,7 @@ class BadReferenceConfig: dangling_table_name, ) session.commit() + session.commit() ti_lkp_table.drop(bind=settings.engine, checkfirst=True)