diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index e4dbcb0e5eeb4..1b59f2dd2451d 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -105,7 +105,7 @@ from airflow.utils import timezone from airflow.utils.context import ConnectionAccessor, Context, VariableAccessor, context_merge from airflow.utils.email import send_email -from airflow.utils.helpers import render_template_to_string +from airflow.utils.helpers import prune_dict, render_template_to_string from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.module_loading import qualname from airflow.utils.net import get_hostname @@ -540,6 +540,10 @@ def __init__( # can be changed when calling 'run' self.test_mode = False + @property + def stats_tags(self) -> dict[str, str]: + return prune_dict({"dag_id": self.dag_id, "run_id": str(self.run_id), "task_id": self.task_id}) + @staticmethod def insert_mapping(run_id: str, task: Operator, map_index: int) -> dict[str, Any]: """:meta private:""" @@ -1259,12 +1263,7 @@ def check_and_change_state_before_execution( self.pid = None if not ignore_all_deps and not ignore_ti_state and self.state == State.SUCCESS: - Stats.incr( - "previously_succeeded", - 1, - 1, - tags={"dag_id": self.dag_id, "run_id": self.run_id, "task_id": self.task_id}, - ) + Stats.incr("previously_succeeded", tags=self.stat_tags) if not mark_success: # Firstly find non-runnable and non-requeueable tis. @@ -1409,10 +1408,14 @@ def _run_raw_task( session.merge(self) session.commit() actual_start_date = timezone.utcnow() - Stats.incr(f"ti.start.{self.task.dag_id}.{self.task.task_id}") + Stats.incr(f"ti.start.{self.task.dag_id}.{self.task.task_id}", tags=self.stats_tags) # Initialize final state counters at zero for state in State.task_states: - Stats.incr(f"ti.finish.{self.task.dag_id}.{self.task.task_id}.{state}", count=0) + Stats.incr( + f"ti.finish.{self.task.dag_id}.{self.task.task_id}.{state}", + count=0, + tags=self.stats_tags, + ) self.task = self.task.prepare_for_execution() context = self.get_template_context(ignore_param_exceptions=False) @@ -1482,7 +1485,7 @@ def _run_raw_task( session.commit() raise finally: - Stats.incr(f"ti.finish.{self.dag_id}.{self.task_id}.{self.state}") + Stats.incr(f"ti.finish.{self.dag_id}.{self.task_id}.{self.state}", tags=self.stats_tags) # Recording SKIPPED or SUCCESS self.clear_next_method_args() @@ -1544,7 +1547,7 @@ def signal_handler(signum, frame): if not self.next_method: self.clear_xcom_data() - with Stats.timer(f"dag.{self.task.dag_id}.{self.task.task_id}.duration"): + with Stats.timer(f"dag.{self.task.dag_id}.{self.task.task_id}.duration", tags=self.stats_tags): # Set the validated/merged params on the task object. self.task.params = context["params"] @@ -1579,10 +1582,8 @@ def signal_handler(signum, frame): # Run post_execute callback self.task.post_execute(context=context, result=result) - Stats.incr(f"operator_successes_{self.task.task_type}", 1, 1) - Stats.incr( - "ti_successes", tags={"dag_id": self.dag_id, "run_id": self.run_id, "task_id": self.task_id} - ) + Stats.incr(f"operator_successes_{self.task.task_type}", tags=self.stats_tags) + Stats.incr("ti_successes", tags=self.stats_tags) def _run_finished_callback( self, @@ -1849,10 +1850,10 @@ def handle_failure( self.end_date = timezone.utcnow() self.set_duration() - Stats.incr(f"operator_failures_{self.operator}") - Stats.incr( - "ti_failures", tags={"dag_id": self.dag_id, "run_id": self.run_id, "task_id": self.task_id} - ) + + Stats.incr(f"operator_failures_{self.operator}", tags=self.stats_tags) + Stats.incr("ti_failures", tags=self.stats_tags) + if not test_mode: session.add(Log(State.FAILED, self))