Skip to content

Commit

Permalink
pythongh-76785: Update test.support.interpreters to Align With PEP 734 (
Browse files Browse the repository at this point in the history
pythongh-115566)

This brings the code under test.support.interpreters, and the corresponding extension modules, in line with recent updates to PEP 734.

(Note: PEP 734 has not been accepted at this time.  However, we are using an internal copy of the implementation in the test suite to exercise the existing subinterpreters feature.)
  • Loading branch information
ericsnowcurrently authored and woodruffw committed Mar 4, 2024
1 parent e8dfde0 commit e3e4bbe
Show file tree
Hide file tree
Showing 11 changed files with 624 additions and 157 deletions.
51 changes: 40 additions & 11 deletions Lib/test/support/interpreters/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@

# aliases:
from _xxsubinterpreters import (
InterpreterError, InterpreterNotFoundError,
InterpreterError, InterpreterNotFoundError, NotShareableError,
is_shareable,
)


__all__ = [
'get_current', 'get_main', 'create', 'list_all', 'is_shareable',
'Interpreter',
'InterpreterError', 'InterpreterNotFoundError', 'ExecFailure',
'InterpreterError', 'InterpreterNotFoundError', 'ExecutionFailed',
'NotShareableError',
'create_queue', 'Queue', 'QueueEmpty', 'QueueFull',
]

Expand Down Expand Up @@ -42,7 +43,11 @@ def __getattr__(name):
{formatted}
""".strip()

class ExecFailure(RuntimeError):
class ExecutionFailed(RuntimeError):
"""An unhandled exception happened during execution.
This is raised from Interpreter.exec() and Interpreter.call().
"""

def __init__(self, excinfo):
msg = excinfo.formatted
Expand Down Expand Up @@ -157,7 +162,7 @@ def prepare_main(self, ns=None, /, **kwargs):
ns = dict(ns, **kwargs) if ns is not None else kwargs
_interpreters.set___main___attrs(self._id, ns)

def exec_sync(self, code, /):
def exec(self, code, /):
"""Run the given source code in the interpreter.
This is essentially the same as calling the builtin "exec"
Expand All @@ -166,22 +171,46 @@ def exec_sync(self, code, /):
There is no return value.
If the code raises an unhandled exception then an ExecFailure
is raised, which summarizes the unhandled exception. The actual
exception is discarded because objects cannot be shared between
interpreters.
If the code raises an unhandled exception then an ExecutionFailed
exception is raised, which summarizes the unhandled exception.
The actual exception is discarded because objects cannot be
shared between interpreters.
This blocks the current Python thread until done. During
that time, the previous interpreter is allowed to run
in other threads.
"""
excinfo = _interpreters.exec(self._id, code)
if excinfo is not None:
raise ExecFailure(excinfo)
raise ExecutionFailed(excinfo)

def call(self, callable, /):
"""Call the object in the interpreter with given args/kwargs.
Only functions that take no arguments and have no closure
are supported.
def run(self, code, /):
The return value is discarded.
If the callable raises an exception then the error display
(including full traceback) is send back between the interpreters
and an ExecutionFailed exception is raised, much like what
happens with Interpreter.exec().
"""
# XXX Support args and kwargs.
# XXX Support arbitrary callables.
# XXX Support returning the return value (e.g. via pickle).
excinfo = _interpreters.call(self._id, callable)
if excinfo is not None:
raise ExecutionFailed(excinfo)

def call_in_thread(self, callable, /):
"""Return a new thread that calls the object in the interpreter.
The return value and any raised exception are discarded.
"""
def task():
self.exec_sync(code)
self.call(callable)
t = threading.Thread(target=task)
t.start()
return t
74 changes: 63 additions & 11 deletions Lib/test/support/interpreters/queues.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Cross-interpreter Queues High Level Module."""

