Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async jupyter client #10

Merged
merged 30 commits into from
Feb 26, 2020
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
0d52672
Asynchronous cell execution
davidbrochart Jan 29, 2020
12420d7
Keep only async versions of execute related methods
davidbrochart Jan 30, 2020
145a038
Split async_run_cell in 2 tasks
davidbrochart Jan 30, 2020
69c21a9
Longer timeout in test_async_parallel_notebooks
davidbrochart Jan 30, 2020
c57f37c
Longer timeout
davidbrochart Jan 30, 2020
31284ba
No timeout
davidbrochart Jan 30, 2020
3d690cd
Fix linter
davidbrochart Jan 30, 2020
1d49ea9
poll_period=0
davidbrochart Jan 30, 2020
fd8ea1f
No timeout
davidbrochart Jan 30, 2020
5be3d95
Add test_many_async_parallel_notebooks
davidbrochart Jan 30, 2020
c4be851
Fully async with async jupyter_client
davidbrochart Feb 2, 2020
4cd62ec
Get jupyter_client #506
davidbrochart Feb 2, 2020
4139f26
Fix pip install
davidbrochart Feb 2, 2020
9f233a8
-
davidbrochart Feb 2, 2020
d6fa25c
-
davidbrochart Feb 2, 2020
d1ff6b9
-
davidbrochart Feb 2, 2020
6f498f6
Python 3.8 only
davidbrochart Feb 2, 2020
aa9bbff
-
davidbrochart Feb 2, 2020
c0b1157
-
davidbrochart Feb 2, 2020
81fb8dc
Workaround for AsyncMock so that we support python3.7
davidbrochart Feb 4, 2020
323fe5c
Fix linter
davidbrochart Feb 4, 2020
55f36e9
Import asynccontextmanager from async_generator for Python < 3.7
davidbrochart Feb 4, 2020
4aa3466
python 3.5 compatibility
davidbrochart Feb 4, 2020
16b3f52
Add py35 and py36 tests
davidbrochart Feb 4, 2020
146cf26
Decrease multiprocessing pool to 2 workers
davidbrochart Feb 4, 2020
3265d5e
Decrease number of async notebook run to 4
davidbrochart Feb 4, 2020
3afc591
Rework polling on shell and iopub channels
davidbrochart Feb 8, 2020
f24ecba
Merge with master
davidbrochart Feb 12, 2020
9c33393
Fix linter, rename
davidbrochart Feb 12, 2020
f513ee7
Pin jupyter_client>=6.0.0
davidbrochart Feb 24, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,22 @@ matrix:
env: TOXENV=py37
- python: 3.8
env: TOXENV=py38
- python: 3.6
- python: 3.8
env: TOXENV=flake8
- python: 3.6
- python: 3.8
env: TOXENV=dist
- python: 3.6
- python: 3.8
env: TOXENV=docs
- python: 3.6
- python: 3.8
env: TOXENV=manifest
install:
- pip install tox coverage codecov
script:
- tox -e $TOXENV
after_success:
- test $TRAVIS_BRANCH = "master" &&
test $TOXENV = "py36" &&
test $TOXENV = "py38" &&
coverage xml -i
- test $TRAVIS_BRANCH = "master" &&
test $TOXENV = "py36" &&
test $TOXENV = "py38" &&
codecov
188 changes: 108 additions & 80 deletions nbclient/execute.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
import base64
from textwrap import dedent
from contextlib import contextmanager

# For python 3.5 compatibility we import asynccontextmanager from async_generator instead of
# contextlib, and we `await yield_()` instead of just `yield`
from async_generator import asynccontextmanager, async_generator, yield_

from time import monotonic
from queue import Empty
import asyncio

from traitlets.config.configurable import LoggingConfigurable
from traitlets import List, Unicode, Bool, Enum, Any, Type, Dict, Integer, default
Expand Down Expand Up @@ -286,9 +291,10 @@ def start_kernel_manager(self):
self.km = self.kernel_manager_class(config=self.config)
else:
self.km = self.kernel_manager_class(kernel_name=self.kernel_name, config=self.config)
self.km.client_class = 'jupyter_client.asynchronous.AsyncKernelClient'
return self.km

def start_new_kernel_client(self, **kwargs):
async def start_new_kernel_client(self, **kwargs):
"""Creates a new kernel client.

Parameters
Expand All @@ -315,16 +321,17 @@ def start_new_kernel_client(self, **kwargs):
self.kc = self.km.client()
self.kc.start_channels()
try:
self.kc.wait_for_ready(timeout=self.startup_timeout)
await self.kc.wait_for_ready(timeout=self.startup_timeout)
except RuntimeError:
self.kc.stop_channels()
self.km.shutdown_kernel()
raise
self.kc.allow_stdin = False
return self.kc

@contextmanager
def setup_kernel(self, **kwargs):
@asynccontextmanager
@async_generator # needed for python 3.5 compatibility
async def setup_kernel(self, **kwargs):
"""
Context manager for setting up the kernel to execute a notebook.

