From a577260163b6c94ded1e18c6a71056bbe8f9989a Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Mon, 15 May 2023 01:08:58 +0200 Subject: [PATCH] Better typing for Job and JobRunners By avoiding setting the job in the BaseJobRunner, the typing for Runners and Job and JobPydantic is now more complete and accurate. Scheduler and Backfill Runners limit their code to Job and can use all the things that ORM Job allows them to do Other runners are limited to union of Job and JobPydantic version so that they can be run on the client side of the internal API without having all the Job features. This is a follow up after #31182 that fixed missing job_type for DagProcessor Job and nicely extracted job to BaseRunner but broke MyPy/Typing guards implemented in the runners that should aid the AIP-44 implementation. --- airflow/jobs/backfill_job_runner.py | 1 + airflow/jobs/base_job_runner.py | 6 +++--- airflow/jobs/dag_processor_job_runner.py | 1 + airflow/jobs/local_task_job_runner.py | 1 + airflow/jobs/scheduler_job_runner.py | 1 + airflow/jobs/triggerer_job_runner.py | 1 + 6 files changed, 8 insertions(+), 3 deletions(-) diff --git a/airflow/jobs/backfill_job_runner.py b/airflow/jobs/backfill_job_runner.py index 65dfc0b54e97c..ebc92187f9551 100644 --- a/airflow/jobs/backfill_job_runner.py +++ b/airflow/jobs/backfill_job_runner.py @@ -149,6 +149,7 @@ def __init__( :param kwargs: """ super().__init__(job) + self.job = job self.dag = dag self.dag_id = dag.dag_id self.bf_start_date = start_date diff --git a/airflow/jobs/base_job_runner.py b/airflow/jobs/base_job_runner.py index eeaff887df198..5b0af0dbecd01 100644 --- a/airflow/jobs/base_job_runner.py +++ b/airflow/jobs/base_job_runner.py @@ -25,6 +25,7 @@ from sqlalchemy.orm import Session from airflow.jobs.job import Job + from airflow.serialization.pydantic.job import JobPydantic class BaseJobRunner: @@ -32,14 +33,13 @@ class BaseJobRunner: job_type = "undefined" - def __init__(self, job): + def __init__(self, job: Job | JobPydantic) -> None: if job.job_type and job.job_type != self.job_type: raise Exception( f"The job is already assigned a different job_type: {job.job_type}." f"This is a bug and should be reported." ) - self.job = job - self.job.job_type = self.job_type + job.job_type = self.job_type def _execute(self) -> int | None: """ diff --git a/airflow/jobs/dag_processor_job_runner.py b/airflow/jobs/dag_processor_job_runner.py index c8e114cb0b296..b2420726c6a99 100644 --- a/airflow/jobs/dag_processor_job_runner.py +++ b/airflow/jobs/dag_processor_job_runner.py @@ -47,6 +47,7 @@ def __init__( **kwargs, ): super().__init__(job) + self.job = job self.processor = processor self.processor.heartbeat = lambda: perform_heartbeat( job=self.job, diff --git a/airflow/jobs/local_task_job_runner.py b/airflow/jobs/local_task_job_runner.py index da821f2288d91..2fb4eaaba3ef6 100644 --- a/airflow/jobs/local_task_job_runner.py +++ b/airflow/jobs/local_task_job_runner.py @@ -88,6 +88,7 @@ def __init__( external_executor_id: str | None = None, ): super().__init__(job) + self.job = job LoggingMixin.__init__(self, context=task_instance) self.task_instance = task_instance self.ignore_all_deps = ignore_all_deps diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 337f317df845c..9847372eae14e 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -157,6 +157,7 @@ def __init__( processor_poll_interval: float | None = None, ): super().__init__(job) + self.job = job self.subdir = subdir self.num_runs = num_runs # In specific tests, we want to stop the parse loop after the _files_ have been parsed a certain diff --git a/airflow/jobs/triggerer_job_runner.py b/airflow/jobs/triggerer_job_runner.py index d26b52f6376e7..4651b6cf9210b 100644 --- a/airflow/jobs/triggerer_job_runner.py +++ b/airflow/jobs/triggerer_job_runner.py @@ -253,6 +253,7 @@ def __init__( capacity=None, ): super().__init__(job) + self.job = job if capacity is None: self.capacity = conf.getint("triggerer", "default_capacity", fallback=1000) elif isinstance(capacity, int) and capacity > 0: