Skip to content

Commit

Permalink
feat(swingset): add subprocess+node -based VatManager
Browse files Browse the repository at this point in the history
This adds a new `managerType` named `'subprocess-node'`, which runs Node.js
in a subprocess, and sends it netstring-wrapped JSON-encoded Delivery/Syscall
objects to drive the vat.

It currently has the same limitations as the thread-based worker:

* vatPowers is missing transformTildot
* metering is not implemented at all
* delivery transcripts (and replay) are not yet implemented

In addition, it does not yet use a blocking read for syscall responses, so it
can not yet invoke devices. However, unlike the thread worker, blocking reads
should be possible for a subprocess, so eventually we will have this ability.

refs #1374
  • Loading branch information
warner committed Aug 7, 2020
1 parent 61615a2 commit 184c912
Show file tree
Hide file tree
Showing 9 changed files with 412 additions and 4 deletions.
1 change: 1 addition & 0 deletions packages/SwingSet/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
"@babel/core": "^7.5.0",
"@babel/generator": "^7.6.4",
"anylogger": "^0.21.0",
"netstring-stream": "^1.0.1",
"re2": "^1.10.5",
"rollup": "^1.23.1",
"rollup-plugin-node-resolve": "^5.2.0",
Expand Down
2 changes: 2 additions & 0 deletions packages/SwingSet/src/controller.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { HandledPromise } from '@agoric/eventual-send';
import { makeMeteringTransformer } from '@agoric/transform-metering';
import { makeTransform } from '@agoric/transform-eventual-send';

import { startSubprocessWorker } from './spawnSubprocessWorker';
import { assertKnownOptions } from './assertOptions';
import { waitUntilQuiescent } from './waitUntilQuiescent';
import { insistStorageAPI } from './storageAPI';
Expand Down Expand Up @@ -242,6 +243,7 @@ export async function buildVatController(
transformMetering,
transformTildot,
makeNodeWorker,
startSubprocessWorker,
};

const kernel = buildKernel(kernelEndowments);
Expand Down
2 changes: 2 additions & 0 deletions packages/SwingSet/src/kernel/kernel.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ export default function buildKernel(kernelEndowments) {
transformMetering,
transformTildot,
makeNodeWorker,
startSubprocessWorker,
} = kernelEndowments;
insistStorageAPI(hostStorage);
const { enhancedCrankBuffer, commitCrank } = wrapStorage(hostStorage);
Expand Down Expand Up @@ -567,6 +568,7 @@ export default function buildKernel(kernelEndowments) {
transformMetering,
waitUntilQuiescent,
makeNodeWorker,
startSubprocessWorker,
});

/*
Expand Down
23 changes: 19 additions & 4 deletions packages/SwingSet/src/kernel/vatManager/factory.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { assert } from '@agoric/assert';
import { assertKnownOptions } from '../../assertOptions';
import { makeLocalVatManagerFactory } from './localVatManager';
import { makeNodeWorkerVatManagerFactory } from './nodeWorker';
import { makeNodeSubprocessFactory } from './worker-subprocess-node';

export function makeVatManagerFactory({
allVatPowers,
Expand All @@ -12,6 +13,7 @@ export function makeVatManagerFactory({
transformMetering,
waitUntilQuiescent,
makeNodeWorker,
startSubprocessWorker,
}) {
const localFactory = makeLocalVatManagerFactory({
allVatPowers,
Expand All @@ -27,6 +29,11 @@ export function makeVatManagerFactory({
kernelKeeper,
});

const nodeSubprocessFactory = makeNodeSubprocessFactory({
startSubprocessWorker,
kernelKeeper,
});

function validateManagerOptions(managerOptions) {
assertKnownOptions(managerOptions, [
'enablePipelining',
Expand Down Expand Up @@ -69,16 +76,24 @@ export function makeVatManagerFactory({
}
return localFactory.createFromBundle(vatID, bundle, managerOptions);
}
// 'setup' based vats must be local. TODO: stop using 'setup' in vats,
// but tests and comms-vat still need it
assert(!setup, `setup()-based vats must use a local Manager`);

if (managerType === 'nodeWorker') {
// 'setup' based vats must be local. TODO: stop using 'setup' in vats,
// but tests and comms-vat still need it
assert(!setup, `setup()-based vats must use a local Manager`);
return nodeWorkerFactory.createFromBundle(vatID, bundle, managerOptions);
}

if (managerType === 'node-subprocess') {
return nodeSubprocessFactory.createFromBundle(
vatID,
bundle,
managerOptions,
);
}

throw Error(
`unknown manager type ${managerType}, not 'local' or 'nodeWorker'`,
`unknown type ${managerType}, not local/nodeWorker/node-subprocess`,
);
}

Expand Down
157 changes: 157 additions & 0 deletions packages/SwingSet/src/kernel/vatManager/subprocessSupervisor.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/* global harden */
// this file is loaded at the start of a new subprocess
import '@agoric/install-ses';

import anylogger from 'anylogger';
import fs from 'fs';
import Netstring from 'netstring-stream';

