Skip to content

Commit

Permalink
Simplify conditions on len() in jobs (#33568)
Browse files Browse the repository at this point in the history
  • Loading branch information
eumiro authored Aug 21, 2023
1 parent 95a930b commit eb56473
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 8 deletions.
8 changes: 2 additions & 6 deletions airflow/jobs/backfill_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ def _process_backfill_task_instances(

is_unit_test = airflow_conf.getboolean("core", "unit_test_mode")

while (len(ti_status.to_run) > 0 or len(ti_status.running) > 0) and len(ti_status.deadlocked) == 0:
while (ti_status.to_run or ti_status.running) and not ti_status.deadlocked:
self.log.debug("*** Clearing out not_ready list ***")
ti_status.not_ready.clear()

Expand Down Expand Up @@ -677,11 +677,7 @@ def _per_task_process(key, ti: TaskInstance, session):
# If the set of tasks that aren't ready ever equals the set of
# tasks to run and there are no running tasks then the backfill
# is deadlocked
if (
ti_status.not_ready
and ti_status.not_ready == set(ti_status.to_run)
and len(ti_status.running) == 0
):
if ti_status.not_ready and ti_status.not_ready == set(ti_status.to_run) and not ti_status.running:
self.log.warning("Deadlock discovered for ti_status.to_run=%s", ti_status.to_run.values())
ti_status.deadlocked.update(ti_status.to_run.values())
ti_status.to_run.clear()
Expand Down
4 changes: 2 additions & 2 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session) -
# TODO[HA]: This was wrong before anyway, as it only looked at a sub-set of dags, not everything.
# Stats.gauge('scheduler.tasks.pending', len(task_instances_to_examine))

if len(task_instances_to_examine) == 0:
if not task_instances_to_examine:
self.log.debug("No tasks to consider for execution.")
break

Expand Down Expand Up @@ -588,7 +588,7 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session) -
Stats.gauge("scheduler.tasks.starving", num_starving_tasks_total)
Stats.gauge("scheduler.tasks.executable", len(executable_tis))

if len(executable_tis) > 0:
if executable_tis:
task_instance_str = "\n".join(f"\t{x!r}" for x in executable_tis)
self.log.info("Setting the following tasks to queued state:\n%s", task_instance_str)

Expand Down

0 comments on commit eb56473

Please sign in to comment.