From d76d982717fb648c1bd75fc6ba8a98180bc8d97c Mon Sep 17 00:00:00 2001 From: Yury Selivanov Date: Mon, 28 Oct 2019 18:07:27 -0400 Subject: [PATCH] Fix KeyboardInterrupt handling logic. When uvloop is run in the main thread we *always* want to set up a self-pipe and a signal wakeup FD. That's the only way how libuv can be notified that a ^C happened and break away from selecting on sockets. asyncio does not need to do that, as the 'selectors' module it uses is already aware of the way Python implements ^C handling. This translates to a slightly different behavior between asyncio & uvloop: 1. uvloop needs to always call signal.set_wakeup_fd() when run in the main thread; 2. asyncio only needs to call signal.set_wakeup_fd() when a user registers a signal handler. (2) means that if the user had not set up any signals, the signal wakeup FD stays the same between different asyncio runs. This commit fixes uvloop signal implementation to make sure that uvloop behaves the same way as asyncio in regards to signal wakeup FD between the loop runs. It also ensures that uvloop always have a proper self-pipe set up so that ^C is always supported when it is run in the main thread. Issue #295. --- tests/test_dealloc.py | 2 +- tests/test_regr1.py | 57 +++++++++--------- tests/test_signals.py | 39 ++++++++++++ uvloop/loop.pxd | 8 ++- uvloop/loop.pyx | 135 +++++++++++++++++++++++------------------- 5 files changed, 146 insertions(+), 95 deletions(-) diff --git a/tests/test_dealloc.py b/tests/test_dealloc.py index 3ae436e8..0b9b2f6e 100644 --- a/tests/test_dealloc.py +++ b/tests/test_dealloc.py @@ -36,7 +36,7 @@ async def foo(): return 42 def main(): - asyncio.set_event_loop(uvloop.new_event_loop()) + uvloop.install() loop = asyncio.get_event_loop() loop.set_debug(True) loop.run_until_complete(foo()) diff --git a/tests/test_regr1.py b/tests/test_regr1.py index 8c8d5572..c502457e 100644 --- a/tests/test_regr1.py +++ b/tests/test_regr1.py @@ -74,36 +74,33 @@ def on_alarm(self, sig, fr): raise FailedTestError def run_test(self): - try: - for i in range(10): - for threaded in [True, False]: - if threaded: - qin, qout = queue.Queue(), queue.Queue() - threading.Thread( - target=run_server, - args=(qin, qout), - daemon=True).start() - else: - qin = multiprocessing.Queue() - qout = multiprocessing.Queue() - multiprocessing.Process( - target=run_server, - args=(qin, qout), - daemon=True).start() - - addr = qout.get() - loop = self.new_loop() - asyncio.set_event_loop(loop) - loop.create_task( - loop.create_connection( - lambda: EchoClientProtocol(loop), - host=addr[0], port=addr[1])) - loop.run_forever() - loop.close() - qin.put('stop') - qout.get() - finally: - loop.close() + for i in range(10): + for threaded in [True, False]: + if threaded: + qin, qout = queue.Queue(), queue.Queue() + threading.Thread( + target=run_server, + args=(qin, qout), + daemon=True).start() + else: + qin = multiprocessing.Queue() + qout = multiprocessing.Queue() + multiprocessing.Process( + target=run_server, + args=(qin, qout), + daemon=True).start() + + addr = qout.get() + loop = self.new_loop() + asyncio.set_event_loop(loop) + loop.create_task( + loop.create_connection( + lambda: EchoClientProtocol(loop), + host=addr[0], port=addr[1])) + loop.run_forever() + loop.close() + qin.put('stop') + qout.get() @unittest.skipIf( multiprocessing.get_start_method(False) == 'spawn', diff --git a/tests/test_signals.py b/tests/test_signals.py index 96879cb8..e51f5691 100644 --- a/tests/test_signals.py +++ b/tests/test_signals.py @@ -117,6 +117,45 @@ async def worker(): loop = """ + self.NEW_LOOP + """ asyncio.set_event_loop(loop) loop.create_task(worker()) +try: + loop.run_forever() +finally: + srv.close() + loop.run_until_complete(srv.wait_closed()) + loop.close() +""" + + proc = await asyncio.create_subprocess_exec( + sys.executable, b'-W', b'ignore', b'-c', PROG, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + + await proc.stdout.readline() + time.sleep(DELAY) + proc.send_signal(signal.SIGINT) + out, err = await proc.communicate() + self.assertIn(b'KeyboardInterrupt', err) + + self.loop.run_until_complete(runner()) + + @tb.silence_long_exec_warning() + def test_signals_sigint_uvcode_two_loop_runs(self): + async def runner(): + PROG = R"""\ +import asyncio +import uvloop + +srv = None + +async def worker(): + global srv + cb = lambda *args: None + srv = await asyncio.start_server(cb, '127.0.0.1', 0) + +loop = """ + self.NEW_LOOP + """ +asyncio.set_event_loop(loop) +loop.run_until_complete(worker()) +print('READY', flush=True) try: loop.run_forever() finally: diff --git a/uvloop/loop.pxd b/uvloop/loop.pxd index 765e48ef..f36d1e2f 100644 --- a/uvloop/loop.pxd +++ b/uvloop/loop.pxd @@ -65,6 +65,7 @@ cdef class Loop: object _ssock object _csock bint _listening_signals + int _old_signal_wakeup_id set _timers dict _polls @@ -149,6 +150,8 @@ cdef class Loop: cdef void _handle_exception(self, object ex) + cdef inline _is_main_thread(self) + cdef inline _new_future(self) cdef inline _check_signal(self, sig) cdef inline _check_closed(self) @@ -186,10 +189,9 @@ cdef class Loop: cdef _sock_set_reuseport(self, int fd) - cdef _setup_signals(self) + cdef _setup_or_resume_signals(self) cdef _shutdown_signals(self) - cdef _recv_signals_start(self) - cdef _recv_signals_stop(self) + cdef _pause_signals(self) cdef _handle_signal(self, sig) cdef _read_from_self(self) diff --git a/uvloop/loop.pyx b/uvloop/loop.pyx index fe79165c..c865fedf 100644 --- a/uvloop/loop.pyx +++ b/uvloop/loop.pyx @@ -167,6 +167,7 @@ cdef class Loop: self._ssock = self._csock = None self._signal_handlers = {} self._listening_signals = False + self._old_signal_wakeup_id = -1 self._coroutine_debug_set = False @@ -183,6 +184,9 @@ cdef class Loop: self._servers = set() + cdef inline _is_main_thread(self): + return MAIN_THREAD_ID == PyThread_get_thread_ident() + def __init__(self): self.set_debug((not sys_ignore_environment and bool(os_environ.get('PYTHONASYNCIODEBUG')))) @@ -241,34 +245,40 @@ cdef class Loop: self._debug_exception_handler_cnt = 0 - cdef _setup_signals(self): - cdef int old_wakeup_fd + cdef _setup_or_resume_signals(self): + if not self._is_main_thread(): + return if self._listening_signals: - return + raise RuntimeError('signals handling has been already setup') + + if self._ssock is not None: + raise RuntimeError('self-pipe exists before loop run') + + # Create a self-pipe and call set_signal_wakeup_fd() with one + # of its ends. This is needed so that libuv knows that it needs + # to wakeup on ^C (no matter if the SIGINT handler is still the + # standard Python's one or or user set their own.) self._ssock, self._csock = socket_socketpair() - self._ssock.setblocking(False) - self._csock.setblocking(False) try: - old_wakeup_fd = _set_signal_wakeup_fd(self._csock.fileno()) - except (OSError, ValueError): - # Not the main thread + self._ssock.setblocking(False) + self._csock.setblocking(False) + + fileno = self._csock.fileno() + + self._old_signal_wakeup_id = _set_signal_wakeup_fd(fileno) + except Exception: + # Out of all statements in the try block, only the + # "_set_signal_wakeup_fd()" call can fail, but it shouldn't, + # as we ensure that the current thread is the main thread. + # Still, if something goes horribly wrong we want to clean up + # the socket pair. self._ssock.close() self._csock.close() - self._ssock = self._csock = None - return - - self._listening_signals = True - return old_wakeup_fd - - cdef _recv_signals_start(self): - cdef object old_wakeup_fd = None - if self._ssock is None: - old_wakeup_fd = self._setup_signals() - if self._ssock is None: - # Not the main thread. - return + self._ssock = None + self._csock = None + raise self._add_reader( self._ssock, @@ -277,30 +287,24 @@ cdef class Loop: "Loop._read_from_self", self._read_from_self, self)) - return old_wakeup_fd - cdef _recv_signals_stop(self): - if self._ssock is None: - return + self._listening_signals = True - self._remove_reader(self._ssock) + cdef _pause_signals(self): + if not self._is_main_thread(): + if self._listening_signals: + raise RuntimeError( + 'cannot pause signals handling; no longer running in ' + 'the main thread') + else: + return - cdef _shutdown_signals(self): if not self._listening_signals: - return + raise RuntimeError('signals handling has not been setup') - for sig in list(self._signal_handlers): - self.remove_signal_handler(sig) - - if not self._listening_signals: - # `remove_signal_handler` will call `_shutdown_signals` when - # removing last signal handler. - return + self._listening_signals = False - try: - signal_set_wakeup_fd(-1) - except (ValueError, OSError) as exc: - aio_logger.info('set_wakeup_fd(-1) failed: %s', exc) + _set_signal_wakeup_fd(self._old_signal_wakeup_id) self._remove_reader(self._ssock) self._ssock.close() @@ -308,7 +312,24 @@ cdef class Loop: self._ssock = None self._csock = None - self._listening_signals = False + cdef _shutdown_signals(self): + if not self._is_main_thread(): + if self._signal_handlers: + aio_logger.warning( + 'cannot cleanup signal handlers: closing the event loop ' + 'in a non-main OS thread') + return + + if self._listening_signals: + raise RuntimeError( + 'cannot shutdown signals handling as it has not been paused') + + if self._ssock: + raise RuntimeError( + 'self-pipe was not cleaned up after loop was run') + + for sig in list(self._signal_handlers): + self.remove_signal_handler(sig) def __sighandler(self, signum, frame): self._signals.add(signum) @@ -451,7 +472,6 @@ cdef class Loop: cdef _run(self, uv.uv_run_mode mode): cdef int err - cdef object old_wakeup_fd if self._closed == 1: raise RuntimeError('unable to start the loop; it was closed') @@ -474,7 +494,7 @@ cdef class Loop: self.handler_check__exec_writes.start() self.handler_idle.start() - old_wakeup_fd = self._recv_signals_start() + self._setup_or_resume_signals() if aio_set_running_loop is not None: aio_set_running_loop(self) @@ -484,13 +504,11 @@ cdef class Loop: if aio_set_running_loop is not None: aio_set_running_loop(None) - self._recv_signals_stop() - if old_wakeup_fd is not None: - signal_set_wakeup_fd(old_wakeup_fd) - self.handler_check__exec_writes.stop() self.handler_idle.stop() + self._pause_signals() + self._thread_is_main = 0 self._thread_id = 0 self._running = 0 @@ -2794,10 +2812,10 @@ cdef class Loop: cdef: Handle h - if not self._listening_signals: - self._setup_signals() - if not self._listening_signals: - raise ValueError('set_wakeup_fd only works in main thread') + if not self._is_main_thread(): + raise ValueError( + 'add_signal_handler() can only be called from ' + 'the main thread') if (aio_iscoroutine(callback) or aio_iscoroutinefunction(callback)): @@ -2829,14 +2847,6 @@ cdef class Loop: self._check_signal(sig) self._check_closed() - try: - # set_wakeup_fd() raises ValueError if this is not the - # main thread. By calling it early we ensure that an - # event loop running in another thread cannot add a signal - # handler. - _set_signal_wakeup_fd(self._csock.fileno()) - except (ValueError, OSError) as exc: - raise RuntimeError(str(exc)) h = new_Handle(self, callback, args or None, None) self._signal_handlers[sig] = h @@ -2866,6 +2876,12 @@ cdef class Loop: Return True if a signal handler was removed, False if not. """ + + if not self._is_main_thread(): + raise ValueError( + 'remove_signal_handler() can only be called from ' + 'the main thread') + self._check_signal(sig) if not self._listening_signals: @@ -2889,9 +2905,6 @@ cdef class Loop: else: raise - if not self._signal_handlers: - self._shutdown_signals() - return True @cython.iterable_coroutine