Skip to content

Commit

Permalink
Don't just default to scheduler heartbeat in jobs (#33084)
Browse files Browse the repository at this point in the history
(cherry picked from commit c39359e)
  • Loading branch information
o-nikolas authored and ephraimbuddy committed Aug 9, 2023
1 parent 39e590c commit 6378943
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 3 deletions.
7 changes: 7 additions & 0 deletions airflow/config_templates/unit_tests.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,13 @@ scheduler_heartbeat_sec = 5
parsing_processes = 2
dag_dir_list_interval = 0

[triggerer]
# Those values are set so that during unit tests things run faster than usual.
# Triggerer heartbeat intentionally different from scheduler to catch bad assumptions in code
# that they are the same which by default they are in production but they can be configured differently
# by users.
job_heartbeat_sec = 2

[example_section]
# This section is used to test coercions of configuration values retrieval
string_value = 21600
Expand Down
11 changes: 9 additions & 2 deletions airflow/jobs/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,6 @@ class Job(Base, LoggingMixin):
Only makes sense for SchedulerJob and BackfillJob instances.
"""

heartrate = conf.getfloat("scheduler", "JOB_HEARTBEAT_SEC")

def __init__(self, executor=None, heartrate=None, **kwargs):
# Save init parameters as DB fields
self.hostname = get_hostname()
Expand All @@ -117,6 +115,15 @@ def __init__(self, executor=None, heartrate=None, **kwargs):
def executor(self):
return ExecutorLoader.get_default_executor()

@cached_property
def heartrate(self):
if self.job_type == "TriggererJob":
return conf.getfloat("triggerer", "JOB_HEARTBEAT_SEC")
else:
# Heartrate used to be hardcoded to scheduler, so in all other
# cases continue to use that value for back compat
return conf.getfloat("scheduler", "JOB_HEARTBEAT_SEC")

def is_alive(self, grace_multiplier=2.1):
"""
Is this job currently alive.
Expand Down
33 changes: 32 additions & 1 deletion tests/jobs/test_base_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from airflow.utils.state import State
from tests.listeners import lifecycle_listener
from tests.test_utils.config import conf_vars
from tests.utils.test_helpers import MockJobRunner
from tests.utils.test_helpers import MockJobRunner, SchedulerJobRunner, TriggererJobRunner


class TestJob:
Expand Down Expand Up @@ -90,6 +90,37 @@ def abort():
assert job.state == State.FAILED
assert job.end_date is not None

@pytest.mark.parametrize(
"job_runner, job_type,job_heartbeat_sec",
[(SchedulerJobRunner, "scheduler", "11"), (TriggererJobRunner, "triggerer", "9")],
)
def test_heart_rate_after_fetched_from_db(self, job_runner, job_type, job_heartbeat_sec):
"""Ensure heartrate is set correctly after jobs are queried from the DB"""
with create_session() as session, conf_vars(
{(job_type.lower(), "job_heartbeat_sec"): job_heartbeat_sec}
):
job = Job()
job_runner(job=job)
session.add(job)
session.flush()

most_recent = most_recent_job(job_runner.job_type, session=session)
assert most_recent.heartrate == float(job_heartbeat_sec)

session.rollback()

@pytest.mark.parametrize(
"job_runner, job_type,job_heartbeat_sec",
[(SchedulerJobRunner, "scheduler", "11"), (TriggererJobRunner, "triggerer", "9")],
)
def test_heart_rate_via_constructor_persists(self, job_runner, job_type, job_heartbeat_sec):
"""Ensure heartrate passed via constructor is set correctly"""
with conf_vars({(job_type.lower(), "job_heartbeat_sec"): job_heartbeat_sec}):
job = Job(heartrate=12)
job_runner(job)
# heartrate should be 12 since we passed that to the constructor directly
assert job.heartrate == 12

def test_most_recent_job(self):
with create_session() as session:
old_job = Job(heartrate=10)
Expand Down
8 changes: 8 additions & 0 deletions tests/utils/test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,3 +343,11 @@ def _execute(self):
if self.func is not None:
return self.func()
return None


class SchedulerJobRunner(MockJobRunner):
job_type = "SchedulerJob"


class TriggererJobRunner(MockJobRunner):
job_type = "TriggererJob"

0 comments on commit 6378943

Please sign in to comment.