diff --git a/Lib/test/support/interpreters/__init__.py b/Lib/test/support/interpreters/__init__.py index 15a908e9663593..d02ffbae1113c0 100644 --- a/Lib/test/support/interpreters/__init__.py +++ b/Lib/test/support/interpreters/__init__.py @@ -6,7 +6,7 @@ # aliases: from _xxsubinterpreters import ( - InterpreterError, InterpreterNotFoundError, + InterpreterError, InterpreterNotFoundError, NotShareableError, is_shareable, ) @@ -14,7 +14,8 @@ __all__ = [ 'get_current', 'get_main', 'create', 'list_all', 'is_shareable', 'Interpreter', - 'InterpreterError', 'InterpreterNotFoundError', 'ExecFailure', + 'InterpreterError', 'InterpreterNotFoundError', 'ExecutionFailed', + 'NotShareableError', 'create_queue', 'Queue', 'QueueEmpty', 'QueueFull', ] @@ -42,7 +43,11 @@ def __getattr__(name): {formatted} """.strip() -class ExecFailure(RuntimeError): +class ExecutionFailed(RuntimeError): + """An unhandled exception happened during execution. + + This is raised from Interpreter.exec() and Interpreter.call(). + """ def __init__(self, excinfo): msg = excinfo.formatted @@ -157,7 +162,7 @@ def prepare_main(self, ns=None, /, **kwargs): ns = dict(ns, **kwargs) if ns is not None else kwargs _interpreters.set___main___attrs(self._id, ns) - def exec_sync(self, code, /): + def exec(self, code, /): """Run the given source code in the interpreter. This is essentially the same as calling the builtin "exec" @@ -166,10 +171,10 @@ def exec_sync(self, code, /): There is no return value. - If the code raises an unhandled exception then an ExecFailure - is raised, which summarizes the unhandled exception. The actual - exception is discarded because objects cannot be shared between - interpreters. + If the code raises an unhandled exception then an ExecutionFailed + exception is raised, which summarizes the unhandled exception. + The actual exception is discarded because objects cannot be + shared between interpreters. This blocks the current Python thread until done. During that time, the previous interpreter is allowed to run @@ -177,11 +182,35 @@ def exec_sync(self, code, /): """ excinfo = _interpreters.exec(self._id, code) if excinfo is not None: - raise ExecFailure(excinfo) + raise ExecutionFailed(excinfo) + + def call(self, callable, /): + """Call the object in the interpreter with given args/kwargs. + + Only functions that take no arguments and have no closure + are supported. - def run(self, code, /): + The return value is discarded. + + If the callable raises an exception then the error display + (including full traceback) is send back between the interpreters + and an ExecutionFailed exception is raised, much like what + happens with Interpreter.exec(). + """ + # XXX Support args and kwargs. + # XXX Support arbitrary callables. + # XXX Support returning the return value (e.g. via pickle). + excinfo = _interpreters.call(self._id, callable) + if excinfo is not None: + raise ExecutionFailed(excinfo) + + def call_in_thread(self, callable, /): + """Return a new thread that calls the object in the interpreter. + + The return value and any raised exception are discarded. + """ def task(): - self.exec_sync(code) + self.call(callable) t = threading.Thread(target=task) t.start() return t diff --git a/Lib/test/support/interpreters/queues.py b/Lib/test/support/interpreters/queues.py index aead0c40ca9667..2cc616be337a50 100644 --- a/Lib/test/support/interpreters/queues.py +++ b/Lib/test/support/interpreters/queues.py @@ -1,5 +1,6 @@ """Cross-interpreter Queues High Level Module.""" +import pickle import queue import time import weakref @@ -31,20 +32,26 @@ class QueueFull(_queues.QueueFull, queue.Full): """ -def create(maxsize=0): +_SHARED_ONLY = 0 +_PICKLED = 1 + +def create(maxsize=0, *, syncobj=False): """Return a new cross-interpreter queue. The queue may be used to pass data safely between interpreters. + + "syncobj" sets the default for Queue.put() + and Queue.put_nowait(). """ - qid = _queues.create(maxsize) - return Queue(qid) + fmt = _SHARED_ONLY if syncobj else _PICKLED + qid = _queues.create(maxsize, fmt) + return Queue(qid, _fmt=fmt) def list_all(): """Return a list of all open queues.""" - return [Queue(qid) - for qid in _queues.list_all()] - + return [Queue(qid, _fmt=fmt) + for qid, fmt in _queues.list_all()] _known_queues = weakref.WeakValueDictionary() @@ -52,17 +59,20 @@ def list_all(): class Queue: """A cross-interpreter queue.""" - def __new__(cls, id, /): + def __new__(cls, id, /, *, _fmt=None): # There is only one instance for any given ID. if isinstance(id, int): id = int(id) else: raise TypeError(f'id must be an int, got {id!r}') + if _fmt is None: + _fmt = _queues.get_default_fmt(id) try: self = _known_queues[id] except KeyError: self = super().__new__(cls) self._id = id + self._fmt = _fmt _known_queues[id] = self _queues.bind(id) return self @@ -105,20 +115,50 @@ def qsize(self): return _queues.get_count(self._id) def put(self, obj, timeout=None, *, + syncobj=None, _delay=10 / 1000, # 10 milliseconds ): """Add the object to the queue. This blocks while the queue is full. + + If "syncobj" is None (the default) then it uses the + queue's default, set with create_queue().. + + If "syncobj" is false then all objects are supported, + at the expense of worse performance. + + If "syncobj" is true then the object must be "shareable". + Examples of "shareable" objects include the builtin singletons, + str, and memoryview. One benefit is that such objects are + passed through the queue efficiently. + + The key difference, though, is conceptual: the corresponding + object returned from Queue.get() will be strictly equivalent + to the given obj. In other words, the two objects will be + effectively indistinguishable from each other, even if the + object is mutable. The received object may actually be the + same object, or a copy (immutable values only), or a proxy. + Regardless, the received object should be treated as though + the original has been shared directly, whether or not it + actually is. That's a slightly different and stronger promise + than just (initial) equality, which is all "syncobj=False" + can promise. """ + if syncobj is None: + fmt = self._fmt + else: + fmt = _SHARED_ONLY if syncobj else _PICKLED if timeout is not None: timeout = int(timeout) if timeout < 0: raise ValueError(f'timeout value must be non-negative') end = time.time() + timeout + if fmt is _PICKLED: + obj = pickle.dumps(obj) while True: try: - _queues.put(self._id, obj) + _queues.put(self._id, obj, fmt) except _queues.QueueFull as exc: if timeout is not None and time.time() >= end: exc.__class__ = QueueFull @@ -127,9 +167,15 @@ def put(self, obj, timeout=None, *, else: break - def put_nowait(self, obj): + def put_nowait(self, obj, *, syncobj=None): + if syncobj is None: + fmt = self._fmt + else: + fmt = _SHARED_ONLY if syncobj else _PICKLED + if fmt is _PICKLED: + obj = pickle.dumps(obj) try: - return _queues.put(self._id, obj) + _queues.put(self._id, obj, fmt) except _queues.QueueFull as exc: exc.__class__ = QueueFull raise # re-raise @@ -148,12 +194,18 @@ def get(self, timeout=None, *, end = time.time() + timeout while True: try: - return _queues.get(self._id) + obj, fmt = _queues.get(self._id) except _queues.QueueEmpty as exc: if timeout is not None and time.time() >= end: exc.__class__ = QueueEmpty raise # re-raise time.sleep(_delay) + else: + break + if fmt == _PICKLED: + obj = pickle.loads(obj) + else: + assert fmt == _SHARED_ONLY return obj def get_nowait(self): diff --git a/Lib/test/test_interpreters/test_api.py b/Lib/test/test_interpreters/test_api.py index aefd326977095f..363143fa810f35 100644 --- a/Lib/test/test_interpreters/test_api.py +++ b/Lib/test/test_interpreters/test_api.py @@ -280,7 +280,7 @@ def test_subinterpreter(self): def test_finished(self): r, w = self.pipe() interp = interpreters.create() - interp.exec_sync(f"""if True: + interp.exec(f"""if True: import os os.write({w}, b'x') """) @@ -312,7 +312,7 @@ def test_with_only_background_threads(self): FINISHED = b'F' interp = interpreters.create() - interp.exec_sync(f"""if True: + interp.exec(f"""if True: import os import threading @@ -326,7 +326,7 @@ def task(): self.assertFalse(interp.is_running()) os.write(w_thread, DONE) - interp.exec_sync('t.join()') + interp.exec('t.join()') self.assertEqual(os.read(r_interp, 1), FINISHED) @@ -393,7 +393,7 @@ def test_from_sibling(self): interp2 = interpreters.create() self.assertEqual(set(interpreters.list_all()), {main, interp1, interp2}) - interp1.exec_sync(dedent(f""" + interp1.exec(dedent(f""" from test.support import interpreters interp2 = interpreters.Interpreter({interp2.id}) interp2.close() @@ -427,7 +427,7 @@ def test_subthreads_still_running(self): FINISHED = b'F' interp = interpreters.create() - interp.exec_sync(f"""if True: + interp.exec(f"""if True: import os import threading import time @@ -503,27 +503,27 @@ def test_not_shareable(self): interp.prepare_main(spam={'spam': 'eggs', 'foo': 'bar'}) # Make sure neither was actually bound. - with self.assertRaises(interpreters.ExecFailure): - interp.exec_sync('print(foo)') - with self.assertRaises(interpreters.ExecFailure): - interp.exec_sync('print(spam)') + with self.assertRaises(interpreters.ExecutionFailed): + interp.exec('print(foo)') + with self.assertRaises(interpreters.ExecutionFailed): + interp.exec('print(spam)') -class TestInterpreterExecSync(TestBase): +class TestInterpreterExec(TestBase): def test_success(self): interp = interpreters.create() script, file = _captured_script('print("it worked!", end="")') with file: - interp.exec_sync(script) + interp.exec(script) out = file.read() self.assertEqual(out, 'it worked!') def test_failure(self): interp = interpreters.create() - with self.assertRaises(interpreters.ExecFailure): - interp.exec_sync('raise Exception') + with self.assertRaises(interpreters.ExecutionFailed): + interp.exec('raise Exception') def test_display_preserved_exception(self): tempdir = self.temp_dir() @@ -542,21 +542,21 @@ def script(): spam.eggs() interp = interpreters.create() - interp.exec_sync(script) + interp.exec(script) """) stdout, stderr = self.assert_python_failure(scriptfile) self.maxDiff = None - interpmod_line, = (l for l in stderr.splitlines() if ' exec_sync' in l) - # File "{interpreters.__file__}", line 179, in exec_sync + interpmod_line, = (l for l in stderr.splitlines() if ' exec' in l) + # File "{interpreters.__file__}", line 179, in exec self.assertEqual(stderr, dedent(f"""\ Traceback (most recent call last): File "{scriptfile}", line 9, in - interp.exec_sync(script) - ~~~~~~~~~~~~~~~~^^^^^^^^ + interp.exec(script) + ~~~~~~~~~~~^^^^^^^^ {interpmod_line.strip()} - raise ExecFailure(excinfo) - test.support.interpreters.ExecFailure: RuntimeError: uh-oh! + raise ExecutionFailed(excinfo) + test.support.interpreters.ExecutionFailed: RuntimeError: uh-oh! Uncaught in the interpreter: @@ -578,7 +578,7 @@ def test_in_thread(self): script, file = _captured_script('print("it worked!", end="")') with file: def f(): - interp.exec_sync(script) + interp.exec(script) t = threading.Thread(target=f) t.start() @@ -604,7 +604,7 @@ def test_fork(self): with open('{file.name}', 'w', encoding='utf-8') as out: out.write('{expected}') """) - interp.exec_sync(script) + interp.exec(script) file.seek(0) content = file.read() @@ -615,17 +615,17 @@ def test_already_running(self): interp = interpreters.create() with _running(interp): with self.assertRaises(RuntimeError): - interp.exec_sync('print("spam")') + interp.exec('print("spam")') def test_bad_script(self): interp = interpreters.create() with self.assertRaises(TypeError): - interp.exec_sync(10) + interp.exec(10) def test_bytes_for_script(self): interp = interpreters.create() with self.assertRaises(TypeError): - interp.exec_sync(b'print("spam")') + interp.exec(b'print("spam")') def test_with_background_threads_still_running(self): r_interp, w_interp = self.pipe() @@ -636,7 +636,7 @@ def test_with_background_threads_still_running(self): FINISHED = b'F' interp = interpreters.create() - interp.exec_sync(f"""if True: + interp.exec(f"""if True: import os import threading @@ -648,46 +648,229 @@ def task(): t.start() os.write({w_interp}, {RAN!r}) """) - interp.exec_sync(f"""if True: + interp.exec(f"""if True: os.write({w_interp}, {RAN!r}) """) os.write(w_thread, DONE) - interp.exec_sync('t.join()') + interp.exec('t.join()') self.assertEqual(os.read(r_interp, 1), RAN) self.assertEqual(os.read(r_interp, 1), RAN) self.assertEqual(os.read(r_interp, 1), FINISHED) # test_xxsubinterpreters covers the remaining - # Interpreter.exec_sync() behavior. + # Interpreter.exec() behavior. -class TestInterpreterRun(TestBase): - - def test_success(self): - interp = interpreters.create() - script, file = _captured_script('print("it worked!", end="")') - with file: - t = interp.run(script) +def call_func_noop(): + pass + + +def call_func_return_shareable(): + return (1, None) + + +def call_func_return_not_shareable(): + return [1, 2, 3] + + +def call_func_failure(): + raise Exception('spam!') + + +def call_func_ident(value): + return value + + +def get_call_func_closure(value): + def call_func_closure(): + return value + return call_func_closure + + +class Spam: + + @staticmethod + def noop(): + pass + + @classmethod + def from_values(cls, *values): + return cls(values) + + def __init__(self, value): + self.value = value + + def __call__(self, *args, **kwargs): + return (self.value, args, kwargs) + + def __eq__(self, other): + if not isinstance(other, Spam): + return NotImplemented + return self.value == other.value + + def run(self, *args, **kwargs): + return (self.value, args, kwargs) + + +def call_func_complex(op, /, value=None, *args, exc=None, **kwargs): + if exc is not None: + raise exc + if op == '': + raise ValueError('missing op') + elif op == 'ident': + if args or kwargs: + raise Exception((args, kwargs)) + return value + elif op == 'full-ident': + return (value, args, kwargs) + elif op == 'globals': + if value is not None or args or kwargs: + raise Exception((value, args, kwargs)) + return __name__ + elif op == 'interpid': + if value is not None or args or kwargs: + raise Exception((value, args, kwargs)) + return interpreters.get_current().id + elif op == 'closure': + if args or kwargs: + raise Exception((args, kwargs)) + return get_call_func_closure(value) + elif op == 'custom': + if args or kwargs: + raise Exception((args, kwargs)) + return Spam(value) + elif op == 'custom-inner': + if args or kwargs: + raise Exception((args, kwargs)) + class Eggs(Spam): + pass + return Eggs(value) + elif not isinstance(op, str): + raise TypeError(op) + else: + raise NotImplementedError(op) + + +class TestInterpreterCall(TestBase): + + # signature + # - blank + # - args + # - kwargs + # - args, kwargs + # return + # - nothing (None) + # - simple + # - closure + # - custom + # ops: + # - do nothing + # - fail + # - echo + # - do complex, relative to interpreter + # scope + # - global func + # - local closure + # - returned closure + # - callable type instance + # - type + # - classmethod + # - staticmethod + # - instance method + # exception + # - builtin + # - custom + # - preserves info (e.g. SyntaxError) + # - matching error display + + def test_call(self): + interp = interpreters.create() + + for i, (callable, args, kwargs) in enumerate([ + (call_func_noop, (), {}), + (call_func_return_shareable, (), {}), + (call_func_return_not_shareable, (), {}), + (Spam.noop, (), {}), + ]): + with self.subTest(f'success case #{i+1}'): + res = interp.call(callable) + self.assertIs(res, None) + + for i, (callable, args, kwargs) in enumerate([ + (call_func_ident, ('spamspamspam',), {}), + (get_call_func_closure, (42,), {}), + (get_call_func_closure(42), (), {}), + (Spam.from_values, (), {}), + (Spam.from_values, (1, 2, 3), {}), + (Spam, ('???'), {}), + (Spam(101), (), {}), + (Spam(10101).run, (), {}), + (call_func_complex, ('ident', 'spam'), {}), + (call_func_complex, ('full-ident', 'spam'), {}), + (call_func_complex, ('full-ident', 'spam', 'ham'), {'eggs': '!!!'}), + (call_func_complex, ('globals',), {}), + (call_func_complex, ('interpid',), {}), + (call_func_complex, ('closure',), {'value': '~~~'}), + (call_func_complex, ('custom', 'spam!'), {}), + (call_func_complex, ('custom-inner', 'eggs!'), {}), + (call_func_complex, ('???',), {'exc': ValueError('spam')}), + ]): + with self.subTest(f'invalid case #{i+1}'): + with self.assertRaises(Exception): + if args or kwargs: + raise Exception((args, kwargs)) + interp.call(callable) + + with self.assertRaises(interpreters.ExecutionFailed): + interp.call(call_func_failure) + + def test_call_in_thread(self): + interp = interpreters.create() + + for i, (callable, args, kwargs) in enumerate([ + (call_func_noop, (), {}), + (call_func_return_shareable, (), {}), + (call_func_return_not_shareable, (), {}), + (Spam.noop, (), {}), + ]): + with self.subTest(f'success case #{i+1}'): + with self.captured_thread_exception() as ctx: + t = interp.call_in_thread(callable) + t.join() + self.assertIsNone(ctx.caught) + + for i, (callable, args, kwargs) in enumerate([ + (call_func_ident, ('spamspamspam',), {}), + (get_call_func_closure, (42,), {}), + (get_call_func_closure(42), (), {}), + (Spam.from_values, (), {}), + (Spam.from_values, (1, 2, 3), {}), + (Spam, ('???'), {}), + (Spam(101), (), {}), + (Spam(10101).run, (), {}), + (call_func_complex, ('ident', 'spam'), {}), + (call_func_complex, ('full-ident', 'spam'), {}), + (call_func_complex, ('full-ident', 'spam', 'ham'), {'eggs': '!!!'}), + (call_func_complex, ('globals',), {}), + (call_func_complex, ('interpid',), {}), + (call_func_complex, ('closure',), {'value': '~~~'}), + (call_func_complex, ('custom', 'spam!'), {}), + (call_func_complex, ('custom-inner', 'eggs!'), {}), + (call_func_complex, ('???',), {'exc': ValueError('spam')}), + ]): + with self.subTest(f'invalid case #{i+1}'): + if args or kwargs: + continue + with self.captured_thread_exception() as ctx: + t = interp.call_in_thread(callable) + t.join() + self.assertIsNotNone(ctx.caught) + + with self.captured_thread_exception() as ctx: + t = interp.call_in_thread(call_func_failure) t.join() - out = file.read() - - self.assertEqual(out, 'it worked!') - - def test_failure(self): - caught = False - def excepthook(args): - nonlocal caught - caught = True - threading.excepthook = excepthook - try: - interp = interpreters.create() - t = interp.run('raise Exception') - t.join() - - self.assertTrue(caught) - except BaseException: - threading.excepthook = threading.__excepthook__ + self.assertIsNotNone(ctx.caught) class TestIsShareable(TestBase): diff --git a/Lib/test/test_interpreters/test_channels.py b/Lib/test/test_interpreters/test_channels.py index 3c3e18832d4168..07e503837bcf75 100644 --- a/Lib/test/test_interpreters/test_channels.py +++ b/Lib/test/test_interpreters/test_channels.py @@ -120,7 +120,7 @@ def test_send_recv_main(self): def test_send_recv_same_interpreter(self): interp = interpreters.create() - interp.exec_sync(dedent(""" + interp.exec(dedent(""" from test.support.interpreters import channels r, s = channels.create() orig = b'spam' @@ -193,7 +193,7 @@ def test_send_recv_nowait_main_with_default(self): def test_send_recv_nowait_same_interpreter(self): interp = interpreters.create() - interp.exec_sync(dedent(""" + interp.exec(dedent(""" from test.support.interpreters import channels r, s = channels.create() orig = b'spam' diff --git a/Lib/test/test_interpreters/test_lifecycle.py b/Lib/test/test_interpreters/test_lifecycle.py index c2917d839904f9..67b6f439c3191f 100644 --- a/Lib/test/test_interpreters/test_lifecycle.py +++ b/Lib/test/test_interpreters/test_lifecycle.py @@ -124,7 +124,7 @@ def test_sys_path_0(self): orig = sys.path[0] interp = interpreters.create() - interp.exec_sync(f"""if True: + interp.exec(f"""if True: import json import sys print(json.dumps({{ diff --git a/Lib/test/test_interpreters/test_queues.py b/Lib/test/test_interpreters/test_queues.py index 2a8ca99c1f6e3f..65b5435fb00b04 100644 --- a/Lib/test/test_interpreters/test_queues.py +++ b/Lib/test/test_interpreters/test_queues.py @@ -51,20 +51,20 @@ def test_shareable(self): queue1 = queues.create() interp = interpreters.create() - interp.exec_sync(dedent(f""" + interp.exec(dedent(f""" from test.support.interpreters import queues queue1 = queues.Queue({queue1.id}) """)); with self.subTest('same interpreter'): queue2 = queues.create() - queue1.put(queue2) + queue1.put(queue2, syncobj=True) queue3 = queue1.get() self.assertIs(queue3, queue2) with self.subTest('from current interpreter'): queue4 = queues.create() - queue1.put(queue4) + queue1.put(queue4, syncobj=True) out = _run_output(interp, dedent(""" queue4 = queue1.get() print(queue4.id) @@ -75,7 +75,7 @@ def test_shareable(self): with self.subTest('from subinterpreter'): out = _run_output(interp, dedent(""" queue5 = queues.create() - queue1.put(queue5) + queue1.put(queue5, syncobj=True) print(queue5.id) """)) qid = int(out) @@ -118,7 +118,7 @@ class TestQueueOps(TestBase): def test_empty(self): queue = queues.create() before = queue.empty() - queue.put(None) + queue.put(None, syncobj=True) during = queue.empty() queue.get() after = queue.empty() @@ -133,7 +133,7 @@ def test_full(self): queue = queues.create(3) for _ in range(3): actual.append(queue.full()) - queue.put(None) + queue.put(None, syncobj=True) actual.append(queue.full()) for _ in range(3): queue.get() @@ -147,16 +147,16 @@ def test_qsize(self): queue = queues.create() for _ in range(3): actual.append(queue.qsize()) - queue.put(None) + queue.put(None, syncobj=True) actual.append(queue.qsize()) queue.get() actual.append(queue.qsize()) - queue.put(None) + queue.put(None, syncobj=True) actual.append(queue.qsize()) for _ in range(3): queue.get() actual.append(queue.qsize()) - queue.put(None) + queue.put(None, syncobj=True) actual.append(queue.qsize()) queue.get() actual.append(queue.qsize()) @@ -165,30 +165,81 @@ def test_qsize(self): def test_put_get_main(self): expected = list(range(20)) - queue = queues.create() - for i in range(20): - queue.put(i) - actual = [queue.get() for _ in range(20)] + for syncobj in (True, False): + kwds = dict(syncobj=syncobj) + with self.subTest(f'syncobj={syncobj}'): + queue = queues.create() + for i in range(20): + queue.put(i, **kwds) + actual = [queue.get() for _ in range(20)] - self.assertEqual(actual, expected) + self.assertEqual(actual, expected) def test_put_timeout(self): - queue = queues.create(2) - queue.put(None) - queue.put(None) - with self.assertRaises(queues.QueueFull): - queue.put(None, timeout=0.1) - queue.get() - queue.put(None) + for syncobj in (True, False): + kwds = dict(syncobj=syncobj) + with self.subTest(f'syncobj={syncobj}'): + queue = queues.create(2) + queue.put(None, **kwds) + queue.put(None, **kwds) + with self.assertRaises(queues.QueueFull): + queue.put(None, timeout=0.1, **kwds) + queue.get() + queue.put(None, **kwds) def test_put_nowait(self): - queue = queues.create(2) - queue.put_nowait(None) - queue.put_nowait(None) - with self.assertRaises(queues.QueueFull): - queue.put_nowait(None) - queue.get() - queue.put_nowait(None) + for syncobj in (True, False): + kwds = dict(syncobj=syncobj) + with self.subTest(f'syncobj={syncobj}'): + queue = queues.create(2) + queue.put_nowait(None, **kwds) + queue.put_nowait(None, **kwds) + with self.assertRaises(queues.QueueFull): + queue.put_nowait(None, **kwds) + queue.get() + queue.put_nowait(None, **kwds) + + def test_put_syncobj(self): + for obj in [ + None, + True, + 10, + 'spam', + b'spam', + (0, 'a'), + ]: + with self.subTest(repr(obj)): + queue = queues.create() + queue.put(obj, syncobj=True) + obj2 = queue.get() + self.assertEqual(obj2, obj) + + for obj in [ + [1, 2, 3], + {'a': 13, 'b': 17}, + ]: + with self.subTest(repr(obj)): + queue = queues.create() + with self.assertRaises(interpreters.NotShareableError): + queue.put(obj, syncobj=True) + + def test_put_not_syncobj(self): + for obj in [ + None, + True, + 10, + 'spam', + b'spam', + (0, 'a'), + # not shareable + [1, 2, 3], + {'a': 13, 'b': 17}, + ]: + with self.subTest(repr(obj)): + queue = queues.create() + queue.put(obj, syncobj=False) + obj2 = queue.get() + self.assertEqual(obj2, obj) def test_get_timeout(self): queue = queues.create() @@ -200,13 +251,41 @@ def test_get_nowait(self): with self.assertRaises(queues.QueueEmpty): queue.get_nowait() + def test_put_get_default_syncobj(self): + expected = list(range(20)) + queue = queues.create(syncobj=True) + for i in range(20): + queue.put(i) + actual = [queue.get() for _ in range(20)] + + self.assertEqual(actual, expected) + + obj = [1, 2, 3] # lists are not shareable + with self.assertRaises(interpreters.NotShareableError): + queue.put(obj) + + def test_put_get_default_not_syncobj(self): + expected = list(range(20)) + queue = queues.create(syncobj=False) + for i in range(20): + queue.put(i) + actual = [queue.get() for _ in range(20)] + + self.assertEqual(actual, expected) + + obj = [1, 2, 3] # lists are not shareable + queue.put(obj) + obj2 = queue.get() + self.assertEqual(obj, obj2) + self.assertIsNot(obj, obj2) + def test_put_get_same_interpreter(self): interp = interpreters.create() - interp.exec_sync(dedent(""" + interp.exec(dedent(""" from test.support.interpreters import queues queue = queues.create() orig = b'spam' - queue.put(orig) + queue.put(orig, syncobj=True) obj = queue.get() assert obj == orig, 'expected: obj == orig' assert obj is not orig, 'expected: obj is not orig' @@ -219,7 +298,7 @@ def test_put_get_different_interpreters(self): self.assertEqual(len(queues.list_all()), 2) obj1 = b'spam' - queue1.put(obj1) + queue1.put(obj1, syncobj=True) out = _run_output( interp, @@ -236,7 +315,7 @@ def test_put_get_different_interpreters(self): obj2 = b'eggs' print(id(obj2)) assert queue2.qsize() == 0, 'expected: queue2.qsize() == 0' - queue2.put(obj2) + queue2.put(obj2, syncobj=True) assert queue2.qsize() == 1, 'expected: queue2.qsize() == 1' """)) self.assertEqual(len(queues.list_all()), 2) @@ -258,8 +337,8 @@ def test_put_cleared_with_subinterpreter(self): queue = queues.Queue({queue.id}) obj1 = b'spam' obj2 = b'eggs' - queue.put(obj1) - queue.put(obj2) + queue.put(obj1, syncobj=True) + queue.put(obj2, syncobj=True) """)) self.assertEqual(queue.qsize(), 2) @@ -281,12 +360,12 @@ def f(): break except queues.QueueEmpty: continue - queue2.put(obj) + queue2.put(obj, syncobj=True) t = threading.Thread(target=f) t.start() orig = b'spam' - queue1.put(orig) + queue1.put(orig, syncobj=True) obj = queue2.get() t.join() diff --git a/Lib/test/test_interpreters/utils.py b/Lib/test/test_interpreters/utils.py index 3a37ed09dd8943..973d05d4f96dcb 100644 --- a/Lib/test/test_interpreters/utils.py +++ b/Lib/test/test_interpreters/utils.py @@ -4,8 +4,9 @@ import subprocess import sys import tempfile -import threading from textwrap import dedent +import threading +import types import unittest from test import support @@ -41,7 +42,7 @@ def _run_output(interp, request, init=None): with rpipe: if init: interp.prepare_main(init) - interp.exec_sync(script) + interp.exec(script) return rpipe.read() @@ -49,7 +50,7 @@ def _run_output(interp, request, init=None): def _running(interp): r, w = os.pipe() def run(): - interp.exec_sync(dedent(f""" + interp.exec(dedent(f""" # wait for "signal" with open({r}) as rpipe: rpipe.read() @@ -84,6 +85,18 @@ def temp_dir(self): self.addCleanup(lambda: os_helper.rmtree(tempdir)) return tempdir + @contextlib.contextmanager + def captured_thread_exception(self): + ctx = types.SimpleNamespace(caught=None) + def excepthook(args): + ctx.caught = args + orig_excepthook = threading.excepthook + threading.excepthook = excepthook + try: + yield ctx + finally: + threading.excepthook = orig_excepthook + def make_script(self, filename, dirname=None, text=None): if text: text = dedent(text) diff --git a/Lib/test/test_sys.py b/Lib/test/test_sys.py index 71671a5a984256..38dcabd84d8170 100644 --- a/Lib/test/test_sys.py +++ b/Lib/test/test_sys.py @@ -729,7 +729,7 @@ def test_subinterp_intern_dynamically_allocated(self): self.assertIs(t, s) interp = interpreters.create() - interp.exec_sync(textwrap.dedent(f''' + interp.exec(textwrap.dedent(f''' import sys t = sys.intern({s!r}) assert id(t) != {id(s)}, (id(t), {id(s)}) @@ -744,7 +744,7 @@ def test_subinterp_intern_statically_allocated(self): t = sys.intern(s) interp = interpreters.create() - interp.exec_sync(textwrap.dedent(f''' + interp.exec(textwrap.dedent(f''' import sys t = sys.intern({s!r}) assert id(t) == {id(t)}, (id(t), {id(t)}) diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py index 1ab223b81e939e..3b5c37c948c8c3 100644 --- a/Lib/test/test_threading.py +++ b/Lib/test/test_threading.py @@ -1478,7 +1478,7 @@ def test_threads_join_with_no_main(self): DONE = b'D' interp = interpreters.create() - interp.exec_sync(f"""if True: + interp.exec(f"""if True: import os import threading import time diff --git a/Modules/_xxinterpqueuesmodule.c b/Modules/_xxinterpqueuesmodule.c index 7d8c67f49fefb8..715bb766cac624 100644 --- a/Modules/_xxinterpqueuesmodule.c +++ b/Modules/_xxinterpqueuesmodule.c @@ -294,6 +294,8 @@ handle_queue_error(int err, PyObject *mod, int64_t qid) case ERR_QUEUES_ALLOC: PyErr_NoMemory(); break; + case -1: + return -1; default: state = get_module_state(mod); assert(state->QueueError != NULL); @@ -320,14 +322,17 @@ struct _queueitem; typedef struct _queueitem { _PyCrossInterpreterData *data; + int fmt; struct _queueitem *next; } _queueitem; static void -_queueitem_init(_queueitem *item, _PyCrossInterpreterData *data) +_queueitem_init(_queueitem *item, + _PyCrossInterpreterData *data, int fmt) { *item = (_queueitem){ .data = data, + .fmt = fmt, }; } @@ -344,14 +349,14 @@ _queueitem_clear(_queueitem *item) } static _queueitem * -_queueitem_new(_PyCrossInterpreterData *data) +_queueitem_new(_PyCrossInterpreterData *data, int fmt) { _queueitem *item = GLOBAL_MALLOC(_queueitem); if (item == NULL) { PyErr_NoMemory(); return NULL; } - _queueitem_init(item, data); + _queueitem_init(item, data, fmt); return item; } @@ -373,9 +378,11 @@ _queueitem_free_all(_queueitem *item) } static void -_queueitem_popped(_queueitem *item, _PyCrossInterpreterData **p_data) +_queueitem_popped(_queueitem *item, + _PyCrossInterpreterData **p_data, int *p_fmt) { *p_data = item->data; + *p_fmt = item->fmt; // We clear them here, so they won't be released in _queueitem_clear(). item->data = NULL; _queueitem_free(item); @@ -393,10 +400,11 @@ typedef struct _queue { _queueitem *first; _queueitem *last; } items; + int fmt; } _queue; static int -_queue_init(_queue *queue, Py_ssize_t maxsize) +_queue_init(_queue *queue, Py_ssize_t maxsize, int fmt) { PyThread_type_lock mutex = PyThread_allocate_lock(); if (mutex == NULL) { @@ -408,6 +416,7 @@ _queue_init(_queue *queue, Py_ssize_t maxsize) .items = { .maxsize = maxsize, }, + .fmt = fmt, }; return 0; } @@ -486,7 +495,7 @@ _queue_unlock(_queue *queue) } static int -_queue_add(_queue *queue, _PyCrossInterpreterData *data) +_queue_add(_queue *queue, _PyCrossInterpreterData *data, int fmt) { int err = _queue_lock(queue); if (err < 0) { @@ -502,7 +511,7 @@ _queue_add(_queue *queue, _PyCrossInterpreterData *data) return ERR_QUEUE_FULL; } - _queueitem *item = _queueitem_new(data); + _queueitem *item = _queueitem_new(data, fmt); if (item == NULL) { _queue_unlock(queue); return -1; @@ -522,7 +531,8 @@ _queue_add(_queue *queue, _PyCrossInterpreterData *data) } static int -_queue_next(_queue *queue, _PyCrossInterpreterData **p_data) +_queue_next(_queue *queue, + _PyCrossInterpreterData **p_data, int *p_fmt) { int err = _queue_lock(queue); if (err < 0) { @@ -541,7 +551,7 @@ _queue_next(_queue *queue, _PyCrossInterpreterData **p_data) } queue->items.count -= 1; - _queueitem_popped(item, p_data); + _queueitem_popped(item, p_data, p_fmt); _queue_unlock(queue); return 0; @@ -843,18 +853,26 @@ _queues_decref(_queues *queues, int64_t qid) PyThread_release_lock(queues->mutex); } -static int64_t * +struct queue_id_and_fmt { + int64_t id; + int fmt; +}; + +static struct queue_id_and_fmt * _queues_list_all(_queues *queues, int64_t *count) { - int64_t *qids = NULL; + struct queue_id_and_fmt *qids = NULL; PyThread_acquire_lock(queues->mutex, WAIT_LOCK); - int64_t *ids = PyMem_NEW(int64_t, (Py_ssize_t)(queues->count)); + struct queue_id_and_fmt *ids = PyMem_NEW(struct queue_id_and_fmt, + (Py_ssize_t)(queues->count)); if (ids == NULL) { goto done; } _queueref *ref = queues->head; for (int64_t i=0; ref != NULL; ref = ref->next, i++) { - ids[i] = ref->qid; + ids[i].id = ref->qid; + assert(ref->queue != NULL); + ids[i].fmt = ref->queue->fmt; } *count = queues->count; @@ -890,13 +908,13 @@ _queue_free(_queue *queue) // Create a new queue. static int64_t -queue_create(_queues *queues, Py_ssize_t maxsize) +queue_create(_queues *queues, Py_ssize_t maxsize, int fmt) { _queue *queue = GLOBAL_MALLOC(_queue); if (queue == NULL) { return ERR_QUEUE_ALLOC; } - int err = _queue_init(queue, maxsize); + int err = _queue_init(queue, maxsize, fmt); if (err < 0) { GLOBAL_FREE(queue); return (int64_t)err; @@ -925,7 +943,7 @@ queue_destroy(_queues *queues, int64_t qid) // Push an object onto the queue. static int -queue_put(_queues *queues, int64_t qid, PyObject *obj) +queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt) { // Look up the queue. _queue *queue = NULL; @@ -948,7 +966,7 @@ queue_put(_queues *queues, int64_t qid, PyObject *obj) } // Add the data to the queue. - int res = _queue_add(queue, data); + int res = _queue_add(queue, data, fmt); _queue_unmark_waiter(queue, queues->mutex); if (res != 0) { // We may chain an exception here: @@ -963,7 +981,7 @@ queue_put(_queues *queues, int64_t qid, PyObject *obj) // Pop the next object off the queue. Fail if empty. // XXX Support a "wait" mutex? static int -queue_get(_queues *queues, int64_t qid, PyObject **res) +queue_get(_queues *queues, int64_t qid, PyObject **res, int *p_fmt) { int err; *res = NULL; @@ -979,7 +997,7 @@ queue_get(_queues *queues, int64_t qid, PyObject **res) // Pop off the next item from the queue. _PyCrossInterpreterData *data = NULL; - err = _queue_next(queue, &data); + err = _queue_next(queue, &data, p_fmt); _queue_unmark_waiter(queue, queues->mutex); if (err != 0) { return err; @@ -1267,14 +1285,15 @@ qidarg_converter(PyObject *arg, void *ptr) static PyObject * queuesmod_create(PyObject *self, PyObject *args, PyObject *kwds) { - static char *kwlist[] = {"maxsize", NULL}; - Py_ssize_t maxsize = -1; - if (!PyArg_ParseTupleAndKeywords(args, kwds, "|n:create", kwlist, - &maxsize)) { + static char *kwlist[] = {"maxsize", "fmt", NULL}; + Py_ssize_t maxsize; + int fmt; + if (!PyArg_ParseTupleAndKeywords(args, kwds, "ni:create", kwlist, + &maxsize, &fmt)) { return NULL; } - int64_t qid = queue_create(&_globals.queues, maxsize); + int64_t qid = queue_create(&_globals.queues, maxsize, fmt); if (qid < 0) { (void)handle_queue_error((int)qid, self, qid); return NULL; @@ -1329,7 +1348,7 @@ static PyObject * queuesmod_list_all(PyObject *self, PyObject *Py_UNUSED(ignored)) { int64_t count = 0; - int64_t *qids = _queues_list_all(&_globals.queues, &count); + struct queue_id_and_fmt *qids = _queues_list_all(&_globals.queues, &count); if (qids == NULL) { if (count == 0) { return PyList_New(0); @@ -1340,14 +1359,14 @@ queuesmod_list_all(PyObject *self, PyObject *Py_UNUSED(ignored)) if (ids == NULL) { goto finally; } - int64_t *cur = qids; + struct queue_id_and_fmt *cur = qids; for (int64_t i=0; i < count; cur++, i++) { - PyObject *qidobj = PyLong_FromLongLong(*cur); - if (qidobj == NULL) { + PyObject *item = Py_BuildValue("Li", cur->id, cur->fmt); + if (item == NULL) { Py_SETREF(ids, NULL); break; } - PyList_SET_ITEM(ids, (Py_ssize_t)i, qidobj); + PyList_SET_ITEM(ids, (Py_ssize_t)i, item); } finally: @@ -1363,17 +1382,18 @@ Return the list of IDs for all queues."); static PyObject * queuesmod_put(PyObject *self, PyObject *args, PyObject *kwds) { - static char *kwlist[] = {"qid", "obj", NULL}; + static char *kwlist[] = {"qid", "obj", "fmt", NULL}; qidarg_converter_data qidarg; PyObject *obj; - if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O:put", kwlist, - qidarg_converter, &qidarg, &obj)) { + int fmt; + if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&Oi:put", kwlist, + qidarg_converter, &qidarg, &obj, &fmt)) { return NULL; } int64_t qid = qidarg.id; /* Queue up the object. */ - int err = queue_put(&_globals.queues, qid, obj); + int err = queue_put(&_globals.queues, qid, obj, fmt); if (handle_queue_error(err, self, qid)) { return NULL; } @@ -1382,7 +1402,7 @@ queuesmod_put(PyObject *self, PyObject *args, PyObject *kwds) } PyDoc_STRVAR(queuesmod_put_doc, -"put(qid, obj)\n\ +"put(qid, obj, sharedonly=False)\n\ \n\ Add the object's data to the queue."); @@ -1399,7 +1419,8 @@ queuesmod_get(PyObject *self, PyObject *args, PyObject *kwds) int64_t qid = qidarg.id; PyObject *obj = NULL; - int err = queue_get(&_globals.queues, qid, &obj); + int fmt; + int err = queue_get(&_globals.queues, qid, &obj, &fmt); if (err == ERR_QUEUE_EMPTY && dflt != NULL) { assert(obj == NULL); obj = Py_NewRef(dflt); @@ -1407,7 +1428,10 @@ queuesmod_get(PyObject *self, PyObject *args, PyObject *kwds) else if (handle_queue_error(err, self, qid)) { return NULL; } - return obj; + + PyObject *res = Py_BuildValue("Oi", obj, fmt); + Py_DECREF(obj); + return res; } PyDoc_STRVAR(queuesmod_get_doc, @@ -1499,6 +1523,33 @@ PyDoc_STRVAR(queuesmod_get_maxsize_doc, \n\ Return the maximum number of items in the queue."); +static PyObject * +queuesmod_get_default_fmt(PyObject *self, PyObject *args, PyObject *kwds) +{ + static char *kwlist[] = {"qid", NULL}; + qidarg_converter_data qidarg; + if (!PyArg_ParseTupleAndKeywords(args, kwds, + "O&:get_default_fmt", kwlist, + qidarg_converter, &qidarg)) { + return NULL; + } + int64_t qid = qidarg.id; + + _queue *queue = NULL; + int err = _queues_lookup(&_globals.queues, qid, &queue); + if (handle_queue_error(err, self, qid)) { + return NULL; + } + int fmt = queue->fmt; + _queue_unmark_waiter(queue, _globals.queues.mutex); + return PyLong_FromLong(fmt); +} + +PyDoc_STRVAR(queuesmod_get_default_fmt_doc, +"get_default_fmt(qid)\n\ +\n\ +Return the default format to use for the queue."); + static PyObject * queuesmod_is_full(PyObject *self, PyObject *args, PyObject *kwds) { @@ -1593,6 +1644,8 @@ static PyMethodDef module_functions[] = { METH_VARARGS | METH_KEYWORDS, queuesmod_release_doc}, {"get_maxsize", _PyCFunction_CAST(queuesmod_get_maxsize), METH_VARARGS | METH_KEYWORDS, queuesmod_get_maxsize_doc}, + {"get_default_fmt", _PyCFunction_CAST(queuesmod_get_default_fmt), + METH_VARARGS | METH_KEYWORDS, queuesmod_get_default_fmt_doc}, {"is_full", _PyCFunction_CAST(queuesmod_is_full), METH_VARARGS | METH_KEYWORDS, queuesmod_is_full_doc}, {"get_count", _PyCFunction_CAST(queuesmod_get_count), diff --git a/Modules/_xxsubinterpretersmodule.c b/Modules/_xxsubinterpretersmodule.c index b4004d165078f7..28c2f9c08bc0da 100644 --- a/Modules/_xxsubinterpretersmodule.c +++ b/Modules/_xxsubinterpretersmodule.c @@ -902,6 +902,56 @@ The code/function must not take any arguments or be a closure\n\ If a function is provided, its code object is used and all its state\n\ is ignored, including its __globals__ dict."); +static PyObject * +interp_call(PyObject *self, PyObject *args, PyObject *kwds) +{ + static char *kwlist[] = {"id", "callable", "args", "kwargs", NULL}; + PyObject *id, *callable; + PyObject *args_obj = NULL; + PyObject *kwargs_obj = NULL; + if (!PyArg_ParseTupleAndKeywords(args, kwds, + "OO|OO:" MODULE_NAME_STR ".call", kwlist, + &id, &callable, &args_obj, &kwargs_obj)) { + return NULL; + } + + if (args_obj != NULL) { + PyErr_SetString(PyExc_ValueError, "got unexpected args"); + return NULL; + } + if (kwargs_obj != NULL) { + PyErr_SetString(PyExc_ValueError, "got unexpected kwargs"); + return NULL; + } + + PyObject *code = (PyObject *)convert_code_arg(callable, MODULE_NAME_STR ".call", + "argument 2", "a function"); + if (code == NULL) { + return NULL; + } + + PyObject *excinfo = NULL; + int res = _interp_exec(self, id, code, NULL, &excinfo); + Py_DECREF(code); + if (res < 0) { + assert((excinfo == NULL) != (PyErr_Occurred() == NULL)); + return excinfo; + } + Py_RETURN_NONE; +} + +PyDoc_STRVAR(call_doc, +"call(id, callable, args=None, kwargs=None)\n\ +\n\ +Call the provided object in the identified interpreter.\n\ +Pass the given args and kwargs, if possible.\n\ +\n\ +\"callable\" may be a plain function with no free vars that takes\n\ +no arguments.\n\ +\n\ +The function's code object is used and all its state\n\ +is ignored, including its __globals__ dict."); + static PyObject * interp_run_string(PyObject *self, PyObject *args, PyObject *kwds) { @@ -1085,6 +1135,8 @@ static PyMethodDef module_functions[] = { METH_VARARGS | METH_KEYWORDS, is_running_doc}, {"exec", _PyCFunction_CAST(interp_exec), METH_VARARGS | METH_KEYWORDS, exec_doc}, + {"call", _PyCFunction_CAST(interp_call), + METH_VARARGS | METH_KEYWORDS, call_doc}, {"run_string", _PyCFunction_CAST(interp_run_string), METH_VARARGS | METH_KEYWORDS, run_string_doc}, {"run_func", _PyCFunction_CAST(interp_run_func), @@ -1113,6 +1165,7 @@ The 'interpreters' module provides a more convenient interface."); static int module_exec(PyObject *mod) { + PyInterpreterState *interp = PyInterpreterState_Get(); module_state *state = get_module_state(mod); // exceptions @@ -1122,6 +1175,11 @@ module_exec(PyObject *mod) if (PyModule_AddType(mod, (PyTypeObject *)PyExc_InterpreterNotFoundError) < 0) { goto error; } + PyObject *PyExc_NotShareableError = \ + _PyInterpreterState_GetXIState(interp)->PyExc_NotShareableError; + if (PyModule_AddType(mod, (PyTypeObject *)PyExc_NotShareableError) < 0) { + goto error; + } if (register_memoryview_xid(mod, &state->XIBufferViewType) < 0) { goto error;