From 184c9126d33a7f987d6d770df39416f0154e1045 Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Fri, 7 Aug 2020 00:25:39 -0700 Subject: [PATCH] feat(swingset): add subprocess+node -based VatManager This adds a new `managerType` named `'subprocess-node'`, which runs Node.js in a subprocess, and sends it netstring-wrapped JSON-encoded Delivery/Syscall objects to drive the vat. It currently has the same limitations as the thread-based worker: * vatPowers is missing transformTildot * metering is not implemented at all * delivery transcripts (and replay) are not yet implemented In addition, it does not yet use a blocking read for syscall responses, so it can not yet invoke devices. However, unlike the thread worker, blocking reads should be possible for a subprocess, so eventually we will have this ability. refs #1374 --- packages/SwingSet/package.json | 1 + packages/SwingSet/src/controller.js | 2 + packages/SwingSet/src/kernel/kernel.js | 2 + .../SwingSet/src/kernel/vatManager/factory.js | 23 ++- .../kernel/vatManager/subprocessSupervisor.js | 157 ++++++++++++++++++ .../vatManager/worker-subprocess-node.js | 146 ++++++++++++++++ .../SwingSet/src/spawnSubprocessWorker.js | 66 ++++++++ packages/SwingSet/test/workers/test-worker.js | 12 ++ yarn.lock | 7 + 9 files changed, 412 insertions(+), 4 deletions(-) create mode 100644 packages/SwingSet/src/kernel/vatManager/subprocessSupervisor.js create mode 100644 packages/SwingSet/src/kernel/vatManager/worker-subprocess-node.js create mode 100644 packages/SwingSet/src/spawnSubprocessWorker.js diff --git a/packages/SwingSet/package.json b/packages/SwingSet/package.json index e57598e4c27..48783bf3bec 100644 --- a/packages/SwingSet/package.json +++ b/packages/SwingSet/package.json @@ -46,6 +46,7 @@ "@babel/core": "^7.5.0", "@babel/generator": "^7.6.4", "anylogger": "^0.21.0", + "netstring-stream": "^1.0.1", "re2": "^1.10.5", "rollup": "^1.23.1", "rollup-plugin-node-resolve": "^5.2.0", diff --git a/packages/SwingSet/src/controller.js b/packages/SwingSet/src/controller.js index 6d8020fb93d..307ee421581 100644 --- a/packages/SwingSet/src/controller.js +++ b/packages/SwingSet/src/controller.js @@ -18,6 +18,7 @@ import { HandledPromise } from '@agoric/eventual-send'; import { makeMeteringTransformer } from '@agoric/transform-metering'; import { makeTransform } from '@agoric/transform-eventual-send'; +import { startSubprocessWorker } from './spawnSubprocessWorker'; import { assertKnownOptions } from './assertOptions'; import { waitUntilQuiescent } from './waitUntilQuiescent'; import { insistStorageAPI } from './storageAPI'; @@ -242,6 +243,7 @@ export async function buildVatController( transformMetering, transformTildot, makeNodeWorker, + startSubprocessWorker, }; const kernel = buildKernel(kernelEndowments); diff --git a/packages/SwingSet/src/kernel/kernel.js b/packages/SwingSet/src/kernel/kernel.js index 476cabb0764..4be1805be81 100644 --- a/packages/SwingSet/src/kernel/kernel.js +++ b/packages/SwingSet/src/kernel/kernel.js @@ -40,6 +40,7 @@ export default function buildKernel(kernelEndowments) { transformMetering, transformTildot, makeNodeWorker, + startSubprocessWorker, } = kernelEndowments; insistStorageAPI(hostStorage); const { enhancedCrankBuffer, commitCrank } = wrapStorage(hostStorage); @@ -567,6 +568,7 @@ export default function buildKernel(kernelEndowments) { transformMetering, waitUntilQuiescent, makeNodeWorker, + startSubprocessWorker, }); /* diff --git a/packages/SwingSet/src/kernel/vatManager/factory.js b/packages/SwingSet/src/kernel/vatManager/factory.js index 6d3ccf7ca35..c47d395f085 100644 --- a/packages/SwingSet/src/kernel/vatManager/factory.js +++ b/packages/SwingSet/src/kernel/vatManager/factory.js @@ -3,6 +3,7 @@ import { assert } from '@agoric/assert'; import { assertKnownOptions } from '../../assertOptions'; import { makeLocalVatManagerFactory } from './localVatManager'; import { makeNodeWorkerVatManagerFactory } from './nodeWorker'; +import { makeNodeSubprocessFactory } from './worker-subprocess-node'; export function makeVatManagerFactory({ allVatPowers, @@ -12,6 +13,7 @@ export function makeVatManagerFactory({ transformMetering, waitUntilQuiescent, makeNodeWorker, + startSubprocessWorker, }) { const localFactory = makeLocalVatManagerFactory({ allVatPowers, @@ -27,6 +29,11 @@ export function makeVatManagerFactory({ kernelKeeper, }); + const nodeSubprocessFactory = makeNodeSubprocessFactory({ + startSubprocessWorker, + kernelKeeper, + }); + function validateManagerOptions(managerOptions) { assertKnownOptions(managerOptions, [ 'enablePipelining', @@ -69,16 +76,24 @@ export function makeVatManagerFactory({ } return localFactory.createFromBundle(vatID, bundle, managerOptions); } + // 'setup' based vats must be local. TODO: stop using 'setup' in vats, + // but tests and comms-vat still need it + assert(!setup, `setup()-based vats must use a local Manager`); if (managerType === 'nodeWorker') { - // 'setup' based vats must be local. TODO: stop using 'setup' in vats, - // but tests and comms-vat still need it - assert(!setup, `setup()-based vats must use a local Manager`); return nodeWorkerFactory.createFromBundle(vatID, bundle, managerOptions); } + if (managerType === 'node-subprocess') { + return nodeSubprocessFactory.createFromBundle( + vatID, + bundle, + managerOptions, + ); + } + throw Error( - `unknown manager type ${managerType}, not 'local' or 'nodeWorker'`, + `unknown type ${managerType}, not local/nodeWorker/node-subprocess`, ); } diff --git a/packages/SwingSet/src/kernel/vatManager/subprocessSupervisor.js b/packages/SwingSet/src/kernel/vatManager/subprocessSupervisor.js new file mode 100644 index 00000000000..977a52faeda --- /dev/null +++ b/packages/SwingSet/src/kernel/vatManager/subprocessSupervisor.js @@ -0,0 +1,157 @@ +/* global harden */ +// this file is loaded at the start of a new subprocess +import '@agoric/install-ses'; + +import anylogger from 'anylogger'; +import fs from 'fs'; +import Netstring from 'netstring-stream'; + +import { assert } from '@agoric/assert'; +import { importBundle } from '@agoric/import-bundle'; +import { Remotable, getInterfaceOf } from '@agoric/marshal'; +import { HandledPromise } from '@agoric/eventual-send'; +import { waitUntilQuiescent } from '../../waitUntilQuiescent'; +import { makeLiveSlots } from '../liveSlots'; + +// eslint-disable-next-line no-unused-vars +function workerLog(first, ...args) { + // console.error(`---worker: ${first}`, ...args); +} + +workerLog(`supervisor started`); + +function makeConsole(tag) { + const log = anylogger(tag); + const cons = {}; + for (const level of ['debug', 'log', 'info', 'warn', 'error']) { + cons[level] = log[level]; + } + return harden(cons); +} + +function runAndWait(f, errmsg) { + Promise.resolve() + .then(f) + .then(undefined, err => workerLog(`doProcess: ${errmsg}:`, err)); + return waitUntilQuiescent(); +} + +let dispatch; + +async function doProcess(dispatchRecord, errmsg) { + const dispatchOp = dispatchRecord[0]; + const dispatchArgs = dispatchRecord.slice(1); + workerLog(`runAndWait`); + await runAndWait(() => dispatch[dispatchOp](...dispatchArgs), errmsg); + workerLog(`doProcess done`); +} + +function doNotify(vpid, vp) { + const errmsg = `vat.promise[${vpid}] ${vp.state} failed`; + switch (vp.state) { + case 'fulfilledToPresence': + return doProcess(['notifyFulfillToPresence', vpid, vp.slot], errmsg); + case 'redirected': + throw new Error('not implemented yet'); + case 'fulfilledToData': + return doProcess(['notifyFulfillToData', vpid, vp.data], errmsg); + case 'rejected': + return doProcess(['notifyReject', vpid, vp.data], errmsg); + default: + throw Error(`unknown promise state '${vp.state}'`); + } +} + +const toParent = Netstring.writeStream(); +toParent.pipe(fs.createWriteStream('IGNORED', { fd: 4, encoding: 'utf-8' })); + +const fromParent = fs + .createReadStream('IGNORED', { fd: 3, encoding: 'utf-8' }) + .pipe(Netstring.readStream()); +fromParent.setEncoding('utf-8'); + +function sendUplink(msg) { + assert(msg instanceof Array, `msg must be an Array`); + toParent.write(JSON.stringify(msg)); +} + +// fromParent.on('data', data => { +// workerLog('data from parent', data); +// toParent.write('child ack'); +// }); + +let syscallLog; +fromParent.on('data', data => { + const [type, ...margs] = JSON.parse(data); + workerLog(`received`, type); + if (type === 'start') { + // TODO: parent should send ['start', vatID] + workerLog(`got start`); + sendUplink(['gotStart']); + } else if (type === 'setBundle') { + const [bundle, vatParameters] = margs; + const endowments = { + console: makeConsole(`SwingSet:vatWorker`), + HandledPromise, + }; + importBundle(bundle, { endowments }).then(vatNS => { + workerLog(`got vatNS:`, Object.keys(vatNS).join(',')); + sendUplink(['gotBundle']); + + function doSyscall(vatSyscallObject) { + sendUplink(['syscall', ...vatSyscallObject]); + } + const syscall = harden({ + send: (...args) => doSyscall(['send', ...args]), + callNow: (..._args) => { + throw Error(`nodeWorker cannot syscall.callNow`); + }, + subscribe: (...args) => doSyscall(['subscribe', ...args]), + fulfillToData: (...args) => doSyscall(['fulfillToData', ...args]), + fulfillToPresence: (...args) => + doSyscall(['fulfillToPresence', ...args]), + reject: (...args) => doSyscall(['reject', ...args]), + }); + + const state = null; + const vatID = 'demo-vatID'; + // todo: maybe add transformTildot, makeGetMeter/transformMetering to + // vatPowers, but only if options tell us they're wanted. Maybe + // transformTildot should be async and outsourced to the kernel + // process/thread. + const vatPowers = { Remotable, getInterfaceOf }; + dispatch = makeLiveSlots( + syscall, + state, + vatNS.buildRootObject, + vatID, + vatPowers, + vatParameters, + ); + workerLog(`got dispatch:`, Object.keys(dispatch).join(',')); + sendUplink(['dispatchReady']); + }); + } else if (type === 'deliver') { + if (!dispatch) { + workerLog(`error: deliver before dispatchReady`); + return; + } + const [dtype, ...dargs] = margs; + if (dtype === 'message') { + const [targetSlot, msg] = dargs; + const errmsg = `vat[${targetSlot}].${msg.method} dispatch failed`; + doProcess( + ['deliver', targetSlot, msg.method, msg.args, msg.result], + errmsg, + ).then(() => { + sendUplink(['deliverDone']); + }); + } else if (dtype === 'notify') { + doNotify(...dargs).then(() => sendUplink(['deliverDone', syscallLog])); + } else { + throw Error(`bad delivery type ${dtype}`); + } + } else { + workerLog(`unrecognized downlink message ${type}`); + } +}); diff --git a/packages/SwingSet/src/kernel/vatManager/worker-subprocess-node.js b/packages/SwingSet/src/kernel/vatManager/worker-subprocess-node.js new file mode 100644 index 00000000000..1d22dde8984 --- /dev/null +++ b/packages/SwingSet/src/kernel/vatManager/worker-subprocess-node.js @@ -0,0 +1,146 @@ +/* global harden */ + +// import { spawn } from 'child_process'; // not from Compartment + +import { assert } from '@agoric/assert'; +import { makePromiseKit } from '@agoric/promise-kit'; +import { makeTranscriptManager } from './transcript'; + +import { createSyscall } from './syscall'; + +// start a "Worker" (Node's tool for starting new threads) and load a bundle +// into it + +/* +import { waitUntilQuiescent } from '../../waitUntilQuiescent'; +function wait10ms() { + const { promise: queueEmptyP, resolve } = makePromiseKit(); + setTimeout(() => resolve(), 10); + return queueEmptyP; +} +*/ + +// eslint-disable-next-line no-unused-vars +function parentLog(first, ...args) { + // console.error(`--parent: ${first}`, ...args); +} + +export function makeNodeSubprocessFactory(tools) { + const { startSubprocessWorker, kernelKeeper } = tools; + + function createFromBundle(vatID, bundle, managerOptions) { + const { vatParameters } = managerOptions; + assert(!managerOptions.metered, 'not supported yet'); + assert(!managerOptions.notifyTermination, 'not supported yet'); + assert(!managerOptions.enableSetup, 'not supported at all'); + if (managerOptions.enableInternalMetering) { + // TODO: warn+ignore, rather than throw, because the kernel enables it + // for all vats, because the Spawner still needs it. When the kernel + // stops doing that, turn this into a regular assert + console.log(`node-worker does not support enableInternalMetering`); + } + const vatKeeper = kernelKeeper.allocateVatKeeperIfNeeded(vatID); + const transcriptManager = makeTranscriptManager( + kernelKeeper, + vatKeeper, + vatID, + ); + + // prepare to accept syscalls from the worker + + // TODO: make the worker responsible for checking themselves: we send + // both the delivery and the expected syscalls, and the supervisor + // compares what the bundle does with what it was told to expect. + // Modulo flow control, we just stream transcript entries at the + // worker and eventually get back an "ok" or an error. When we do + // that, doSyscall won't even see replayed syscalls from the worker. + + const { doSyscall, setVatSyscallHandler } = createSyscall( + transcriptManager, + ); + function handleSyscall(vatSyscallObject) { + const type = vatSyscallObject[0]; + if (type === 'callNow') { + throw Error(`nodeWorker cannot block, cannot use syscall.callNow`); + } + doSyscall(vatSyscallObject); + } + + // start the worker and establish a connection + const { fromChild, toChild, terminate, done } = startSubprocessWorker(); + + function sendToWorker(msg) { + assert(msg instanceof Array); + toChild.write(JSON.stringify(msg)); + } + + const { + promise: dispatchReadyP, + resolve: dispatchIsReady, + } = makePromiseKit(); + let waiting; + + function handleUpstream([type, ...args]) { + parentLog(`received`, type); + if (type === 'setUplinkAck') { + parentLog(`upload ready`); + } else if (type === 'gotBundle') { + parentLog(`bundle loaded`); + } else if (type === 'dispatchReady') { + parentLog(`dispatch() ready`); + // wait10ms().then(dispatchIsReady); // stall to let logs get printed + dispatchIsReady(); + } else if (type === 'syscall') { + parentLog(`syscall`, args); + const vatSyscallObject = args; + handleSyscall(vatSyscallObject); + } else if (type === 'deliverDone') { + parentLog(`deliverDone`); + if (waiting) { + const resolve = waiting; + waiting = null; + resolve(); + } + } else { + parentLog(`unrecognized uplink message ${type}`); + } + } + + fromChild.on('data', data => { + const msg = JSON.parse(data); + handleUpstream(msg); + }); + + parentLog(`instructing worker to load bundle..`); + sendToWorker(['setBundle', bundle, vatParameters]); + + function deliver(delivery) { + parentLog(`sending delivery`, delivery); + assert(!waiting, `already waiting for delivery`); + const pr = makePromiseKit(); + waiting = pr.resolve; + sendToWorker(['deliver', ...delivery]); + return pr.promise; + } + + function replayTranscript() { + throw Error(`replayTranscript not yet implemented`); + } + + function shutdown() { + terminate(); + return done; + } + + const manager = harden({ + replayTranscript, + setVatSyscallHandler, + deliver, + shutdown, + }); + + return dispatchReadyP.then(() => manager); + } + + return harden({ createFromBundle }); +} diff --git a/packages/SwingSet/src/spawnSubprocessWorker.js b/packages/SwingSet/src/spawnSubprocessWorker.js new file mode 100644 index 00000000000..057612825e8 --- /dev/null +++ b/packages/SwingSet/src/spawnSubprocessWorker.js @@ -0,0 +1,66 @@ +/* global harden */ +// this file is loaded by the controller, in the start compartment +import process from 'process'; +import { spawn } from 'child_process'; +import Netstring from 'netstring-stream'; + +import { makePromiseKit } from '@agoric/promise-kit'; + +// eslint-disable-next-line no-unused-vars +function parentLog(first, ...args) { + // console.error(`--parent: ${first}`, ...args); +} + +const supercode = require.resolve( + './kernel/vatManager/subprocessSupervisor.js', +); +// we send on fd3, and receive on fd4. We pass fd1/2 (stdout/err) through, so +// console log/err from the child shows up normally. We don't use Node's +// built-in serialization feature ('ipc') because the child process won't +// always be Node. +const stdio = harden(['inherit', 'inherit', 'inherit', 'pipe', 'pipe']); + +export function startSubprocessWorker() { + const proc = spawn(process.execPath, ['-r', 'esm', supercode], { stdio }); + + const toChild = Netstring.writeStream(); + toChild.pipe(proc.stdio[3]); + // proc.stdio[4].setEncoding('utf-8'); + const fromChild = proc.stdio[4].pipe(Netstring.readStream()); + fromChild.setEncoding('utf-8'); + + // fromChild.addListener('data', data => parentLog(`fd4 data`, data)); + // toChild.write('hello child'); + + const pk = makePromiseKit(); + + proc.once('exit', code => { + parentLog('child exit', code); + pk.resolve(code); + }); + proc.once('error', e => { + parentLog('child error', e); + pk.reject(e); + }); + parentLog(`waiting on child`); + + function terminate() { + proc.kill(); + } + + // the Netstring objects don't like being hardened, so we wrap the methods + // that get used + const wrappedFromChild = { + on: (evName, f) => fromChild.on(evName, f), + }; + const wrappedToChild = { + write: data => toChild.write(data), + }; + + return harden({ + fromChild: wrappedFromChild, + toChild: wrappedToChild, + terminate, + done: pk.promise, + }); +} diff --git a/packages/SwingSet/test/workers/test-worker.js b/packages/SwingSet/test/workers/test-worker.js index c109a294ced..316119b5d16 100644 --- a/packages/SwingSet/test/workers/test-worker.js +++ b/packages/SwingSet/test/workers/test-worker.js @@ -13,3 +13,15 @@ tap.test('nodeWorker vat manager', async t => { await c.shutdown(); t.end(); }); + +tap.test('node-subprocess vat manager', async t => { + const config = await loadBasedir(__dirname); + config.vats.target.creationOptions = { managerType: 'node-subprocess' }; + const c = await buildVatController(config, []); + + await c.run(); + t.equal(c.bootstrapResult.status(), 'fulfilled'); + + await c.shutdown(); + t.end(); +}); diff --git a/yarn.lock b/yarn.lock index f2cf0f98a28..b869a5e3a92 100644 --- a/yarn.lock +++ b/yarn.lock @@ -10650,6 +10650,13 @@ nested-error-stacks@^2.0.0: resolved "https://registry.yarnpkg.com/nested-error-stacks/-/nested-error-stacks-2.1.0.tgz#0fbdcf3e13fe4994781280524f8b96b0cdff9c61" integrity sha512-AO81vsIO1k1sM4Zrd6Hu7regmJN1NSiAja10gc4bX3F0wd+9rQmcuHQaHVQCYIEC8iFXnE+mavh23GOt7wBgug== +netstring-stream@^1.0.1: + version "1.0.1" + resolved "https://registry.yarnpkg.com/netstring-stream/-/netstring-stream-1.0.1.tgz#d1babecbc4715428154d2956201bc8be3a526729" + integrity sha512-/lXoL4KEi8Cty/AsjPkDF7S/cPaHSDMU8PU4NbJNDbW7EhMIb8o6JJA9BD4LJPqPBchpEEoKAluI+BxuqJkR9g== + dependencies: + through2 "^2.0.3" + next-tick@~1.0.0: version "1.0.0" resolved "https://registry.yarnpkg.com/next-tick/-/next-tick-1.0.0.tgz#ca86d1fe8828169b0120208e3dc8424b9db8342c"