Skip to content

Commit

Permalink
Allow C-Task object to be interrupted (in most cases) (#26)
Browse files Browse the repository at this point in the history
* work for ctask interrupt

* fixes.  cannot interrupt new ctask

* don't rely on the "task_is_runnable()" test until the interrupted task has
its interrupt delivered (has run)

* we cannot interrupt c tasks with __step scheduled.

* fix coverage for impossible code

* support timeouts for CTasks

* Update README

* remove accidental checkin

* fix typing

* Fix assertion for different name of internal object in 3.9

* typing, spelling
  • Loading branch information
kristjanvalur authored Oct 4, 2023
1 parent 6135d21 commit 576fd49
Show file tree
Hide file tree
Showing 5 changed files with 274 additions and 108 deletions.
41 changes: 26 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ of invocation in each such case.

1. It copies the current _context_
1. It initializes a `CoroStart()` object for the coroutine, starting it in the copied context.
2. If it subsequently is `done()` It returns `CoroStart.as_future()`, ortherwise
2. If it subsequently is `done()` It returns `CoroStart.as_future()`, otherwise
it creates and returns a `Task` (using `asyncio.create_task` by default.)

The result is an _awaitable_ which can be either directly awaited or passed
Expand Down Expand Up @@ -171,7 +171,7 @@ async code blocked. If the code tries to access the event loop, e.g. by creatin
The `syncfunction()` decorator can be used to automatically wrap an async function
so that it is executed using `await_sync()`:

```pycon
```python
>>> @asynkit.syncfunction
... async def sync_function():
... async def async_function():
Expand Down Expand Up @@ -217,15 +217,15 @@ application.

This class manages the state of a partially run coroutine and is what what powers the `coro_eager()` and `await_sync()` functions.
When initialized, it will _start_ the coroutine, running it until it either suspends, returns, or raises
an exception. It can subsequently be _awaited_ to retreive the result.
an exception. It can subsequently be _awaited_ to retrieve the result.

Similarly to a `Future`, it has these methods:

- `done()` - returns `True` if the coroutine finished without blocking. In this case, the following two methods may be called to get the result.
- `result()` - Returns the _return value_ of the coroutine or **raises** any _exception_ that it produced.
- `exception()` - Returns any _exception_ raised, or `None` otherwise.

But more importly it has these:
But more importantly it has these:

- `__await__()` - A magic method making it directly _awaitable_. If it has already finished, awaiting this coroutine is the same as calling `result()`, otherwise it awaits the original coroutine's continued execution
- `as_coroutine()` - A helper which returns a proper _coroutine_ object to await the `CoroStart`
Expand Down Expand Up @@ -281,7 +281,7 @@ implemented using [`CoroStart`](#corostart)

## `coro_iter()`

This helper function turns a coroutine function into an iterator. It is primarly
This helper function turns a coroutine function into an iterator. It is primarily
intended to be used by the [`awaitmethod()`](#awaitmethod) function decorator.

## `awaitmethod()`
Expand Down Expand Up @@ -378,7 +378,7 @@ it will become the _return value_ of the `Monitor.oob()` call in the coroutine.
Neither data nor an exception can be sent the first time the coroutine is awaited,
only as a response to a previous `OOBData` exception.

A `Monitor` can be used when a coroutine wants to suspend itself, maybe waiting for some extenal
A `Monitor` can be used when a coroutine wants to suspend itself, maybe waiting for some external
condition, without resorting to the relatively heavy mechanism of creating, managing and synchronizing
`Task` objects. This can be useful if the coroutine needs to maintain state. Additionally,
this kind of messaging does not require an _event loop_ to be present and can can be driven
Expand Down Expand Up @@ -455,7 +455,7 @@ For a more complete example, have a look at [example_resp.py](examples/example_r

A `GeneratorObject` builds on top of the `Monitor` to create an `AsyncGenerator`. It is in many ways
similar to an _asynchronous generator_ constructed using the _generator function_ syntax.
But wheras those return values using the `yield` _keyword_,
But whereas those return values using the `yield` _keyword_,
a GeneratorObject has an `ayield()` _method_, which means that data can be sent to the generator
by anyone, and not just by using `yield`, which makes composing such generators much simpler.

Expand Down Expand Up @@ -678,7 +678,7 @@ returning.

- `CoroStart` when used with `Task` objects, such as by using `EagerTaskGroup`,
does not work reliably with `trio`.
This is because the syncronization primitives
This is because the synchronization primitives
are not based on `Future` objects but rather perform `Task`-based actions both before going to sleep
and upon waking up. If a `CoroStart` initially blocks on a primitive such as `Event.wait()` or
`sleep(x)` it will be surprised and throw an error when it wakes up on in a different
Expand All @@ -692,7 +692,7 @@ in various ways. For `asyncio`, the event loop never sees the `Future` object

# Experimental features

Some features are currently availible experimentally. They may work only on some platforms or be experimetal in nature, not stable or mature enough to be officially part of the library
Some features are currently available experimentally. They may work only on some platforms or be experimental in nature, not stable or mature enough to be officially part of the library

## Task Interruption

Expand All @@ -705,14 +705,25 @@ Methods are provided to raise exceptions on a `Task`. This is somewhat similar
is _awaiting_ another task, the wait is interrupted, but that other task is not otherwise
affected.

A task which is blocked, waiting for a future, is immediatelly freed and scheduled to run.
A task which is blocked, waiting for a future, is immediately freed and scheduled to run.
If the task is already scheduled to run, i.e. it is _new_, or the future has triggered but
the task hasn't become active yet, it is still awoken with an exception.

- __Note:__ These functions currently are only supported on `Task` object implemented in python.
- __Note:__ These functions currently are only work **reliably** with `Task` object implemented in Python.
Modern implementation often have a native "C" implementation of `Task` objects and they contain inaccessible code which cannot be used by the library. In particular, the
`Task.__step` mehtod cannot be explicitly scheduled to the event loop. For that reason,
a special `create_pytask()` helper is provided to create a suitable `Task` instance.
`Task.__step` method cannot be explicitly scheduled to the event loop. For that reason,
a special `create_pytask()` helper is provided to create a suitable python `Task` instance.
- __However:__ This library does go through extra hoops to make it usable with C Tasks.
It almost works, but with two caveats:

- CTasks which have plain `TaskStepMethWrapper` callbacks scheduled cannot be interrupted.
These are typically tasks executing `await asyncio.sleep(0)` or freshly created
tasks that haven't started executing.
- The CTask's `_fut_waiting` member _cannot_ be cleared from our code, so there exists a time
where it can point to a valid, not-done, Future, even though the Task is about
to wake up. This will make methods such as `task_is_blocked()` return incorrect
values. It __will__ get cleared when the interrupted task starts executing, however. All the more reason to use `task_interrupt()` over `task_throw()` since
the former allows no space for code to see the task in such an intermediate state.

### `task_throw()`

Expand All @@ -726,7 +737,7 @@ pending. if `immediate` is `True`, it is placed at the head of the runnable que
to be next in line for execution.

- This method should probably not be used directly. Throwing exceptions into tasks should
be _synchronous_, i.e. we should deliver them immediatelly, because there is no way to
be _synchronous_, i.e. we should deliver them immediately, because there is no way to
queue pending exceptions and they do not add up in any meaningful way.
Prefer to use `task_interrupt()` below.

Expand All @@ -745,7 +756,7 @@ An `async` version of `task_throw()`. When awaited, `task_interrupt()` is invok
run. Once awaited, the exception **has been raised** on the target task.

By ensuring that the target task runs immediately, it is possible to reason about task
execution without having to rely on external syncronization primitives and the cooperation
execution without having to rely on external synchronization primitives and the cooperation
of the target task. An interrupt is never _pending_ on the task (as a _cancellation_ can
be) and therefore it cannot cause collisions with other interrupts.

Expand Down
202 changes: 160 additions & 42 deletions src/asynkit/experimental/interrupt.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
import asyncio.tasks
import contextlib
import sys
from asyncio import AbstractEventLoop
from typing import Any, AsyncGenerator, Coroutine, Optional

from asynkit.loop.types import TaskAny
from asynkit.scheduling import get_scheduling_loop
from asynkit.loop.extensions import AbstractSchedulingLoop, get_scheduling_loop
from asynkit.loop.types import FutureAny, TaskAny

__all__ = [
"create_pytask",
Expand All @@ -16,6 +17,8 @@
"PyTask",
]

_have_context = sys.version_info > (3, 8)

# Crate a python Task. We need access to the __step method and this is hidden
# in the C implementation from _asyncio module
if hasattr(asyncio.tasks, "_PyTask"):
Expand Down Expand Up @@ -58,19 +61,18 @@ def task_throw(
if task.done():
raise RuntimeError("cannot interrupt task which is done")

# this only works on python tasks, which have the exposed __step method
# because we need to insert it directly into the ready queue
# have to defeat the name mangling:
try:
step_method = task._Task__step # type: ignore[attr-defined]
except AttributeError as error:
raise TypeError("cannot interrupt task which is not a python task") from error
# For Python tasks, we can access the __step method to send an
# exception. For C implemented tasks, it becomes more complicated,
# since the actual __step and __wakeup methods are hidden. We need
# more complex and hacky code to re-use already scheduled callbacks
# in this case.
step_method = getattr(task, "_Task__step", None)

# get our scheduling loop, to perform the actual scheduling
task_loop = task.get_loop()
scheduling_loop = get_scheduling_loop(task_loop)

# tasks.py mentions that the only way to transion from waiting
# tasks.py mentions that the only way to transition from waiting
# for a future and being runnable is via the __wakeup() method.
# Well, we change that here. task_throw() makes a task stop waiting
# for a future and transitions it to the runnable state directly.
Expand All @@ -97,40 +99,60 @@ def task_throw(
# the _must_cancel flag is set on the task instead.

fut_waiter = task._fut_waiter # type: ignore[attr-defined]
if fut_waiter and not fut_waiter.done():
# it is blocked on a future.
# we remove ourselves from the future's callback list.
# this way, we can stop waiting for it, without cancelling it,
# which would would have side effects.
wakeup_method = task._Task__wakeup # type: ignore[attr-defined]
fut_waiter.remove_done_callback(wakeup_method)

if step_method is None:
# special super hack for C tasks
callback, arg, ctx = c_task_reschedule(
task_loop, scheduling_loop, task, fut_waiter, exception
)
else:
# it is not blocked but it could be cancelled
if task._must_cancel or ( # type: ignore[attr-defined]
fut_waiter and fut_waiter.cancelled()
):
raise RuntimeError("cannot interrupt a cancelled task")
# regular code for Python tasks. We can use the __step method directly.

# it is in the ready queue (has __step / __wakeup shceduled)
# or it is the running task..
handle = scheduling_loop.ready_remove(task)
if handle is None:
# it is the running task
assert task is asyncio.current_task()
raise RuntimeError("cannot interrupt self")
if fut_waiter and not fut_waiter.done():
# it is blocked on a future.
# we remove ourselves from the future's callback list.
# this way, we can stop waiting for it, without cancelling it,
# which would would have side effects.
wakeup_method = task._Task__wakeup # type: ignore[attr-defined]
fut_waiter.remove_done_callback(wakeup_method)
else:
# it is not blocked but it could be cancelled
if task._must_cancel or ( # type: ignore[attr-defined]
fut_waiter and fut_waiter.cancelled()
):
raise RuntimeError("cannot interrupt a cancelled task")

# now, we have to insert it
task._fut_waiter = None # type: ignore[attr-defined]
if sys.version_info > (3, 8):
task_loop.call_soon(
step_method,
exception,
context=task._context, # type: ignore[attr-defined]
# it is in the ready queue (has __step / __wakeup scheduled)
# or it is the running task..
handle = scheduling_loop.ready_remove(task)
if handle is None:
# it is the running task
assert task is asyncio.current_task()
raise RuntimeError("cannot interrupt self")

callback, arg = step_method, exception
ctx = task._context if _have_context else None # type: ignore[attr-defined]

# clear the future waiter, and re-insert it. fut_waiter is not necessarily
# done.
if step_method:
# only possible for Py-Tasks!
# for C tasks, we cannot clear it. but it is fine, it will be cleared
# later by the task's __step method, and we are no longer in its callback list
# however in the mean time, the invariant that _fut_waiter is None or done()
# while the task is blocked, no longer holds! so we should really make
# sure this task is switched to immediately!
task._fut_waiter = None # type: ignore[attr-defined]
if _have_context:
task_loop.call_soon( # type: ignore[call-arg]
callback,
arg,
context=ctx,
)
else: # pragma: no cover
task_loop.call_soon(
step_method,
exception,
callback,
arg,
)

if immediate:
Expand All @@ -144,6 +166,104 @@ def task_throw(
scheduling_loop.ready_insert(0, handle)


def c_task_reschedule(
task_loop: AbstractEventLoop,
scheduling_loop: AbstractSchedulingLoop,
task: TaskAny,
fut_waiter: FutureAny,
exception: BaseException,
) -> Any:
# because we don't have access to the __step method or __wakeup methods
# in c tasks, we need to fish out the already existing callbacks
# in the system and _reuse_ those.

# first, find the callback on the future, if any. Similar to
# Future.remove_done_callback()
# but filters for the task, and returns the single callback found.
if fut_waiter and not fut_waiter.done():
callback, ctx = future_find_task_callback(fut_waiter, task)
fut_waiter.remove_done_callback(callback)
handle = None

else:
# it is not blocked but it could be cancelled
if task._must_cancel or ( # type: ignore[attr-defined]
fut_waiter and fut_waiter.cancelled()
):
raise RuntimeError("cannot interrupt a cancelled task")

# it is in the ready queue (has __step / __wakeup scheduled)
# or it is the running task..
handle = scheduling_loop.ready_remove(task)
if handle is None:
# it is the running task
assert task is asyncio.current_task()
raise RuntimeError("cannot interrupt self")
callback = handle._callback # type: ignore[attr-defined]
ctx = handle._context if _have_context else None # type: ignore[attr-defined]

# we now have a callback, a bound method. We must re-use this method
# because we have no way to create a new bound method for the internal
# __step and __wakeup methods of C tasks. Find out which we have.
# for C tasks, this can either be the
# "TaskStepMethWrapper" for __step, or the "task_wakeup" for __wakeup.
# (for Py tasks, it is just a Task.__step or Task.__wakeup bound method.
# We check for both)
arg: Any
cbname = str(callback)
# CTasks have a TaskStepMethWrapper
if "TaskStep" in cbname or "__step" in cbname: # pragma: no cover
# we can re-use this directly
arg = exception

# BUT! TaskStepMethWrapper cannot take arguments when called. And we cannot
# cannot create one. So, CTasks which have a plain __step scheduled
# cannot be interrupted. So, we have to give up here. There is no way for us
# into the pesky C implementation, we cannot modify the wrapped args, nothing.
# bummer.
if "TaskStepMethWrapper" in cbname:
assert handle is not None
scheduling_loop.ready_insert(-1, handle) # re-insert it somewhere
raise RuntimeError(
"cannot interrupt a c-task with a plain __step scheduled"
)
else:
# this is a TaskWakeupMethWrapper in 3.9 and earlier, 'task_wakeup()' after.
assert "wakeup" in cbname or "TaskWakeup" in cbname
# we need to create a cancelled future and pass that as arg to this one.
f: FutureAny = task._loop.create_future() # type: ignore[attr-defined]
f.set_exception(exception)
arg = f

return callback, arg, ctx


def future_find_task_callback(fut_waiter: FutureAny, task: TaskAny) -> Any:
"""
Look for the correct callback on the future to remove, by finding the
one associated with a task.
"""
if _have_context:
found = [
(f, ctx)
for (f, ctx) in fut_waiter._callbacks # type: ignore[attr-defined]
if getattr(f, "__self__", None) is task
]
else: # pragma: no cover
found = [
f
for f in fut_waiter._callbacks # type: ignore[attr-defined]
if getattr(f, "__self__", None) is task
]
assert len(found) == 1
cb = found[0]
if _have_context:
callback, ctx = cb
else: # pragma: no cover
callback, ctx = cb, None # type: ignore[assignment]
return callback, ctx


# interrupt a task. We use much of the same mechanism used when cancelling a task,
# except that we don't actually cancel the task, we just raise an exception in it.
# Additionally, we need it to execute immediately, so we we switch to it.
Expand All @@ -168,17 +288,15 @@ async def task_timeout(timeout: float) -> AsyncGenerator[None, None]:
task = asyncio.current_task()
assert task is not None
loop = task.get_loop()
if not isinstance(task, PyTask):
raise TypeError("cannot interrupt task which is not a python task")

# create an interrupt instance, which we check for
my_interrupt = TimeoutInterrupt()

def trigger_timeout() -> None:
# we want to interrupt the task, but not from a
# loop callback (using task_throw()), because hypothetically many
# such calbacks could run, and they could then
# pre-empt each other. Instead, we interrupt from
# such callbacks could run, and they could then
# preempt each other. Instead, we interrupt from
# a task, so that only one interrupt can be active.
async def interruptor() -> None:
if is_active: # pragma: no branch
Expand Down
Loading

0 comments on commit 576fd49

Please sign in to comment.