Skip to content

Commit

Permalink
add commits
Browse files Browse the repository at this point in the history
  • Loading branch information
dstandish committed May 6, 2022
1 parent b74a2e3 commit d4794a3
Showing 1 changed file with 4 additions and 0 deletions.
4 changes: 4 additions & 0 deletions airflow/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -1292,6 +1292,7 @@ class BadReferenceConfig:

ti_lkp_table = _create_ti_key_lkp_table(session=session, table_name='_airflow_tmp_ti_key_lkp')

session.commit()
for model, change_version, bad_ref_cfg in models_list:
log.debug("checking model %s", model.__tablename__)
# We can't use the model here since it may differ from the db state due to
Expand All @@ -1304,11 +1305,13 @@ class BadReferenceConfig:
if "run_id" in source_table.columns:
continue

session.commit()
bad_rows_subquery = bad_ref_cfg.exists_func(session, source_table, ti_lkp_table=ti_lkp_table)
dangling_table_name = _format_airflow_moved_table_name(source_table.name, change_version, 'dangling')
select_list = [x.label(x.name) for x in source_table.c]
log.debug(bad_rows_subquery.selectable.compile())
invalid_rows_query = session.query(*select_list).filter(~bad_rows_subquery.exists())
session.commit()
if dangling_table_name in existing_table_names:
invalid_row_count = invalid_rows_query.count()
if invalid_row_count <= 0:
Expand All @@ -1330,6 +1333,7 @@ class BadReferenceConfig:
dangling_table_name,
)
session.commit()
session.commit()
ti_lkp_table.drop(bind=settings.engine, checkfirst=True)


Expand Down

0 comments on commit d4794a3

Please sign in to comment.