Skip to content

Commit

Permalink
remove uneeded spaces
Browse files Browse the repository at this point in the history
  • Loading branch information
AmFlint committed May 10, 2023
1 parent 1cfdc41 commit 8dd17bc
Show file tree
Hide file tree
Showing 6 changed files with 9 additions and 15 deletions.
4 changes: 1 addition & 3 deletions airflow/jobs/backfill_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,14 +148,12 @@ def __init__(
:param args:
:param kwargs:
"""
super().__init__()
super().__init__(job)
if job.job_type and job.job_type != self.job_type:
raise Exception(
f"The job is already assigned a different job_type: {job.job_type}."
f"This is a bug and should be reported."
)
self.job = job
self.job.job_type = self.job_type
self.dag = dag
self.dag_id = dag.dag_id
self.bf_start_date = start_date
Expand Down
4 changes: 4 additions & 0 deletions airflow/jobs/base_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ class BaseJobRunner:

job_type = "undefined"

def __init__(self, job):
self.job = job
self.job.job_type = self.job_type

def _execute(self) -> int | None:
"""
Executes the logic connected to the runner. This method should be
Expand Down
4 changes: 1 addition & 3 deletions airflow/jobs/dag_processor_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,12 @@ def __init__(
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self.job = job
super().__init__(job)
if job.job_type and job.job_type != self.job_type:
raise Exception(
f"The job is already assigned a different job_type: {job.job_type}."
f"This is a bug and should be reported."
)
self.job.job_type = self.job_type
self.processor = processor
self.processor.heartbeat = lambda: perform_heartbeat(
job=self.job,
Expand Down
4 changes: 1 addition & 3 deletions airflow/jobs/local_task_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,13 @@ def __init__(
pool: str | None = None,
external_executor_id: str | None = None,
):
BaseJobRunner.__init__(self)
super().__init__(job)
LoggingMixin.__init__(self, context=task_instance)
if job.job_type and job.job_type != self.job_type:
raise Exception(
f"The job is already assigned a different job_type: {job.job_type}."
f"This is a bug and should be reported."
)
self.job = job
self.job.job_type = self.job_type
self.task_instance = task_instance
self.ignore_all_deps = ignore_all_deps
self.ignore_depends_on_past = ignore_depends_on_past
Expand Down
4 changes: 1 addition & 3 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,14 +156,12 @@ def __init__(
log: logging.Logger | None = None,
processor_poll_interval: float | None = None,
):
super().__init__()
super().__init__(job)
if job.job_type and job.job_type != self.job_type:
raise Exception(
f"The job is already assigned a different job_type: {job.job_type}."
f"This is a bug and should be reported."
)
self.job = job
self.job.job_type = self.job_type
self.subdir = subdir
self.num_runs = num_runs
# In specific tests, we want to stop the parse loop after the _files_ have been parsed a certain
Expand Down
4 changes: 1 addition & 3 deletions airflow/jobs/triggerer_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,14 +252,12 @@ def __init__(
job: Job | JobPydantic,
capacity=None,
):
super().__init__()
super().__init__(job)
if job.job_type and job.job_type != self.job_type:
raise Exception(
f"The job is already assigned a different job_type: {job.job_type}."
f"This is a bug and should be reported."
)
self.job = job
self.job.job_type = self.job_type
if capacity is None:
self.capacity = conf.getint("triggerer", "default_capacity", fallback=1000)
elif isinstance(capacity, int) and capacity > 0:
Expand Down

0 comments on commit 8dd17bc

Please sign in to comment.