Skip to content

Commit

Permalink
Remove lock from CancelContext
Browse files Browse the repository at this point in the history
  • Loading branch information
zanieb committed Feb 25, 2023
1 parent dd46a28 commit 4f4dc6b
Showing 1 changed file with 13 additions and 13 deletions.
26 changes: 13 additions & 13 deletions src/prefect/_internal/concurrency/timeouts.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,17 @@ class CancelContext:
def __init__(self, timeout: Optional[float]) -> None:
self._timeout = timeout
self._cancelled: bool = False
self._lock = threading.Lock()

@property
def timeout(self) -> Optional[float]:
return self._timeout

@property
def cancelled(self):
with self._lock:
return self._cancelled
return self._cancelled

@cancelled.setter
def cancelled(self, value: bool):
with self._lock:
self._cancelled = value
def mark_cancelled(self):
self._cancelled = True


@contextlib.contextmanager
Expand All @@ -64,7 +60,8 @@ def cancel_async_after(timeout: Optional[float]):
)
yield ctx
finally:
ctx.cancelled = cancel_scope.cancel_called
if cancel_scope.cancel_called:
ctx.mark_cancelled()


def get_deadline(timeout: Optional[float]):
Expand Down Expand Up @@ -101,7 +98,8 @@ def cancel_async_at(deadline: Optional[float]):
with cancel_async_after(timeout) as inner_ctx:
yield ctx
finally:
ctx.cancelled = inner_ctx.cancelled
if inner_ctx.cancelled:
ctx.mark_cancelled()


@contextlib.contextmanager
Expand All @@ -127,7 +125,8 @@ def cancel_sync_at(deadline: Optional[float]):
with cancel_sync_after(timeout) as inner_ctx:
yield ctx
finally:
ctx.cancelled = inner_ctx.cancelled
if inner_ctx.cancelled:
ctx.mark_cancelled()


@contextlib.contextmanager
Expand Down Expand Up @@ -171,7 +170,8 @@ def cancel_sync_after(timeout: Optional[float]):
)
yield ctx
finally:
ctx.cancelled = inner_ctx.cancelled
if inner_ctx.cancelled:
ctx.mark_cancelled()


@contextlib.contextmanager
Expand All @@ -194,7 +194,7 @@ def _alarm_based_timeout(timeout: float):
ctx = CancelContext(timeout=timeout)

def raise_alarm_as_timeout(signum, frame):
ctx.cancelled = True
ctx.mark_cancelled()
logger.debug(
"Cancel fired for alarm based timeout of thread %r", current_thread.name
)
Expand Down Expand Up @@ -229,7 +229,7 @@ def timeout_enforcer():
"Cancel fired for watcher based timeout of thread %r",
supervised_thread.name,
)
ctx.cancelled = True
ctx.mark_cancelled()
_send_exception_to_thread(supervised_thread, TimeoutError)

enforcer = threading.Thread(target=timeout_enforcer, daemon=True)
Expand Down

0 comments on commit 4f4dc6b

Please sign in to comment.