From a184c0ba428a0e560cc96bee6bb847f04c7b9bb8 Mon Sep 17 00:00:00 2001 From: Viktor Ivanov Date: Tue, 25 Jan 2022 20:12:54 +0200 Subject: [PATCH 1/2] Fix StatD timing metric units Two StatD timing metrics were calculated in seconds and given directly to StatD which accepts either milliseconds when passed directly or timedelta. Switched the two to timedelta and updated unit tests. --- airflow/dag_processing/manager.py | 6 +++--- airflow/sensors/smart_sensor.py | 9 ++++---- tests/dag_processing/test_manager.py | 6 ++++-- tests/sensors/test_smart_sensor_operator.py | 23 +++++++++++++++++++++ 4 files changed, 35 insertions(+), 9 deletions(-) diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index dcfb6bedafb90..d5f896d64be4f 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -70,7 +70,7 @@ class DagFileStat(NamedTuple): num_dags: int import_errors: int last_finish_time: Optional[datetime] - last_duration: Optional[float] + last_duration: Optional[timedelta] run_count: int @@ -839,7 +839,7 @@ def get_last_runtime(self, file_path): :rtype: float """ stat = self._file_stats.get(file_path) - return stat.last_duration if stat else None + return stat.last_duration.total_seconds() if stat and stat.last_duration else None def get_last_dag_count(self, file_path): """ @@ -932,7 +932,7 @@ def _collect_results_from_processor(self, processor) -> None: count_import_errors = -1 num_dags = 0 - last_duration = (last_finish_time - processor.start_time).total_seconds() + last_duration = last_finish_time - processor.start_time stat = DagFileStat( num_dags=num_dags, import_errors=count_import_errors, diff --git a/airflow/sensors/smart_sensor.py b/airflow/sensors/smart_sensor.py index a45eb10d1cdba..fbf04bf8a65a7 100644 --- a/airflow/sensors/smart_sensor.py +++ b/airflow/sensors/smart_sensor.py @@ -738,16 +738,17 @@ def execute(self, context: Context): for sensor_work in self.sensor_works: self._execute_sensor_work(sensor_work) - duration = (timezone.utcnow() - poke_start_time).total_seconds() + duration = timezone.utcnow() - poke_start_time + duration_seconds = duration.total_seconds() - self.log.info("Taking %s to execute %s tasks.", duration, len(self.sensor_works)) + self.log.info("Taking %s to execute %s tasks.", duration_seconds, len(self.sensor_works)) Stats.timing("smart_sensor_operator.loop_duration", duration) Stats.gauge("smart_sensor_operator.executed_tasks", len(self.sensor_works)) self._emit_loop_stats() - if duration < self.poke_interval: - sleep(self.poke_interval - duration) + if duration_seconds < self.poke_interval: + sleep(self.poke_interval - duration_seconds) if (timezone.utcnow() - started_at).total_seconds() > self.timeout: self.log.info("Time is out for smart sensor.") return diff --git a/tests/dag_processing/test_manager.py b/tests/dag_processing/test_manager.py index ad613d877df0c..891df4ec641b3 100644 --- a/tests/dag_processing/test_manager.py +++ b/tests/dag_processing/test_manager.py @@ -427,7 +427,7 @@ def test_recently_modified_file_is_parsed_with_mtime_mode( # let's say the DAG was just parsed 2 seconds before the Freezed time last_finish_time = freezed_base_time - timedelta(seconds=10) manager._file_stats = { - "file_1.py": DagFileStat(1, 0, last_finish_time, 1.0, 1), + "file_1.py": DagFileStat(1, 0, last_finish_time, timedelta(seconds=1.0), 1), } with freeze_time(freezed_base_time): manager.set_file_paths(dag_files) @@ -715,7 +715,9 @@ def test_send_file_processing_statsd_timing(self, statsd_timing_mock, tmpdir): child_pipe.close() parent_pipe.close() - statsd_timing_mock.assert_called_with('dag_processing.last_duration.temp_dag', last_runtime) + statsd_timing_mock.assert_called_with( + 'dag_processing.last_duration.temp_dag', timedelta(seconds=last_runtime) + ) def test_refresh_dags_dir_doesnt_delete_zipped_dags(self, tmpdir): """Test DagFileProcessorManager._refresh_dag_dir method""" diff --git a/tests/sensors/test_smart_sensor_operator.py b/tests/sensors/test_smart_sensor_operator.py index 7c875a06f4049..22c03918cab8a 100644 --- a/tests/sensors/test_smart_sensor_operator.py +++ b/tests/sensors/test_smart_sensor_operator.py @@ -20,6 +20,7 @@ import os import time import unittest +from unittest import mock from unittest.mock import Mock from freezegun import freeze_time @@ -310,3 +311,25 @@ def test_register_in_sensor_service(self): assert sensor_instance is not None assert sensor_instance.state == State.SENSING assert sensor_instance.operator == "DummySensor" + + @mock.patch('airflow.sensors.smart_sensor.Stats.timing') + @mock.patch('airflow.sensors.smart_sensor.timezone.utcnow') + def test_send_sensor_timing(self, timezone_utcnow_mock, statsd_timing_mock): + initial_time = timezone.datetime(2022, 1, 5, 0, 0, 0) + timezone_utcnow_mock.return_value = initial_time + self._make_sensor_dag_run() + smart = self._make_smart_operator(0) + smart.timeout = 0 + duration = datetime.timedelta(seconds=3) + timezone_utcnow_mock.side_effect = [ + # started_at + initial_time, + # poke_start_time + initial_time, + # duration + initial_time + duration, + # timeout check + initial_time + duration, + ] + smart.execute(None) + statsd_timing_mock.assert_called_with('smart_sensor_operator.loop_duration', duration) From d6352b8f0de00ea0a0330aec2ab57fe62a8f717e Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Thu, 7 Apr 2022 02:05:42 +0800 Subject: [PATCH 2/2] Clarify in log the unit is seconds --- airflow/sensors/smart_sensor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/sensors/smart_sensor.py b/airflow/sensors/smart_sensor.py index fbf04bf8a65a7..bc22ab9c541eb 100644 --- a/airflow/sensors/smart_sensor.py +++ b/airflow/sensors/smart_sensor.py @@ -741,7 +741,7 @@ def execute(self, context: Context): duration = timezone.utcnow() - poke_start_time duration_seconds = duration.total_seconds() - self.log.info("Taking %s to execute %s tasks.", duration_seconds, len(self.sensor_works)) + self.log.info("Taking %s seconds to execute %s tasks.", duration_seconds, len(self.sensor_works)) Stats.timing("smart_sensor_operator.loop_duration", duration) Stats.gauge("smart_sensor_operator.executed_tasks", len(self.sensor_works))