Skip to content

Commit

Permalink
Fix StatD timing metric units (#21106)
Browse files Browse the repository at this point in the history
Co-authored-by: Tzu-ping Chung <[email protected]>
Co-authored-by: Tzu-ping Chung <[email protected]>
(cherry picked from commit 1507ca4)
  • Loading branch information
viktorvia authored and ephraimbuddy committed Jul 5, 2022
1 parent 679bd6a commit 40f1661
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 9 deletions.
6 changes: 3 additions & 3 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class DagFileStat(NamedTuple):
num_dags: int
import_errors: int
last_finish_time: Optional[datetime]
last_duration: Optional[float]
last_duration: Optional[timedelta]
run_count: int


Expand Down Expand Up @@ -834,7 +834,7 @@ def get_last_runtime(self, file_path):
:rtype: float
"""
stat = self._file_stats.get(file_path)
return stat.last_duration if stat else None
return stat.last_duration.total_seconds() if stat and stat.last_duration else None

def get_last_dag_count(self, file_path):
"""
Expand Down Expand Up @@ -927,7 +927,7 @@ def _collect_results_from_processor(self, processor) -> None:
count_import_errors = -1
num_dags = 0

last_duration = (last_finish_time - processor.start_time).total_seconds()
last_duration = last_finish_time - processor.start_time
stat = DagFileStat(
num_dags=num_dags,
import_errors=count_import_errors,
Expand Down
9 changes: 5 additions & 4 deletions airflow/sensors/smart_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -738,16 +738,17 @@ def execute(self, context: Context):
for sensor_work in self.sensor_works:
self._execute_sensor_work(sensor_work)

duration = (timezone.utcnow() - poke_start_time).total_seconds()
duration = timezone.utcnow() - poke_start_time
duration_seconds = duration.total_seconds()

self.log.info("Taking %s to execute %s tasks.", duration, len(self.sensor_works))
self.log.info("Taking %s seconds to execute %s tasks.", duration_seconds, len(self.sensor_works))

Stats.timing("smart_sensor_operator.loop_duration", duration)
Stats.gauge("smart_sensor_operator.executed_tasks", len(self.sensor_works))
self._emit_loop_stats()

if duration < self.poke_interval:
sleep(self.poke_interval - duration)
if duration_seconds < self.poke_interval:
sleep(self.poke_interval - duration_seconds)
if (timezone.utcnow() - started_at).total_seconds() > self.timeout:
self.log.info("Time is out for smart sensor.")
return
Expand Down
6 changes: 4 additions & 2 deletions tests/dag_processing/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ def test_recently_modified_file_is_parsed_with_mtime_mode(
# let's say the DAG was just parsed 2 seconds before the Freezed time
last_finish_time = freezed_base_time - timedelta(seconds=10)
manager._file_stats = {
"file_1.py": DagFileStat(1, 0, last_finish_time, 1.0, 1),
"file_1.py": DagFileStat(1, 0, last_finish_time, timedelta(seconds=1.0), 1),
}
with freeze_time(freezed_base_time):
manager.set_file_paths(dag_files)
Expand Down Expand Up @@ -695,7 +695,9 @@ def test_send_file_processing_statsd_timing(self, statsd_timing_mock, tmpdir):
child_pipe.close()
parent_pipe.close()

statsd_timing_mock.assert_called_with('dag_processing.last_duration.temp_dag', last_runtime)
statsd_timing_mock.assert_called_with(
'dag_processing.last_duration.temp_dag', timedelta(seconds=last_runtime)
)

def test_refresh_dags_dir_doesnt_delete_zipped_dags(self, tmpdir):
"""Test DagFileProcessorManager._refresh_dag_dir method"""
Expand Down
23 changes: 23 additions & 0 deletions tests/sensors/test_smart_sensor_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import os
import time
import unittest
from unittest import mock
from unittest.mock import Mock

from freezegun import freeze_time
Expand Down Expand Up @@ -310,3 +311,25 @@ def test_register_in_sensor_service(self):
assert sensor_instance is not None
assert sensor_instance.state == State.SENSING
assert sensor_instance.operator == "DummySensor"

@mock.patch('airflow.sensors.smart_sensor.Stats.timing')
@mock.patch('airflow.sensors.smart_sensor.timezone.utcnow')
def test_send_sensor_timing(self, timezone_utcnow_mock, statsd_timing_mock):
initial_time = timezone.datetime(2022, 1, 5, 0, 0, 0)
timezone_utcnow_mock.return_value = initial_time
self._make_sensor_dag_run()
smart = self._make_smart_operator(0)
smart.timeout = 0
duration = datetime.timedelta(seconds=3)
timezone_utcnow_mock.side_effect = [
# started_at
initial_time,
# poke_start_time
initial_time,
# duration
initial_time + duration,
# timeout check
initial_time + duration,
]
smart.execute(None)
statsd_timing_mock.assert_called_with('smart_sensor_operator.loop_duration', duration)

0 comments on commit 40f1661

Please sign in to comment.