Skip to content

Commit

Permalink
Fix TimeSensorAsync (#17748)
Browse files Browse the repository at this point in the history
When using the following example dag, it currently fails with `You cannot pass naive datetimes` error.
This happens because `TimeSensorAsync` passes a `datetime.time` object while `DateTimeTrigger` expects
a `datetime.datetime` object. This PR fixes that.

Example DAG:

```python
from datetime import timedelta

from airflow import DAG
from airflow.sensors.time_sensor import TimeSensorAsync
from airflow.utils import dates, timezone

with DAG(
    dag_id='example_date_time_async_operator',
    schedule_interval='0 0 * * *',
    start_date=dates.days_ago(2),
    dagrun_timeout=timedelta(minutes=60),
    tags=['example', 'example2', 'async'],
) as dag:

    TimeSensorAsync(task_id="test-2", target_time=timezone.time(22, 43, 0))
```
  • Loading branch information
kaxil authored Aug 22, 2021
1 parent 11c5ce3 commit 6511ce7
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 2 deletions.
9 changes: 8 additions & 1 deletion airflow/sensors/time_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import datetime

from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.temporal import DateTimeTrigger
Expand Down Expand Up @@ -50,9 +51,15 @@ class TimeSensorAsync(BaseSensorOperator):
def __init__(self, *, target_time, **kwargs):
super().__init__(**kwargs)
self.target_time = target_time
current_time = timezone.make_naive(timezone.utcnow(), self.dag.timezone)
todays_date = current_time.date()
self.target_datetime = datetime.datetime.combine(todays_date, self.target_time, current_time.tzinfo)

def execute(self, context):
self.defer(trigger=DateTimeTrigger(moment=self.target_time), method_name="execute_complete")
self.defer(
trigger=DateTimeTrigger(moment=self.target_datetime),
method_name="execute_complete",
)

def execute_complete(self, context, event=None): # pylint: disable=unused-argument
"""Callback for when the trigger fires - returns immediately."""
Expand Down
2 changes: 1 addition & 1 deletion airflow/triggers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class BaseTrigger(abc.ABC):
We use the same class for both situations, and rely on all Trigger classes
to be able to return the (Airflow-JSON-encodable) arguments that will
let them be reinsantiated elsewhere.
let them be re-instantiated elsewhere.
"""

def __init__(self):
Expand Down

0 comments on commit 6511ce7

Please sign in to comment.