diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index 0f900c63f0491..cbbc2bfdaff23 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -74,7 +74,7 @@ class DagFileStat(NamedTuple): num_dags: int import_errors: int last_finish_time: Optional[datetime] - last_duration: Optional[float] + last_duration: Optional[timedelta] run_count: int @@ -834,7 +834,7 @@ def get_last_runtime(self, file_path): :rtype: float """ stat = self._file_stats.get(file_path) - return stat.last_duration if stat else None + return stat.last_duration.total_seconds() if stat and stat.last_duration else None def get_last_dag_count(self, file_path): """ @@ -927,7 +927,7 @@ def _collect_results_from_processor(self, processor) -> None: count_import_errors = -1 num_dags = 0 - last_duration = (last_finish_time - processor.start_time).total_seconds() + last_duration = last_finish_time - processor.start_time stat = DagFileStat( num_dags=num_dags, import_errors=count_import_errors, diff --git a/airflow/sensors/smart_sensor.py b/airflow/sensors/smart_sensor.py index a45eb10d1cdba..bc22ab9c541eb 100644 --- a/airflow/sensors/smart_sensor.py +++ b/airflow/sensors/smart_sensor.py @@ -738,16 +738,17 @@ def execute(self, context: Context): for sensor_work in self.sensor_works: self._execute_sensor_work(sensor_work) - duration = (timezone.utcnow() - poke_start_time).total_seconds() + duration = timezone.utcnow() - poke_start_time + duration_seconds = duration.total_seconds() - self.log.info("Taking %s to execute %s tasks.", duration, len(self.sensor_works)) + self.log.info("Taking %s seconds to execute %s tasks.", duration_seconds, len(self.sensor_works)) Stats.timing("smart_sensor_operator.loop_duration", duration) Stats.gauge("smart_sensor_operator.executed_tasks", len(self.sensor_works)) self._emit_loop_stats() - if duration < self.poke_interval: - sleep(self.poke_interval - duration) + if duration_seconds < self.poke_interval: + sleep(self.poke_interval - duration_seconds) if (timezone.utcnow() - started_at).total_seconds() > self.timeout: self.log.info("Time is out for smart sensor.") return diff --git a/tests/dag_processing/test_manager.py b/tests/dag_processing/test_manager.py index 6a65116d51f70..ed1b194a7b4cb 100644 --- a/tests/dag_processing/test_manager.py +++ b/tests/dag_processing/test_manager.py @@ -426,7 +426,7 @@ def test_recently_modified_file_is_parsed_with_mtime_mode( # let's say the DAG was just parsed 2 seconds before the Freezed time last_finish_time = freezed_base_time - timedelta(seconds=10) manager._file_stats = { - "file_1.py": DagFileStat(1, 0, last_finish_time, 1.0, 1), + "file_1.py": DagFileStat(1, 0, last_finish_time, timedelta(seconds=1.0), 1), } with freeze_time(freezed_base_time): manager.set_file_paths(dag_files) @@ -695,7 +695,9 @@ def test_send_file_processing_statsd_timing(self, statsd_timing_mock, tmpdir): child_pipe.close() parent_pipe.close() - statsd_timing_mock.assert_called_with('dag_processing.last_duration.temp_dag', last_runtime) + statsd_timing_mock.assert_called_with( + 'dag_processing.last_duration.temp_dag', timedelta(seconds=last_runtime) + ) def test_refresh_dags_dir_doesnt_delete_zipped_dags(self, tmpdir): """Test DagFileProcessorManager._refresh_dag_dir method""" diff --git a/tests/sensors/test_smart_sensor_operator.py b/tests/sensors/test_smart_sensor_operator.py index 7c875a06f4049..22c03918cab8a 100644 --- a/tests/sensors/test_smart_sensor_operator.py +++ b/tests/sensors/test_smart_sensor_operator.py @@ -20,6 +20,7 @@ import os import time import unittest +from unittest import mock from unittest.mock import Mock from freezegun import freeze_time @@ -310,3 +311,25 @@ def test_register_in_sensor_service(self): assert sensor_instance is not None assert sensor_instance.state == State.SENSING assert sensor_instance.operator == "DummySensor" + + @mock.patch('airflow.sensors.smart_sensor.Stats.timing') + @mock.patch('airflow.sensors.smart_sensor.timezone.utcnow') + def test_send_sensor_timing(self, timezone_utcnow_mock, statsd_timing_mock): + initial_time = timezone.datetime(2022, 1, 5, 0, 0, 0) + timezone_utcnow_mock.return_value = initial_time + self._make_sensor_dag_run() + smart = self._make_smart_operator(0) + smart.timeout = 0 + duration = datetime.timedelta(seconds=3) + timezone_utcnow_mock.side_effect = [ + # started_at + initial_time, + # poke_start_time + initial_time, + # duration + initial_time + duration, + # timeout check + initial_time + duration, + ] + smart.execute(None) + statsd_timing_mock.assert_called_with('smart_sensor_operator.loop_duration', duration)