-
Notifications
You must be signed in to change notification settings - Fork 212
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix(swingset): rewrite comms, probably add third-party forwarding
This overhauls the comms vat to use a new reference-tracking model. It should fix a number of gaps where the previous code would throw an error: * pass an already-resolved Promise through any message, this would make the comms vat re-use a retired VPID, which is a vat-fatal error * resolve a remotely-sourced promise to a local object (rather than a remote one), then pipeline a message to it (so the message should be reflected back into the kernel) * resolve a local promise to a remote object, then the remote pipelines a message to it (so the message should be reflected back out to the remote) * passing one remote's references to a different remote The last case is the "three-party handoff". The full solution (with shortening, grant-matching, and automatic connection establishment) is quite a ways off, but this change ought to implement "three-party forwarding". In this form, when A sends B a reference to C, what B gets is effectively an object on A that automatically forwards all messages off to C. This "object" is hidden inside the comms vat and is not reified as a real object. Three-party forwarding is not tested yet. refs #1535 refs #1404
- Loading branch information
Showing
18 changed files
with
1,549 additions
and
670 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
/* global harden */ | ||
|
||
// In our system (at least for now), we make a distinction between callable | ||
// objects and non-callable data. Attempting to send a message to data will | ||
// throw an error. This function provides a consistent definition of this | ||
// error message, for the use of the kernel and the comms vat. The capdata it | ||
// returns can be used in a `notifyReject` on the result promise. | ||
|
||
// Within a vat, the native JavaScript behavior (e.g. `const obj = {}; | ||
// obj.foo()`) would be TypeError with a message like "obj.foo is not a | ||
// function", which gleans "obj" from the name of the variable that held the | ||
// target. We have no idea what the caller used to name their target, and the | ||
// "data is not callable" error is kind of unique to the way swingset handles | ||
// references, so we create a distinct error message. | ||
|
||
const QCLASS = '@'; | ||
|
||
export function makeUndeliverableError(method) { | ||
const s = { | ||
[QCLASS]: 'error', | ||
name: 'TypeError', | ||
message: `data is not callable, has no method ${method}`, | ||
}; | ||
return harden({ body: JSON.stringify(s), slots: [] }); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
/* global harden */ | ||
import { assert } from '@agoric/assert'; | ||
import { makeVatSlot } from '../../parseVatSlots'; | ||
import { | ||
flipRemoteSlot, | ||
insistRemoteType, | ||
parseRemoteSlot, | ||
} from './parseRemoteSlot'; | ||
import { getRemote } from './remote'; | ||
|
||
function rname(remote) { | ||
return `${remote.remoteID} (${remote.name})`; | ||
} | ||
|
||
export function makeInbound(state, stateKit) { | ||
const { | ||
allocateUnresolvedPromise, | ||
insistPromiseIsUnresolved, | ||
subscribeRemoteToPromise, | ||
changeDeciderToRemote, | ||
changeDeciderFromRemoteToComms, | ||
} = stateKit; | ||
|
||
// get-*: the entry must be present | ||
// add-*: the entry must not be present. add one. | ||
// provide-*: return an entry, adding one if necessary | ||
|
||
// *-LocalForRemote: receiving an object/promise from a remote machine | ||
|
||
function getLocalForRemote(remoteID, rref) { | ||
const remote = getRemote(state, remoteID); | ||
const lref = remote.fromRemote.get(rref); | ||
assert(lref, `${rref} must already be in ${rname(remote)}`); | ||
return lref; | ||
} | ||
|
||
function addLocalObjectForRemote(remote, roid) { | ||
// The index must be allocated by them. If we allocated it, it should | ||
// have been in our table already, and the fact that it isn't means | ||
// they're reaching for something we haven't given them. | ||
assert( | ||
!parseRemoteSlot(roid).allocatedByRecipient, | ||
`I don't remember giving ${roid} to ${rname(remote)}`, | ||
); | ||
|
||
// So this must be a new import. Allocate a new vat object for it, which | ||
// will be the local machine's proxy for use by all other local vats, as | ||
// well as third party machines. | ||
const vatoid = makeVatSlot('object', true, state.nextObjectIndex); | ||
state.nextObjectIndex += 1; | ||
|
||
// remember who owns this object, to route messages later | ||
state.objectTable.set(vatoid, remote.remoteID); | ||
|
||
// they sent us ro-NN | ||
remote.fromRemote.set(roid, vatoid); | ||
// when we send it back, we'll send ro+NN | ||
remote.toRemote.set(vatoid, flipRemoteSlot(roid)); | ||
} | ||
|
||
function addLocalPromiseForRemote(remote, rpid) { | ||
assert( | ||
!parseRemoteSlot(rpid).allocatedByRecipient, | ||
`I don't remember giving ${rpid} to ${rname(remote)}`, | ||
); | ||
// allocate a new p+NN, remember them as the decider, add to clist | ||
const vpid = allocateUnresolvedPromise(); | ||
changeDeciderToRemote(vpid, remote.remoteID); | ||
remote.fromRemote.set(rpid, vpid); | ||
remote.toRemote.set(vpid, flipRemoteSlot(rpid)); | ||
} | ||
|
||
function provideLocalForRemote(remoteID, rref) { | ||
// We're receiving a slot from a remote system. If they've sent it to us | ||
// previously, or if we're the ones who sent it to them earlier, it will be | ||
// in the inbound table already. | ||
const remote = getRemote(state, remoteID); | ||
if (!remote.fromRemote.has(rref)) { | ||
const { type } = parseRemoteSlot(rref); | ||
if (type === 'object') { | ||
addLocalObjectForRemote(remote, rref); | ||
} else if (type === 'promise') { | ||
addLocalPromiseForRemote(remote, rref); | ||
} else { | ||
throw Error(`cannot accept type ${type} from remote`); | ||
} | ||
} | ||
return remote.fromRemote.get(rref); | ||
} | ||
|
||
function provideLocalForRemoteResult(remoteID, result) { | ||
insistRemoteType('promise', result); | ||
const vpid = provideLocalForRemote(remoteID, result); | ||
// this asserts they had control over vpid, and that it wasn't already | ||
// resolved. TODO: reject somehow rather than crash weirdly, we can't | ||
// keep them from trying either | ||
insistPromiseIsUnresolved(vpid); | ||
changeDeciderFromRemoteToComms(vpid, remoteID); | ||
subscribeRemoteToPromise(vpid, remoteID); // auto-subscribe sender | ||
return vpid; | ||
} | ||
|
||
return harden({ | ||
provideLocalForRemote, | ||
getLocalForRemote, | ||
provideLocalForRemoteResult, | ||
}); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,181 @@ | ||
/* global harden */ | ||
import { assert } from '@agoric/assert'; | ||
import { parseVatSlot } from '../../parseVatSlots'; | ||
|
||
export function makeKernel(state, syscall, stateKit) { | ||
const { | ||
trackUnresolvedPromise, | ||
allocateResolvedPromiseID, | ||
subscribeKernelToPromise, | ||
unsubscribeKernelFromPromise, | ||
deciderIsKernel, | ||
changeDeciderToKernel, | ||
changeDeciderFromKernelToComms, | ||
} = stateKit; | ||
|
||
let resolveToKernel; // cyclic, set later | ||
|
||
function setDeliveryKit(deliveryKit) { | ||
resolveToKernel = deliveryKit.resolveToKernel; | ||
} | ||
|
||
// *-KernelForLocal: comms vat sending out to kernel | ||
// | ||
// Our local identifiers (vpid/void) are the same as the ones we use when | ||
// talking to the kernel, so this doesn't require any translation, per se. | ||
// However when we send new promises into the kernel, we need to remember | ||
// to notify the kernel about their eventual (or existing) resolution. And | ||
// we need to keep track of the "decider" of each Promise, which changes | ||
// when used as a message result. Using translation-shaped functions like | ||
// these allow delivery.js to be more uniform, and gives us a place to | ||
// perform this subscription and tracking. We must also keep track of | ||
// whether this vpid has been retired or not, and create a new | ||
// (short-lived) identifier to reference resolved promises if necessary. | ||
|
||
function provideKernelForLocal(lref) { | ||
const { type } = parseVatSlot(lref); | ||
|
||
if (type === 'object') { | ||
return lref; | ||
} | ||
|
||
if (type === 'promise') { | ||
const vpid = lref; | ||
const p = state.promiseTable.get(vpid); | ||
// We should always know about this vpid. p+NN is allocated upon | ||
// receipt of a remote promise, and p-NN is added upon receipt of a | ||
// kernel promise. We retain the promiseTable entry even after the | ||
// promise is retired (to remember the resolution). | ||
assert(p, `how did I forget about ${vpid}`); | ||
|
||
if (p.resolved) { | ||
// The vpid might have been retired, in which case we must not use it | ||
// when speaking to the kernel. It will only be retired if 1: it | ||
// crossed the commsvat-kernel boundary already, and 2: the decider | ||
// (either commsvat or kernel) resolved it. If we allocated the vpid | ||
// for a message which arrived from one remote and went straight out | ||
// to another, we won't have mentioned it to the kernel yet. Rather | ||
// than keep track of this, we just use a fresh vpid each time we | ||
// need to talk about a resolved promise to the kernel. We must send | ||
// the resolution and then immediately retire the vpid again. | ||
|
||
const fresh = allocateResolvedPromiseID(); | ||
// console.log(`fresh: ${fresh} for ${vpid}`, p.resolution); | ||
// we must tell the kernel about the resolution *after* the message | ||
// which introduces it | ||
Promise.resolve().then(() => resolveToKernel(fresh, p.resolution)); | ||
return fresh; | ||
} | ||
|
||
// Unresolved promises can use the same VPID until it is retired. Since | ||
// we're telling the kernel about this VPID, we must arrange to notify | ||
// the kernel when it resolves, unless the kernel itself is in control. | ||
if (!deciderIsKernel(vpid)) { | ||
subscribeKernelToPromise(vpid); | ||
} | ||
return vpid; | ||
} | ||
throw Error(`cannot give type ${type} to kernel`); | ||
} | ||
|
||
function provideKernelForLocalResult(vpid) { | ||
if (!vpid) { | ||
return null; | ||
} | ||
const p = state.promiseTable.get(vpid); | ||
assert(!p.resolved, `result ${vpid} is already resolved`); | ||
// TODO: reject somehow rather than crashing weirdly if we are not | ||
// already the decider, but I'm not sure how we could hit that case. | ||
changeDeciderToKernel(vpid); | ||
// if we knew about this promise already, and thought the kernel | ||
// cared.. well, it doesn't now | ||
unsubscribeKernelFromPromise(vpid); | ||
return vpid; | ||
} | ||
|
||
// *-LocalForKernel: kernel sending in to comms vat | ||
|
||
function provideLocalForKernel(kref) { | ||
const { type, allocatedByVat } = parseVatSlot(kref); | ||
if (type !== 'object' && type !== 'promise') { | ||
// TODO: reject the message rather than crashing weirdly, we | ||
// can't prevent vats from attempting this | ||
throw Error(`cannot accept type ${type} from kernel`); | ||
} | ||
if (type === 'object') { | ||
if (allocatedByVat) { | ||
assert( | ||
state.objectTable.has(kref), | ||
`I don't remember giving ${kref} to the kernel`, | ||
); | ||
} | ||
} else if (type === 'promise') { | ||
const vpid = kref; | ||
if (allocatedByVat) { | ||
assert( | ||
state.promiseTable.has(vpid), | ||
`I don't remember giving ${vpid} to the kernel`, | ||
); | ||
} else { | ||
const p = state.promiseTable.get(vpid); | ||
if (p) { | ||
// hey, we agreed to never speak of the resolved VPID again. maybe | ||
// the kernel is recycling p-NN VPIDs, or is just confused | ||
assert(!p.resolved, `kernel sent retired ${vpid}`); | ||
} else { | ||
// the kernel is telling us about a new promise, so it's the decider | ||
trackUnresolvedPromise(vpid); | ||
changeDeciderToKernel(vpid); | ||
syscall.subscribe(vpid); | ||
} | ||
} | ||
} | ||
return kref; | ||
} | ||
|
||
function provideLocalForKernelResult(vpid) { | ||
if (!vpid) { | ||
return null; | ||
} | ||
const { type, allocatedByVat } = parseVatSlot(vpid); | ||
assert.equal(type, 'promise'); | ||
const p = state.promiseTable.get(vpid); | ||
// regardless of who allocated it, we should not get told about a promise | ||
// we know to be resolved. We agreed to never speak of the resolved VPID | ||
// again. maybe the kernel is recycling p-NN VPIDs, or is just confused | ||
if (p) { | ||
assert(!p.resolved, `kernel sent retired ${vpid}`); | ||
} | ||
|
||
// if we're supposed to have allocated the number, we better recognize it | ||
if (allocatedByVat) { | ||
assert(p, `I don't remember giving ${vpid} to the kernel`); | ||
} | ||
|
||
if (p) { | ||
// The kernel is using a pre-existing promise as the result. It had | ||
// better already have control. TODO: if the decider was not already | ||
// the kernel, somehow reject the message, rather than crashing | ||
// weirdly, since we can't prevent low-level vats from using a 'result' | ||
// promise that they don't actually control | ||
changeDeciderFromKernelToComms(vpid); | ||
// TODO: ideally we'd syscall.unsubscribe here, but that doesn't exist | ||
} else { | ||
// the kernel is telling us about a new promise, and new unresolved | ||
// promises are born with us being the decider | ||
trackUnresolvedPromise(vpid); | ||
} | ||
// either way, the kernel is going to want to know about the resolution | ||
subscribeKernelToPromise(vpid); | ||
return vpid; | ||
} | ||
|
||
return harden({ | ||
setDeliveryKit, | ||
|
||
provideKernelForLocal, | ||
provideKernelForLocalResult, | ||
provideLocalForKernel, | ||
provideLocalForKernelResult, | ||
}); | ||
} |
Oops, something went wrong.