import { assert } from '@agoric/assert';
import { importBundle } from '@agoric/import-bundle';
import { Remotable, getInterfaceOf } from '@agoric/marshal';
import { HandledPromise } from '@agoric/eventual-send';
import { waitUntilQuiescent } from '../../waitUntilQuiescent';
import { makeLiveSlots } from '../liveSlots';

// eslint-disable-next-line no-unused-vars
function workerLog(first, ...args) {
// console.error(`---worker: ${first}`, ...args);
}

workerLog(`supervisor started`);

function makeConsole(tag) {
const log = anylogger(tag);
const cons = {};
for (const level of ['debug', 'log', 'info', 'warn', 'error']) {
cons[level] = log[level];
}
return harden(cons);
}

function runAndWait(f, errmsg) {
Promise.resolve()
.then(f)
.then(undefined, err => workerLog(`doProcess: ${errmsg}:`, err));
return waitUntilQuiescent();
}

let dispatch;

async function doProcess(dispatchRecord, errmsg) {
const dispatchOp = dispatchRecord[0];
const dispatchArgs = dispatchRecord.slice(1);
workerLog(`runAndWait`);
await runAndWait(() => dispatch[dispatchOp](...dispatchArgs), errmsg);
workerLog(`doProcess done`);
}

function doNotify(vpid, vp) {
const errmsg = `vat.promise[${vpid}] ${vp.state} failed`;
switch (vp.state) {
case 'fulfilledToPresence':
return doProcess(['notifyFulfillToPresence', vpid, vp.slot], errmsg);
case 'redirected':
throw new Error('not implemented yet');
case 'fulfilledToData':
return doProcess(['notifyFulfillToData', vpid, vp.data], errmsg);
case 'rejected':
return doProcess(['notifyReject', vpid, vp.data], errmsg);
default:
throw Error(`unknown promise state '${vp.state}'`);
}
}

const toParent = Netstring.writeStream();
toParent.pipe(fs.createWriteStream('IGNORED', { fd: 4, encoding: 'utf-8' }));

const fromParent = fs
.createReadStream('IGNORED', { fd: 3, encoding: 'utf-8' })
.pipe(Netstring.readStream());
fromParent.setEncoding('utf-8');

function sendUplink(msg) {
assert(msg instanceof Array, `msg must be an Array`);
toParent.write(JSON.stringify(msg));
}

// fromParent.on('data', data => {
// workerLog('data from parent', data);
// toParent.write('child ack');
// });

let syscallLog;
fromParent.on('data', data => {
const [type, ...margs] = JSON.parse(data);
workerLog(`received`, type);
if (type === 'start') {
// TODO: parent should send ['start', vatID]
workerLog(`got start`);
sendUplink(['gotStart']);
} else if (type === 'setBundle') {
const [bundle, vatParameters] = margs;
const endowments = {
console: makeConsole(`SwingSet:vatWorker`),
HandledPromise,
};
importBundle(bundle, { endowments }).then(vatNS => {
workerLog(`got vatNS:`, Object.keys(vatNS).join(','));
sendUplink(['gotBundle']);

function doSyscall(vatSyscallObject) {
sendUplink(['syscall', ...vatSyscallObject]);
}
const syscall = harden({
send: (...args) => doSyscall(['send', ...args]),
callNow: (..._args) => {
throw Error(`nodeWorker cannot syscall.callNow`);
},
subscribe: (...args) => doSyscall(['subscribe', ...args]),
fulfillToData: (...args) => doSyscall(['fulfillToData', ...args]),
fulfillToPresence: (...args) =>
doSyscall(['fulfillToPresence', ...args]),
reject: (...args) => doSyscall(['reject', ...args]),
});

const state = null;
const vatID = 'demo-vatID';
// todo: maybe add transformTildot, makeGetMeter/transformMetering to
// vatPowers, but only if options tell us they're wanted. Maybe
// transformTildot should be async and outsourced to the kernel
// process/thread.
const vatPowers = { Remotable, getInterfaceOf };
dispatch = makeLiveSlots(
syscall,
state,
vatNS.buildRootObject,
vatID,
vatPowers,
vatParameters,
);
workerLog(`got dispatch:`, Object.keys(dispatch).join(','));
sendUplink(['dispatchReady']);
});
} else if (type === 'deliver') {
if (!dispatch) {
workerLog(`error: deliver before dispatchReady`);
return;
}
const [dtype, ...dargs] = margs;
if (dtype === 'message') {
const [targetSlot, msg] = dargs;
const errmsg = `vat[${targetSlot}].${msg.method} dispatch failed`;
doProcess(
['deliver', targetSlot, msg.method, msg.args, msg.result],
errmsg,
).then(() => {
sendUplink(['deliverDone']);
});
} else if (dtype === 'notify') {
doNotify(...dargs).then(() => sendUplink(['deliverDone', syscallLog]));
} else {
throw Error(`bad delivery type ${dtype}`);
}
} else {
workerLog(`unrecognized downlink message ${type}`);
}
});
Loading

0 comments on commit 184c912

Please sign in to comment.