From a272afefffcc0a5b319161a3e02f2c02570af81e Mon Sep 17 00:00:00 2001 From: Bill Franklin Date: Tue, 19 Apr 2022 11:57:54 +0100 Subject: [PATCH] Initialize finished counter at zero Sets initial count of task finished state to zero. This enables acquiring the rate from zero to one (particularly useful if you want to alert on any failures). We're using the Prometheus statsd-exporter. Since counters are usually used with a PromQL function like `rate`, it's important that counters are initialized at zero, otherwise when a task finishes the rate function will not have a previous value to compare the state count to. For example, what we'd like to do: ``` sum by (dag_id, task_id) (rate(airflow_ti_finish{state='failed'}[1h])) > 0 ``` This tells us the failure rate of tasks over time. What I've tried to do instead to ensure the metric captures the change from zero to one: ``` (sum by (dag_id, task_id) (rate(airflow_ti_finish{state='failed'}[1h])) > 0) or sum by (dag_id, task_id) (airflow_ti_finish{state='failed'} != 0 unless (airflow_ti_finish{state='failed'} offset 1m)) ``` Two useful posts on this subject: https://www.robustperception.io/why-predeclare-metrics https://www.section.io/blog/beware-prometheus-counters-that-do-not-begin-at-zero/ --- airflow/models/taskinstance.py | 3 +++ tests/models/test_taskinstance.py | 4 +++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 73f38e1a9d4af..412b06cd295c7 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -1431,6 +1431,9 @@ def _run_raw_task( session.commit() actual_start_date = timezone.utcnow() Stats.incr(f'ti.start.{self.task.dag_id}.{self.task.task_id}') + # Initialize final state counters at zero + for state in State.task_states: + Stats.incr(f'ti.finish.{self.task.dag_id}.{self.task.task_id}.{state}', count=0) try: if not mark_success: self.task = self.task.prepare_for_execution() diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index 2c52fd0eb6ab9..d104e798353d1 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -2020,8 +2020,10 @@ def test_task_stats(self, stats_mock, create_task_instance): ti._run_raw_task() ti.refresh_from_db() stats_mock.assert_called_with(f'ti.finish.{ti.dag_id}.{ti.task_id}.{ti.state}') + for state in State.task_states: + assert call(f'ti.finish.{ti.dag_id}.{ti.task_id}.{state}', count=0) in stats_mock.mock_calls assert call(f'ti.start.{ti.dag_id}.{ti.task_id}') in stats_mock.mock_calls - assert stats_mock.call_count == 4 + assert stats_mock.call_count == 19 def test_command_as_list(self, create_task_instance): ti = create_task_instance()