Expand All @@ -337,17 +344,30 @@ def setup_kernel(self, **kwargs):
self.start_kernel_manager()

if not self.km.has_kernel:
self.start_new_kernel_client(**kwargs)
await self.start_new_kernel_client(**kwargs)
try:
yield
await yield_(None) # would just yield in python >3.5
finally:
self.kc.stop_channels()
self.kc = None

# TODO: Remove non-kwarg arguments
def execute(self, **kwargs):
"""
Executes each code cell.
Executes each code cell (blocking).

Returns
-------
nb : NotebookNode
The executed notebook.
"""
loop = get_loop()
return loop.run_until_complete(self.async_execute(**kwargs))

# TODO: Remove non-kwarg arguments
async def async_execute(self, **kwargs):
"""
Executes each code cell asynchronously.

Returns
-------
Expand All @@ -356,11 +376,11 @@ def execute(self, **kwargs):
"""
self.reset_execution_trackers()

with self.setup_kernel(**kwargs):
async with self.setup_kernel(**kwargs):
self.log.info("Executing notebook with kernel: %s" % self.kernel_name)
for index, cell in enumerate(self.nb.cells):
self.execute_cell(cell, index)
info_msg = self._wait_for_reply(self.kc.kernel_info())
await self.async_execute_cell(cell, index)
info_msg = await self._wait_for_reply(self.kc.kernel_info())
self.nb.metadata['language_info'] = info_msg['content']['language_info']
self.set_widgets_metadata()

Expand Down Expand Up @@ -388,14 +408,23 @@ def set_widgets_metadata(self):

def execute_cell(self, cell, cell_index, store_history=True):
"""
Executes a single code cell.
Executes a single code cell (blocking).

To execute all cells see :meth:`execute`.
"""
loop = get_loop()
return loop.run_until_complete(self.async_execute_cell(cell, cell_index, store_history))

async def async_execute_cell(self, cell, cell_index, store_history=True):
"""
Executes a single code cell asynchronously.

To execute all cells see :meth:`execute`.
"""
if cell.cell_type != 'code' or not cell.source.strip():
return cell

reply, outputs = self.run_cell(cell, cell_index, store_history)
reply, outputs = await self.async_run_cell(cell, cell_index, store_history)
# Backwards compatibility for processes that wrap run_cell
cell.outputs = outputs

Expand Down Expand Up @@ -432,16 +461,22 @@ def _update_display_id(self, display_id, msg):
outputs[output_idx]['data'] = out['data']
outputs[output_idx]['metadata'] = out['metadata']

def _poll_for_reply(self, msg_id, cell=None, timeout=None):
try:
# check with timeout if kernel is still alive
msg = self.kc.shell_channel.get_msg(timeout=timeout)
if msg['parent_header'].get('msg_id') == msg_id:
return msg
except Empty:
# received no message, check if kernel is still alive
self._check_alive()
# kernel still alive, wait for a message
async def _poll_for_reply(self, msg_id, cell=None, timeout=None):
if timeout is not None:
deadline = monotonic() + timeout
while True:
try:
# check with timeout if kernel is still alive
msg = await self.kc.shell_channel.get_msg(timeout=timeout)
if msg['parent_header'].get('msg_id') == msg_id:
return msg
else:
if timeout is not None:
timeout = max(0, deadline - monotonic())
except Empty:
# received no message, check if kernel is still alive
self._check_alive()
self._handle_timeout(timeout, cell)

def _get_timeout(self, cell):
if self.timeout_func is not None and cell is not None:
Expand Down Expand Up @@ -469,14 +504,14 @@ def _check_alive(self):
self.log.error("Kernel died while waiting for execute reply.")
raise DeadKernelError("Kernel died")

def _wait_for_reply(self, msg_id, cell=None):
async def _wait_for_reply(self, msg_id, cell=None):
# wait for finish, with timeout
timeout = self._get_timeout(cell)
cummulative_time = 0
self.shell_timeout_interval = 5
while True:
try:
msg = self.kc.shell_channel.get_msg(timeout=self.shell_timeout_interval)
msg = await self.kc.shell_channel.get_msg(timeout=self.shell_timeout_interval)
except Empty:
self._check_alive()
cummulative_time += self.shell_timeout_interval
Expand All @@ -502,14 +537,41 @@ def _passed_deadline(self, deadline):
return False

def run_cell(self, cell, cell_index=0, store_history=False):
loop = get_loop()
return loop.run_until_complete(self.async_run_cell(cell, cell_index, store_history))

