Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async jupyter client #10

Merged
merged 30 commits into from
Feb 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
0d52672
Asynchronous cell execution
davidbrochart Jan 29, 2020
12420d7
Keep only async versions of execute related methods
davidbrochart Jan 30, 2020
145a038
Split async_run_cell in 2 tasks
davidbrochart Jan 30, 2020
69c21a9
Longer timeout in test_async_parallel_notebooks
davidbrochart Jan 30, 2020
c57f37c
Longer timeout
davidbrochart Jan 30, 2020
31284ba
No timeout
davidbrochart Jan 30, 2020
3d690cd
Fix linter
davidbrochart Jan 30, 2020
1d49ea9
poll_period=0
davidbrochart Jan 30, 2020
fd8ea1f
No timeout
davidbrochart Jan 30, 2020
5be3d95
Add test_many_async_parallel_notebooks
davidbrochart Jan 30, 2020
c4be851
Fully async with async jupyter_client
davidbrochart Feb 2, 2020
4cd62ec
Get jupyter_client #506
davidbrochart Feb 2, 2020
4139f26
Fix pip install
davidbrochart Feb 2, 2020
9f233a8
-
davidbrochart Feb 2, 2020
d6fa25c
-
davidbrochart Feb 2, 2020
d1ff6b9
-
davidbrochart Feb 2, 2020
6f498f6
Python 3.8 only
davidbrochart Feb 2, 2020
aa9bbff
-
davidbrochart Feb 2, 2020
c0b1157
-
davidbrochart Feb 2, 2020
81fb8dc
Workaround for AsyncMock so that we support python3.7
davidbrochart Feb 4, 2020
323fe5c
Fix linter
davidbrochart Feb 4, 2020
55f36e9
Import asynccontextmanager from async_generator for Python < 3.7
davidbrochart Feb 4, 2020
4aa3466
python 3.5 compatibility
davidbrochart Feb 4, 2020
16b3f52
Add py35 and py36 tests
davidbrochart Feb 4, 2020
146cf26
Decrease multiprocessing pool to 2 workers
davidbrochart Feb 4, 2020
3265d5e
Decrease number of async notebook run to 4
davidbrochart Feb 4, 2020
3afc591
Rework polling on shell and iopub channels
davidbrochart Feb 8, 2020
f24ecba
Merge with master
davidbrochart Feb 12, 2020
9c33393
Fix linter, rename
davidbrochart Feb 12, 2020
f513ee7
Pin jupyter_client>=6.0.0
davidbrochart Feb 24, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,22 @@ matrix:
env: TOXENV=py37
- python: 3.8
env: TOXENV=py38
- python: 3.6
- python: 3.8
env: TOXENV=flake8
- python: 3.6
- python: 3.8
env: TOXENV=dist
- python: 3.6
- python: 3.8
env: TOXENV=docs
- python: 3.6
- python: 3.8
env: TOXENV=manifest
install:
- pip install tox coverage codecov
script:
- tox -e $TOXENV
after_success:
- test $TRAVIS_BRANCH = "master" &&
test $TOXENV = "py36" &&
test $TOXENV = "py38" &&
coverage xml -i
- test $TRAVIS_BRANCH = "master" &&
test $TOXENV = "py36" &&
test $TOXENV = "py38" &&
codecov
207 changes: 123 additions & 84 deletions nbclient/client.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
import base64
from textwrap import dedent
from contextlib import contextmanager

# For python 3.5 compatibility we import asynccontextmanager from async_generator instead of
# contextlib, and we `await yield_()` instead of just `yield`
from async_generator import asynccontextmanager, async_generator, yield_

from time import monotonic
from queue import Empty
import asyncio

from traitlets.config.configurable import LoggingConfigurable
from traitlets import List, Unicode, Bool, Enum, Any, Type, Dict, Integer, default
Expand Down Expand Up @@ -285,9 +290,10 @@ def start_kernel_manager(self):
self.km = self.kernel_manager_class(config=self.config)
else:
self.km = self.kernel_manager_class(kernel_name=self.kernel_name, config=self.config)
self.km.client_class = 'jupyter_client.asynchronous.AsyncKernelClient'
return self.km

