Skip to content

Commit

Permalink
feat: expose coroutine constructor
Browse files Browse the repository at this point in the history
  • Loading branch information
wyfo committed Apr 25, 2024
1 parent 7885cce commit 48a4e0c
Show file tree
Hide file tree
Showing 8 changed files with 189 additions and 121 deletions.
69 changes: 55 additions & 14 deletions guide/src/async-await.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
`#[pyfunction]` and `#[pymethods]` attributes also support `async fn`.

```rust
# #![allow(dead_code)]
# # ![allow(dead_code)]
# #[cfg(feature = "experimental-async")] {
use std::{thread, time::Duration};
use futures::channel::oneshot;
Expand All @@ -26,21 +26,33 @@ async fn sleep(seconds: f64, result: Option<PyObject>) -> Option<PyObject> {

## `Send + 'static` constraint

Resulting future of an `async fn` decorated by `#[pyfunction]` must be `Send + 'static` to be embedded in a Python object.
Resulting future of an `async fn` decorated by `#[pyfunction]` must be `Send + 'static` to be embedded in a Python
object.

As a consequence, `async fn` parameters and return types must also be `Send + 'static`, so it is not possible to have a signature like `async fn does_not_compile<'py>(arg: Bound<'py, PyAny>) -> Bound<'py, PyAny>`.
As a consequence, `async fn` parameters and return types must also be `Send + 'static`, so it is not possible to have a
signature like `async fn does_not_compile<'py>(arg: Bound<'py, PyAny>) -> Bound<'py, PyAny>`.

However, there is an exception for method receivers, so async methods can accept `&self`/`&mut self`. Note that this means that the class instance is borrowed for as long as the returned future is not completed, even across yield points and while waiting for I/O operations to complete. Hence, other methods cannot obtain exclusive borrows while the future is still being polled. This is the same as how async methods in Rust generally work but it is more problematic for Rust code interfacing with Python code due to pervasive shared mutability. This strongly suggests to prefer shared borrows `&self` over exclusive ones `&mut self` to avoid racy borrow check failures at runtime.
However, there is an exception for method receivers, so async methods can accept `&self`/`&mut self`. Note that this
means that the class instance is borrowed for as long as the returned future is not completed, even across yield points
and while waiting for I/O operations to complete. Hence, other methods cannot obtain exclusive borrows while the future
is still being polled. This is the same as how async methods in Rust generally work but it is more problematic for Rust
code interfacing with Python code due to pervasive shared mutability. This strongly suggests to prefer shared
borrows `&self` over exclusive ones `&mut self` to avoid racy borrow check failures at runtime.

## Implicit GIL holding

Even if it is not possible to pass a `py: Python<'py>` parameter to `async fn`, the GIL is still held during the execution of the future – it's also the case for regular `fn` without `Python<'py>`/`Bound<'py, PyAny>` parameter, yet the GIL is held.
Even if it is not possible to pass a `py: Python<'py>` parameter to `async fn`, the GIL is still held during the
execution of the future – it's also the case for regular `fn` without `Python<'py>`/`Bound<'py, PyAny>` parameter, yet
the GIL is held.

