Skip to content

Commit

Permalink
Run control channel in separate thread
Browse files Browse the repository at this point in the history
  • Loading branch information
SylvainCorlay committed Jan 29, 2021
1 parent aba2179 commit bf48c69
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 88 deletions.
21 changes: 21 additions & 0 deletions ipykernel/control.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@

from threading import Thread
import zmq
if zmq.pyzmq_version_info() >= (17, 0):
from tornado.ioloop import IOLoop
else:
# deprecated since pyzmq 17
from zmq.eventloop.ioloop import IOLoop


class ControlThread(Thread):

def __init__(self, context):
Thread.__init__(self)
self.context = context
self.io_loop = IOLoop(make_current=False)

def run(self):
self.io_loop.make_current()
self.io_loop.start()
self.io_loop.close(all_fds=True)
41 changes: 16 additions & 25 deletions ipykernel/eventloops.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,7 @@ def loop_qt4(kernel):

kernel.app = get_app_qt4([" "])
kernel.app.setQuitOnLastWindowClosed(False)

# Only register the eventloop for the shell stream because doing
# it for the control stream is generating a bunch of unnecessary
# warnings on Windows.
_notify_stream_qt(kernel, kernel.shell_streams[0])
_notify_stream_qt(kernel, kernel.shell_stream)

_loop_qt(kernel.app)

Expand Down Expand Up @@ -160,10 +156,9 @@ def loop_wx(kernel):

def wake():
"""wake from wx"""
for stream in kernel.shell_streams:
if stream.flush(limit=1):
kernel.app.ExitMainLoop()
return
if kernel.shell_stream.flush(limit=1):
kernel.app.ExitMainLoop()
return

# We have to put the wx.Timer in a wx.Frame for it to fire properly.
# We make the Frame hidden when we create it in the main app below.
Expand Down Expand Up @@ -237,13 +232,12 @@ def process_stream_events(stream, *a, **kw):
# For Tkinter, we create a Tk object and call its withdraw method.
kernel.app_wrapper = BasicAppWrapper(app)

for stream in kernel.shell_streams:
notifier = partial(process_stream_events, stream)
# seems to be needed for tk
notifier.__name__ = "notifier"
app.tk.createfilehandler(stream.getsockopt(zmq.FD), READABLE, notifier)
# schedule initial call after start
app.after(0, notifier)
notifier = partial(process_stream_events, shell_stream)
# seems to be needed for tk
notifier.__name__ = "notifier"
app.tk.createfilehandler(shell_stream.getsockopt(zmq.FD), READABLE, notifier)
# schedule initial call after start
app.after(0, notifier)

app.mainloop()

Expand Down Expand Up @@ -330,10 +324,9 @@ def handle_int(etype, value, tb):
# don't let interrupts during mainloop invoke crash_handler:
sys.excepthook = handle_int
mainloop(kernel._poll_interval)
for stream in kernel.shell_streams:
if stream.flush(limit=1):
# events to process, return control to kernel
return
if kernel_shell_stream.flush(limit=1):
# events to process, return control to kernel
return
except:
raise
except KeyboardInterrupt:
Expand Down Expand Up @@ -371,11 +364,9 @@ def process_stream_events(stream):
if stream.flush(limit=1):
loop.stop()

for stream in kernel.shell_streams:
fd = stream.getsockopt(zmq.FD)
notifier = partial(process_stream_events, stream)
loop.add_reader(fd, notifier)
loop.call_soon(notifier)
notifier = partial(process_stream_events, shell_stream)
loop.add_reader(shell_stream.getsockopt(zmq.FD), notifier)
loop.call_soon(notifier)

while True:
error = None
Expand Down
5 changes: 2 additions & 3 deletions ipykernel/inprocess/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
#-----------------------------------------------------------------------------

# IPython imports
from ipykernel.inprocess.socket import DummySocket
from traitlets import Type, Instance, default
from jupyter_client.clientabc import KernelClientABC
from jupyter_client.client import KernelClient
Expand Down Expand Up @@ -171,10 +170,10 @@ def _dispatch_to_kernel(self, msg):
if kernel is None:
raise RuntimeError('Cannot send request. No kernel exists.')

stream = DummySocket()
stream = kernel.shell_stream
self.session.send(stream, msg)
msg_parts = stream.recv_multipart()
kernel.dispatch_shell(stream, msg_parts)
kernel.dispatch_shell(msg_parts)

idents, reply_msg = self.session.recv(stream, copy=False)
self.shell_channel.call_handlers_later(reply_msg)
Expand Down
4 changes: 2 additions & 2 deletions ipykernel/inprocess/ipkernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ class InProcessKernel(IPythonKernel):
#-------------------------------------------------------------------------

