diff --git a/airflow/config_templates/unit_tests.cfg b/airflow/config_templates/unit_tests.cfg index 29d422704e55..69c2d65bba0f 100644 --- a/airflow/config_templates/unit_tests.cfg +++ b/airflow/config_templates/unit_tests.cfg @@ -92,6 +92,13 @@ scheduler_heartbeat_sec = 5 parsing_processes = 2 dag_dir_list_interval = 0 +[triggerer] +# Those values are set so that during unit tests things run faster than usual. +# Triggerer heartbeat intentionally different from scheduler to catch bad assumptions in code +# that they are the same which by default they are in production but they can be configured differently +# by users. +job_heartbeat_sec = 2 + [example_section] # This section is used to test coercions of configuration values retrieval string_value = 21600 diff --git a/airflow/jobs/job.py b/airflow/jobs/job.py index bf404fc8210e..264eed15aa9f 100644 --- a/airflow/jobs/job.py +++ b/airflow/jobs/job.py @@ -97,8 +97,6 @@ class Job(Base, LoggingMixin): Only makes sense for SchedulerJob and BackfillJob instances. """ - heartrate = conf.getfloat("scheduler", "JOB_HEARTBEAT_SEC") - def __init__(self, executor=None, heartrate=None, **kwargs): # Save init parameters as DB fields self.hostname = get_hostname() @@ -117,6 +115,15 @@ def __init__(self, executor=None, heartrate=None, **kwargs): def executor(self): return ExecutorLoader.get_default_executor() + @cached_property + def heartrate(self): + if self.job_type == "TriggererJob": + return conf.getfloat("triggerer", "JOB_HEARTBEAT_SEC") + else: + # Heartrate used to be hardcoded to scheduler, so in all other + # cases continue to use that value for back compat + return conf.getfloat("scheduler", "JOB_HEARTBEAT_SEC") + def is_alive(self, grace_multiplier=2.1): """ Is this job currently alive. diff --git a/tests/jobs/test_base_job.py b/tests/jobs/test_base_job.py index ee8e8e0b4ffd..998cc3bebe0f 100644 --- a/tests/jobs/test_base_job.py +++ b/tests/jobs/test_base_job.py @@ -32,7 +32,7 @@ from airflow.utils.state import State from tests.listeners import lifecycle_listener from tests.test_utils.config import conf_vars -from tests.utils.test_helpers import MockJobRunner +from tests.utils.test_helpers import MockJobRunner, SchedulerJobRunner, TriggererJobRunner class TestJob: @@ -90,6 +90,37 @@ def abort(): assert job.state == State.FAILED assert job.end_date is not None + @pytest.mark.parametrize( + "job_runner, job_type,job_heartbeat_sec", + [(SchedulerJobRunner, "scheduler", "11"), (TriggererJobRunner, "triggerer", "9")], + ) + def test_heart_rate_after_fetched_from_db(self, job_runner, job_type, job_heartbeat_sec): + """Ensure heartrate is set correctly after jobs are queried from the DB""" + with create_session() as session, conf_vars( + {(job_type.lower(), "job_heartbeat_sec"): job_heartbeat_sec} + ): + job = Job() + job_runner(job=job) + session.add(job) + session.flush() + + most_recent = most_recent_job(job_runner.job_type, session=session) + assert most_recent.heartrate == float(job_heartbeat_sec) + + session.rollback() + + @pytest.mark.parametrize( + "job_runner, job_type,job_heartbeat_sec", + [(SchedulerJobRunner, "scheduler", "11"), (TriggererJobRunner, "triggerer", "9")], + ) + def test_heart_rate_via_constructor_persists(self, job_runner, job_type, job_heartbeat_sec): + """Ensure heartrate passed via constructor is set correctly""" + with conf_vars({(job_type.lower(), "job_heartbeat_sec"): job_heartbeat_sec}): + job = Job(heartrate=12) + job_runner(job) + # heartrate should be 12 since we passed that to the constructor directly + assert job.heartrate == 12 + def test_most_recent_job(self): with create_session() as session: old_job = Job(heartrate=10) diff --git a/tests/utils/test_helpers.py b/tests/utils/test_helpers.py index 3b72768fc75f..9d6020874c86 100644 --- a/tests/utils/test_helpers.py +++ b/tests/utils/test_helpers.py @@ -343,3 +343,11 @@ def _execute(self): if self.func is not None: return self.func() return None + + +class SchedulerJobRunner(MockJobRunner): + job_type = "SchedulerJob" + + +class TriggererJobRunner(MockJobRunner): + job_type = "TriggererJob"