From 6f241d27be8eae23f4d18af506cbca39a6c4b377 Mon Sep 17 00:00:00 2001 From: Min RK Date: Fri, 7 May 2021 11:01:19 +0200 Subject: [PATCH 1/3] flush control queue prior to handling shell messages preserves control channel priority over shell channel - when control thread is running, resolves message-processing races - when control thread is not running, ensures control messages are processed before shell requests --- ipykernel/kernelbase.py | 35 +++++++++++++++++++++++++++++++++-- 1 file changed, 33 insertions(+), 2 deletions(-) diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index 6be2a9d6f..3ae03b458 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -4,6 +4,7 @@ # Distributed under the terms of the Modified BSD License. import asyncio +import concurrent.futures from datetime import datetime from functools import partial import itertools @@ -213,8 +214,34 @@ def dispatch_control(self, msg): async def poll_control_queue(self): while True: msg = await self.control_queue.get() + # handle tracers from _flush_control_queue + if isinstance(msg, (concurrent.futures.Future, asyncio.Future)): + msg.set_result(None) + continue await self.process_control(msg) + async def _flush_control_queue(self): + """Flush the control queue, wait for processing of any pending messages""" + if self.control_thread: + control_loop = self.control_thread.io_loop + # concurrent.futures.Futures are threadsafe + # and can be used to await across threads + tracer_future = concurrent.futures.Future() + awaitable_future = asyncio.wrap_future(tracer_future) + else: + control_loop = self.io_loop + tracer_future = awaitable_future = asyncio.Future() + + def _flush(): + # control_stream.flush puts messages on the queue + self.control_stream.flush() + # put Future on the queue after all of those, + # so we can wait for all queued messages to be processed + self.control_queue.put(tracer_future) + + control_loop.add_callback(_flush) + return awaitable_future + async def process_control(self, msg): """dispatch control requests""" idents, msg = self.session.feed_identities(msg, copy=False) @@ -265,6 +292,10 @@ def should_handle(self, stream, msg, idents): async def dispatch_shell(self, msg): """dispatch shell requests""" + + # flush control queue before handling shell requests + await self._flush_control_queue() + idents, msg = self.session.feed_identities(msg, copy=False) try: msg = self.session.deserialize(msg, content=True, copy=False) @@ -630,7 +661,7 @@ async def inspect_request(self, stream, ident, parent): content.get('detail_level', 0), ) if inspect.isawaitable(reply_content): - reply_content = await reply_content + reply_content = await reply_content # Before we send this object over, we scrub it for JSON usage reply_content = json_clean(reply_content) @@ -944,7 +975,7 @@ def _input_request(self, prompt, ident, parent, password=False): raise KeyboardInterrupt("Interrupted by user") from None except Exception as e: self.log.warning("Invalid Message:", exc_info=True) - + try: value = reply["content"]["value"] except Exception: From 749ed915d585fcbff6e878f1fb083588a1154611 Mon Sep 17 00:00:00 2001 From: Min RK Date: Fri, 7 May 2021 12:06:40 +0200 Subject: [PATCH 2/3] test control channel priority --- ipykernel/tests/test_kernel.py | 45 ++++++++++++++++++++++++++++++++++ ipykernel/tests/utils.py | 8 +++--- 2 files changed, 50 insertions(+), 3 deletions(-) diff --git a/ipykernel/tests/test_kernel.py b/ipykernel/tests/test_kernel.py index 9fe09f43b..13afdcb9f 100644 --- a/ipykernel/tests/test_kernel.py +++ b/ipykernel/tests/test_kernel.py @@ -417,3 +417,48 @@ def test_interrupt_during_pdb_set_trace(): # If we failed to interrupt interrupt, this will timeout: reply = get_reply(kc, msg_id2, TIMEOUT) validate_message(reply, 'execute_reply', msg_id2) + + +def test_control_thread_priority(): + + N = 5 + with new_kernel() as kc: + msg_id = kc.execute("pass") + get_reply(kc, msg_id) + + sleep_msg_id = kc.execute("import asyncio; await asyncio.sleep(2)") + + # submit N shell messages + shell_msg_ids = [] + for i in range(N): + shell_msg_ids.append(kc.execute(f"i = {i}")) + + # ensure all shell messages have arrived at the kernel before any control messages + time.sleep(0.5) + # at this point, shell messages should be waiting in msg_queue, + # rather than zmq while the kernel is still in the middle of processing + # the first execution + + # now send N control messages + control_msg_ids = [] + for i in range(N): + msg = kc.session.msg("kernel_info_request", {}) + kc.control_channel.send(msg) + control_msg_ids.append(msg["header"]["msg_id"]) + + # finally, collect the replies on both channels for comparison + sleep_reply = get_reply(kc, sleep_msg_id) + shell_replies = [] + for msg_id in shell_msg_ids: + shell_replies.append(get_reply(kc, msg_id)) + + control_replies = [] + for msg_id in control_msg_ids: + control_replies.append(get_reply(kc, msg_id, channel="control")) + + # verify that all control messages were handled before all shell messages + shell_dates = [msg["header"]["date"] for msg in shell_replies] + control_dates = [msg["header"]["date"] for msg in control_replies] + # comparing first to last ought to be enough, since queues preserve order + # use <= in case of very-fast handling and/or low resolution timers + assert control_dates[-1] <= shell_dates[0] diff --git a/ipykernel/tests/utils.py b/ipykernel/tests/utils.py index 920261e84..386a7a40e 100644 --- a/ipykernel/tests/utils.py +++ b/ipykernel/tests/utils.py @@ -53,13 +53,15 @@ def flush_channels(kc=None): validate_message(msg) -def get_reply(kc, msg_id, timeout): - timeout = TIMEOUT +def get_reply(kc, msg_id, timeout=TIMEOUT, channel='shell'): t0 = time() while True: - reply = kc.get_shell_msg(timeout=timeout) + get_msg = getattr(kc, f'get_{channel}_msg') + reply = get_msg(timeout=timeout) if reply['parent_header']['msg_id'] == msg_id: break + # Allow debugging ignored replies + print(f"Ignoring reply not to {msg_id}: {reply}") t1 = time() timeout -= t1 - t0 t0 = t1 From f1748ae70a85f5bd752e44244f2b13e4bd263fa6 Mon Sep 17 00:00:00 2001 From: Min RK Date: Fri, 7 May 2021 12:59:04 +0200 Subject: [PATCH 3/3] inprocess kernel has nothing to flush --- ipykernel/inprocess/ipkernel.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/ipykernel/inprocess/ipkernel.py b/ipykernel/inprocess/ipkernel.py index 942907226..bfa715acd 100644 --- a/ipykernel/inprocess/ipkernel.py +++ b/ipykernel/inprocess/ipkernel.py @@ -87,6 +87,10 @@ async def _abort_queues(self): """ The in-process kernel doesn't abort requests. """ pass + async def _flush_control_queue(self): + """No need to flush control queues for in-process""" + pass + def _input_request(self, prompt, ident, parent, password=False): # Flush output before making the request. self.raw_input_str = None