Skip to content

Commit

Permalink
Refactor unneeded 'continue' jumps in api (#33842)
Browse files Browse the repository at this point in the history
  • Loading branch information
eumiro authored Sep 1, 2023
1 parent 8a9988a commit 3b58a38
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 17 deletions.
4 changes: 1 addition & 3 deletions airflow/api/common/delete_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,7 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session: Session =
count = 0

for model in get_sqla_model_classes():
if hasattr(model, "dag_id"):
if keep_records_in_log and model.__name__ == "Log":
continue
if hasattr(model, "dag_id") and (not keep_records_in_log or model.__name__ != "Log"):
count += session.execute(
delete(model)
.where(model.dag_id.in_(dags_to_delete))
Expand Down
26 changes: 12 additions & 14 deletions airflow/api/common/mark_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,15 @@ def _create_dagruns(
}

for info in infos:
if info.logical_date in dag_runs:
continue
dag_runs[info.logical_date] = dag.create_dagrun(
execution_date=info.logical_date,
data_interval=info.data_interval,
start_date=timezone.utcnow(),
external_trigger=False,
state=state,
run_type=run_type,
)
if info.logical_date not in dag_runs:
dag_runs[info.logical_date] = dag.create_dagrun(
execution_date=info.logical_date,
data_interval=info.data_interval,
start_date=timezone.utcnow(),
external_trigger=False,
state=state,
run_type=run_type,
)
return dag_runs.values()


Expand Down Expand Up @@ -493,10 +492,9 @@ def set_dag_run_state_to_failed(

tasks = []
for task in dag.tasks:
if task.task_id not in task_ids_of_running_tis:
continue
task.dag = dag
tasks.append(task)
if task.task_id in task_ids_of_running_tis:
task.dag = dag
tasks.append(task)

# Mark non-finished tasks as SKIPPED.
tis = session.scalars(
Expand Down

0 comments on commit 3b58a38

Please sign in to comment.