Skip to content

Commit

Permalink
Merge pull request #3335 from martinRenou/use_control_comm_target
Browse files Browse the repository at this point in the history
Use control comm target in base Manager
  • Loading branch information
jasongrout authored Feb 8, 2022
2 parents 2a26374 + ba01c58 commit eabbbcd
Show file tree
Hide file tree
Showing 3 changed files with 254 additions and 163 deletions.
226 changes: 225 additions & 1 deletion packages/base-manager/src/manager-base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
import * as services from '@jupyterlab/services';
import * as widgets from '@jupyter-widgets/base';

import { JSONObject, PartialJSONObject } from '@lumino/coreutils';
import {
JSONObject,
PartialJSONObject,
PromiseDelegate,
} from '@lumino/coreutils';

import {
DOMWidgetView,
Expand Down Expand Up @@ -32,6 +36,21 @@ import sanitize from 'sanitize-html';

const PROTOCOL_MAJOR_VERSION = PROTOCOL_VERSION.split('.', 1)[0];

/**
* The control comm target name.
*/
export const CONTROL_COMM_TARGET = 'jupyter.widget.control';

/**
* The supported version for the control comm channel.
*/
export const CONTROL_COMM_PROTOCOL_VERSION = '1.0.0';

/**
* Time (in ms) after which we consider the control comm target not responding.
*/
export const CONTROL_COMM_TIMEOUT = 4000;

/**
* Sanitize HTML-formatted descriptions.
*/
Expand Down Expand Up @@ -342,6 +361,201 @@ export abstract class ManagerBase implements IWidgetManager {
return await modelPromise;
}

/**
* Fetch all widgets states from the kernel using the control comm channel
* If this fails (control comm handler not implemented kernel side),
* it will fallback to `_loadFromKernelSlow`.
*
* This is a utility function that can be used in subclasses.
*/
protected async _loadFromKernel(): Promise<void> {
// Try fetching all widget states through the control comm
let data: any;
let buffers: any;
try {
const initComm = await this._create_comm(
CONTROL_COMM_TARGET,
uuid(),
{},
{ version: CONTROL_COMM_PROTOCOL_VERSION }
);

await new Promise((resolve, reject) => {
initComm.on_msg((msg: any) => {
data = msg['content']['data'];

if (data.method !== 'update_states') {
console.warn(`
Unknown ${data.method} message on the Control channel
`);
return;
}

buffers = (msg.buffers || []).map((b: any) => {
if (b instanceof DataView) {
return b;
} else {
return new DataView(b instanceof ArrayBuffer ? b : b.buffer);
}
});

resolve(null);
});

initComm.on_close(() => reject('Control comm was closed too early'));

// Send a states request msg
initComm.send({ method: 'request_states' }, {});

// Reject if we didn't get a response in time
setTimeout(
() => reject('Control comm did not respond in time'),
CONTROL_COMM_TIMEOUT
);
});

initComm.close();
} catch (error) {
console.warn(
'Failed to fetch widgets through the "jupyter.widget.control" comm channel, fallback to slow fetching of widgets. Reason:',
error
);
// Fallback to the old implementation for old ipywidgets backend versions (<=7.6)
return this._loadFromKernelSlow();
}

const states: any = data.states;

// Extract buffer paths
const bufferPaths: any = {};
for (const bufferPath of data.buffer_paths) {
if (!bufferPaths[bufferPath[0]]) {
bufferPaths[bufferPath[0]] = [];
}
bufferPaths[bufferPath[0]].push(bufferPath.slice(1));
}

// Start creating all widgets
await Promise.all(
Object.keys(states).map(async (widget_id) => {
try {
const state = states[widget_id];
const comm = await this._create_comm('jupyter.widget', widget_id);

// Put binary buffers
if (widget_id in bufferPaths) {
const nBuffers = bufferPaths[widget_id].length;
put_buffers(
state,
bufferPaths[widget_id],
buffers.splice(0, nBuffers)
);
}

await this.new_model(
{
model_name: state.model_name,
model_module: state.model_module,
model_module_version: state.model_module_version,
model_id: widget_id,
comm: comm,
},
state.state
);
} catch (error) {
// Failed to create a widget model, we continue creating other models so that
// other widgets can render
console.error(error);
}
})
);
}

/**
* Old implementation of fetching widgets one by one using
* the request_state message on each comm.
*
* This is a utility function that can be used in subclasses.
*/
protected async _loadFromKernelSlow(): Promise<void> {
const comm_ids = await this._get_comm_info();

// For each comm id that we do not know about, create the comm, and request the state.
const widgets_info = await Promise.all(
Object.keys(comm_ids).map(async (comm_id) => {
try {
const model = this.get_model(comm_id);
// TODO Have the same this.get_model implementation for
// the widgetsnbextension and labextension, the one that
// throws an error if the model is not found instead of
// returning undefined
if (model === undefined) {
throw new Error('widget model not found');
}
await model;
// If we successfully get the model, do no more.
return;
} catch (e) {
// If we have the widget model not found error, then we can create the
// widget. Otherwise, rethrow the error. We have to check the error
// message text explicitly because the get_model function in this
// class throws a generic error with this specific text.
if (e.message !== 'widget model not found') {
throw e;
}
const comm = await this._create_comm(this.comm_target_name, comm_id);

let msg_id = '';
const info = new PromiseDelegate<Private.ICommUpdateData>();
comm.on_msg((msg: services.KernelMessage.ICommMsgMsg) => {
if (
(msg.parent_header as any).msg_id === msg_id &&
msg.header.msg_type === 'comm_msg' &&
msg.content.data.method === 'update'
) {
const data = msg.content.data as any;
const buffer_paths = data.buffer_paths || [];
const buffers = msg.buffers || [];
put_buffers(data.state, buffer_paths, buffers);
info.resolve({ comm, msg });
}
});
msg_id = comm.send(
{
method: 'request_state',
},
this.callbacks(undefined)
);

return info.promise;
}
})
);

// We put in a synchronization barrier here so that we don't have to
// topologically sort the restored widgets. `new_model` synchronously
// registers the widget ids before reconstructing their state
// asynchronously, so promises to every widget reference should be available
// by the time they are used.
await Promise.all(
widgets_info.map(async (widget_info) => {
if (!widget_info) {
return;
}
const content = widget_info.msg.content as any;
await this.new_model(
{
model_name: content.data.state._model_name,
model_module: content.data.state._model_module,
model_module_version: content.data.state._model_module_version,
comm: widget_info.comm,
},
content.data.state
);
})
);
}

async _make_model(
options: RequiredSome<IModelOptions, 'model_id'>,
serialized_state: any = {}
Expand Down Expand Up @@ -690,3 +904,13 @@ export function serialize_state(
});
return { version_major: 2, version_minor: 0, state: state };
}