def start_new_kernel_client(self, **kwargs):
async def start_new_kernel_client(self, **kwargs):
"""Creates a new kernel client.

Parameters
Expand All @@ -314,16 +320,17 @@ def start_new_kernel_client(self, **kwargs):
self.kc = self.km.client()
self.kc.start_channels()
try:
self.kc.wait_for_ready(timeout=self.startup_timeout)
await self.kc.wait_for_ready(timeout=self.startup_timeout)
except RuntimeError:
self.kc.stop_channels()
self.km.shutdown_kernel()
raise
self.kc.allow_stdin = False
return self.kc

@contextmanager
def setup_kernel(self, **kwargs):
@asynccontextmanager
@async_generator # needed for python 3.5 compatibility
async def setup_kernel(self, **kwargs):
"""
Context manager for setting up the kernel to execute a notebook.

Expand All @@ -336,16 +343,28 @@ def setup_kernel(self, **kwargs):
self.start_kernel_manager()

if not self.km.has_kernel:
self.start_new_kernel_client(**kwargs)
await self.start_new_kernel_client(**kwargs)
try:
yield
await yield_(None) # would just yield in python >3.5
finally:
self.kc.stop_channels()
self.kc = None

def execute(self, **kwargs):
"""
Executes each code cell.
Executes each code cell (blocking).

Returns
-------
nb : NotebookNode
The executed notebook.
"""
loop = get_loop()
return loop.run_until_complete(self.async_execute(**kwargs))

async def async_execute(self, **kwargs):
"""
Executes each code cell asynchronously.

Returns
-------
Expand All @@ -354,13 +373,15 @@ def execute(self, **kwargs):
"""
self.reset_execution_trackers()

with self.setup_kernel(**kwargs):
async with self.setup_kernel(**kwargs):
self.log.info("Executing notebook with kernel: %s" % self.kernel_name)
for index, cell in enumerate(self.nb.cells):
# Ignore `'execution_count' in content` as it's always 1
# when store_history is False
self.execute_cell(cell, index, execution_count=self.code_cells_executed + 1)
info_msg = self._wait_for_reply(self.kc.kernel_info())
await self.async_execute_cell(
cell, index, execution_count=self.code_cells_executed + 1
)
info_msg = await self._wait_for_reply(self.kc.kernel_info())
self.nb.metadata['language_info'] = info_msg['content']['language_info']
self.set_widgets_metadata()

Expand Down Expand Up @@ -408,16 +429,40 @@ def _update_display_id(self, display_id, msg):
outputs[output_idx]['data'] = out['data']
outputs[output_idx]['metadata'] = out['metadata']

def _poll_for_reply(self, msg_id, cell=None, timeout=None):
try:
# check with timeout if kernel is still alive
msg = self.kc.shell_channel.get_msg(timeout=timeout)
if msg['parent_header'].get('msg_id') == msg_id:
return msg
except Empty:
# received no message, check if kernel is still alive
self._check_alive()
# kernel still alive, wait for a message
async def _poll_for_reply(self, msg_id, cell, timeout, task_poll_output_msg):
if timeout is not None:
deadline = monotonic() + timeout
while True:
try:
msg = await self.kc.shell_channel.get_msg(timeout=timeout)
if msg['parent_header'].get('msg_id') == msg_id:
try:
await asyncio.wait_for(task_poll_output_msg, self.iopub_timeout)
except (asyncio.TimeoutError, Empty):
if self.raise_on_iopub_timeout:
raise CellTimeoutError.error_from_timeout_and_cell(
"Timeout waiting for IOPub output", self.iopub_timeout, cell
)
else:
self.log.warning("Timeout waiting for IOPub output")
return msg
else:
if timeout is not None:
timeout = max(0, deadline - monotonic())
except Empty:
# received no message, check if kernel is still alive
self._check_alive()
self._handle_timeout(timeout, cell)

async def _poll_output_msg(self, parent_msg_id, cell, cell_index):
while True:
msg = await self.kc.iopub_channel.get_msg(timeout=None)
if msg['parent_header'].get('msg_id') == parent_msg_id:
try:
# Will raise CellExecutionComplete when completed
self.process_message(msg, cell, cell_index)
except CellExecutionComplete:
return

def _get_timeout(self, cell):
if self.timeout_func is not None and cell is not None:
Expand Down Expand Up @@ -445,14 +490,14 @@ def _check_alive(self):
self.log.error("Kernel died while waiting for execute reply.")
raise DeadKernelError("Kernel died")

