Skip to content

Commit

Permalink
Added support for subinterpreter workers (#850)
Browse files Browse the repository at this point in the history
  • Loading branch information
agronholm authored Jan 5, 2025
1 parent 6d612a9 commit 264a6f9
Show file tree
Hide file tree
Showing 9 changed files with 374 additions and 1 deletion.
8 changes: 8 additions & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ Running code in worker threads
.. autofunction:: anyio.to_thread.run_sync
.. autofunction:: anyio.to_thread.current_default_thread_limiter

Running code in subinterpreters
-------------------------------

.. autofunction:: anyio.to_interpreter.run_sync
.. autofunction:: anyio.to_interpreter.current_default_interpreter_limiter

Running code in worker processes
--------------------------------

Expand Down Expand Up @@ -189,6 +195,8 @@ Exceptions
----------

.. autoexception:: anyio.BrokenResourceError
.. autoexception:: anyio.BrokenWorkerIntepreter
.. autoexception:: anyio.BrokenWorkerProcess
.. autoexception:: anyio.BusyResourceError
.. autoexception:: anyio.ClosedResourceError
.. autoexception:: anyio.DelimiterNotFound
Expand Down
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ The manual
networking
threads
subprocesses
subinterpreters
fileio
signals
testing
Expand Down
51 changes: 51 additions & 0 deletions docs/subinterpreters.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
Working with subinterpreters
============================

.. py:currentmodule:: anyio
Subinterpreters offer a middle ground between worker threads and worker processes. They
allow you to utilize multiple CPU cores to run Python code while avoiding the overhead
and complexities of spawning subprocesses.

.. warning:: Subinterpreter support is considered **experimental**. The underlying
Python API for managing subinterpreters has not been finalized yet, and has had
little real-world testing. As such, it is not recommended to use this feature for
anything important yet.

Running a function in a worker interpreter
------------------------------------------

Running functions in a worker interpreter makes sense when:

* The code you want to run in parallel is CPU intensive
* The code is either pure Python code, or extension code that does not release the
Global Interpreter Lock (GIL)

If the code you're trying to run only does blocking network I/O, or file I/O, then
you're better off using :doc:`worker thread <threads>` instead.

This is done by using :func:`.interpreter.run_sync`::

import time

from anyio import run, to_interpreter

from yourothermodule import cpu_intensive_function

async def main():
result = await to_interpreter.run_sync(
cpu_intensive_function, 'Hello, ', 'world!'
)
print(result)

run(main)

Limitations
-----------

* Subinterpreters are only supported on Python 3.13 or later
* Code in the ``__main__`` module cannot be run with this (as a consequence, this
applies to any functions defined in the REPL)
* The target functions cannot react to cancellation
* Unlike with threads, the code running in the subinterpreter cannot share mutable data
with other interpreters/threads (however, sharing _immutable_ data is fine)
5 changes: 4 additions & 1 deletion docs/subprocesses.rst
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,16 @@ Running functions in worker processes
-------------------------------------

When you need to run CPU intensive code, worker processes are better than threads
because current implementations of Python cannot run Python code in multiple threads at
because, with the exception of the experimental free-threaded builds of Python 3.13 and
later, current implementations of Python cannot run Python code in multiple threads at
once.

Exceptions to this rule are:

#. Blocking I/O operations
#. C extension code that explicitly releases the Global Interpreter Lock
#. :doc:`Subinterpreter workers <subinterpreters>`
(experimental; available on Python 3.13 and later)

If the code you wish to run does not belong in this category, it's best to use worker
processes instead in order to take advantage of multiple CPU cores.
Expand Down
2 changes: 2 additions & 0 deletions docs/versionhistory.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ This library adheres to `Semantic Versioning 2.0 <http://semver.org/>`_.

**UNRELEASED**

- Added **experimental** support for running functions in subinterpreters on Python
3.13 and later
- Added support for the ``copy()``, ``copy_into()``, ``move()`` and ``move_into()``
methods in ``anyio.Path``, available in Python 3.14
- Changed ``TaskGroup`` on asyncio to always spawn tasks non-eagerly, even if using a
Expand Down
1 change: 1 addition & 0 deletions src/anyio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from ._core._eventloop import sleep_forever as sleep_forever
from ._core._eventloop import sleep_until as sleep_until
from ._core._exceptions import BrokenResourceError as BrokenResourceError
from ._core._exceptions import BrokenWorkerIntepreter as BrokenWorkerIntepreter
from ._core._exceptions import BrokenWorkerProcess as BrokenWorkerProcess
from ._core._exceptions import BusyResourceError as BusyResourceError
from ._core._exceptions import ClosedResourceError as ClosedResourceError
Expand Down
37 changes: 37 additions & 0 deletions src/anyio/_core/_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import sys
from collections.abc import Generator
from textwrap import dedent
from typing import Any

if sys.version_info < (3, 11):
from exceptiongroup import BaseExceptionGroup
Expand All @@ -21,6 +23,41 @@ class BrokenWorkerProcess(Exception):
"""


class BrokenWorkerIntepreter(Exception):
"""
Raised by :meth:`~anyio.to_interpreter.run_sync` if an unexpected exception is
raised in the subinterpreter.
"""

def __init__(self, excinfo: Any):
# This was adapted from concurrent.futures.interpreter.ExecutionFailed
msg = excinfo.formatted
if not msg:
if excinfo.type and excinfo.msg:
msg = f"{excinfo.type.__name__}: {excinfo.msg}"
else:
msg = excinfo.type.__name__ or excinfo.msg

super().__init__(msg)
self.excinfo = excinfo

def __str__(self) -> str:
try:
formatted = self.excinfo.errdisplay
except Exception:
return super().__str__()
else:
return dedent(
f"""
{super().__str__()}
Uncaught in the interpreter:
{formatted}
""".strip()
)


class BusyResourceError(Exception):
"""
Raised when two tasks are trying to read from or write to the same resource
Expand Down
218 changes: 218 additions & 0 deletions src/anyio/to_interpreter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
from __future__ import annotations

import atexit
import os
import pickle
import sys
from collections import deque
from collections.abc import Callable
from textwrap import dedent
from typing import Any, Final, TypeVar

from . import current_time, to_thread
from ._core._exceptions import BrokenWorkerIntepreter
from ._core._synchronization import CapacityLimiter
from .lowlevel import RunVar

if sys.version_info >= (3, 11):
from typing import TypeVarTuple, Unpack
else:
from typing_extensions import TypeVarTuple, Unpack

UNBOUND: Final = 2 # I have no clue how this works, but it was used in the stdlib
FMT_UNPICKLED: Final = 0
FMT_PICKLED: Final = 1
DEFAULT_CPU_COUNT: Final = 8 # this is just an arbitrarily selected value
MAX_WORKER_IDLE_TIME = (
30 # seconds a subinterpreter can be idle before becoming eligible for pruning
)

T_Retval = TypeVar("T_Retval")
PosArgsT = TypeVarTuple("PosArgsT")

_idle_workers = RunVar[deque["Worker"]]("_available_workers")
_default_interpreter_limiter = RunVar[CapacityLimiter]("_default_interpreter_limiter")


class Worker:
_run_func = compile(
dedent("""
import _interpqueues as queues
import _interpreters as interpreters
from pickle import loads, dumps, HIGHEST_PROTOCOL
item = queues.get(queue_id)[0]
try:
func, args = loads(item)
retval = func(*args)
except BaseException as exc:
is_exception = True
retval = exc
else:
is_exception = False
try:
queues.put(queue_id, (retval, is_exception), FMT_UNPICKLED, UNBOUND)
except interpreters.NotShareableError:
retval = dumps(retval, HIGHEST_PROTOCOL)
queues.put(queue_id, (retval, is_exception), FMT_PICKLED, UNBOUND)
"""),
"<string>",
"exec",
)

last_used: float = 0

_initialized: bool = False
_interpreter_id: int
_queue_id: int

def initialize(self) -> None:
import _interpqueues as queues
import _interpreters as interpreters

self._interpreter_id = interpreters.create()
self._queue_id = queues.create(2, FMT_UNPICKLED, UNBOUND) # type: ignore[call-arg]
self._initialized = True
interpreters.set___main___attrs(
self._interpreter_id,
{
"queue_id": self._queue_id,
"FMT_PICKLED": FMT_PICKLED,
"FMT_UNPICKLED": FMT_UNPICKLED,
"UNBOUND": UNBOUND,
},
)

def destroy(self) -> None:
import _interpqueues as queues
import _interpreters as interpreters

if self._initialized:
interpreters.destroy(self._interpreter_id)
queues.destroy(self._queue_id)

def _call(
self,
func: Callable[..., T_Retval],
args: tuple[Any],
) -> tuple[Any, bool]:
import _interpqueues as queues
import _interpreters as interpreters

if not self._initialized:
self.initialize()

payload = pickle.dumps((func, args), pickle.HIGHEST_PROTOCOL)
queues.put(self._queue_id, payload, FMT_PICKLED, UNBOUND) # type: ignore[call-arg]

res: Any
is_exception: bool
if exc_info := interpreters.exec(self._interpreter_id, self._run_func): # type: ignore[func-returns-value,arg-type]
raise BrokenWorkerIntepreter(exc_info)

(res, is_exception), fmt = queues.get(self._queue_id)[:2]
if fmt == FMT_PICKLED:
res = pickle.loads(res)

return res, is_exception

async def call(
self,
func: Callable[..., T_Retval],
args: tuple[Any],
limiter: CapacityLimiter,
) -> T_Retval:
result, is_exception = await to_thread.run_sync(
self._call,
func,
args,
limiter=limiter,
)
if is_exception:
raise result

return result


def _stop_workers(workers: deque[Worker]) -> None:
for worker in workers:
worker.destroy()

workers.clear()


async def run_sync(
func: Callable[[Unpack[PosArgsT]], T_Retval],
*args: Unpack[PosArgsT],
limiter: CapacityLimiter | None = None,
) -> T_Retval:
"""
Call the given function with the given arguments in a subinterpreter.
If the ``cancellable`` option is enabled and the task waiting for its completion is
cancelled, the call will still run its course but its return value (or any raised
exception) will be ignored.
.. warning:: This feature is **experimental**. The upstream interpreter API has not
yet been finalized or thoroughly tested, so don't rely on this for anything
mission critical.
:param func: a callable
:param args: positional arguments for the callable
:param limiter: capacity limiter to use to limit the total amount of subinterpreters
running (if omitted, the default limiter is used)
:return: the result of the call
:raises BrokenWorkerIntepreter: if there's an internal error in a subinterpreter
"""
if sys.version_info <= (3, 13):
raise RuntimeError("subinterpreters require at least Python 3.13")

if limiter is None:
limiter = current_default_interpreter_limiter()

try:
idle_workers = _idle_workers.get()
except LookupError:
idle_workers = deque()
_idle_workers.set(idle_workers)
atexit.register(_stop_workers, idle_workers)

async with limiter:
try:
worker = idle_workers.pop()
except IndexError:
worker = Worker()

try:
return await worker.call(func, args, limiter)
finally:
# Prune workers that have been idle for too long
now = current_time()
while idle_workers:
if now - idle_workers[0].last_used <= MAX_WORKER_IDLE_TIME:
break

await to_thread.run_sync(idle_workers.popleft().destroy, limiter=limiter)

worker.last_used = current_time()
idle_workers.append(worker)


def current_default_interpreter_limiter() -> CapacityLimiter:
"""
Return the capacity limiter that is used by default to limit the number of
concurrently running subinterpreters.
Defaults to the number of CPU cores.
:return: a capacity limiter object
"""
try:
return _default_interpreter_limiter.get()
except LookupError:
limiter = CapacityLimiter(os.cpu_count() or DEFAULT_CPU_COUNT)
_default_interpreter_limiter.set(limiter)
return limiter
Loading

0 comments on commit 264a6f9

Please sign in to comment.