Skip to content

Commit

Permalink
Server extension allows optional kernel name parameter and creates ne…
Browse files Browse the repository at this point in the history
…w kernel if needed (#313)

* first take at adding kernel name

* working

* clean up code

* started unit tests

* first take unit tests

* fix method to mock

* finished tests

* bump versions

* update readme

* remove extra dot

* fix python 2 return and yield
  • Loading branch information
aggFTW authored Jan 5, 2017
1 parent 402d4c9 commit 67fcf74
Show file tree
Hide file tree
Showing 8 changed files with 254 additions and 184 deletions.
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,13 @@ Request Body example:
'path': 'path.ipynb',
'username': 'username',
'password': 'password',
'endpoint': 'url'
'endpoint': 'url',
'kernelname': 'pysparkkernel'
}
```

Returns `200` if successful; `400` if body is not JSON string or key is not found; `404` if kernel for path is not found; `500` if error is encountered changing clusters.
*Note that the kernelname parameter is optional and defaults to the one specified on the config file or pysparkkernel if not on the config file.*
Returns `200` if successful; `400` if body is not JSON string or key is not found; `500` if error is encountered changing clusters.

Reply Body example:
```
Expand Down
2 changes: 1 addition & 1 deletion autovizwidget/autovizwidget/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.9.3'
__version__ = '0.10.0'
2 changes: 1 addition & 1 deletion hdijupyterutils/hdijupyterutils/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.9.3'
__version__ = '0.10.0'
4 changes: 3 additions & 1 deletion sparkmagic/example_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,7 @@

"heartbeat_refresh_seconds": 30,
"livy_server_heartbeat_timeout_seconds": 0,
"heartbeat_retry_seconds": 10
"heartbeat_retry_seconds": 10,

"server_extension_default_kernel_name": "pysparkkernel"
}
2 changes: 1 addition & 1 deletion sparkmagic/sparkmagic/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = '0.9.3'
__version__ = '0.10.0'
from sparkmagic.serverextension.handlers import load_jupyter_server_extension


Expand Down
88 changes: 63 additions & 25 deletions sparkmagic/sparkmagic/serverextension/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,32 @@
from notebook.utils import url_path_join
from notebook.base.handlers import IPythonHandler
from tornado import web
from tornado import gen
from tornado.web import MissingArgumentError
from tornado.escape import json_decode

from sparkmagic.kernels.kernelmagics import KernelMagics
import sparkmagic.utils.configuration as conf
from sparkmagic.utils.sparkevents import SparkEvents
from sparkmagic.utils.sparklogger import SparkLog


class ReconnectHandler(IPythonHandler):
logger = None

@web.authenticated
@gen.coroutine
def post(self):
self.logger = SparkLog(u"ReconnectHandler")

spark_events = self._get_spark_events()

try:
data = json_decode(self.request.body)
except ValueError as e:
self.set_status(400)
msg = "Invalid JSON in request body."
self.logger.error(msg)
self.finish(msg)
spark_events.emit_cluster_change_event(None, 400, False, msg)
return
Expand All @@ -32,25 +41,18 @@ def post(self):
except MissingArgumentError as e:
self.set_status(400)
self.finish(str(e))
self.logger.error(str(e))
spark_events.emit_cluster_change_event(endpoint, 400, False, str(e))
return

# Get kernel manager
kernel_manager = self._get_kernel_manager(path)
if kernel_manager is None:
status_code = 404
self.set_status(status_code)
error = "No kernel for given path"
self.finish(json.dumps(dict(success=False, error=error), sort_keys=True))
spark_events.emit_cluster_change_event(endpoint, status_code, False, error)
return

# Restart
kernel_manager.restart_kernel()
kernel_name = self._get_kernel_name(data)

# Get kernel manager, create a new kernel if none exists or restart the existing one when applicable
kernel_manager = yield self._get_kernel_manager(path, kernel_name)

# Execute code
client = kernel_manager.client()
code = '%{} -s {} -u {} -p {}'.format(KernelMagics._do_not_call_change_endpoint.__name__, endpoint, username, password)
code = '%{} -s {} -u {} -p {}'.format(KernelMagics._do_not_call_change_endpoint.__name__, endpoint, username, password)
response_id = client.execute(code, silent=False, store_history=False)
msg = client.get_shell_msg(response_id)

Expand All @@ -60,34 +62,70 @@ def post(self):
if successful_message:
status_code = 200
else:
self.logger.error(u"Code to reconnect errored out: {}".format(error))
status_code = 500

# Post execution info
self.set_status(status_code)
self.finish(json.dumps(dict(success=successful_message, error=error), sort_keys=True))
spark_events.emit_cluster_change_event(endpoint, status_code, successful_message, error)

def _get_kernel_name(self, data):
kernel_name = self._get_argument_if_exists(data, 'kernelname')
self.logger.debug("Kernel name is {}".format(kernel_name))
if kernel_name is None:
kernel_name = conf.server_extension_default_kernel_name()
self.logger.debug("Defaulting to kernel name {}".format(kernel_name))
return kernel_name

def _get_argument_if_exists(self, data, key):
return data.get(key)

def _get_argument_or_raise(self, data, key):
try:
return data[key]
except KeyError:
raise MissingArgumentError(key)

def _get_kernel_manager(self, path):

@gen.coroutine
def _get_kernel_manager(self, path, kernel_name):
sessions = self.session_manager.list_sessions()

kernel_id = None
for session in sessions:
if session['notebook']['path'] == path:
session_id = session['id']
kernel_id = session['kernel']['id']
existing_kernel_name = session['kernel']['name']
break

if kernel_id is None:
return None

return self.kernel_manager.get_kernel(kernel_id)

def _msg_status(selg, msg):
self.logger.debug(u"Kernel not found. Starting a new kernel.")
k_m = yield self._get_kernel_manager_new_session(path, kernel_name)
elif existing_kernel_name != kernel_name:
self.logger.debug(u"Existing kernel name '{}' does not match requested '{}'. Starting a new kernel.".format(existing_kernel_name, kernel_name))
self._delete_session(session_id)
k_m = yield self._get_kernel_manager_new_session(path, kernel_name)
else:
self.logger.debug(u"Kernel found. Restarting kernel.")
k_m = self.kernel_manager.get_kernel(kernel_id)
k_m.restart_kernel()

raise gen.Return(k_m)

@gen.coroutine
def _get_kernel_manager_new_session(self, path, kernel_name):
model_future = self.session_manager.create_session(kernel_name=kernel_name, path=path)
model = yield model_future
kernel_id = model["kernel"]["id"]
self.logger.debug("Kernel created with id {}".format(str(kernel_id)))
k_m = self.kernel_manager.get_kernel(kernel_id)
raise gen.Return(k_m)

def _delete_session(self, session_id):
self.session_manager.delete_session(session_id)

def _msg_status(self, msg):
return msg['content']['status']

def _msg_successful(self, msg):
Expand All @@ -108,11 +146,11 @@ def _get_spark_events(self):
def load_jupyter_server_extension(nb_app):
nb_app.log.info("sparkmagic extension enabled!")
web_app = nb_app.web_app

base_url = web_app.settings['base_url']
host_pattern = '.*$'

route_pattern_reconnect = url_path_join(base_url, '/reconnectsparkmagic')
handlers = [(route_pattern_reconnect, ReconnectHandler)]

web_app.add_handlers(host_pattern, handlers)
Loading

0 comments on commit 67fcf74

Please sign in to comment.