def _wait_for_reply(self, msg_id, cell=None):
async def _wait_for_reply(self, msg_id, cell=None):
# wait for finish, with timeout
timeout = self._get_timeout(cell)
cummulative_time = 0
self.shell_timeout_interval = 5
while True:
try:
msg = self.kc.shell_channel.get_msg(timeout=self.shell_timeout_interval)
msg = await self.kc.shell_channel.get_msg(timeout=self.shell_timeout_interval)
except Empty:
self._check_alive()
cummulative_time += self.shell_timeout_interval
Expand Down Expand Up @@ -488,7 +533,46 @@ def _check_raise_for_error(self, cell, exec_reply):

def execute_cell(self, cell, cell_index, execution_count=None, store_history=True):
"""
Executes a single code cell.
Executes a single code cell (blocking).

To execute all cells see :meth:`execute`.

Parameters
----------
cell : nbformat.NotebookNode
The cell which is currently being processed.
cell_index : int
The position of the cell within the notebook object.
execution_count : int
The execution count to be assigned to the cell (default: Use kernel response)
store_history : bool
Determines if history should be stored in the kernel (default: False).
Specific to ipython kernels, which can store command histories.

Returns
-------
output : dict
The execution output payload (or None for no output).

Raises
------
CellExecutionError
If execution failed and should raise an exception, this will be raised
with defaults about the failure.

Returns
-------
cell : NotebookNode
The cell which was just processed.
"""
loop = get_loop()
return loop.run_until_complete(
self.async_execute_cell(cell, cell_index, execution_count, store_history)
)

async def async_execute_cell(self, cell, cell_index, execution_count=None, store_history=True):
"""
Executes a single code cell asynchronously.

To execute all cells see :meth:`execute`.

Expand Down Expand Up @@ -531,70 +615,16 @@ def execute_cell(self, cell, cell_index, execution_count=None, store_history=Tru
# We launched a code cell to execute
self.code_cells_executed += 1
exec_timeout = self._get_timeout(cell)
deadline = None
if exec_timeout is not None:
deadline = monotonic() + exec_timeout

cell.outputs = []
self.clear_before_next_output = False

# This loop resolves nbconvert#659. By polling iopub_channel's and shell_channel's
# output we avoid dropping output and important signals (like idle) from
# iopub_channel. Prior to this change, iopub_channel wasn't polled until
# after exec_reply was obtained from shell_channel, leading to the
# aforementioned dropped data.

# These two variables are used to track what still needs polling:
# more_output=true => continue to poll the iopub_channel
more_output = True
# polling_exec_reply=true => continue to poll the shell_channel
polling_exec_reply = True

while more_output or polling_exec_reply:
if polling_exec_reply:
if self._passed_deadline(deadline):
self._handle_timeout(exec_timeout, cell)
polling_exec_reply = False
continue

# Avoid exceeding the execution timeout (deadline), but stop
# after at most 1s so we can poll output from iopub_channel.
timeout = self._timeout_with_deadline(1, deadline)
exec_reply = self._poll_for_reply(parent_msg_id, cell, timeout)
if exec_reply is not None:
polling_exec_reply = False

if more_output:
try:
timeout = self.iopub_timeout
if polling_exec_reply:
# Avoid exceeding the execution timeout (deadline) while
# polling for output.
timeout = self._timeout_with_deadline(timeout, deadline)
msg = self.kc.iopub_channel.get_msg(timeout=timeout)
except Empty:
if polling_exec_reply:
# Still waiting for execution to finish so we expect that
# output may not always be produced yet.
continue

if self.raise_on_iopub_timeout:
raise CellTimeoutError.error_from_timeout_and_cell(
"Timeout waiting for IOPub output", self.iopub_timeout, cell
)
else:
self.log.warning("Timeout waiting for IOPub output")
more_output = False
continue
if msg['parent_header'].get('msg_id') != parent_msg_id:
# not an output from our execution
continue

try:
# Will raise CellExecutionComplete when completed
self.process_message(msg, cell, cell_index)
except CellExecutionComplete:
more_output = False
task_poll_output_msg = asyncio.ensure_future(
self._poll_output_msg(parent_msg_id, cell, cell_index)
)
exec_reply = await self._poll_for_reply(
parent_msg_id, cell, exec_timeout, task_poll_output_msg
)

if execution_count:
cell['execution_count'] = execution_count
Expand Down Expand Up @@ -748,3 +778,12 @@ def execute(nb, cwd=None, km=None, **kwargs):
if cwd is not None:
resources['metadata'] = {'path': cwd}
return NotebookClient(nb=nb, resources=resources, km=km, **kwargs).execute()


def get_loop():
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop
Loading