From a524fe068401e3e5a1ad9fad4a57933dedb2d0ab Mon Sep 17 00:00:00 2001 From: martinRenou Date: Wed, 9 Feb 2022 10:12:39 +0100 Subject: [PATCH] Backport PR #3335: Use control comm target in LabManager --- jupyterlab_widgets/src/manager.ts | 82 +---------- packages/base/src/manager-base.ts | 223 ++++++++++++++++++++++++++++++ widgetsnbextension/src/manager.js | 75 +++------- 3 files changed, 243 insertions(+), 137 deletions(-) diff --git a/jupyterlab_widgets/src/manager.ts b/jupyterlab_widgets/src/manager.ts index ac2380b6931..9519ba27536 100644 --- a/jupyterlab_widgets/src/manager.ts +++ b/jupyterlab_widgets/src/manager.ts @@ -6,17 +6,13 @@ import * as Backbone from 'backbone'; import { ManagerBase, shims, IClassicComm, IWidgetRegistryData, ExportMap, - ExportData, WidgetModel, WidgetView, put_buffers, serialize_state, IStateOptions + ExportData, WidgetModel, WidgetView, serialize_state, IStateOptions } from '@jupyter-widgets/base'; import { IDisposable } from '@lumino/disposable'; -import { - PromiseDelegate -} from '@lumino/coreutils'; - import { Widget } from '@lumino/widgets'; @@ -225,69 +221,8 @@ class WidgetManager extends ManagerBase implements IDisposable { if (this.context.sessionContext.session?.kernel.handleComms === false) { return; } - const comm_ids = await this._get_comm_info(); - - // For each comm id that we do not know about, create the comm, and request the state. - const widgets_info = await Promise.all(Object.keys(comm_ids).map(async (comm_id) => { - try { - await this.get_model(comm_id); - // If we successfully get the model, do no more. - return; - } catch (e) { - // If we have the widget model not found error, then we can create the - // widget. Otherwise, rethrow the error. We have to check the error - // message text explicitly because the get_model function in this - // class throws a generic error with this specific text. - if (e.message !== 'widget model not found') { - throw e; - } - const comm = await this._create_comm(this.comm_target_name, comm_id); - - let msg_id: string; - const info = new PromiseDelegate(); - comm.on_msg((msg: KernelMessage.ICommMsgMsg) => { - if ((msg.parent_header as any).msg_id === msg_id - && msg.header.msg_type === 'comm_msg' - && msg.content.data.method === 'update') { - let data = (msg.content.data as any); - let buffer_paths = data.buffer_paths || []; - // Make sure the buffers are DataViews - let buffers = (msg.buffers || []).map(b => { - if (b instanceof DataView) { - return b; - } else { - return new DataView(b instanceof ArrayBuffer ? b : b.buffer); - } - }); - put_buffers(data.state, buffer_paths, buffers); - info.resolve({comm, msg}); - } - }); - msg_id = comm.send({ - method: 'request_state' - }, this.callbacks(undefined)); - return info.promise; - } - })); - - // We put in a synchronization barrier here so that we don't have to - // topologically sort the restored widgets. `new_model` synchronously - // registers the widget ids before reconstructing their state - // asynchronously, so promises to every widget reference should be available - // by the time they are used. - await Promise.all(widgets_info.map(async widget_info => { - if (!widget_info) { - return; - } - const content = widget_info.msg.content as any; - await this.new_model({ - model_name: content.data.state._model_name, - model_module: content.data.state._model_module, - model_module_version: content.data.state._model_module_version, - comm: widget_info.comm, - }, content.data.state); - })); + return super._loadFromKernel(); } @@ -538,16 +473,3 @@ namespace WidgetManager { saveState: boolean }; } - - -namespace Private { - - /** - * Data promised when a comm info request resolves. - */ - export - interface ICommUpdateData { - comm: IClassicComm; - msg: KernelMessage.ICommMsgMsg; - } -} diff --git a/packages/base/src/manager-base.ts b/packages/base/src/manager-base.ts index 3afd2a358d7..1ad7a317630 100644 --- a/packages/base/src/manager-base.ts +++ b/packages/base/src/manager-base.ts @@ -4,6 +4,10 @@ import * as utils from './utils'; import * as services from '@jupyterlab/services'; +import { + PromiseDelegate, + } from '@lumino/coreutils'; + import { DOMWidgetView, WidgetModel, WidgetView, DOMWidgetModel } from './widget'; @@ -18,6 +22,21 @@ import { const PROTOCOL_MAJOR_VERSION = PROTOCOL_VERSION.split('.', 1)[0]; +/** + * The control comm target name. + */ + export const CONTROL_COMM_TARGET = 'jupyter.widget.control'; + + /** + * The supported version for the control comm channel. + */ + export const CONTROL_COMM_PROTOCOL_VERSION = '1.0.0'; + + /** + * Time (in ms) after which we consider the control comm target not responding. + */ + export const CONTROL_COMM_TIMEOUT = 4000; + /** * The options for a model. * @@ -361,7 +380,201 @@ abstract class ManagerBase { widget_model.name = options.model_name; widget_model.module = options.model_module; return widget_model; + } + /** + * Fetch all widgets states from the kernel using the control comm channel + * If this fails (control comm handler not implemented kernel side), + * it will fallback to `_loadFromKernelSlow`. + * + * This is a utility function that can be used in subclasses. + */ + protected async _loadFromKernel(): Promise { + // Try fetching all widget states through the control comm + let data: any; + let buffers: any; + try { + const initComm = await this._create_comm( + CONTROL_COMM_TARGET, + utils.uuid(), + {}, + { version: CONTROL_COMM_PROTOCOL_VERSION } + ); + + await new Promise((resolve, reject) => { + initComm.on_msg((msg: any) => { + data = msg['content']['data']; + + if (data.method !== 'update_states') { + console.warn(` + Unknown ${data.method} message on the Control channel + `); + return; + } + + buffers = (msg.buffers || []).map((b: any) => { + if (b instanceof DataView) { + return b; + } else { + return new DataView(b instanceof ArrayBuffer ? b : b.buffer); + } + }); + + resolve(null); + }); + + initComm.on_close(() => reject('Control comm was closed too early')); + + // Send a states request msg + initComm.send({ method: 'request_states' }, {}); + + // Reject if we didn't get a response in time + setTimeout( + () => reject('Control comm did not respond in time'), + CONTROL_COMM_TIMEOUT + ); + }); + + initComm.close(); + } catch (error) { + console.warn( + 'Failed to fetch widgets through the "jupyter.widget.control" comm channel, fallback to slow fetching of widgets. Reason:', + error + ); + // Fallback to the old implementation for old ipywidgets backend versions (<=7.6) + return this._loadFromKernelSlow(); + } + + const states: any = data.states; + + // Extract buffer paths + const bufferPaths: any = {}; + for (const bufferPath of data.buffer_paths) { + if (!bufferPaths[bufferPath[0]]) { + bufferPaths[bufferPath[0]] = []; + } + bufferPaths[bufferPath[0]].push(bufferPath.slice(1)); + } + + // Start creating all widgets + await Promise.all( + Object.keys(states).map(async (widget_id) => { + try { + const state = states[widget_id]; + const comm = await this._create_comm('jupyter.widget', widget_id); + + // Put binary buffers + if (widget_id in bufferPaths) { + const nBuffers = bufferPaths[widget_id].length; + utils.put_buffers( + state, + bufferPaths[widget_id], + buffers.splice(0, nBuffers) + ); + } + + await this.new_model( + { + model_name: state.model_name, + model_module: state.model_module, + model_module_version: state.model_module_version, + model_id: widget_id, + comm: comm, + }, + state.state + ); + } catch (error) { + // Failed to create a widget model, we continue creating other models so that + // other widgets can render + console.error(error); + } + }) + ); + } + + /** + * Old implementation of fetching widgets one by one using + * the request_state message on each comm. + * + * This is a utility function that can be used in subclasses. + */ + protected async _loadFromKernelSlow(): Promise { + const comm_ids = await this._get_comm_info(); + + // For each comm id that we do not know about, create the comm, and request the state. + const widgets_info = await Promise.all( + Object.keys(comm_ids).map(async (comm_id) => { + try { + const model = this.get_model(comm_id); + // TODO Have the same this.get_model implementation for + // the widgetsnbextension and labextension, the one that + // throws an error if the model is not found instead of + // returning undefined + if (model === undefined) { + throw new Error('widget model not found'); + } + await model; + // If we successfully get the model, do no more. + return; + } catch (e) { + // If we have the widget model not found error, then we can create the + // widget. Otherwise, rethrow the error. We have to check the error + // message text explicitly because the get_model function in this + // class throws a generic error with this specific text. + if (e.message !== 'widget model not found') { + throw e; + } + const comm = await this._create_comm(this.comm_target_name, comm_id); + + let msg_id = ''; + const info = new PromiseDelegate(); + comm.on_msg((msg) => { + if ( + (msg.parent_header as any).msg_id === msg_id && + msg.header.msg_type === 'comm_msg' && + msg.content.data.method === 'update' + ) { + const data = msg.content.data as any; + const buffer_paths = data.buffer_paths || []; + const buffers = msg.buffers || []; + utils.put_buffers(data.state, buffer_paths, buffers); + info.resolve({ comm, msg }); + } + }); + msg_id = comm.send( + { + method: 'request_state', + }, + this.callbacks(undefined) + ); + + return info.promise; + } + }) + ); + + // We put in a synchronization barrier here so that we don't have to + // topologically sort the restored widgets. `new_model` synchronously + // registers the widget ids before reconstructing their state + // asynchronously, so promises to every widget reference should be available + // by the time they are used. + await Promise.all( + widgets_info.map(async (widget_info) => { + if (!widget_info) { + return; + } + const content = widget_info.msg.content as any; + await this.new_model( + { + model_name: content.data.state._model_name, + model_module: content.data.state._model_module, + model_module_version: content.data.state._model_module_version, + comm: widget_info.comm, + }, + content.data.state + ); + }) + ); } /** @@ -586,3 +799,13 @@ function serialize_state(models: WidgetModel[], options: IStateOptions = {}) { }); return {version_major: 2, version_minor: 0, state: state}; } + +namespace Private { + /** + * Data promised when a comm info request resolves. + */ + export interface ICommUpdateData { + comm: IClassicComm; + msg: services.KernelMessage.ICommMsgMsg; + } +} diff --git a/widgetsnbextension/src/manager.js b/widgetsnbextension/src/manager.js index 876e33b5243..91c836c18c3 100644 --- a/widgetsnbextension/src/manager.js +++ b/widgetsnbextension/src/manager.js @@ -75,62 +75,23 @@ export class WidgetManager extends base.ManagerBase { // Attempt to reconstruct any live comms by requesting them from the back-end (2). var that = this; - this._get_comm_info().then(function(comm_ids) { - - // Create comm class instances from comm ids (2). - var comm_promises = Object.keys(comm_ids).map(function(comm_id) { - return that._create_comm(that.comm_target_name, comm_id); - }); - - // Send a state request message out for each widget comm and wait - // for the responses (2). - return Promise.all(comm_promises).then(function(comms) { - return Promise.all(comms.map(function(comm) { - var update_promise = new Promise(function(resolve, reject) { - comm.on_msg(function (msg) { - base.put_buffers(msg.content.data.state, msg.content.data.buffer_paths, msg.buffers); - // A suspected response was received, check to see if - // it's a state update. If so, resolve. - if (msg.content.data.method === 'update') { - resolve({ - comm: comm, - msg: msg - }); - } - }); - }); - comm.send({ - method: 'request_state' - }, that.callbacks()); - return update_promise; - })); - }).then(function(widgets_info) { - return Promise.all(widgets_info.map(function(widget_info) { - return that.new_model({ - model_name: widget_info.msg.content.data.state._model_name, - model_module: widget_info.msg.content.data.state._model_module, - model_module_version: widget_info.msg.content.data.state._model_module_version, - comm: widget_info.comm, - }, widget_info.msg.content.data.state); - })); - }).then(function() { - // Now that we have mirrored any widgets from the kernel... - // Restore any widgets from saved state that are not live (3) - if (widget_md && widget_md['application/vnd.jupyter.widget-state+json']) { - var state = notebook.metadata.widgets['application/vnd.jupyter.widget-state+json']; - state = that.filterExistingModelState(state); - return that.set_state(state); - } - }).then(function() { - // Rerender cells that have widget data - that.notebook.get_cells().forEach(function(cell) { - var rerender = cell.output_area && cell.output_area.outputs.find(function(output) { - return output.data && output.data[MIME_TYPE]; - }); - if (rerender) { - that.notebook.render_cell_output(cell); - } + this._loadFromKernel().then(function() { + // Now that we have mirrored any widgets from the kernel... + // Restore any widgets from saved state that are not live (3) + if (widget_md && widget_md['application/vnd.jupyter.widget-state+json']) { + var state = notebook.metadata.widgets['application/vnd.jupyter.widget-state+json']; + state = that.filterExistingModelState(state); + return that.set_state(state); + } + }).then(function() { + // Rerender cells that have widget data + that.notebook.get_cells().forEach(function(cell) { + var rerender = cell.output_area && cell.output_area.outputs.find(function(output) { + return output.data && output.data[MIME_TYPE]; }); + if (rerender) { + that.notebook.render_cell_output(cell); + } }); }); @@ -336,7 +297,7 @@ export class WidgetManager extends base.ManagerBase { } /** - * List of widget managers in *reverse* order + * List of widget managers in *reverse* order * (_managers[0] is the most recent) */ -WidgetManager._managers = []; +WidgetManager._managers = [];