diff --git a/src/prefect/_internal/concurrency/timeouts.py b/src/prefect/_internal/concurrency/timeouts.py index 36b6d5b9cfb1..3554506b757e 100644 --- a/src/prefect/_internal/concurrency/timeouts.py +++ b/src/prefect/_internal/concurrency/timeouts.py @@ -25,7 +25,6 @@ class CancelContext: def __init__(self, timeout: Optional[float]) -> None: self._timeout = timeout self._cancelled: bool = False - self._lock = threading.Lock() @property def timeout(self) -> Optional[float]: @@ -33,13 +32,10 @@ def timeout(self) -> Optional[float]: @property def cancelled(self): - with self._lock: - return self._cancelled + return self._cancelled - @cancelled.setter - def cancelled(self, value: bool): - with self._lock: - self._cancelled = value + def mark_cancelled(self): + self._mark_cancelled() @contextlib.contextmanager @@ -194,7 +190,7 @@ def _alarm_based_timeout(timeout: float): ctx = CancelContext(timeout=timeout) def raise_alarm_as_timeout(signum, frame): - ctx.cancelled = True + ctx.mark_cancelled() logger.debug( "Cancel fired for alarm based timeout of thread %r", current_thread.name ) @@ -229,7 +225,7 @@ def timeout_enforcer(): "Cancel fired for watcher based timeout of thread %r", supervised_thread.name, ) - ctx.cancelled = True + ctx.mark_cancelled() _send_exception_to_thread(supervised_thread, TimeoutError) enforcer = threading.Thread(target=timeout_enforcer, daemon=True)