Skip to content

Commit

Permalink
Remove executor_class from Job - fixing backfil for custom executors (#…
Browse files Browse the repository at this point in the history
…32219)

The executor_class in job was used in the past to pass information
about executor being used but it has been replaced by executor
passed via JobRunner when needed. The executor_class has only
been used in backfill job, but it has been used wrongly - because
it had no fully qualified path there.

The right way of using it is to check the executor passed via
JobRunner, and since executor_class is not used any more, we
can safely remove it.

This fixes a bug when custom exceutors could not be used
during backfill.

Co-authored-by: Andrew Halpern <[email protected]>
  • Loading branch information
potiuk and adh-wonolo authored Jun 28, 2023
1 parent be057d8 commit df4026a
Show file tree
Hide file tree
Showing 3 changed files with 1 addition and 8 deletions.
5 changes: 1 addition & 4 deletions airflow/jobs/backfill_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -540,10 +540,7 @@ def _per_task_process(key, ti: TaskInstance, session):

cfg_path = None

executor_class, _ = ExecutorLoader.import_executor_cls(
self.job.executor_class,
)
if executor_class.is_local:
if executor.is_local:
cfg_path = tmp_configuration_copy()

executor.queue_task_instance(
Expand Down
3 changes: 0 additions & 3 deletions airflow/jobs/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,6 @@ def __init__(self, executor=None, heartrate=None, **kwargs):
self.hostname = get_hostname()
if executor:
self.executor = executor
self.executor_class = executor.__class__.__name__
else:
self.executor_class = conf.get("core", "EXECUTOR")
self.start_date = timezone.utcnow()
self.latest_heartbeat = timezone.utcnow()
if heartrate is not None:
Expand Down
1 change: 0 additions & 1 deletion tests/jobs/test_base_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,6 @@ def test_essential_attr(self, mock_getuser, mock_hostname, mock_default_executor

test_job = Job(heartrate=10, dag_id="example_dag", state=State.RUNNING)
MockJobRunner(job=test_job)
assert test_job.executor_class == "SequentialExecutor"
assert test_job.heartrate == 10
assert test_job.dag_id == "example_dag"
assert test_job.hostname == "test_hostname"
Expand Down

0 comments on commit df4026a

Please sign in to comment.