shell_class = Type(allow_none=True)
shell_streams = List()
control_stream = Any()
_underlying_iopub_socket = Instance(DummySocket, ())
iopub_thread = Instance(IOPubThread)

shell_stream = Instance(DummySocket, ())

@default('iopub_thread')
def _default_iopub_thread(self):
thread = IOPubThread(self._underlying_iopub_socket)
Expand Down
24 changes: 18 additions & 6 deletions ipykernel/kernelapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

# local imports
from .iostream import IOPubThread
from .control import ControlThread
from .heartbeat import Heartbeat
from .ipkernel import IPythonKernel
from .parentpoller import ParentPollerUnix, ParentPollerWindows
Expand Down Expand Up @@ -124,6 +125,7 @@ class IPKernelApp(BaseIPythonApplication, InteractiveShellApp,
stdin_socket = Any()
iopub_socket = Any()
iopub_thread = Any()
control_thread = Any()

ports = Dict()

Expand Down Expand Up @@ -276,6 +278,17 @@ def init_sockets(self):
self.stdin_port = self._bind_socket(self.stdin_socket, self.stdin_port)
self.log.debug("stdin ROUTER Channel on port: %i" % self.stdin_port)

if hasattr(zmq, 'ROUTER_HANDOVER'):
# set router-handover to workaround zeromq reconnect problems
# in certain rare circumstances
# see ipython/ipykernel#270 and zeromq/libzmq#2892
self.shell_socket.router_handover = \
self.stdin_socket.router_handover = 1

self.init_control(context)
self.init_iopub(context)

def init_control(self, context):
self.control_socket = context.socket(zmq.ROUTER)
self.control_socket.linger = 1000
self.control_port = self._bind_socket(self.control_socket, self.control_port)
Expand All @@ -285,11 +298,10 @@ def init_sockets(self):
# set router-handover to workaround zeromq reconnect problems
# in certain rare circumstances
# see ipython/ipykernel#270 and zeromq/libzmq#2892
self.shell_socket.router_handover = \
self.control_socket.router_handover = \
self.stdin_socket.router_handover = 1
self.control_socket.router_handover = 1

self.init_iopub(context)
self.control_thread = ControlThread(self.control_socket)
self.control_thread.start()

def init_iopub(self, context):
self.iopub_socket = context.socket(zmq.PUB)
Expand Down Expand Up @@ -437,13 +449,13 @@ def init_signal(self):
def init_kernel(self):
"""Create the Kernel object itself"""
shell_stream = ZMQStream(self.shell_socket)
control_stream = ZMQStream(self.control_socket)
control_stream = ZMQStream(self.control_socket, self.control_thread.io_loop)

kernel_factory = self.kernel_class.instance

kernel = kernel_factory(parent=self, session=self.session,
control_stream=control_stream,
shell_streams=[shell_stream, control_stream],
shell_stream=shell_stream,
iopub_thread=self.iopub_thread,
iopub_socket=self.iopub_socket,
stdin_socket=self.stdin_socket,
Expand Down
84 changes: 32 additions & 52 deletions ipykernel/kernelbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

from tornado import ioloop
from tornado import gen
from tornado.queues import PriorityQueue, QueueEmpty
from tornado.queues import Queue, QueueEmpty
import zmq
from zmq.eventloop.zmqstream import ZMQStream

Expand All @@ -38,9 +38,6 @@

from ._version import kernel_protocol_version

CONTROL_PRIORITY = 1
SHELL_PRIORITY = 10


class Kernel(SingletonConfigurable):

Expand All @@ -60,7 +57,7 @@ def _update_eventloop(self, change):

session = Instance(Session, allow_none=True)
profile_dir = Instance('IPython.core.profiledir.ProfileDir', allow_none=True)
shell_streams = List()
shell_stream = Instance(ZMQStream, allow_none=True)
control_stream = Instance(ZMQStream, allow_none=True)
iopub_socket = Any()
iopub_thread = Any()
Expand Down Expand Up @@ -215,7 +212,7 @@ def should_handle(self, stream, msg, idents):
return True

@gen.coroutine
def dispatch_shell(self, stream, msg):
def dispatch_shell(self, msg):
"""dispatch shell requests"""
idents, msg = self.session.feed_identities(msg, copy=False)
try:
Expand All @@ -232,11 +229,11 @@ def dispatch_shell(self, stream, msg):

# Only abort execute requests
if self._aborting and msg_type == 'execute_request':
self._send_abort_reply(stream, msg, idents)
self._send_abort_reply(self.shell_stream, msg, idents)
self._publish_status('idle')
# flush to ensure reply is sent before
# handling the next request
stream.flush(zmq.POLLOUT)
self.shell_stream.flush(zmq.POLLOUT)
return

# Print some info about this message and leave a '--->' marker, so it's
Expand All @@ -245,7 +242,7 @@ def dispatch_shell(self, stream, msg):
self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
self.log.debug(' Content: %s\n --->\n ', msg['content'])

if not self.should_handle(stream, msg, idents):
if not self.should_handle(self.shell_stream, msg, idents):
return

handler = self.shell_handlers.get(msg_type, None)
Expand All @@ -258,7 +255,7 @@ def dispatch_shell(self, stream, msg):
except Exception:
self.log.debug("Unable to signal in pre_handler_hook:", exc_info=True)
try:
yield gen.maybe_future(handler(stream, idents, msg))
yield gen.maybe_future(handler(self.shell_stream, idents, msg))
except Exception:
self.log.error("Exception in message handler:", exc_info=True)
finally:
Expand All @@ -272,7 +269,7 @@ def dispatch_shell(self, stream, msg):
self._publish_status('idle')
# flush to ensure reply is sent before
# handling the next request
stream.flush(zmq.POLLOUT)
self.shell_stream.flush(zmq.POLLOUT)

def pre_handler_hook(self):
"""Hook to execute before calling message handler"""
Expand Down Expand Up @@ -332,27 +329,22 @@ def do_one_iteration(self):
.. versionchanged:: 5
This is now a coroutine
"""
# flush messages off of shell streams into the message queue
for stream in self.shell_streams:
stream.flush()
# process all messages higher priority than shell (control),
# and at most one shell message per iteration
priority = 0
while priority is not None and priority < SHELL_PRIORITY:
priority = yield self.process_one(wait=False)
# flush messages off of shell stream into the message queue
self.shell_stream.flush()
# process at most one shell message per iteration
yield self.process_one(wait=False)

@gen.coroutine
def process_one(self, wait=True):
"""Process one request
Returns priority of the message handled.
Returns None if no message was handled.
"""
if wait:
priority, t, dispatch, args = yield self.msg_queue.get()
t, dispatch, args = yield self.msg_queue.get()
else:
try:
priority, t, dispatch, args = self.msg_queue.get_nowait()
t, dispatch, args = self.msg_queue.get_nowait()
except QueueEmpty:
return None
yield gen.maybe_future(dispatch(*args))
Expand All @@ -377,21 +369,18 @@ def dispatch_queue(self):

_message_counter = Any(
help="""Monotonic counter of messages
Ensures messages of the same priority are handled in arrival order.
""",
)
@default('_message_counter')
def _message_counter_default(self):
return itertools.count()

def schedule_dispatch(self, priority, dispatch, *args):
def schedule_dispatch(self, dispatch, *args):
"""schedule a message for dispatch"""
idx = next(self._message_counter)

self.msg_queue.put_nowait(
(
priority,
idx,
dispatch,
args,
Expand All @@ -403,32 +392,24 @@ def schedule_dispatch(self, priority, dispatch, *args):
def start(self):
"""register dispatchers for streams"""
self.io_loop = ioloop.IOLoop.current()
self.msg_queue = PriorityQueue()
self.msg_queue = Queue()
self.io_loop.add_callback(self.dispatch_queue)

self.control_stream.on_recv(
partial(
self.schedule_dispatch,
self.dispatch_control,
),
copy=False,
)

if self.control_stream:
self.control_stream.on_recv(
partial(
self.schedule_dispatch,
CONTROL_PRIORITY,
self.dispatch_control,
),
copy=False,
)

for s in self.shell_streams:
if s is self.control_stream:
continue
s.on_recv(
partial(
self.schedule_dispatch,
SHELL_PRIORITY,
self.dispatch_shell,
s,
),
copy=False,
)
self.shell_stream.on_recv(
partial(
self.schedule_dispatch,
self.dispatch_shell,
),
copy=False,
)

# publish idle status
self._publish_status('starting')
Expand Down Expand Up @@ -784,8 +765,7 @@ def _topic(self, topic):

@gen.coroutine
def _abort_queues(self):
for stream in self.shell_streams:
stream.flush()
self.shell_stream.flush()
self._aborting = True

def stop_aborting(f):
Expand Down Expand Up @@ -909,4 +889,4 @@ def _at_shutdown(self):
if self._shutdown_message is not None:
self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
self.log.debug("%s", self._shutdown_message)
[ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
self.shell_stream.flush(zmq.POLLOUT)

0 comments on commit bf48c69

Please sign in to comment.