Skip to content

Commit

Permalink
feat: add coroutine::await_in_coroutine to await awaitables in coro…
Browse files Browse the repository at this point in the history
…utine context
  • Loading branch information
wyfo committed Apr 25, 2024
1 parent 08948df commit 18f59ea
Show file tree
Hide file tree
Showing 13 changed files with 730 additions and 217 deletions.
37 changes: 19 additions & 18 deletions guide/src/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,35 +6,36 @@

- [Getting started](getting-started.md)
- [Using Rust from Python](rust-from-python.md)
- [Python modules](module.md)
- [Python functions](function.md)
- [Function signatures](function/signature.md)
- [Error handling](function/error-handling.md)
- [Python classes](class.md)
- [Class customizations](class/protocols.md)
- [Basic object customization](class/object.md)
- [Emulating numeric types](class/numeric.md)
- [Emulating callable objects](class/call.md)
- [Python modules](module.md)
- [Python functions](function.md)
- [Function signatures](function/signature.md)
- [Error handling](function/error-handling.md)
- [Python classes](class.md)
- [Class customizations](class/protocols.md)
- [Basic object customization](class/object.md)
- [Emulating numeric types](class/numeric.md)
- [Emulating callable objects](class/call.md)
- [Calling Python from Rust](python-from-rust.md)
- [Python object types](types.md)
- [Python exceptions](exception.md)
- [Calling Python functions](python-from-rust/function-calls.md)
- [Executing existing Python code](python-from-rust/calling-existing-code.md)
- [Python object types](types.md)
- [Python exceptions](exception.md)
- [Calling Python functions](python-from-rust/function-calls.md)
- [Executing existing Python code](python-from-rust/calling-existing-code.md)
- [Type conversions](conversions.md)
- [Mapping of Rust types to Python types](conversions/tables.md)
- [Conversion traits](conversions/traits.md)
- [Mapping of Rust types to Python types](conversions/tables.md)
- [Conversion traits](conversions/traits.md)
- [Using `async` and `await`](async-await.md)
- [Awaiting Python awaitables](async-await/awaiting_python_awaitables)
- [Parallelism](parallelism.md)
- [Debugging](debugging.md)
- [Features reference](features.md)
- [Memory management](memory.md)
- [Performance](performance.md)
- [Advanced topics](advanced.md)
- [Building and distribution](building-and-distribution.md)
- [Supporting multiple Python versions](building-and-distribution/multiple-python-versions.md)
- [Supporting multiple Python versions](building-and-distribution/multiple-python-versions.md)
- [Useful crates](ecosystem.md)
- [Logging](ecosystem/logging.md)
- [Using `async` and `await`](ecosystem/async-await.md)
- [Logging](ecosystem/logging.md)
- [Using `async` and `await`](ecosystem/async-await.md)
- [FAQ and troubleshooting](faq.md)

---
Expand Down
62 changes: 62 additions & 0 deletions guide/src/async-await/awaiting_python_awaitables.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# Awaiting Python awaitables

