From 6511ce7f95935eda2f8b20624d9dee5bcd048361 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Sun, 22 Aug 2021 13:23:40 +0100 Subject: [PATCH] Fix ``TimeSensorAsync`` (#17748) When using the following example dag, it currently fails with `You cannot pass naive datetimes` error. This happens because `TimeSensorAsync` passes a `datetime.time` object while `DateTimeTrigger` expects a `datetime.datetime` object. This PR fixes that. Example DAG: ```python from datetime import timedelta from airflow import DAG from airflow.sensors.time_sensor import TimeSensorAsync from airflow.utils import dates, timezone with DAG( dag_id='example_date_time_async_operator', schedule_interval='0 0 * * *', start_date=dates.days_ago(2), dagrun_timeout=timedelta(minutes=60), tags=['example', 'example2', 'async'], ) as dag: TimeSensorAsync(task_id="test-2", target_time=timezone.time(22, 43, 0)) ``` --- airflow/sensors/time_sensor.py | 9 ++++++++- airflow/triggers/base.py | 2 +- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/airflow/sensors/time_sensor.py b/airflow/sensors/time_sensor.py index 2ac93894eda57..72416be9e3231 100644 --- a/airflow/sensors/time_sensor.py +++ b/airflow/sensors/time_sensor.py @@ -15,6 +15,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import datetime from airflow.sensors.base import BaseSensorOperator from airflow.triggers.temporal import DateTimeTrigger @@ -50,9 +51,15 @@ class TimeSensorAsync(BaseSensorOperator): def __init__(self, *, target_time, **kwargs): super().__init__(**kwargs) self.target_time = target_time + current_time = timezone.make_naive(timezone.utcnow(), self.dag.timezone) + todays_date = current_time.date() + self.target_datetime = datetime.datetime.combine(todays_date, self.target_time, current_time.tzinfo) def execute(self, context): - self.defer(trigger=DateTimeTrigger(moment=self.target_time), method_name="execute_complete") + self.defer( + trigger=DateTimeTrigger(moment=self.target_datetime), + method_name="execute_complete", + ) def execute_complete(self, context, event=None): # pylint: disable=unused-argument """Callback for when the trigger fires - returns immediately.""" diff --git a/airflow/triggers/base.py b/airflow/triggers/base.py index d48be6dccce49..a7bb44c206212 100644 --- a/airflow/triggers/base.py +++ b/airflow/triggers/base.py @@ -30,7 +30,7 @@ class BaseTrigger(abc.ABC): We use the same class for both situations, and rely on all Trigger classes to be able to return the (Airflow-JSON-encodable) arguments that will - let them be reinsantiated elsewhere. + let them be re-instantiated elsewhere. """ def __init__(self):