async def poll_output_msg(self, parent_msg_id, cell, cell_index, timeout=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will need to do some more manual testing to ensure we correctly timeout, pass execution context, and handle errors cleanly. Some of those patterns are tested in the synchronous path with some assumptions that might not hold for the async version.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to explain a bit more how I implemented the asynchronous functionality, there are now two tasks that are run in parallel:

  • _poll_for_reply: asynchronously awaits a reply on the shell channel (possibly with timeout).
  • poll_output_msg: asynchronously awaits a message on the IOPub channel for cell execution. We first await as long as no reply is received on the shell channel, and when a reply is received on the shell channel we cancel this task and launch it again with a timeout.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, that helps!

if timeout is not None:
deadline = monotonic() + timeout
while True:
try:
msg = await self.kc.iopub_channel.get_msg(timeout=timeout)
except Empty:
# timeout
if self.raise_on_iopub_timeout:
raise CellTimeoutError.error_from_timeout_and_cell(
"Timeout waiting for IOPub output", self.iopub_timeout, cell
)
else:
self.log.warning("Timeout waiting for IOPub output")
return
if msg['parent_header'].get('msg_id') != parent_msg_id:
# not an output from our execution
pass
else:
try:
# Will raise CellExecutionComplete when completed
self.process_message(msg, cell, cell_index)
except CellExecutionComplete:
return
if timeout is not None:
timeout = max(0, deadline - monotonic())

async def async_run_cell(self, cell, cell_index=0, store_history=False):
parent_msg_id = self.kc.execute(
cell.source, store_history=store_history, stop_on_error=not self.allow_errors
)
self.log.debug("Executing cell:\n%s", cell.source)
exec_timeout = self._get_timeout(cell)
deadline = None
if exec_timeout is not None:
deadline = monotonic() + exec_timeout

cell.outputs = []
self.clear_before_next_output = False
Expand All @@ -520,57 +582,14 @@ def run_cell(self, cell, cell_index=0, store_history=False):
# after exec_reply was obtained from shell_channel, leading to the
# aforementioned dropped data.

# These two variables are used to track what still needs polling:
# more_output=true => continue to poll the iopub_channel
more_output = True
# polling_exec_reply=true => continue to poll the shell_channel
polling_exec_reply = True

while more_output or polling_exec_reply:
if polling_exec_reply:
if self._passed_deadline(deadline):
self._handle_timeout(exec_timeout, cell)
polling_exec_reply = False
continue

# Avoid exceeding the execution timeout (deadline), but stop
# after at most 1s so we can poll output from iopub_channel.
timeout = self._timeout_with_deadline(1, deadline)
exec_reply = self._poll_for_reply(parent_msg_id, cell, timeout)
if exec_reply is not None:
polling_exec_reply = False

if more_output:
try:
timeout = self.iopub_timeout
if polling_exec_reply:
# Avoid exceeding the execution timeout (deadline) while
# polling for output.
timeout = self._timeout_with_deadline(timeout, deadline)
msg = self.kc.iopub_channel.get_msg(timeout=timeout)
except Empty:
if polling_exec_reply:
# Still waiting for execution to finish so we expect that
# output may not always be produced yet.
continue

if self.raise_on_iopub_timeout:
raise CellTimeoutError.error_from_timeout_and_cell(
"Timeout waiting for IOPub output", self.iopub_timeout, cell
)
else:
self.log.warning("Timeout waiting for IOPub output")
more_output = False
continue
if msg['parent_header'].get('msg_id') != parent_msg_id:
# not an output from our execution
continue

try:
# Will raise CellExecutionComplete when completed
self.process_message(msg, cell, cell_index)
except CellExecutionComplete:
more_output = False
exec_timeout = self._get_timeout(cell)
task_poll_output_msg = asyncio.ensure_future(
self.poll_output_msg(parent_msg_id, cell, cell_index)
)
exec_reply = await self._poll_for_reply(parent_msg_id, cell, exec_timeout)
if not task_poll_output_msg.done():
task_poll_output_msg.cancel()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never knew about cancel, can this mean a message can get lost?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, because messages are queued anyway in zmq. And we create a new task polling for messages just after that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But can it cancel, when it's off the zmq queue, say, it's in the json parser. How does it cancel like Ctrl-c.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can't do anything between two await, so it depends on the implementation. For instance, in zmq.asyncio.Socket.recv_multipart we might await zmq.asyncio.Socket.recv several times and so cancelling in the middle of it can lead to losing a message, you're right.

Copy link
Member Author

@davidbrochart davidbrochart Feb 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another way that I tried out was to leave the first poll_output_msg task running (with no timeout) and launch another one with a timeout when _poll_for_reply is done (and if the first poll_output_msg task is not done), and then asyncio.wait(both_tasks, return_when=asyncio.FIRST_COMPLETED), but it doesn't handle exceptions well (there is a return_when=asyncio.FIRST_EXCEPTION for that).
I can't think of a better solution right now.

await self.poll_output_msg(parent_msg_id, cell, cell_index, self.iopub_timeout)

# Return cell.outputs still for backwards compatibility
return exec_reply, cell.outputs
Expand Down Expand Up @@ -721,3 +740,12 @@ def executenb(nb, cwd=None, km=None, **kwargs):
if cwd is not None:
resources['metadata'] = {'path': cwd}
return Executor(nb=nb, resources=resources, km=km, **kwargs).execute()


def get_loop():
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop
Loading