Skip to content

Commit

Permalink
Better typing for Job and JobRunners
Browse files Browse the repository at this point in the history
By avoiding setting the job in the BaseJobRunner, the typing for Runners
and Job and JobPydantic is now more complete and accurate.

Scheduler and Backfill Runners limit their code to Job and can use all
the things that ORM Job allows them to do

Other runners are limited to union of Job and JobPydantic version so
that they can be run on the client side of the internal API without
having all the Job features.

This is a follow up after apache#31182 that fixed missing job_type for
DagProcessor Job and nicely extracted job to BaseRunner but broke
MyPy/Typing guards implemented in the runners that should aid the AIP-44
implementation.
  • Loading branch information
potiuk committed May 14, 2023
1 parent 3193857 commit a577260
Show file tree
Hide file tree
Showing 6 changed files with 8 additions and 3 deletions.
1 change: 1 addition & 0 deletions airflow/jobs/backfill_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ def __init__(
:param kwargs:
"""
super().__init__(job)
self.job = job
self.dag = dag
self.dag_id = dag.dag_id
self.bf_start_date = start_date
Expand Down
6 changes: 3 additions & 3 deletions airflow/jobs/base_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,21 @@
from sqlalchemy.orm import Session

from airflow.jobs.job import Job
from airflow.serialization.pydantic.job import JobPydantic


class BaseJobRunner:
"""Abstract class for job runners to derive from."""

job_type = "undefined"

def __init__(self, job):
def __init__(self, job: Job | JobPydantic) -> None:
if job.job_type and job.job_type != self.job_type:
raise Exception(
f"The job is already assigned a different job_type: {job.job_type}."
f"This is a bug and should be reported."
)
self.job = job
self.job.job_type = self.job_type
job.job_type = self.job_type

def _execute(self) -> int | None:
"""
Expand Down
1 change: 1 addition & 0 deletions airflow/jobs/dag_processor_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def __init__(
**kwargs,
):
super().__init__(job)
self.job = job
self.processor = processor
self.processor.heartbeat = lambda: perform_heartbeat(
job=self.job,
Expand Down
1 change: 1 addition & 0 deletions airflow/jobs/local_task_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def __init__(
external_executor_id: str | None = None,
):
super().__init__(job)
self.job = job
LoggingMixin.__init__(self, context=task_instance)
self.task_instance = task_instance
self.ignore_all_deps = ignore_all_deps
Expand Down
1 change: 1 addition & 0 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ def __init__(
processor_poll_interval: float | None = None,
):
super().__init__(job)
self.job = job
self.subdir = subdir
self.num_runs = num_runs
# In specific tests, we want to stop the parse loop after the _files_ have been parsed a certain
Expand Down
1 change: 1 addition & 0 deletions airflow/jobs/triggerer_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ def __init__(
capacity=None,
):
super().__init__(job)
self.job = job
if capacity is None:
self.capacity = conf.getint("triggerer", "default_capacity", fallback=1000)
elif isinstance(capacity, int) and capacity > 0:
Expand Down

0 comments on commit a577260

Please sign in to comment.