Skip to content

Commit

Permalink
Make sure that DAG processor job row has filed value in job_type co…
Browse files Browse the repository at this point in the history
…lumn (#31182)

* fix dag processor job_runner, add missing job_type

* remove uneeded spaces

* move job_type check to base_runner

* fix test mock job runner pass job argument to init

(cherry picked from commit 420a9b1)
  • Loading branch information
AmFlint authored and ephraimbuddy committed May 10, 2023
1 parent bdae7b1 commit 711d6f0
Show file tree
Hide file tree
Showing 7 changed files with 15 additions and 40 deletions.
9 changes: 1 addition & 8 deletions airflow/jobs/backfill_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,14 +148,7 @@ def __init__(
:param args:
:param kwargs:
"""
super().__init__()
if job.job_type and job.job_type != self.job_type:
raise Exception(
f"The job is already assigned a different job_type: {job.job_type}."
f"This is a bug and should be reported."
)
self.job = job
self.job.job_type = self.job_type
super().__init__(job)
self.dag = dag
self.dag_id = dag.dag_id
self.bf_start_date = start_date
Expand Down
9 changes: 9 additions & 0 deletions airflow/jobs/base_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ class BaseJobRunner:

job_type = "undefined"

def __init__(self, job):
if job.job_type and job.job_type != self.job_type:
raise Exception(
f"The job is already assigned a different job_type: {job.job_type}."
f"This is a bug and should be reported."
)
self.job = job
self.job.job_type = self.job_type

def _execute(self) -> int | None:
"""
Executes the logic connected to the runner. This method should be
Expand Down
8 changes: 1 addition & 7 deletions airflow/jobs/dag_processor_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,7 @@ def __init__(
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self.job = job
if job.job_type and job.job_type != self.job_type:
raise Exception(
f"The job is already assigned a different job_type: {job.job_type}."
f"This is a bug and should be reported."
)
super().__init__(job)
self.processor = processor
self.processor.heartbeat = lambda: perform_heartbeat(
job=self.job,
Expand Down
9 changes: 1 addition & 8 deletions airflow/jobs/local_task_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,8 @@ def __init__(
pool: str | None = None,
external_executor_id: str | None = None,
):
BaseJobRunner.__init__(self)
super().__init__(job)
LoggingMixin.__init__(self, context=task_instance)
if job.job_type and job.job_type != self.job_type:
raise Exception(
f"The job is already assigned a different job_type: {job.job_type}."
f"This is a bug and should be reported."
)
self.job = job
self.job.job_type = self.job_type
self.task_instance = task_instance
self.ignore_all_deps = ignore_all_deps
self.ignore_depends_on_past = ignore_depends_on_past
Expand Down
9 changes: 1 addition & 8 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,14 +156,7 @@ def __init__(
log: logging.Logger | None = None,
processor_poll_interval: float | None = None,
):
super().__init__()
if job.job_type and job.job_type != self.job_type:
raise Exception(
f"The job is already assigned a different job_type: {job.job_type}."
f"This is a bug and should be reported."
)
self.job = job
self.job.job_type = self.job_type
super().__init__(job)
self.subdir = subdir
self.num_runs = num_runs
# In specific tests, we want to stop the parse loop after the _files_ have been parsed a certain
Expand Down
9 changes: 1 addition & 8 deletions airflow/jobs/triggerer_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,14 +252,7 @@ def __init__(
job: Job | JobPydantic,
capacity=None,
):
super().__init__()
if job.job_type and job.job_type != self.job_type:
raise Exception(
f"The job is already assigned a different job_type: {job.job_type}."
f"This is a bug and should be reported."
)
self.job = job
self.job.job_type = self.job_type
super().__init__(job)
if capacity is None:
self.capacity = conf.getint("triggerer", "default_capacity", fallback=1000)
elif isinstance(capacity, int) and capacity > 0:
Expand Down
2 changes: 1 addition & 1 deletion tests/utils/test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ class MockJobRunner(BaseJobRunner):
job_type = "MockJob"

def __init__(self, job: Job | JobPydantic, func=None):
super().__init__()
super().__init__(job)
self.job = job
self.job.job_type = self.job_type
self.func = func
Expand Down

0 comments on commit 711d6f0

Please sign in to comment.