import pickle
import queue
import time
import weakref
Expand Down Expand Up @@ -31,38 +32,47 @@ class QueueFull(_queues.QueueFull, queue.Full):
"""


def create(maxsize=0):
_SHARED_ONLY = 0
_PICKLED = 1

def create(maxsize=0, *, syncobj=False):
"""Return a new cross-interpreter queue.
The queue may be used to pass data safely between interpreters.
"syncobj" sets the default for Queue.put()
and Queue.put_nowait().
"""
qid = _queues.create(maxsize)
return Queue(qid)
fmt = _SHARED_ONLY if syncobj else _PICKLED
qid = _queues.create(maxsize, fmt)
return Queue(qid, _fmt=fmt)


def list_all():
"""Return a list of all open queues."""
return [Queue(qid)
for qid in _queues.list_all()]

return [Queue(qid, _fmt=fmt)
for qid, fmt in _queues.list_all()]


_known_queues = weakref.WeakValueDictionary()

class Queue:
"""A cross-interpreter queue."""

def __new__(cls, id, /):
def __new__(cls, id, /, *, _fmt=None):
# There is only one instance for any given ID.
if isinstance(id, int):
id = int(id)
else:
raise TypeError(f'id must be an int, got {id!r}')
if _fmt is None:
_fmt = _queues.get_default_fmt(id)
try:
self = _known_queues[id]
except KeyError:
self = super().__new__(cls)
self._id = id
self._fmt = _fmt
_known_queues[id] = self
_queues.bind(id)
return self
Expand Down Expand Up @@ -105,20 +115,50 @@ def qsize(self):
return _queues.get_count(self._id)

def put(self, obj, timeout=None, *,
syncobj=None,
_delay=10 / 1000, # 10 milliseconds
):
"""Add the object to the queue.
This blocks while the queue is full.
If "syncobj" is None (the default) then it uses the
queue's default, set with create_queue()..
If "syncobj" is false then all objects are supported,
at the expense of worse performance.
If "syncobj" is true then the object must be "shareable".
Examples of "shareable" objects include the builtin singletons,
str, and memoryview. One benefit is that such objects are
passed through the queue efficiently.
The key difference, though, is conceptual: the corresponding
object returned from Queue.get() will be strictly equivalent
to the given obj. In other words, the two objects will be
effectively indistinguishable from each other, even if the
object is mutable. The received object may actually be the
same object, or a copy (immutable values only), or a proxy.
Regardless, the received object should be treated as though
the original has been shared directly, whether or not it
actually is. That's a slightly different and stronger promise
than just (initial) equality, which is all "syncobj=False"
can promise.
"""
if syncobj is None:
fmt = self._fmt
else:
fmt = _SHARED_ONLY if syncobj else _PICKLED
if timeout is not None:
timeout = int(timeout)
if timeout < 0:
raise ValueError(f'timeout value must be non-negative')
end = time.time() + timeout
if fmt is _PICKLED:
obj = pickle.dumps(obj)
while True:
try:
_queues.put(self._id, obj)
_queues.put(self._id, obj, fmt)
except _queues.QueueFull as exc:
if timeout is not None and time.time() >= end:
exc.__class__ = QueueFull
Expand All @@ -127,9 +167,15 @@ def put(self, obj, timeout=None, *,
else:
break

def put_nowait(self, obj):
def put_nowait(self, obj, *, syncobj=None):
if syncobj is None:
fmt = self._fmt
else:
fmt = _SHARED_ONLY if syncobj else _PICKLED
if fmt is _PICKLED:
obj = pickle.dumps(obj)
try:
return _queues.put(self._id, obj)
_queues.put(self._id, obj, fmt)
except _queues.QueueFull as exc:
exc.__class__ = QueueFull
raise # re-raise
Expand All @@ -148,12 +194,18 @@ def get(self, timeout=None, *,
end = time.time() + timeout
while True:
try:
return _queues.get(self._id)
obj, fmt = _queues.get(self._id)
except _queues.QueueEmpty as exc:
if timeout is not None and time.time() >= end:
exc.__class__ = QueueEmpty
raise # re-raise
time.sleep(_delay)
else:
break
if fmt == _PICKLED:
obj = pickle.loads(obj)
else:
assert fmt == _SHARED_ONLY
return obj

def get_nowait(self):
Expand Down
Loading

0 comments on commit e3e4bbe

Please sign in to comment.