From eb56473b40d2717758613ead1cdf712c33347082 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20=C5=A0ediv=C3=BD?= <6774676+eumiro@users.noreply.github.com> Date: Mon, 21 Aug 2023 05:37:07 +0000 Subject: [PATCH] Simplify conditions on len() in jobs (#33568) --- airflow/jobs/backfill_job_runner.py | 8 ++------ airflow/jobs/scheduler_job_runner.py | 4 ++-- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/airflow/jobs/backfill_job_runner.py b/airflow/jobs/backfill_job_runner.py index 0e39fb0e0246..f199ce698256 100644 --- a/airflow/jobs/backfill_job_runner.py +++ b/airflow/jobs/backfill_job_runner.py @@ -451,7 +451,7 @@ def _process_backfill_task_instances( is_unit_test = airflow_conf.getboolean("core", "unit_test_mode") - while (len(ti_status.to_run) > 0 or len(ti_status.running) > 0) and len(ti_status.deadlocked) == 0: + while (ti_status.to_run or ti_status.running) and not ti_status.deadlocked: self.log.debug("*** Clearing out not_ready list ***") ti_status.not_ready.clear() @@ -677,11 +677,7 @@ def _per_task_process(key, ti: TaskInstance, session): # If the set of tasks that aren't ready ever equals the set of # tasks to run and there are no running tasks then the backfill # is deadlocked - if ( - ti_status.not_ready - and ti_status.not_ready == set(ti_status.to_run) - and len(ti_status.running) == 0 - ): + if ti_status.not_ready and ti_status.not_ready == set(ti_status.to_run) and not ti_status.running: self.log.warning("Deadlock discovered for ti_status.to_run=%s", ti_status.to_run.values()) ti_status.deadlocked.update(ti_status.to_run.values()) ti_status.to_run.clear() diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 947fec0a7b4b..f128c7857351 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -405,7 +405,7 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session) - # TODO[HA]: This was wrong before anyway, as it only looked at a sub-set of dags, not everything. # Stats.gauge('scheduler.tasks.pending', len(task_instances_to_examine)) - if len(task_instances_to_examine) == 0: + if not task_instances_to_examine: self.log.debug("No tasks to consider for execution.") break @@ -588,7 +588,7 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session) - Stats.gauge("scheduler.tasks.starving", num_starving_tasks_total) Stats.gauge("scheduler.tasks.executable", len(executable_tis)) - if len(executable_tis) > 0: + if executable_tis: task_instance_str = "\n".join(f"\t{x!r}" for x in executable_tis) self.log.info("Setting the following tasks to queued state:\n%s", task_instance_str)