namespace Private {
/**
* Data promised when a comm info request resolves.
*/
export interface ICommUpdateData {
comm: IClassicComm;
msg: services.KernelMessage.ICommMsgMsg;
}
}
81 changes: 2 additions & 79 deletions python/jupyterlab_widgets/src/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import {
ExportData,
WidgetModel,
WidgetView,
put_buffers,
ICallbacks,
} from '@jupyter-widgets/base';

Expand All @@ -21,7 +20,7 @@ import {

import { IDisposable } from '@lumino/disposable';

import { PromiseDelegate, ReadonlyPartialJSONValue } from '@lumino/coreutils';
import { ReadonlyPartialJSONValue } from '@lumino/coreutils';

import { INotebookModel } from '@jupyterlab/notebook';

Expand Down Expand Up @@ -106,74 +105,8 @@ export abstract class LabWidgetManager
// A "load" for a kernel that does not handle comms does nothing.
return;
}
const comm_ids = await this._get_comm_info();

// For each comm id that we do not know about, create the comm, and request the state.
const widgets_info = await Promise.all(
Object.keys(comm_ids).map(async (comm_id) => {
try {
await this.get_model(comm_id);
// If we successfully get the model, do no more.
return;
} catch (e) {
// If we have the widget model not found error, then we can create the
// widget. Otherwise, rethrow the error. We have to check the error
// message text explicitly because the get_model function in this
// class throws a generic error with this specific text.
if (e.message !== 'widget model not found') {
throw e;
}
const comm = await this._create_comm(this.comm_target_name, comm_id);

let msg_id = '';
const info = new PromiseDelegate<Private.ICommUpdateData>();
comm.on_msg((msg: KernelMessage.ICommMsgMsg) => {
if (
(msg.parent_header as any).msg_id === msg_id &&
msg.header.msg_type === 'comm_msg' &&
msg.content.data.method === 'update'
) {
const data = msg.content.data as any;
const buffer_paths = data.buffer_paths || [];
const buffers = msg.buffers || [];
put_buffers(data.state, buffer_paths, buffers);
info.resolve({ comm, msg });
}
});
msg_id = comm.send(
{
method: 'request_state',
},
this.callbacks(undefined)
);

return info.promise;
}
})
);

// We put in a synchronization barrier here so that we don't have to
// topologically sort the restored widgets. `new_model` synchronously
// registers the widget ids before reconstructing their state
// asynchronously, so promises to every widget reference should be available
// by the time they are used.
await Promise.all(
widgets_info.map(async (widget_info) => {
if (!widget_info) {
return;
}
const content = widget_info.msg.content as any;
await this.new_model(
{
model_name: content.data.state._model_name,
model_module: content.data.state._model_module,
model_module_version: content.data.state._model_module_version,
comm: widget_info.comm,
},
content.data.state
);
})
);
return super._loadFromKernel();
}

/**
Expand Down Expand Up @@ -668,13 +601,3 @@ export namespace WidgetManager {
saveState: boolean;
};
}

namespace Private {
/**
* Data promised when a comm info request resolves.
*/
export interface ICommUpdateData {
comm: IClassicComm;
msg: KernelMessage.ICommMsgMsg;
}
}
Loading

0 comments on commit eabbbcd

Please sign in to comment.