It is still possible to get a `Python` marker using [`Python::with_gil`]({{#PYO3_DOCS_URL}}/pyo3/marker/struct.Python.html#method.with_gil); because `with_gil` is reentrant and optimized, the cost will be negligible.
It is still possible to get a `Python` marker
using [`Python::with_gil`]({{#PYO3_DOCS_URL}}/pyo3/marker/struct.Python.html#method.with_gil); because `with_gil` is
reentrant and optimized, the cost will be negligible.

## Release the GIL across `.await`

There is currently no simple way to release the GIL when awaiting a future, *but solutions are currently in development*.
There is currently no simple way to release the GIL when awaiting a future, *but solutions are currently in
development*.

Here is the advised workaround for now:

Expand Down Expand Up @@ -72,10 +84,12 @@ where

## Cancellation

Cancellation on the Python side can be caught using [`CancelHandle`]({{#PYO3_DOCS_URL}}/pyo3/coroutine/struct.CancelHandle.html) type, by annotating a function parameter with `#[pyo3(cancel_handle)]`.
Cancellation on the Python side can be caught
using [`CancelHandle`]({{#PYO3_DOCS_URL}}/pyo3/coroutine/struct.CancelHandle.html) type, by annotating a function
parameter with `#[pyo3(cancel_handle)]`.

```rust
# #![allow(dead_code)]
# # ![allow(dead_code)]
# #[cfg(feature = "experimental-async")] {
use futures::FutureExt;
use pyo3::prelude::*;
Expand All @@ -93,15 +107,42 @@ async fn cancellable(#[pyo3(cancel_handle)] mut cancel: CancelHandle) {

## *asyncio* vs. *anyio*

By default, Python awaitables instantiated with `async fn` can only be awaited in *asyncio* context.
By default, Python awaitables instantiated with `async fn` can only be awaited in *asyncio* context.

PyO3 can also target [*anyio*](https://github.com/agronholm/anyio) with the dedicated `anyio` Cargo feature. With it enabled, `async fn` become awaitable both in *asyncio* or [*trio*](https://github.com/python-trio/trio) context.
PyO3 can also target [*anyio*](https://github.com/agronholm/anyio) with the dedicated `anyio` Cargo feature. With it
enabled, `async fn` become awaitable both in *asyncio* or [*trio*](https://github.com/python-trio/trio) context.
However, it requires to have the [*sniffio*](https://github.com/python-trio/sniffio) (or *anyio*) library installed.

## The `Coroutine` type

To make a Rust future awaitable in Python, PyO3 defines a [`Coroutine`]({{#PYO3_DOCS_URL}}/pyo3/coroutine/struct.Coroutine.html) type, which implements the Python [coroutine protocol](https://docs.python.org/3/library/collections.abc.html#collections.abc.Coroutine).
To make a Rust future awaitable in Python, PyO3 defines
a [`Coroutine`]({{#PYO3_DOCS_URL}}/pyo3/coroutine/struct.Coroutine.html) type, which implements the
Python [coroutine protocol](https://docs.python.org/3/library/collections.abc.html#collections.abc.Coroutine).

Each `coroutine.send` call is translated to a `Future::poll` call. If a [`CancelHandle`]({{#PYO3_DOCS_URL}}/pyo3/coroutine/struct.CancelHandle.html) parameter is declared, the exception passed to `coroutine.throw` call is stored in it and can be retrieved with [`CancelHandle::cancelled`]({{#PYO3_DOCS_URL}}/pyo3/coroutine/struct.CancelHandle.html#method.cancelled); otherwise, it cancels the Rust future, and the exception is reraised;
Each `coroutine.send` call is translated to a `Future::poll` call. If
a [`CancelHandle`]({{#PYO3_DOCS_URL}}/pyo3/coroutine/struct.CancelHandle.html) parameter is declared, the exception
passed to `coroutine.throw` call is stored in it and can be retrieved
with [`CancelHandle::cancelled`]({{#PYO3_DOCS_URL}}/pyo3/coroutine/struct.CancelHandle.html#method.cancelled);
otherwise, it cancels the Rust future, and the exception is reraised;

*The type does not yet have a public constructor until the design is finalized.*
Coroutine can also be instantiated directly

```rust
# # ![allow(dead_code)]
use pyo3::prelude::*;
use pyo3::coroutine::{CancelHandle, Coroutine};

#[pyfunction]
fn new_coroutine(py: Python<'_>) -> Coroutine {
let mut cancel = CancelHandle::new();
let throw_callback = cancel.throw_callback();
let future = async move {
cancel.cancelled().await;
PyResult::Ok(())
};
Coroutine::new("my_coro", future)
.with_qualname_prefix("MyClass")
.with_throw_callback(throw_callback)
.with_allow_threads(true)
}
```
1 change: 1 addition & 0 deletions newsfragments/3613.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Expose `Coroutine` constructor
10 changes: 5 additions & 5 deletions pyo3-macros-backend/src/method.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,13 +571,13 @@ impl<'a> FnSpec<'a> {
};
let mut call = quote! {{
let future = #future;
#pyo3_path::impl_::coroutine::new_coroutine(
#pyo3_path::intern!(py, stringify!(#python_name)),
#qualname_prefix,
#throw_callback,
#allow_threads,
#pyo3_path::coroutine::Coroutine::new(
stringify!(#python_name),
async move { #pyo3_path::impl_::wrap::OkWrap::wrap(future.await) },
)
.with_qualname_prefix(#qualname_prefix)
.with_throw_callback(#throw_callback)
.with_allow_threads(#allow_threads)
}};
if cancel_handle.is_some() {
call = quote! {{
Expand Down
121 changes: 62 additions & 59 deletions src/coroutine.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Python coroutine implementation, used notably when wrapping `async fn`
//! with `#[pyfunction]`/`#[pymethods]`.
use std::borrow::Cow;
use std::{
future::Future,
panic,
Expand All @@ -12,11 +13,11 @@ use pyo3_macros::{pyclass, pymethods};

use crate::{
coroutine::waker::CoroutineWaker,
exceptions::{PyAttributeError, PyGeneratorExit, PyRuntimeError, PyStopIteration},
exceptions::{PyGeneratorExit, PyRuntimeError, PyStopIteration},
marker::Ungil,
panic::PanicException,
types::{string::PyStringMethods, PyString},
IntoPy, Py, PyErr, PyObject, PyResult, Python,
types::PyString,
Bound, IntoPy, Py, PyErr, PyObject, PyResult, Python,
};

#[cfg(feature = "anyio")]
Expand All @@ -40,48 +41,44 @@ pub(crate) enum CoroOp {
}

trait CoroutineFuture: Send {
fn poll(self: Pin<&mut Self>, py: Python<'_>, waker: &Waker) -> Poll<PyResult<PyObject>>;
fn poll(
self: Pin<&mut Self>,
py: Python<'_>,
waker: &Waker,
allow_threads: bool,
) -> Poll<PyResult<PyObject>>;
}

impl<F, T, E> CoroutineFuture for F
where
F: Future<Output = Result<T, E>> + Send,
T: IntoPy<PyObject> + Send,
E: Into<PyErr> + Send,
{
fn poll(self: Pin<&mut Self>, py: Python<'_>, waker: &Waker) -> Poll<PyResult<PyObject>> {
self.poll(&mut Context::from_waker(waker))
.map_ok(|obj| obj.into_py(py))
.map_err(Into::into)
}
}

struct AllowThreads<F> {
future: F,
}

impl<F, T, E> CoroutineFuture for AllowThreads<F>
where
F: Future<Output = Result<T, E>> + Send + Ungil,
T: IntoPy<PyObject> + Send + Ungil,
E: Into<PyErr> + Send + Ungil,
{
fn poll(self: Pin<&mut Self>, py: Python<'_>, waker: &Waker) -> Poll<PyResult<PyObject>> {
// SAFETY: future field is pinned when self is
let future = unsafe { self.map_unchecked_mut(|a| &mut a.future) };
py.allow_threads(|| future.poll(&mut Context::from_waker(waker)))
.map_ok(|obj| obj.into_py(py))
.map_err(Into::into)
fn poll(
self: Pin<&mut Self>,
py: Python<'_>,
waker: &Waker,
allow_threads: bool,
) -> Poll<PyResult<PyObject>> {
if allow_threads {
py.allow_threads(|| self.poll(&mut Context::from_waker(waker)))
} else {
self.poll(&mut Context::from_waker(waker))
}
.map_ok(|obj| obj.into_py(py))
.map_err(Into::into)
}
}

/// Python coroutine wrapping a [`Future`].
#[pyclass(crate = "crate")]
pub struct Coroutine {
name: Option<Py<PyString>>,
future: Option<Pin<Box<dyn CoroutineFuture>>>,
name: Cow<'static, str>,
qualname_prefix: Option<&'static str>,
throw_callback: Option<ThrowCallback>,
future: Option<Pin<Box<dyn CoroutineFuture>>>,
allow_threads: bool,
waker: Option<Arc<CoroutineWaker>>,
}

Expand All @@ -91,32 +88,44 @@ impl Coroutine {
/// Coroutine `send` polls the wrapped future, ignoring the value passed
/// (should always be `None` anyway).
///
/// `Coroutine `throw` drop the wrapped future and reraise the exception passed
pub(crate) fn new<F, T, E>(
name: Option<Py<PyString>>,
qualname_prefix: Option<&'static str>,
throw_callback: Option<ThrowCallback>,
allow_threads: bool,
future: F,
) -> Self
/// `Coroutine `throw` drop the wrapped future and reraise the exception passed.
pub fn new<F, T, E>(name: impl Into<Cow<'static, str>>, future: F) -> Self
where
F: Future<Output = Result<T, E>> + Send + Ungil + 'static,
T: IntoPy<PyObject> + Send + Ungil,
E: Into<PyErr> + Send + Ungil,
{
Self {
name,
qualname_prefix,
throw_callback,
future: Some(if allow_threads {
Box::pin(AllowThreads { future })
} else {
Box::pin(future)
}),
future: Some(Box::pin(future)),
name: name.into(),
qualname_prefix: None,
throw_callback: None,
allow_threads: false,
waker: None,
}
}

/// Set a prefix for `__qualname__`, which will be joined with a "."
pub fn with_qualname_prefix(mut self, prefix: impl Into<Option<&'static str>>) -> Self {
self.qualname_prefix = prefix.into();
self
}

/// Register a callback for coroutine `throw` method.
///
/// The exception passed to `throw` is then redirected to this callback, notifying the
/// associated [`CancelHandle`], without being reraised.
pub fn with_throw_callback(mut self, callback: impl Into<Option<ThrowCallback>>) -> Self {
self.throw_callback = callback.into();
self
}

/// Release the GIL while polling the future wrapped.
pub fn with_allow_threads(mut self, allow_threads: bool) -> Self {
self.allow_threads = allow_threads;
self
}

fn poll_inner(&mut self, py: Python<'_>, mut op: CoroOp) -> PyResult<PyObject> {
// raise if the coroutine has already been run to completion
let future_rs = match self.future {
Expand Down Expand Up @@ -147,7 +156,7 @@ impl Coroutine {
// poll the future and forward its results if ready; otherwise, yield from waker
// polling is UnwindSafe because the future is dropped in case of panic
let waker = Waker::from(self.waker.clone().unwrap());
let poll = || future_rs.as_mut().poll(py, &waker);
let poll = || future_rs.as_mut().poll(py, &waker, self.allow_threads);
match panic::catch_unwind(panic::AssertUnwindSafe(poll)) {
Err(err) => Err(PanicException::from_panic_payload(err)),
Ok(Poll::Ready(res)) => Err(PyStopIteration::new_err(res?)),
Expand All @@ -172,22 +181,16 @@ impl Coroutine {
#[pymethods(crate = "crate")]
impl Coroutine {
#[getter]
fn __name__(&self, py: Python<'_>) -> PyResult<Py<PyString>> {
match &self.name {
Some(name) => Ok(name.clone_ref(py)),
None => Err(PyAttributeError::new_err("__name__")),
}
fn __name__<'py>(&self, py: Python<'py>) -> Bound<'py, PyString> {
PyString::new_bound(py, &self.name)
}

#[getter]
fn __qualname__(&self, py: Python<'_>) -> PyResult<Py<PyString>> {
match (&self.name, &self.qualname_prefix) {
(Some(name), Some(prefix)) => Ok(format!("{}.{}", prefix, name.bind(py).to_cow()?)
.as_str()
.into_py(py)),
(Some(name), None) => Ok(name.clone_ref(py)),
(None, _) => Err(PyAttributeError::new_err("__qualname__")),
}
fn __qualname__<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyString>> {
Ok(match &self.qualname_prefix {
Some(prefix) => PyString::new_bound(py, &format!("{}.{}", prefix, self.name)),
None => self.__name__(py),
})
}

fn send(&mut self, py: Python<'_>, value: PyObject) -> PyResult<PyObject> {
Expand Down
4 changes: 2 additions & 2 deletions src/coroutine/cancel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,13 @@ impl CancelHandle {
Cancelled(self).await
}

#[doc(hidden)]
/// Instantiate a [`ThrowCallback`] associated to this cancel handle.
pub fn throw_callback(&self) -> ThrowCallback {
ThrowCallback(self.0.clone())
}
}

#[doc(hidden)]
/// Callback for coroutine `throw` method, notifying the associated [`CancelHandle`]
pub struct ThrowCallback(Arc<Mutex<Inner>>);

impl ThrowCallback {
Expand Down
35 changes: 3 additions & 32 deletions src/impl_/coroutine.rs
Original file line number Diff line number Diff line change
@@ -1,39 +1,10 @@
use std::{
future::Future,
ops::{Deref, DerefMut},
};
use std::ops::{Deref, DerefMut};

use crate::{
coroutine::{Coroutine, ThrowCallback},
instance::Bound,
marker::Ungil,
pycell::impl_::PyClassBorrowChecker,
pyclass::boolean_struct::False,
types::{PyAnyMethods, PyString},
IntoPy, Py, PyAny, PyClass, PyErr, PyObject, PyResult, Python,
instance::Bound, pycell::impl_::PyClassBorrowChecker, pyclass::boolean_struct::False,
types::PyAnyMethods, Py, PyAny, PyClass, PyResult, Python,
};

pub fn new_coroutine<F, T, E>(
name: &Bound<'_, PyString>,
qualname_prefix: Option<&'static str>,
throw_callback: Option<ThrowCallback>,
allow_threads: bool,
future: F,
) -> Coroutine
where
F: Future<Output = Result<T, E>> + Send + Ungil + 'static,
T: IntoPy<PyObject> + Send + Ungil,
E: Into<PyErr> + Send + Ungil,
{
Coroutine::new(
Some(name.clone().into()),
qualname_prefix,
throw_callback,
allow_threads,
future,
)
}

fn get_ptr<T: PyClass>(obj: &Py<T>) -> *mut T {
obj.get_class_object().get_ptr()
}
Expand Down
Loading

0 comments on commit 48a4e0c

Please sign in to comment.