Skip to content

Commit

Permalink
add stat tagging to models/taskinstance
Browse files Browse the repository at this point in the history
  • Loading branch information
ferruzzi committed Apr 5, 2023
1 parent 4afc9a4 commit 64c780f
Showing 1 changed file with 20 additions and 19 deletions.
39 changes: 20 additions & 19 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@
from airflow.utils import timezone
from airflow.utils.context import ConnectionAccessor, Context, VariableAccessor, context_merge
from airflow.utils.email import send_email
from airflow.utils.helpers import render_template_to_string
from airflow.utils.helpers import prune_dict, render_template_to_string
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.module_loading import qualname
from airflow.utils.net import get_hostname
Expand Down Expand Up @@ -540,6 +540,10 @@ def __init__(
# can be changed when calling 'run'
self.test_mode = False

@property
def stats_tags(self) -> dict[str, str]:
return prune_dict({"dag_id": self.dag_id, "run_id": str(self.run_id), "task_id": self.task_id})

@staticmethod
def insert_mapping(run_id: str, task: Operator, map_index: int) -> dict[str, Any]:
""":meta private:"""
Expand Down Expand Up @@ -1259,12 +1263,7 @@ def check_and_change_state_before_execution(
self.pid = None

if not ignore_all_deps and not ignore_ti_state and self.state == State.SUCCESS:
Stats.incr(
"previously_succeeded",
1,
1,
tags={"dag_id": self.dag_id, "run_id": self.run_id, "task_id": self.task_id},
)
Stats.incr("previously_succeeded", tags=self.stat_tags)

if not mark_success:
# Firstly find non-runnable and non-requeueable tis.
Expand Down Expand Up @@ -1409,10 +1408,14 @@ def _run_raw_task(
session.merge(self)
session.commit()
actual_start_date = timezone.utcnow()
Stats.incr(f"ti.start.{self.task.dag_id}.{self.task.task_id}")
Stats.incr(f"ti.start.{self.task.dag_id}.{self.task.task_id}", tags=self.stats_tags)
# Initialize final state counters at zero
for state in State.task_states:
Stats.incr(f"ti.finish.{self.task.dag_id}.{self.task.task_id}.{state}", count=0)
Stats.incr(
f"ti.finish.{self.task.dag_id}.{self.task.task_id}.{state}",
count=0,
tags=self.stats_tags,
)

self.task = self.task.prepare_for_execution()
context = self.get_template_context(ignore_param_exceptions=False)
Expand Down Expand Up @@ -1482,7 +1485,7 @@ def _run_raw_task(
session.commit()
raise
finally:
Stats.incr(f"ti.finish.{self.dag_id}.{self.task_id}.{self.state}")
Stats.incr(f"ti.finish.{self.dag_id}.{self.task_id}.{self.state}", tags=self.stats_tags)

# Recording SKIPPED or SUCCESS
self.clear_next_method_args()
Expand Down Expand Up @@ -1544,7 +1547,7 @@ def signal_handler(signum, frame):
if not self.next_method:
self.clear_xcom_data()

with Stats.timer(f"dag.{self.task.dag_id}.{self.task.task_id}.duration"):
with Stats.timer(f"dag.{self.task.dag_id}.{self.task.task_id}.duration", tags=self.stats_tags):
# Set the validated/merged params on the task object.
self.task.params = context["params"]

Expand Down Expand Up @@ -1579,10 +1582,8 @@ def signal_handler(signum, frame):
# Run post_execute callback
self.task.post_execute(context=context, result=result)

Stats.incr(f"operator_successes_{self.task.task_type}", 1, 1)
Stats.incr(
"ti_successes", tags={"dag_id": self.dag_id, "run_id": self.run_id, "task_id": self.task_id}
)
Stats.incr(f"operator_successes_{self.task.task_type}", tags=self.stats_tags)
Stats.incr("ti_successes", tags=self.stats_tags)

def _run_finished_callback(
self,
Expand Down Expand Up @@ -1849,10 +1850,10 @@ def handle_failure(

self.end_date = timezone.utcnow()
self.set_duration()
Stats.incr(f"operator_failures_{self.operator}")
Stats.incr(
"ti_failures", tags={"dag_id": self.dag_id, "run_id": self.run_id, "task_id": self.task_id}
)

Stats.incr(f"operator_failures_{self.operator}", tags=self.stats_tags)
Stats.incr("ti_failures", tags=self.stats_tags)

if not test_mode:
session.add(Log(State.FAILED, self))

Expand Down

0 comments on commit 64c780f

Please sign in to comment.