Skip to content

Commit

Permalink
Initialize finished counter at zero (#23080)
Browse files Browse the repository at this point in the history
Sets initial count of task finished state to zero.
This enables acquiring the rate from zero to one
(particularly useful if you want to alert on any failures).

We're using the Prometheus statsd-exporter. Since counters
are usually used with a PromQL function like `rate`, it's important
that counters are initialized at zero, otherwise when a task
finishes the rate function will not have a previous value to compare
the state count to.

For example, what we'd like to do:

```
sum by (dag_id, task_id) (rate(airflow_ti_finish{state='failed'}[1h])) >
0
```

This tells us the failure rate of tasks over time.

What I've tried to do instead to ensure the metric captures the change
from zero to one:

```
(sum by (dag_id, task_id) (rate(airflow_ti_finish{state='failed'}[1h])) > 0) or sum by (dag_id, task_id) (airflow_ti_finish{state='failed'} != 0 unless (airflow_ti_finish{state='failed'} offset 1m))
```

Two useful posts on this subject:
https://www.robustperception.io/why-predeclare-metrics
https://www.section.io/blog/beware-prometheus-counters-that-do-not-begin-at-zero/

Co-authored-by: Bill Franklin <[email protected]>
  • Loading branch information
bilbof and Bill Franklin authored Apr 22, 2022
1 parent 28dc17c commit 3b2ef88
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 1 deletion.
3 changes: 3 additions & 0 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1431,6 +1431,9 @@ def _run_raw_task(
session.commit()
actual_start_date = timezone.utcnow()
Stats.incr(f'ti.start.{self.task.dag_id}.{self.task.task_id}')
# Initialize final state counters at zero
for state in State.task_states:
Stats.incr(f'ti.finish.{self.task.dag_id}.{self.task.task_id}.{state}', count=0)
try:
if not mark_success:
self.task = self.task.prepare_for_execution()
Expand Down
4 changes: 3 additions & 1 deletion tests/models/test_taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -2020,8 +2020,10 @@ def test_task_stats(self, stats_mock, create_task_instance):
ti._run_raw_task()
ti.refresh_from_db()
stats_mock.assert_called_with(f'ti.finish.{ti.dag_id}.{ti.task_id}.{ti.state}')
for state in State.task_states:
assert call(f'ti.finish.{ti.dag_id}.{ti.task_id}.{state}', count=0) in stats_mock.mock_calls
assert call(f'ti.start.{ti.dag_id}.{ti.task_id}') in stats_mock.mock_calls
assert stats_mock.call_count == 4
assert stats_mock.call_count == 19

def test_command_as_list(self, create_task_instance):
ti = create_task_instance()
Expand Down

0 comments on commit 3b2ef88

Please sign in to comment.