Skip to content

Commit

Permalink
feat(jobs/triggerer_job_runner): add triggerer canceled log (#31757)
Browse files Browse the repository at this point in the history
Emit log message when trigger is cancelled

Co-authored-by: Daniel Standish <[email protected]>
Co-authored-by: Jed Cunningham <[email protected]>
  • Loading branch information
3 people authored Jun 16, 2023
1 parent 6becb70 commit a60429e
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 1 deletion.
8 changes: 8 additions & 0 deletions airflow/jobs/triggerer_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from airflow.stats import Stats
from airflow.triggers.base import BaseTrigger, TriggerEvent
from airflow.typing_compat import TypedDict
from airflow.utils import timezone
from airflow.utils.log.file_task_handler import FileTaskHandler
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.log.trigger_handler import (
Expand Down Expand Up @@ -608,6 +609,13 @@ async def run_trigger(self, trigger_id, trigger):
self.log.info("Trigger %s fired: %s", self.triggers[trigger_id]["name"], event)
self.triggers[trigger_id]["events"] += 1
self.events.append((trigger_id, event))
except asyncio.CancelledError as err:
if timeout := trigger.task_instance.trigger_timeout:
timeout = timeout.replace(tzinfo=timezone.utc) if not timeout.tzinfo else timeout
if timeout < timezone.utcnow():
self.log.error("Trigger cancelled due to timeout")
self.log.error("Trigger cancelled; message=%s", err)
raise
finally:
# CancelledError will get injected when we're stopped - which is
# fine, the cleanup process will understand that, but we want to
Expand Down
29 changes: 28 additions & 1 deletion tests/jobs/test_triggerer_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import importlib
import time
from threading import Thread
from unittest.mock import patch
from unittest.mock import MagicMock, patch

import pendulum
import pytest
Expand Down Expand Up @@ -259,6 +259,33 @@ def test_trigger_lifecycle(session):
job_runner.trigger_runner.stop = True


class TestTriggerRunner:
@pytest.mark.asyncio
@patch("airflow.jobs.triggerer_job_runner.TriggerRunner.set_individual_trigger_logging")
async def test_run_trigger_canceled(self, session) -> None:
trigger_runner = TriggerRunner()
trigger_runner.triggers = {1: {"task": MagicMock(), "name": "mock_name", "events": 0}}
mock_trigger = MagicMock()
mock_trigger.task_instance.trigger_timeout = None
mock_trigger.run.side_effect = asyncio.CancelledError()

with pytest.raises(asyncio.CancelledError):
await trigger_runner.run_trigger(1, mock_trigger)

@pytest.mark.asyncio
@patch("airflow.jobs.triggerer_job_runner.TriggerRunner.set_individual_trigger_logging")
async def test_run_trigger_timeout(self, session, caplog) -> None:
trigger_runner = TriggerRunner()
trigger_runner.triggers = {1: {"task": MagicMock(), "name": "mock_name", "events": 0}}
mock_trigger = MagicMock()
mock_trigger.task_instance.trigger_timeout = timezone.utcnow() - datetime.timedelta(hours=1)
mock_trigger.run.side_effect = asyncio.CancelledError()

with pytest.raises(asyncio.CancelledError):
await trigger_runner.run_trigger(1, mock_trigger)
assert "Trigger cancelled due to timeout" in caplog.text


def test_trigger_create_race_condition_18392(session, tmp_path):
"""
This verifies the resolution of race condition documented in github issue #18392.
Expand Down

0 comments on commit a60429e

Please sign in to comment.