Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kernel providers #112

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added
- Adopt Jupyter Kernel Management and Jupyter Protocol for kernel management framework per [JEP #45](https://github.com/jupyter/enhancement-proposals/pull/45) ([#112](https://github.com/jupyter/jupyter_server/pull/112)).

## [0.3] - 2020-4-22

### Added
Expand Down Expand Up @@ -74,4 +77,4 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- [Batch 8](https://github.com/jupyter/jupyter_server/pull/106)

### Security
- Added a "secure_write to function for cookie/token saves ([#77](https://github.com/jupyter/jupyter_server/pull/77))
- Added a "secure_write to function for cookie/token saves ([#77](https://github.com/jupyter/jupyter_server/pull/77))
Empty file added appveyor.yml
Empty file.
8 changes: 4 additions & 4 deletions jupyter_server/base/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,10 @@ def contents_js_source(self):
# Manager objects
#---------------------------------------------------------------

@property
def kernel_finder(self):
return self.settings['kernel_finder']

@property
def kernel_manager(self):
return self.settings['kernel_manager']
Expand All @@ -262,10 +266,6 @@ def session_manager(self):
def terminal_manager(self):
return self.settings['terminal_manager']

@property
def kernel_spec_manager(self):
return self.settings['kernel_spec_manager']

@property
def config_manager(self):
return self.settings['config_manager']
Expand Down
180 changes: 10 additions & 170 deletions jupyter_server/base/zmqhandlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,80 +4,11 @@
# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.

import json
import struct
import sys
import tornado

from urllib.parse import urlparse
from tornado import ioloop, web
from tornado.websocket import WebSocketHandler

from jupyter_client.session import Session
from jupyter_client.jsonutil import date_default, extract_dates
from ipython_genutils.py3compat import cast_unicode

from .handlers import JupyterHandler


def serialize_binary_message(msg):
"""serialize a message as a binary blob

Header:

4 bytes: number of msg parts (nbufs) as 32b int
4 * nbufs bytes: offset for each buffer as integer as 32b int

Offsets are from the start of the buffer, including the header.

Returns
-------

The message serialized to bytes.

"""
# don't modify msg or buffer list in-place
msg = msg.copy()
buffers = list(msg.pop('buffers'))
if sys.version_info < (3, 4):
buffers = [x.tobytes() for x in buffers]
bmsg = json.dumps(msg, default=date_default).encode('utf8')
buffers.insert(0, bmsg)
nbufs = len(buffers)
offsets = [4 * (nbufs + 1)]
for buf in buffers[:-1]:
offsets.append(offsets[-1] + len(buf))
offsets_buf = struct.pack('!' + 'I' * (nbufs + 1), nbufs, *offsets)
buffers.insert(0, offsets_buf)
return b''.join(buffers)


def deserialize_binary_message(bmsg):
"""deserialize a message from a binary blog

Header:

4 bytes: number of msg parts (nbufs) as 32b int
4 * nbufs bytes: offset for each buffer as integer as 32b int

Offsets are from the start of the buffer, including the header.

Returns
-------

message dictionary
"""
nbufs = struct.unpack('!i', bmsg[:4])[0]
offsets = list(struct.unpack('!' + 'I' * nbufs, bmsg[4:4*(nbufs+1)]))
offsets.append(None)
bufs = []
for start, stop in zip(offsets[:-1], offsets[1:]):
bufs.append(bmsg[start:stop])
msg = json.loads(bufs[0].decode('utf8'))
msg['header'] = extract_dates(msg['header'])
msg['parent_header'] = extract_dates(msg['parent_header'])
msg['buffers'] = bufs[1:]
return msg
from tornado import ioloop
from tornado.iostream import StreamClosedError
from tornado.websocket import WebSocketClosedError

# ping interval for keeping websockets alive (30 seconds)
WS_PING_INTERVAL = 30000
Expand Down Expand Up @@ -181,106 +112,15 @@ def send_ping(self):
self.close()
return

self.ping(b'')
try:
self.ping(b'')
except (StreamClosedError, WebSocketClosedError):
# websocket has been closed, stop pinging
self.ping_callback.stop()
return

self.last_ping = now

def on_pong(self, data):
self.last_pong = ioloop.IOLoop.current().time()


class ZMQStreamHandler(WebSocketMixin, WebSocketHandler):

if tornado.version_info < (4,1):
"""Backport send_error from tornado 4.1 to 4.0"""
def send_error(self, *args, **kwargs):
if self.stream is None:
super(WebSocketHandler, self).send_error(*args, **kwargs)
else:
# If we get an uncaught exception during the handshake,
# we have no choice but to abruptly close the connection.
# TODO: for uncaught exceptions after the handshake,
# we can close the connection more gracefully.
self.stream.close()


def _reserialize_reply(self, msg_or_list, channel=None):
"""Reserialize a reply message using JSON.

msg_or_list can be an already-deserialized msg dict or the zmq buffer list.
If it is the zmq list, it will be deserialized with self.session.

This takes the msg list from the ZMQ socket and serializes the result for the websocket.
This method should be used by self._on_zmq_reply to build messages that can
be sent back to the browser.

"""
if isinstance(msg_or_list, dict):
# already unpacked
msg = msg_or_list
else:
idents, msg_list = self.session.feed_identities(msg_or_list)
msg = self.session.deserialize(msg_list)
if channel:
msg['channel'] = channel
if msg['buffers']:
buf = serialize_binary_message(msg)
return buf
else:
smsg = json.dumps(msg, default=date_default)
return cast_unicode(smsg)

def _on_zmq_reply(self, stream, msg_list):
# Sometimes this gets triggered when the on_close method is scheduled in the
# eventloop but hasn't been called.
if self.ws_connection is None or stream.closed():
self.log.warning("zmq message arrived on closed channel")
self.close()
return
channel = getattr(stream, 'channel', None)
try:
msg = self._reserialize_reply(msg_list, channel=channel)
except Exception:
self.log.critical("Malformed message: %r" % msg_list, exc_info=True)
else:
self.write_message(msg, binary=isinstance(msg, bytes))


class AuthenticatedZMQStreamHandler(ZMQStreamHandler, JupyterHandler):

def set_default_headers(self):
"""Undo the set_default_headers in JupyterHandler

which doesn't make sense for websockets
"""
pass

def pre_get(self):
"""Run before finishing the GET request

Extend this method to add logic that should fire before
the websocket finishes completing.
"""
# authenticate the request before opening the websocket
if self.get_current_user() is None:
self.log.warning("Couldn't authenticate WebSocket connection")
raise web.HTTPError(403)

if self.get_argument('session_id', False):
self.session.session = cast_unicode(self.get_argument('session_id'))
else:
self.log.warning("No session ID specified")

async def get(self, *args, **kwargs):
# pre_get can be a coroutine in subclasses
# assign and yield in two step to avoid tornado 3 issues
res = self.pre_get()
await res
res = super(AuthenticatedZMQStreamHandler, self).get(*args, **kwargs)
await res

def initialize(self):
self.log.debug("Initializing websocket connection %s", self.request.path)
self.session = Session(config=self.config)

def get_compression_options(self):
return self.settings.get('websocket_compression_options', None)
8 changes: 4 additions & 4 deletions jupyter_server/gateway/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from tornado.escape import url_escape, json_decode, utf8

from ipython_genutils.py3compat import cast_unicode
from jupyter_client.session import Session
from jupyter_protocol.session import Session, new_id_bytes
from traitlets.config.configurable import LoggingConfigurable

from .managers import GatewayClient
Expand Down Expand Up @@ -58,7 +58,7 @@ def authenticate(self):

def initialize(self):
self.log.debug("Initializing websocket connection %s", self.request.path)
self.session = Session(config=self.config)
self.session = Session(key=new_id_bytes())
self.gateway = GatewayWebSocketClient(gateway_url=GatewayClient.instance().url)

async def get(self, kernel_id, *args, **kwargs):
Expand Down Expand Up @@ -227,8 +227,8 @@ class GatewayResourceHandler(APIHandler):

@web.authenticated
async def get(self, kernel_name, path, include_body=True):
ksm = self.kernel_spec_manager
kernel_spec_res = await ksm.get_kernel_spec_resource(kernel_name, path)
kf = self.kernel_finder
kernel_spec_res = await kf.get_kernel_spec_resource(kernel_name, path)
if kernel_spec_res is None:
self.log.warning("Kernelspec resource '{}' for '{}' not found. Gateway may not support"
" resource serving.".format(path, kernel_name))
Expand Down
25 changes: 18 additions & 7 deletions jupyter_server/gateway/managers.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.

import asyncio
import os
import json
import logging

from jupyter_kernel_mgmt.discovery import KernelFinder
from socket import gaierror
from tornado import web
from tornado.concurrent import Future
from tornado.escape import json_encode, json_decode, url_escape
from tornado.httpclient import HTTPClient, AsyncHTTPClient, HTTPError

from ..services.kernels.kernelmanager import MappingKernelManager
from ..services.sessions.sessionmanager import SessionManager

from jupyter_client.kernelspec import KernelSpecManager
from ..utils import url_path_join

from traitlets import Instance, Unicode, Float, Bool, default, validate, TraitError
from traitlets.config import SingletonConfigurable

Expand Down Expand Up @@ -488,14 +489,16 @@ def shutdown_all(self, now=False):
self.remove_kernel(kernel_id)


class GatewayKernelSpecManager(KernelSpecManager):

def __init__(self, **kwargs):
super(GatewayKernelSpecManager, self).__init__(**kwargs)
class GatewayKernelFinder(KernelFinder):
def __init__(self, parent, providers=[]):
super(GatewayKernelFinder, self).__init__(providers=providers)
self.base_endpoint = url_path_join(GatewayClient.instance().url,
GatewayClient.instance().kernelspecs_endpoint)
self.base_resource_endpoint = url_path_join(GatewayClient.instance().url,
GatewayClient.instance().kernelspecs_resource_endpoint)
# Because KernelFinder is not a taitlet/Configurable, we need to simulate a configurable
self.parent = parent
self.log = logging.getLogger(__name__)

def _get_kernelspecs_endpoint_url(self, kernel_name=None):
"""Builds a url for the kernels endpoint
Expand All @@ -509,6 +512,14 @@ def _get_kernelspecs_endpoint_url(self, kernel_name=None):

return self.base_endpoint

@asyncio.coroutine
def find_kernels(self):
remote_kspecs = yield from self.get_all_specs()

# convert to list of 2 tuples
for kernel_type, attributes in remote_kspecs.items():
yield kernel_type, attributes

async def get_all_specs(self):
fetched_kspecs = await self.list_kernel_specs()

Expand Down
22 changes: 13 additions & 9 deletions jupyter_server/kernelspecs/handlers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from tornado import web
from urllib.parse import unquote
from ..base.handlers import JupyterHandler
from ..services.kernelspecs.handlers import kernel_name_regex

Expand All @@ -11,20 +12,23 @@ def initialize(self):

@web.authenticated
def get(self, kernel_name, path, include_body=True):
ksm = self.kernel_spec_manager
try:
self.root = ksm.get_kernel_spec(kernel_name).resource_dir
except KeyError as e:
raise web.HTTPError(404, u'Kernel spec %s not found' %
kernel_name) from e
self.log.debug("Serving kernel resource from: %s", self.root)
return web.StaticFileHandler.get(self, path, include_body=include_body)
kf = self.kernel_finder
# TODO: Do we actually want all kernel type names to be case-insensitive?
kernel_name = unquote(kernel_name.lower())
for name, info in kf.find_kernels():
if name == kernel_name:
self.root = info['resource_dir']
self.log.debug("Serving kernel resource from: %s", self.root)
return web.StaticFileHandler.get(self, path,
include_body=include_body)

raise web.HTTPError(404, u'Kernel spec %s not found' % kernel_name)

@web.authenticated
def head(self, kernel_name, path):
return self.get(kernel_name, path, include_body=False)


default_handlers = [
(r"/kernelspecs/%s/(?P<path>.*)" % kernel_name_regex, KernelSpecResourceHandler),
]

Loading