From df4026a47aa6141142e186ec853ba286c2be0e95 Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Wed, 28 Jun 2023 07:32:36 +0200 Subject: [PATCH] Remove executor_class from Job - fixing backfil for custom executors (#32219) The executor_class in job was used in the past to pass information about executor being used but it has been replaced by executor passed via JobRunner when needed. The executor_class has only been used in backfill job, but it has been used wrongly - because it had no fully qualified path there. The right way of using it is to check the executor passed via JobRunner, and since executor_class is not used any more, we can safely remove it. This fixes a bug when custom exceutors could not be used during backfill. Co-authored-by: Andrew Halpern <57367683+adh-wonolo@users.noreply.github.com> --- airflow/jobs/backfill_job_runner.py | 5 +---- airflow/jobs/job.py | 3 --- tests/jobs/test_base_job.py | 1 - 3 files changed, 1 insertion(+), 8 deletions(-) diff --git a/airflow/jobs/backfill_job_runner.py b/airflow/jobs/backfill_job_runner.py index 6ae6ae27cbd40..5b13490be7e62 100644 --- a/airflow/jobs/backfill_job_runner.py +++ b/airflow/jobs/backfill_job_runner.py @@ -540,10 +540,7 @@ def _per_task_process(key, ti: TaskInstance, session): cfg_path = None - executor_class, _ = ExecutorLoader.import_executor_cls( - self.job.executor_class, - ) - if executor_class.is_local: + if executor.is_local: cfg_path = tmp_configuration_copy() executor.queue_task_instance( diff --git a/airflow/jobs/job.py b/airflow/jobs/job.py index 3c2caa3805958..99394cbc4553f 100644 --- a/airflow/jobs/job.py +++ b/airflow/jobs/job.py @@ -104,9 +104,6 @@ def __init__(self, executor=None, heartrate=None, **kwargs): self.hostname = get_hostname() if executor: self.executor = executor - self.executor_class = executor.__class__.__name__ - else: - self.executor_class = conf.get("core", "EXECUTOR") self.start_date = timezone.utcnow() self.latest_heartbeat = timezone.utcnow() if heartrate is not None: diff --git a/tests/jobs/test_base_job.py b/tests/jobs/test_base_job.py index c646979237b7f..0e36bd45713a4 100644 --- a/tests/jobs/test_base_job.py +++ b/tests/jobs/test_base_job.py @@ -204,7 +204,6 @@ def test_essential_attr(self, mock_getuser, mock_hostname, mock_default_executor test_job = Job(heartrate=10, dag_id="example_dag", state=State.RUNNING) MockJobRunner(job=test_job) - assert test_job.executor_class == "SequentialExecutor" assert test_job.heartrate == 10 assert test_job.dag_id == "example_dag" assert test_job.hostname == "test_hostname"