Skip to content

Commit

Permalink
Fix dag task scheduled and queued duration metrics (apache#37936)
Browse files Browse the repository at this point in the history
  • Loading branch information
htpawel authored and romsharon98 committed Jul 26, 2024
1 parent 9a0410b commit 4025b98
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 16 deletions.
4 changes: 2 additions & 2 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -2835,7 +2835,7 @@ def emit_state_change_metric(self, new_state: TaskInstanceState) -> None:
self.task_id,
)
return
timing = (timezone.utcnow() - self.queued_dttm).total_seconds()
timing = timezone.utcnow() - self.queued_dttm
elif new_state == TaskInstanceState.QUEUED:
metric_name = "scheduled_duration"
if self.start_date is None:
Expand All @@ -2848,7 +2848,7 @@ def emit_state_change_metric(self, new_state: TaskInstanceState) -> None:
self.task_id,
)
return
timing = (timezone.utcnow() - self.start_date).total_seconds()
timing = timezone.utcnow() - self.start_date
else:
raise NotImplementedError("no metric emission setup for state %s", new_state)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,21 +257,21 @@ Name Description
================================================================ ========================================================================
``dagrun.dependency-check.<dag_id>`` Milliseconds taken to check DAG dependencies
``dagrun.dependency-check`` Milliseconds taken to check DAG dependencies. Metric with dag_id tagging.
``dag.<dag_id>.<task_id>.duration`` Seconds taken to run a task
``task.duration`` Seconds taken to run a task. Metric with dag_id and task-id tagging.
``dag.<dag_id>.<task_id>.scheduled_duration`` Seconds a task spends in the Scheduled state, before being Queued
``task.scheduled_duration`` Seconds a task spends in the Scheduled state, before being Queued.
``dag.<dag_id>.<task_id>.duration`` Milliseconds taken to run a task
``task.duration`` Milliseconds taken to run a task. Metric with dag_id and task-id tagging.
``dag.<dag_id>.<task_id>.scheduled_duration`` Milliseconds a task spends in the Scheduled state, before being Queued
``task.scheduled_duration`` Milliseconds a task spends in the Scheduled state, before being Queued.
Metric with dag_id and task_id tagging.
``dag.<dag_id>.<task_id>.queued_duration`` Seconds a task spends in the Queued state, before being Running
``task.queued_duration`` Seconds a task spends in the Queued state, before being Running.
``dag.<dag_id>.<task_id>.queued_duration`` Milliseconds a task spends in the Queued state, before being Running
``task.queued_duration`` Milliseconds a task spends in the Queued state, before being Running.
Metric with dag_id and task_id tagging.
``dag_processing.last_duration.<dag_file>`` Seconds taken to load the given DAG file
``dag_processing.last_duration`` Seconds taken to load the given DAG file. Metric with file_name tagging.
``dagrun.duration.success.<dag_id>`` Seconds taken for a DagRun to reach success state
``dagrun.duration.success`` Seconds taken for a DagRun to reach success state.
``dag_processing.last_duration.<dag_file>`` Milliseconds taken to load the given DAG file
``dag_processing.last_duration`` Milliseconds taken to load the given DAG file. Metric with file_name tagging.
``dagrun.duration.success.<dag_id>`` Milliseconds taken for a DagRun to reach success state
``dagrun.duration.success`` Milliseconds taken for a DagRun to reach success state.
Metric with dag_id and run_type tagging.
``dagrun.duration.failed.<dag_id>`` Seconds taken for a DagRun to reach failed state
``dagrun.duration.failed`` Seconds taken for a DagRun to reach failed state.
``dagrun.duration.failed.<dag_id>`` Milliseconds taken for a DagRun to reach failed state
``dagrun.duration.failed`` Milliseconds taken for a DagRun to reach failed state.
Metric with dag_id and run_type tagging.
``dagrun.schedule_delay.<dag_id>`` Milliseconds of delay between the scheduled DagRun
start date and the actual DagRun start date
Expand All @@ -281,8 +281,8 @@ Name Description
only a single scheduler can enter this loop at a time
``scheduler.critical_section_query_duration`` Milliseconds spent running the critical section task instance query
``scheduler.scheduler_loop_duration`` Milliseconds spent running one scheduler loop
``dagrun.<dag_id>.first_task_scheduling_delay`` Seconds elapsed between first task start_date and dagrun expected start
``dagrun.first_task_scheduling_delay`` Seconds elapsed between first task start_date and dagrun expected start.
``dagrun.<dag_id>.first_task_scheduling_delay`` Milliseconds elapsed between first task start_date and dagrun expected start
``dagrun.first_task_scheduling_delay`` Milliseconds elapsed between first task start_date and dagrun expected start.
Metric with dag_id and run_type tagging.
``collect_db_dags`` Milliseconds taken for fetching all Serialized Dags from DB
``kubernetes_executor.clear_not_launched_queued_tasks.duration`` Milliseconds taken for clearing not launched queued tasks in Kubernetes Executor
Expand Down
4 changes: 4 additions & 0 deletions newsfragments/37936.significant.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Time unit for ``scheduled_duration`` and ``queued_duration`` changed.

``scheduled_duration`` and ``queued_duration`` metrics are now emitted in milliseconds instead of seconds.
By convention all statsd metrics should be emitted in milliseconds, this is later expected in e.g. prometheus' statsd-exporter.

0 comments on commit 4025b98

Please sign in to comment.