diff --git a/airflow/jobs/backfill_job_runner.py b/airflow/jobs/backfill_job_runner.py index d28bbadff1154..2cd980363f49f 100644 --- a/airflow/jobs/backfill_job_runner.py +++ b/airflow/jobs/backfill_job_runner.py @@ -148,14 +148,12 @@ def __init__( :param args: :param kwargs: """ - super().__init__() + super().__init__(job) if job.job_type and job.job_type != self.job_type: raise Exception( f"The job is already assigned a different job_type: {job.job_type}." f"This is a bug and should be reported." ) - self.job = job - self.job.job_type = self.job_type self.dag = dag self.dag_id = dag.dag_id self.bf_start_date = start_date diff --git a/airflow/jobs/base_job_runner.py b/airflow/jobs/base_job_runner.py index da25da64e15e6..e5b2d4f05f94a 100644 --- a/airflow/jobs/base_job_runner.py +++ b/airflow/jobs/base_job_runner.py @@ -32,6 +32,10 @@ class BaseJobRunner: job_type = "undefined" + def __init__(self, job): + self.job = job + self.job.job_type = self.job_type + def _execute(self) -> int | None: """ Executes the logic connected to the runner. This method should be diff --git a/airflow/jobs/dag_processor_job_runner.py b/airflow/jobs/dag_processor_job_runner.py index 0485dbcfe74e7..ff01ddad25ac3 100644 --- a/airflow/jobs/dag_processor_job_runner.py +++ b/airflow/jobs/dag_processor_job_runner.py @@ -46,14 +46,12 @@ def __init__( *args, **kwargs, ): - super().__init__(*args, **kwargs) - self.job = job + super().__init__(job) if job.job_type and job.job_type != self.job_type: raise Exception( f"The job is already assigned a different job_type: {job.job_type}." f"This is a bug and should be reported." ) - self.job.job_type = self.job_type self.processor = processor self.processor.heartbeat = lambda: perform_heartbeat( job=self.job, diff --git a/airflow/jobs/local_task_job_runner.py b/airflow/jobs/local_task_job_runner.py index 5d20826f46f29..614b529e51b63 100644 --- a/airflow/jobs/local_task_job_runner.py +++ b/airflow/jobs/local_task_job_runner.py @@ -87,15 +87,13 @@ def __init__( pool: str | None = None, external_executor_id: str | None = None, ): - BaseJobRunner.__init__(self) + super().__init__(job) LoggingMixin.__init__(self, context=task_instance) if job.job_type and job.job_type != self.job_type: raise Exception( f"The job is already assigned a different job_type: {job.job_type}." f"This is a bug and should be reported." ) - self.job = job - self.job.job_type = self.job_type self.task_instance = task_instance self.ignore_all_deps = ignore_all_deps self.ignore_depends_on_past = ignore_depends_on_past diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index c7088412de220..564f41253ead1 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -156,14 +156,12 @@ def __init__( log: logging.Logger | None = None, processor_poll_interval: float | None = None, ): - super().__init__() + super().__init__(job) if job.job_type and job.job_type != self.job_type: raise Exception( f"The job is already assigned a different job_type: {job.job_type}." f"This is a bug and should be reported." ) - self.job = job - self.job.job_type = self.job_type self.subdir = subdir self.num_runs = num_runs # In specific tests, we want to stop the parse loop after the _files_ have been parsed a certain diff --git a/airflow/jobs/triggerer_job_runner.py b/airflow/jobs/triggerer_job_runner.py index fc2800dcd198c..725903b2ac3bd 100644 --- a/airflow/jobs/triggerer_job_runner.py +++ b/airflow/jobs/triggerer_job_runner.py @@ -252,14 +252,12 @@ def __init__( job: Job | JobPydantic, capacity=None, ): - super().__init__() + super().__init__(job) if job.job_type and job.job_type != self.job_type: raise Exception( f"The job is already assigned a different job_type: {job.job_type}." f"This is a bug and should be reported." ) - self.job = job - self.job.job_type = self.job_type if capacity is None: self.capacity = conf.getint("triggerer", "default_capacity", fallback=1000) elif isinstance(capacity, int) and capacity > 0: