diff --git a/Cargo.toml b/Cargo.toml index c7b25ff82b9..8b1c2851c61 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -63,6 +63,9 @@ pyo3-build-config = { path = "pyo3-build-config", version = "0.21.0-dev", featur [features] default = ["macros"] +# Switch coroutine implementation to anyio instead of asyncio +anyio = ["macros"] + # Enables pyo3::inspect module and additional type information on FromPyObject # and IntoPy traits experimental-inspect = [] diff --git a/guide/src/async-await.md b/guide/src/async-await.md index af20547ef1f..3995524971d 100644 --- a/guide/src/async-await.md +++ b/guide/src/async-await.md @@ -22,8 +22,6 @@ async fn sleep(seconds: f64, result: Option) -> Option { } ``` -*Python awaitables instantiated with this method can only be awaited in *asyncio* context. Other Python async runtime may be supported in the future.* - ## `Send + 'static` constraint Resulting future of an `async fn` decorated by `#[pyfunction]` must be `Send + 'static` to be embedded in a Python object. @@ -85,6 +83,13 @@ async fn cancellable(#[pyo3(cancel_handle)]mut cancel: CancelHandle) { } ``` +## *asyncio* vs. *anyio* + +By default, Python awaitables instantiated with `async fn` can only be awaited in *asyncio* context. + +PyO3 can also target [*anyio*](https://github.com/agronholm/anyio) with the dedicated `anyio` Cargo feature. With it enabled, `async fn` become awaitable both in *asyncio* or [*trio*](https://github.com/python-trio/trio) context. +However, it requires to have the [*sniffio*](https://github.com/python-trio/sniffio) (or *anyio*) library installed. + ## The `Coroutine` type To make a Rust future awaitable in Python, PyO3 defines a [`Coroutine`]({{#PYO3_DOCS_URL}}/pyo3/coroutine/struct.Coroutine.html) type, which implements the Python [coroutine protocol](https://docs.python.org/3/library/collections.abc.html#collections.abc.Coroutine). diff --git a/guide/src/building_and_distribution.md b/guide/src/building_and_distribution.md index 8cb675f4303..7c6cf30cce9 100644 --- a/guide/src/building_and_distribution.md +++ b/guide/src/building_and_distribution.md @@ -62,6 +62,7 @@ There are many ways to go about this: it is possible to use `cargo` to build the PyO3 has some Cargo features to configure projects for building Python extension modules: - The `extension-module` feature, which must be enabled when building Python extension modules. - The `abi3` feature and its version-specific `abi3-pyXY` companions, which are used to opt-in to the limited Python API in order to support multiple Python versions in a single wheel. +- The `anyio` feature, making PyO3 coroutines target [*anyio*](https://github.com/agronholm/anyio) instead of *asyncio*; either [*sniffio*](https://github.com/python-trio/sniffio) or *anyio* should be added as dependency of the Python extension. This section describes each of these packaging tools before describing how to build manually without them. It then proceeds with an explanation of the `extension-module` feature. Finally, there is a section describing PyO3's `abi3` features. diff --git a/newsfragments/3612.added.md b/newsfragments/3612.added.md new file mode 100644 index 00000000000..4f5f2f24014 --- /dev/null +++ b/newsfragments/3612.added.md @@ -0,0 +1 @@ +Support anyio with a Cargo feature \ No newline at end of file diff --git a/pytests/Cargo.toml b/pytests/Cargo.toml index 255094a6c40..d5865573dcd 100644 --- a/pytests/Cargo.toml +++ b/pytests/Cargo.toml @@ -7,7 +7,8 @@ edition = "2021" publish = false [dependencies] -pyo3 = { path = "../", features = ["extension-module"] } +futures = "0.3.29" +pyo3 = { path = "../", features = ["extension-module", "anyio"] } [build-dependencies] pyo3-build-config = { path = "../pyo3-build-config" } diff --git a/pytests/requirements-dev.txt b/pytests/requirements-dev.txt index aa0c703a8cd..38081ce2b31 100644 --- a/pytests/requirements-dev.txt +++ b/pytests/requirements-dev.txt @@ -1,3 +1,4 @@ +anyio[trio]>=4.0 hypothesis>=3.55 pytest>=6.0 pytest-asyncio>=0.21 diff --git a/pytests/src/anyio.rs b/pytests/src/anyio.rs new file mode 100644 index 00000000000..7423f74ba99 --- /dev/null +++ b/pytests/src/anyio.rs @@ -0,0 +1,21 @@ +use futures::channel::oneshot; +use pyo3::prelude::*; +use std::thread; +use std::time::Duration; + +#[pyfunction] +async fn sleep(seconds: f64, result: Option) -> Option { + let (tx, rx) = oneshot::channel(); + thread::spawn(move || { + thread::sleep(Duration::from_secs_f64(seconds)); + tx.send(()).unwrap(); + }); + rx.await.unwrap(); + result +} + +#[pymodule] +pub fn anyio(_py: Python<'_>, m: &PyModule) -> PyResult<()> { + m.add_function(wrap_pyfunction!(sleep, m)?)?; + Ok(()) +} diff --git a/pytests/src/lib.rs b/pytests/src/lib.rs index dbcd3ca4113..c7bd00eefe1 100644 --- a/pytests/src/lib.rs +++ b/pytests/src/lib.rs @@ -2,6 +2,7 @@ use pyo3::prelude::*; use pyo3::types::PyDict; use pyo3::wrap_pymodule; +pub mod anyio; pub mod awaitable; pub mod buf_and_str; pub mod comparisons; @@ -18,6 +19,7 @@ pub mod subclassing; #[pymodule] fn pyo3_pytests(py: Python<'_>, m: &PyModule) -> PyResult<()> { + m.add_wrapped(wrap_pymodule!(anyio::anyio))?; m.add_wrapped(wrap_pymodule!(awaitable::awaitable))?; #[cfg(not(Py_LIMITED_API))] m.add_wrapped(wrap_pymodule!(buf_and_str::buf_and_str))?; @@ -39,6 +41,7 @@ fn pyo3_pytests(py: Python<'_>, m: &PyModule) -> PyResult<()> { let sys = PyModule::import(py, "sys")?; let sys_modules: &PyDict = sys.getattr("modules")?.downcast()?; + sys_modules.set_item("pyo3_pytests.anyio", m.getattr("anyio")?)?; sys_modules.set_item("pyo3_pytests.awaitable", m.getattr("awaitable")?)?; sys_modules.set_item("pyo3_pytests.buf_and_str", m.getattr("buf_and_str")?)?; sys_modules.set_item("pyo3_pytests.comparisons", m.getattr("comparisons")?)?; diff --git a/pytests/tests/test_anyio.py b/pytests/tests/test_anyio.py new file mode 100644 index 00000000000..c48435bd2fc --- /dev/null +++ b/pytests/tests/test_anyio.py @@ -0,0 +1,14 @@ +import asyncio + +from pyo3_pytests.anyio import sleep +import trio + + +def test_asyncio(): + assert asyncio.run(sleep(0)) is None + assert asyncio.run(sleep(0.1, 42)) == 42 + + +def test_trio(): + assert trio.run(sleep, 0) is None + assert trio.run(sleep, 0.1, 42) == 42 diff --git a/src/coroutine.rs b/src/coroutine.rs index 1e7c2922209..7b7738037a5 100644 --- a/src/coroutine.rs +++ b/src/coroutine.rs @@ -18,8 +18,12 @@ use crate::{ IntoPy, Py, PyErr, PyObject, PyResult, Python, }; +#[cfg(feature = "anyio")] +mod anyio; mod asyncio; pub(crate) mod cancel; +#[cfg(feature = "anyio")] +mod trio; pub(crate) mod waker; use crate::coroutine::cancel::ThrowCallback; diff --git a/src/coroutine/anyio.rs b/src/coroutine/anyio.rs new file mode 100644 index 00000000000..72b9444c7c8 --- /dev/null +++ b/src/coroutine/anyio.rs @@ -0,0 +1,67 @@ +//! Coroutine implementation using sniffio to select the appropriate implementation, +//! compatible with anyio. +use crate::coroutine::{asyncio::AsyncioWaker, trio::TrioWaker}; +use crate::exceptions::PyRuntimeError; +use crate::sync::GILOnceCell; +use crate::{PyAny, PyErr, PyObject, PyResult, Python}; + +fn current_async_library(py: Python<'_>) -> PyResult<&PyAny> { + static CURRENT_ASYNC_LIBRARY: GILOnceCell = GILOnceCell::new(); + let import = || -> PyResult<_> { + let module = py.import("sniffio")?; + Ok(module.getattr("current_async_library")?.into()) + }; + CURRENT_ASYNC_LIBRARY + .get_or_try_init(py, import)? + .as_ref(py) + .call0() +} + +fn unsupported(runtime: &str) -> PyErr { + PyRuntimeError::new_err(format!("unsupported runtime {runtime}")) +} + +/// Sniffio/anyio-compatible coroutine waker. +/// +/// Polling a Rust future calls `sniffio.current_async_library` to select the appropriate +/// implementation, either asyncio or trio. +pub enum AnyioWaker { + /// [`AsyncioWaker`] + Asyncio(AsyncioWaker), + /// [`TrioWaker`] + Trio(TrioWaker), +} + +impl AnyioWaker { + pub(crate) fn new(py: Python<'_>) -> PyResult { + let sniffed = current_async_library(py)?; + match sniffed.extract()? { + "asyncio" => Ok(Self::Asyncio(AsyncioWaker::new(py)?)), + "trio" => Ok(Self::Trio(TrioWaker::new(py)?)), + rt => Err(unsupported(rt)), + } + } + + pub(crate) fn yield_(&self, py: Python<'_>) -> PyResult { + match self { + AnyioWaker::Asyncio(w) => w.yield_(py), + AnyioWaker::Trio(w) => w.yield_(py), + } + } + + pub(crate) fn yield_waken(py: Python<'_>) -> PyResult { + let sniffed = current_async_library(py)?; + match sniffed.extract()? { + "asyncio" => AsyncioWaker::yield_waken(py), + "trio" => TrioWaker::yield_waken(py), + rt => Err(unsupported(rt)), + } + } + + pub(crate) fn wake(&self, py: Python<'_>) -> PyResult<()> { + match self { + AnyioWaker::Asyncio(w) => w.wake(py), + AnyioWaker::Trio(w) => w.wake(py), + } + } +} diff --git a/src/coroutine/trio.rs b/src/coroutine/trio.rs new file mode 100644 index 00000000000..893c18652cc --- /dev/null +++ b/src/coroutine/trio.rs @@ -0,0 +1,84 @@ +//! Coroutine implementation compatible with trio. +use crate::sync::GILOnceCell; +use crate::types::{PyCFunction, PyIterator}; +use crate::{intern, wrap_pyfunction, Py, PyAny, PyObject, PyResult, Python}; +use pyo3_macros::pyfunction; + +struct Trio { + cancel_shielded_checkpoint: PyObject, + current_task: PyObject, + current_trio_token: PyObject, + reschedule: PyObject, + succeeded: PyObject, + wait_task_rescheduled: PyObject, +} +impl Trio { + fn get(py: Python<'_>) -> PyResult<&Self> { + static TRIO: GILOnceCell = GILOnceCell::new(); + TRIO.get_or_try_init(py, || { + let module = py.import("trio.lowlevel")?; + Ok(Self { + cancel_shielded_checkpoint: module.getattr("cancel_shielded_checkpoint")?.into(), + current_task: module.getattr("current_task")?.into(), + current_trio_token: module.getattr("current_trio_token")?.into(), + reschedule: module.getattr("reschedule")?.into(), + succeeded: module.getattr("Abort")?.getattr("SUCCEEDED")?.into(), + wait_task_rescheduled: module.getattr("wait_task_rescheduled")?.into(), + }) + }) + } +} + +fn yield_from(coro_func: &PyAny) -> PyResult { + PyIterator::from_object(coro_func.call_method0("__await__")?)? + .next() + .expect("cancel_shielded_checkpoint didn't yield") + .map(Into::into) +} + +/// Asyncio-compatible coroutine waker. +/// +/// Polling a Rust future yields `trio.lowlevel.wait_task_rescheduled()`, while `Waker::wake` +/// reschedule the current task. +pub struct TrioWaker { + task: PyObject, + token: PyObject, +} + +impl TrioWaker { + pub(super) fn new(py: Python<'_>) -> PyResult { + let trio = Trio::get(py)?; + let task = trio.current_task.call0(py)?; + let token = trio.current_trio_token.call0(py)?; + Ok(Self { task, token }) + } + + pub(super) fn yield_(&self, py: Python<'_>) -> PyResult { + static ABORT_FUNC: GILOnceCell> = GILOnceCell::new(); + let abort_func = + ABORT_FUNC.get_or_try_init(py, || wrap_pyfunction!(abort_func, py).map(Into::into))?; + let wait_task_rescheduled = Trio::get(py)? + .wait_task_rescheduled + .call1(py, (abort_func,))?; + yield_from(wait_task_rescheduled.as_ref(py)) + } + + pub(super) fn yield_waken(py: Python<'_>) -> PyResult { + let checkpoint = Trio::get(py)?.cancel_shielded_checkpoint.call0(py)?; + yield_from(checkpoint.as_ref(py)) + } + + pub(super) fn wake(&self, py: Python<'_>) -> PyResult<()> { + self.token.call_method1( + py, + intern!(py, "run_sync_soon"), + (&Trio::get(py)?.reschedule, &self.task), + )?; + Ok(()) + } +} + +#[pyfunction(crate = "crate")] +fn abort_func(py: Python<'_>, _arg: PyObject) -> PyResult { + Ok(Trio::get(py)?.succeeded.clone()) +} diff --git a/src/coroutine/waker.rs b/src/coroutine/waker.rs index 0fb8c56eae9..5d58b062f89 100644 --- a/src/coroutine/waker.rs +++ b/src/coroutine/waker.rs @@ -1,4 +1,3 @@ -use crate::coroutine::asyncio::AsyncioWaker; use crate::exceptions::PyStopIteration; use crate::pyclass::IterNextOutput; use crate::sync::GILOnceCell; @@ -8,6 +7,14 @@ use std::cell::Cell; use std::sync::Arc; use std::task::{Poll, Wake}; +cfg_if::cfg_if! { + if #[cfg(feature = "anyio")] { + type WakerImpl = crate::coroutine::anyio::AnyioWaker; + } else { + type WakerImpl = crate::coroutine::asyncio::AsyncioWaker; + } +} + const MIXED_AWAITABLE_AND_FUTURE_ERROR: &str = "Python awaitable mixed with Rust future"; pub(crate) enum FutureOrPoll { @@ -22,7 +29,7 @@ thread_local! { enum State { Waken, Yielded(PyObject), - Pending(AsyncioWaker), + Pending(WakerImpl), } pub(super) struct CoroutineWaker { @@ -48,10 +55,10 @@ impl CoroutineWaker { } pub(super) fn yield_(&self, py: Python<'_>) -> PyResult { - let init = || PyResult::Ok(State::Pending(AsyncioWaker::new(py)?)); + let init = || PyResult::Ok(State::Pending(WakerImpl::new(py)?)); let state = self.state.get_or_try_init(py, init)?; match state { - State::Waken => AsyncioWaker::yield_waken(py), + State::Waken => WakerImpl::yield_waken(py), State::Yielded(obj) => Ok(obj.clone()), State::Pending(waker) => waker.yield_(py), }