Skip to content

Commit

Permalink
Backport PR #3335: Use control comm target in LabManager
Browse files Browse the repository at this point in the history
  • Loading branch information
martinRenou committed Feb 9, 2022
1 parent 9636b75 commit a524fe0
Show file tree
Hide file tree
Showing 3 changed files with 243 additions and 137 deletions.
82 changes: 2 additions & 80 deletions jupyterlab_widgets/src/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,13 @@ import * as Backbone from 'backbone';

import {
ManagerBase, shims, IClassicComm, IWidgetRegistryData, ExportMap,
ExportData, WidgetModel, WidgetView, put_buffers, serialize_state, IStateOptions
ExportData, WidgetModel, WidgetView, serialize_state, IStateOptions
} from '@jupyter-widgets/base';

import {
IDisposable
} from '@lumino/disposable';

import {
PromiseDelegate
} from '@lumino/coreutils';

import {
Widget
} from '@lumino/widgets';
Expand Down Expand Up @@ -225,69 +221,8 @@ class WidgetManager extends ManagerBase<Widget> implements IDisposable {
if (this.context.sessionContext.session?.kernel.handleComms === false) {
return;
}
const comm_ids = await this._get_comm_info();

// For each comm id that we do not know about, create the comm, and request the state.
const widgets_info = await Promise.all(Object.keys(comm_ids).map(async (comm_id) => {
try {
await this.get_model(comm_id);
// If we successfully get the model, do no more.
return;
} catch (e) {
// If we have the widget model not found error, then we can create the
// widget. Otherwise, rethrow the error. We have to check the error
// message text explicitly because the get_model function in this
// class throws a generic error with this specific text.
if (e.message !== 'widget model not found') {
throw e;
}
const comm = await this._create_comm(this.comm_target_name, comm_id);

let msg_id: string;
const info = new PromiseDelegate<Private.ICommUpdateData>();
comm.on_msg((msg: KernelMessage.ICommMsgMsg) => {
if ((msg.parent_header as any).msg_id === msg_id
&& msg.header.msg_type === 'comm_msg'
&& msg.content.data.method === 'update') {
let data = (msg.content.data as any);
let buffer_paths = data.buffer_paths || [];
// Make sure the buffers are DataViews
let buffers = (msg.buffers || []).map(b => {
if (b instanceof DataView) {
return b;
} else {
return new DataView(b instanceof ArrayBuffer ? b : b.buffer);
}
});
put_buffers(data.state, buffer_paths, buffers);
info.resolve({comm, msg});
}
});
msg_id = comm.send({
method: 'request_state'
}, this.callbacks(undefined));

return info.promise;
}
}));

// We put in a synchronization barrier here so that we don't have to
// topologically sort the restored widgets. `new_model` synchronously
// registers the widget ids before reconstructing their state
// asynchronously, so promises to every widget reference should be available
// by the time they are used.
await Promise.all(widgets_info.map(async widget_info => {
if (!widget_info) {
return;
}
const content = widget_info.msg.content as any;
await this.new_model({
model_name: content.data.state._model_name,
model_module: content.data.state._model_module,
model_module_version: content.data.state._model_module_version,
comm: widget_info.comm,
}, content.data.state);
}));
return super._loadFromKernel();
}


Expand Down Expand Up @@ -538,16 +473,3 @@ namespace WidgetManager {
saveState: boolean
};
}


namespace Private {

/**
* Data promised when a comm info request resolves.
*/
export
interface ICommUpdateData {
comm: IClassicComm;
msg: KernelMessage.ICommMsgMsg;
}
}
223 changes: 223 additions & 0 deletions packages/base/src/manager-base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
import * as utils from './utils';
import * as services from '@jupyterlab/services';

import {
PromiseDelegate,
} from '@lumino/coreutils';

