From e4186c47c06740aa7e18e9de97b175b62a615dc9 Mon Sep 17 00:00:00 2001 From: Min RK Date: Tue, 10 Oct 2017 11:06:48 +0200 Subject: [PATCH] allow disabling offline message buffering escape hatch if the new buffering is causing anyone trouble --- notebook/services/kernels/kernelmanager.py | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/notebook/services/kernels/kernelmanager.py b/notebook/services/kernels/kernelmanager.py index 6f47ed6a71..3405ed8385 100644 --- a/notebook/services/kernels/kernelmanager.py +++ b/notebook/services/kernels/kernelmanager.py @@ -83,6 +83,17 @@ def _update_root_dir(self, proposal): Only effective if cull_idle_timeout is not 0.""" ) + buffer_offline_messages = Bool(True, config=True, + help="""Whether messages from kernels whose frontends have disconnected should be buffered in-memory. + + When True (default), messages are buffered and replayed on reconnect, + avoiding lost messages due to interrupted connectivity. + + Disable if long-running kernels will produce too much output while + no frontends are connected. + """ + ) + _kernel_buffers = Any() @default('_kernel_buffers') def _default_kernel_buffers(self): @@ -105,7 +116,7 @@ def cwd_for_path(self, path): while not os.path.isdir(os_path) and os_path != self.root_dir: os_path = os.path.dirname(os_path) return os_path - + @gen.coroutine def start_kernel(self, kernel_id=None, path=None, **kwargs): """Start a kernel for a session and return its kernel_id. @@ -148,7 +159,7 @@ def start_kernel(self, kernel_id=None, path=None, **kwargs): # py2-compat raise gen.Return(kernel_id) - + def start_buffering(self, kernel_id, session_key, channels): """Start buffering messages for a kernel @@ -163,6 +174,12 @@ def start_buffering(self, kernel_id, session_key, channels): channels: dict({'channel': ZMQStream}) The zmq channels whose messages should be buffered. """ + + if not self.buffer_offline_messages: + for channel, stream in channels.items(): + stream.close() + return + self.log.info("Starting buffering for %s", session_key) self._check_kernel_id(kernel_id) # clear previous buffering state @@ -182,7 +199,6 @@ def buffer_msg(channel, msg_parts): for channel, stream in channels.items(): stream.on_recv(partial(buffer_msg, channel)) - def get_buffer(self, kernel_id, session_key): """Get the buffer for a given kernel