Python awaitable can be awaited on Rust side
using [`await_in_coroutine`]({{#PYO3_DOCS_URL}}/pyo3/coroutine/function.await_in_coroutine).

```rust
# # ![allow(dead_code)]
# #[cfg(feature = "experimental-async")] {
use pyo3::{prelude::*, coroutine::await_in_coroutine};

#[pyfunction]
async fn wrap_awaitable(awaitable: PyObject) -> PyResult<PyObject> {
Python::with_gil(|gil| await_in_coroutine(awaitable.bind(gil)))?.await
}
# }
```

Behind the scene, `await_in_coroutine` calls the `__await__` method of the Python awaitable (or `__iter__` for
generator-based coroutine).

## Restrictions

As the name suggests, `await_in_coroutine` resulting future can only be awaited in coroutine context. Otherwise, it
panics.

```rust
# # ![allow(dead_code)]
# #[cfg(feature = "experimental-async")] {
use pyo3::{prelude::*, coroutine::await_in_coroutine};

#[pyfunction]
fn block_on(awaitable: PyObject) -> PyResult<PyObject> {
let future = Python::with_gil(|gil| await_in_coroutine(awaitable.bind(gil)))?;
futures::executor::block_on(future) // ERROR: PyFuture must be awaited in coroutine context
}
# }
```

The future must also be the only one to be awaited at a time; it means that it's forbidden to await it in a `select!`.
Otherwise, it panics.

```rust
# # ![allow(dead_code)]
# #[cfg(feature = "experimental-async")] {
use futures::FutureExt;
use pyo3::{prelude::*, coroutine::await_in_coroutine};

#[pyfunction]
async fn select(awaitable: PyObject) -> PyResult<PyObject> {
let future = Python::with_gil(|gil| await_in_coroutine(awaitable.bind(gil)))?;
futures::select_biased! {
_ = std::future::pending::<()>().fuse() => unreachable!(),
res = future.fuse() => res, // ERROR: Python awaitable mixed with Rust future
}
}
# }
```

These restrictions exist because awaiting a `await_in_coroutine` future strongly binds it to the
enclosing coroutine. The coroutine will then delegate its `send`/`throw`/`close` methods to the
awaited future. If it was awaited in a `select!`, `Coroutine::send` would no able to know if
the value passed would have to be delegated or not.
1 change: 1 addition & 0 deletions newsfragments/3611.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add `coroutine::await_in_coroutine` to await awaitables in coroutine context
6 changes: 5 additions & 1 deletion pyo3-ffi/src/abstract_.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,11 @@ extern "C" {
pub fn PyIter_Next(arg1: *mut PyObject) -> *mut PyObject;
#[cfg(all(not(PyPy), Py_3_10))]
#[cfg_attr(PyPy, link_name = "PyPyIter_Send")]
pub fn PyIter_Send(iter: *mut PyObject, arg: *mut PyObject, presult: *mut *mut PyObject);
pub fn PyIter_Send(
iter: *mut PyObject,
arg: *mut PyObject,
presult: *mut *mut PyObject,
) -> c_int;

#[cfg_attr(PyPy, link_name = "PyPyNumber_Check")]
pub fn PyNumber_Check(o: *mut PyObject) -> c_int;
Expand Down
112 changes: 60 additions & 52 deletions src/coroutine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,30 @@ use std::{
use pyo3_macros::{pyclass, pymethods};

use crate::{
coroutine::{cancel::ThrowCallback, waker::AsyncioWaker},
exceptions::{PyAttributeError, PyRuntimeError, PyStopIteration},
coroutine::waker::CoroutineWaker,
exceptions::{PyAttributeError, PyGeneratorExit, PyRuntimeError, PyStopIteration},
marker::Ungil,
panic::PanicException,
types::{string::PyStringMethods, PyIterator, PyString},
Bound, IntoPy, Py, PyAny, PyErr, PyObject, PyResult, Python,
types::{string::PyStringMethods, PyString},
IntoPy, Py, PyErr, PyObject, PyResult, Python,
};

pub(crate) mod cancel;
mod asyncio;
mod awaitable;
mod cancel;
mod waker;

use crate::marker::Ungil;
pub use cancel::CancelHandle;
pub use awaitable::await_in_coroutine;
pub use cancel::{CancelHandle, ThrowCallback};

const COROUTINE_REUSED_ERROR: &str = "cannot reuse already awaited coroutine";

pub(crate) enum CoroOp {
Send(PyObject),
Throw(PyObject),
Close,
}

trait CoroutineFuture: Send {
fn poll(self: Pin<&mut Self>, py: Python<'_>, waker: &Waker) -> Poll<PyResult<PyObject>>;
}
Expand Down Expand Up @@ -69,7 +78,7 @@ pub struct Coroutine {
qualname_prefix: Option<&'static str>,
throw_callback: Option<ThrowCallback>,
future: Option<Pin<Box<dyn CoroutineFuture>>>,
waker: Option<Arc<AsyncioWaker>>,
waker: Option<Arc<CoroutineWaker>>,
}

impl Coroutine {
Expand Down Expand Up @@ -104,58 +113,55 @@ impl Coroutine {
}
}

fn poll(&mut self, py: Python<'_>, throw: Option<PyObject>) -> PyResult<PyObject> {
fn poll_inner(&mut self, py: Python<'_>, mut op: CoroOp) -> PyResult<PyObject> {
// raise if the coroutine has already been run to completion
let future_rs = match self.future {
Some(ref mut fut) => fut,
None => return Err(PyRuntimeError::new_err(COROUTINE_REUSED_ERROR)),
};
// reraise thrown exception it
match (throw, &self.throw_callback) {
(Some(exc), Some(cb)) => cb.throw(exc),
(Some(exc), None) => {
self.close();
return Err(PyErr::from_value_bound(exc.into_bound(py)));
}
(None, _) => {}
// if the future is not pending on a Python awaitable,
// execute throw callback or complete on close
if !matches!(self.waker, Some(ref w) if w.is_delegated(py)) {
match op {
send @ CoroOp::Send(_) => op = send,
CoroOp::Throw(exc) => match &self.throw_callback {
Some(cb) => {
cb.throw(exc.clone_ref(py));
op = CoroOp::Send(py.None());
}
None => return Err(PyErr::from_value_bound(exc.into_bound(py))),
},
CoroOp::Close => return Err(PyGeneratorExit::new_err(py.None())),
};
}
// create a new waker, or try to reset it in place
if let Some(waker) = self.waker.as_mut().and_then(Arc::get_mut) {
waker.reset();
waker.reset(op);
} else {
self.waker = Some(Arc::new(AsyncioWaker::new()));
self.waker = Some(Arc::new(CoroutineWaker::new(op)));
}
// poll the future and forward its results if ready
// poll the future and forward its results if ready; otherwise, yield from waker
// polling is UnwindSafe because the future is dropped in case of panic
let waker = Waker::from(self.waker.clone().unwrap());
let poll = || future_rs.as_mut().poll(py, &waker);
match panic::catch_unwind(panic::AssertUnwindSafe(poll)) {
Ok(Poll::Ready(res)) => {
self.close();
return Err(PyStopIteration::new_err(res?));
}
Err(err) => {
self.close();
return Err(PanicException::from_panic_payload(err));
}
_ => {}
Err(err) => Err(PanicException::from_panic_payload(err)),
Ok(Poll::Ready(res)) => Err(PyStopIteration::new_err(res?)),
Ok(Poll::Pending) => match self.waker.as_ref().unwrap().yield_(py) {
Ok(to_yield) => Ok(to_yield),
Err(err) => Err(err),
},
}
// otherwise, initialize the waker `asyncio.Future`
if let Some(future) = self.waker.as_ref().unwrap().initialize_future(py)? {
// `asyncio.Future` must be awaited; fortunately, it implements `__iter__ = __await__`
// and will yield itself if its result has not been set in polling above
if let Some(future) = PyIterator::from_bound_object(&future.as_borrowed())
.unwrap()
.next()
{
// future has not been leaked into Python for now, and Rust code can only call
// `set_result(None)` in `Wake` implementation, so it's safe to unwrap
return Ok(future.unwrap().into());
}
}

fn poll(&mut self, py: Python<'_>, op: CoroOp) -> PyResult<PyObject> {
let result = self.poll_inner(py, op);
if result.is_err() {
// the Rust future is dropped, and the field set to `None`
// to indicate the coroutine has been run to completion
drop(self.future.take());
}
// if waker has been waken during future polling, this is roughly equivalent to
// `await asyncio.sleep(0)`, so just yield `None`.
Ok(py.None().into_py(py))
result
}
}

Expand All @@ -180,25 +186,27 @@ impl Coroutine {
}
}

fn send(&mut self, py: Python<'_>, _value: &Bound<'_, PyAny>) -> PyResult<PyObject> {
self.poll(py, None)
fn send(&mut self, py: Python<'_>, value: PyObject) -> PyResult<PyObject> {
self.poll(py, CoroOp::Send(value))
}

fn throw(&mut self, py: Python<'_>, exc: PyObject) -> PyResult<PyObject> {
self.poll(py, Some(exc))
self.poll(py, CoroOp::Throw(exc))
}

fn close(&mut self) {
// the Rust future is dropped, and the field set to `None`
// to indicate the coroutine has been run to completion
drop(self.future.take());
fn close(&mut self, py: Python<'_>) -> PyResult<()> {
match self.poll(py, CoroOp::Close) {
Ok(_) => Ok(()),
Err(err) if err.is_instance_of::<PyGeneratorExit>(py) => Ok(()),
Err(err) => Err(err),
}
}

fn __await__(self_: Py<Self>) -> Py<Self> {
self_
}

fn __next__(&mut self, py: Python<'_>) -> PyResult<PyObject> {
self.poll(py, None)
self.poll(py, CoroOp::Send(py.None()))
}
}
Loading

0 comments on commit 18f59ea

Please sign in to comment.