import {
DOMWidgetView, WidgetModel, WidgetView, DOMWidgetModel
} from './widget';
Expand All @@ -18,6 +22,21 @@ import {

const PROTOCOL_MAJOR_VERSION = PROTOCOL_VERSION.split('.', 1)[0];

/**
* The control comm target name.
*/
export const CONTROL_COMM_TARGET = 'jupyter.widget.control';

/**
* The supported version for the control comm channel.
*/
export const CONTROL_COMM_PROTOCOL_VERSION = '1.0.0';

/**
* Time (in ms) after which we consider the control comm target not responding.
*/
export const CONTROL_COMM_TIMEOUT = 4000;

/**
* The options for a model.
*
Expand Down Expand Up @@ -361,7 +380,201 @@ abstract class ManagerBase<T> {
widget_model.name = options.model_name;
widget_model.module = options.model_module;
return widget_model;
}

/**
* Fetch all widgets states from the kernel using the control comm channel
* If this fails (control comm handler not implemented kernel side),
* it will fallback to `_loadFromKernelSlow`.
*
* This is a utility function that can be used in subclasses.
*/
protected async _loadFromKernel(): Promise<void> {
// Try fetching all widget states through the control comm
let data: any;
let buffers: any;
try {
const initComm = await this._create_comm(
CONTROL_COMM_TARGET,
utils.uuid(),
{},
{ version: CONTROL_COMM_PROTOCOL_VERSION }
);

await new Promise((resolve, reject) => {
initComm.on_msg((msg: any) => {
data = msg['content']['data'];

if (data.method !== 'update_states') {
console.warn(`
Unknown ${data.method} message on the Control channel
`);
return;
}

buffers = (msg.buffers || []).map((b: any) => {
if (b instanceof DataView) {
return b;
} else {
return new DataView(b instanceof ArrayBuffer ? b : b.buffer);
}
});

resolve(null);
});

initComm.on_close(() => reject('Control comm was closed too early'));

// Send a states request msg
initComm.send({ method: 'request_states' }, {});

// Reject if we didn't get a response in time
setTimeout(
() => reject('Control comm did not respond in time'),
CONTROL_COMM_TIMEOUT
);
});

initComm.close();
} catch (error) {
console.warn(
'Failed to fetch widgets through the "jupyter.widget.control" comm channel, fallback to slow fetching of widgets. Reason:',
error
);
// Fallback to the old implementation for old ipywidgets backend versions (<=7.6)
return this._loadFromKernelSlow();
}

const states: any = data.states;

// Extract buffer paths
const bufferPaths: any = {};
for (const bufferPath of data.buffer_paths) {
if (!bufferPaths[bufferPath[0]]) {
bufferPaths[bufferPath[0]] = [];
}
bufferPaths[bufferPath[0]].push(bufferPath.slice(1));
}

// Start creating all widgets
await Promise.all(
Object.keys(states).map(async (widget_id) => {
try {
const state = states[widget_id];
const comm = await this._create_comm('jupyter.widget', widget_id);

// Put binary buffers
if (widget_id in bufferPaths) {
const nBuffers = bufferPaths[widget_id].length;
utils.put_buffers(
state,
bufferPaths[widget_id],
buffers.splice(0, nBuffers)
);
}

await this.new_model(
{
model_name: state.model_name,
model_module: state.model_module,
model_module_version: state.model_module_version,
model_id: widget_id,
comm: comm,
},
state.state
);
} catch (error) {
// Failed to create a widget model, we continue creating other models so that
// other widgets can render
console.error(error);
}
})
);
}

/**
* Old implementation of fetching widgets one by one using
* the request_state message on each comm.
*
* This is a utility function that can be used in subclasses.
*/
protected async _loadFromKernelSlow(): Promise<void> {
const comm_ids = await this._get_comm_info();

// For each comm id that we do not know about, create the comm, and request the state.
const widgets_info = await Promise.all(
Object.keys(comm_ids).map(async (comm_id) => {
try {
const model = this.get_model(comm_id);
// TODO Have the same this.get_model implementation for
// the widgetsnbextension and labextension, the one that
// throws an error if the model is not found instead of
// returning undefined
if (model === undefined) {
throw new Error('widget model not found');
}
await model;
// If we successfully get the model, do no more.
return;
} catch (e) {
// If we have the widget model not found error, then we can create the
// widget. Otherwise, rethrow the error. We have to check the error
// message text explicitly because the get_model function in this
// class throws a generic error with this specific text.
if (e.message !== 'widget model not found') {
throw e;
}
const comm = await this._create_comm(this.comm_target_name, comm_id);

let msg_id = '';
const info = new PromiseDelegate<Private.ICommUpdateData>();
comm.on_msg((msg) => {
if (
(msg.parent_header as any).msg_id === msg_id &&
msg.header.msg_type === 'comm_msg' &&
msg.content.data.method === 'update'
) {
const data = msg.content.data as any;
const buffer_paths = data.buffer_paths || [];
const buffers = msg.buffers || [];
utils.put_buffers(data.state, buffer_paths, buffers);
info.resolve({ comm, msg });
}
});
msg_id = comm.send(
{
method: 'request_state',
},
this.callbacks(undefined)
);

return info.promise;
}
})
);

// We put in a synchronization barrier here so that we don't have to
// topologically sort the restored widgets. `new_model` synchronously
// registers the widget ids before reconstructing their state
// asynchronously, so promises to every widget reference should be available
// by the time they are used.
await Promise.all(
widgets_info.map(async (widget_info) => {
if (!widget_info) {
return;
}
const content = widget_info.msg.content as any;
await this.new_model(
{
model_name: content.data.state._model_name,
model_module: content.data.state._model_module,
model_module_version: content.data.state._model_module_version,
comm: widget_info.comm,
},
content.data.state
);
})
);
}

/**
Expand Down Expand Up @@ -586,3 +799,13 @@ function serialize_state(models: WidgetModel[], options: IStateOptions = {}) {
});
return {version_major: 2, version_minor: 0, state: state};
}

namespace Private {
/**
* Data promised when a comm info request resolves.
*/
export interface ICommUpdateData {
comm: IClassicComm;
msg: services.KernelMessage.ICommMsgMsg;
}
}
Loading

0 comments on commit a524fe0

Please sign in to comment.