Skip to content

Commit

Permalink
feat(notifier): subscribeLatest iterators retry when broken by vat up…
Browse files Browse the repository at this point in the history
…grade

subscribeEach iterators continue to fail in that scenario,
because they cannot guarantee absence of gaps.

Fixes #5185
  • Loading branch information
gibson042 committed Apr 12, 2023
1 parent 6861f5e commit aa383c5
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 42 deletions.
1 change: 1 addition & 0 deletions packages/notifier/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
"@agoric/vat-data": "^0.4.3",
"@endo/far": "^0.2.14",
"@endo/marshal": "^0.8.1",
"@endo/nat": "^4.1.0",
"@endo/promise-kit": "^0.2.52"
},
"devDependencies": {
Expand Down
58 changes: 51 additions & 7 deletions packages/notifier/src/subscribe.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,48 @@
import { E, Far } from '@endo/far';
import { isObject } from '@endo/marshal';
import { isNat } from '@endo/nat';

import './types-ambient.js';

const sink = () => {};

/**
* Check the promise returned by a function for rejection by vat upgrade,
* and refetch upon encountering that condition.
*
* @template T
* @param {() => ERef<T>} thunk
* @returns {Promise<T>}
*/
const reconnectAsNeeded = async thunk => {
let lastVersion;
// End synchronous prelude.
await null;
for (;;) {
try {
// eslint-disable-next-line no-await-in-loop, @jessie.js/no-nested-await
const result = await thunk();
return result;
} catch (err) {
/** @see processUpgrade in {@link ../../SwingSet/src/kernel/kernel.js} */
if (isObject(err) && err.name === 'vatUpgraded') {
const { incarnationNumber: version } = err;
if (
isNat(version) &&
(lastVersion === undefined || version > lastVersion)
) {
// We don't expect another upgrade in between receiving
// a disconnection and re-requesting an update, but must
// nevertheless be prepared for that.
lastVersion = version;
continue;
}
}
throw err;
}
}
};

/**
* Create a near iterable that corresponds to a potentially far one.
*
Expand Down Expand Up @@ -53,7 +92,10 @@ const makeEachIterator = pubList => {
* provides "prefix lossy" iterations of the underlying PublicationList.
* By "prefix lossy", we mean that you may miss everything published before
* you ask the returned iterable for an iterator. But the returned iterator
* will enumerate each thing published from that iterator's starting point.
* will enumerate each thing published from that iterator's starting point
* up to a disconnection result indicating upgrade of the producer
* (which breaks the gap-free guarantee and therefore terminates any active
* iterator while still supporting creation of new iterators).
*
* If the underlying PublicationList is terminated, that terminal value will be
* reported losslessly.
Expand All @@ -64,7 +106,7 @@ const makeEachIterator = pubList => {
export const subscribeEach = topic => {
const iterable = Far('EachIterable', {
[Symbol.asyncIterator]: () => {
const pubList = E(topic).subscribeAfter();
const pubList = reconnectAsNeeded(() => E(topic).subscribeAfter());
return makeEachIterator(pubList);
},
});
Expand Down Expand Up @@ -95,9 +137,10 @@ const cloneLatestIterator = (topic, localUpdateCount, terminalResult) => {
return terminalResult;
}

// Send the next request now, skipping past intermediate updates.
const { value, updateCount } = await E(topic).getUpdateSince(
localUpdateCount,
// Send the next request now, skipping past intermediate updates
// and upgrade disconnections.
const { value, updateCount } = await reconnectAsNeeded(() =>
E(topic).getUpdateSince(localUpdateCount),
);
// Make sure the next request is for a fresher value.
localUpdateCount = updateCount;
Expand Down Expand Up @@ -161,8 +204,9 @@ const makeLatestIterator = topic => cloneLatestIterator(topic);
* By "lossy", we mean that you may miss any published state if a more
* recent published state can be reported instead.
*
* If the underlying PublicationList is terminated, that terminal value will be
* reported losslessly.
* If the underlying PublicationList is terminated by upgrade of the producer,
* it will be re-requested. All other terminal values will be losslessly
* propagated.
*
* @template T
* @param {ERef<LatestTopic<T>>} topic
Expand Down
141 changes: 107 additions & 34 deletions packages/notifier/test/test-publish-kit.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,16 @@ const assertTransmission = async (t, publishKit, value, method = 'publish') => {
}
};

const assertCells = (t, label, cells, publishCount, result, options = {}) => {
const { strict = true } = options;
const assertCells = (t, label, cells, publishCount, expected, options = {}) => {
const { strict = true, iterationResults = {} } = options;
const firstCell = cells[0];
t.deepEqual(
Reflect.ownKeys(firstCell).sort(),
['head', 'publishCount', 'tail'],
`${label} cell property keys`,
);
t.deepEqual(firstCell.head, result, `${label} cell result`);
t.is(firstCell.head.value, result.value, `${label} cell value`);
t.deepEqual(firstCell.head, expected, `${label} cell result`);
t.is(firstCell.head.value, expected.value, `${label} cell value`);
// `publishCount` values *should* be considered opaque,
// but de facto they are a gap-free sequence of bigints
// that starts at 1.
Expand All @@ -78,6 +78,10 @@ const assertCells = (t, label, cells, publishCount, result, options = {}) => {
t.like(cell, props, `${label} cell ${i + 1} must match cell 0`);
});
}

for (const [resultLabel, result] of Object.entries(iterationResults)) {
t.deepEqual(result, expected, `${label} ${resultLabel} result`);
}
};

// eslint-disable-next-line no-shadow
Expand Down Expand Up @@ -270,13 +274,31 @@ test('durable publish kit upgrade trauma (full-vat integration)', async t => {
vatParameters: { version: 'v1' },
},
]);
await run('createVat', [
{
name: 'pubsub2',
bundleCapName: 'pubsub',
},
]);
t.is(
await run('messageVat', [{ name: 'pubsub', methodName: 'getVersion' }]),
'v1',
);
const sub1 = await run('messageVat', [
{ name: 'pubsub', methodName: 'getSubscriber' },
]);
const eachIterable = await run('messageVat', [
{ name: 'pubsub2', methodName: 'subscribeEach', args: [sub1] },
]);
const eachIterator1 = await run('messageVatObject', [
{ presence: eachIterable, methodName: Symbol.asyncIterator },
]);
const latestIterable = await run('messageVat', [
{ name: 'pubsub2', methodName: 'subscribeLatest', args: [sub1] },
]);
const latestIterator1 = await run('messageVatObject', [
{ presence: latestIterable, methodName: Symbol.asyncIterator },
]);

/**
* Advances the publisher.
Expand All @@ -290,17 +312,39 @@ test('durable publish kit upgrade trauma (full-vat integration)', async t => {
]);
};

// Verify receipt of a published value.
// Verify receipt of a published value via subscribeAfter
// and async iterators.
const value1 = Symbol.for('value1');
await publish(value1);
const expectedV1FirstResult = { value: value1, done: false };
const v1FirstCell = await run('messageVatObject', [
{ presence: sub1, methodName: 'subscribeAfter' },
]);
assertCells(t, 'v1 first', [v1FirstCell], 1n, { value: value1, done: false });

// Verify receipt of a second published value via tail and subscribeAfter.
assertCells(t, 'v1 first', [v1FirstCell], 1n, expectedV1FirstResult);
const eachIteratorFirstResult = await run('messageVatObject', [
{ presence: eachIterator1, methodName: 'next' },
]);
t.deepEqual(
eachIteratorFirstResult,
expectedV1FirstResult,
'v1 eachIterator first result',
);
// Don't ask the latest iterator for its first result so we can observe
// that it skips intermediate results.
// const latestIteratorFirstResult = await run('messageVatObject', [
// { presence: latestIterator1, methodName: 'next' },
// ]);
// t.deepEqual(
// latestIteratorFirstResult,
// expectedV1FirstResult,
// 'v1 latestIterator first result',
// );

// Verify receipt of a second published value via tail and subscribeAfter
// and async iterators.
const value2 = Symbol.for('value2');
await publish(value2);
const expectedV1SecondResult = { value: value2, done: false };
await run('messageVatObject', [
{ presence: sub1, methodName: 'subscribeAfter' },
]);
Expand All @@ -313,14 +357,18 @@ test('durable publish kit upgrade trauma (full-vat integration)', async t => {
{ presence: sub1, methodName: 'subscribeAfter' },
]),
];
assertCells(
t,
'v1 second',
v1SecondCells,
2n,
{ value: value2, done: false },
{ strict: false },
);
const v1SecondIterationResults = {
eachIterator: await run('messageVatObject', [
{ presence: eachIterator1, methodName: 'next' },
]),
latestIterator: await run('messageVatObject', [
{ presence: latestIterator1, methodName: 'next' },
]),
};
assertCells(t, 'v1 second', v1SecondCells, 2n, expectedV1SecondResult, {
strict: false,
iterationResults: v1SecondIterationResults,
});

// Upgrade the vat, breaking promises from v1.
await run('upgradeVat', [
Expand All @@ -337,26 +385,47 @@ test('durable publish kit upgrade trauma (full-vat integration)', async t => {
const sub2 = await run('messageVat', [
{ name: 'pubsub', methodName: 'getSubscriber' },
]);
await run('awaitVatObject', [{ presence: v1SecondCells[0].tail }]).then(
(...args) =>
t.deepEqual(args, undefined, 'tail promise of old vat must be rejected'),
failure =>
t.deepEqual(failure, {
incarnationNumber: 1,
name: 'vatUpgraded',
upgradeMessage: 'vat upgraded',
}),
const eachIterator2 = await run('messageVatObject', [
{ presence: eachIterable, methodName: Symbol.asyncIterator },
]);
const assertDisconnection = (p, label) => {
const expected = {
incarnationNumber: 1,
name: 'vatUpgraded',
upgradeMessage: 'vat upgraded',
};
return p.then(
(...args) => t.deepEqual(args, undefined, `${label} must be rejected`),
failure =>
t.deepEqual(failure, expected, `${label} must indicate disconnection`),
);
};
await assertDisconnection(
run('awaitVatObject', [{ presence: v1SecondCells[0].tail }]),
'tail promise of old vat',
);
await assertDisconnection(
run('messageVatObject', [{ presence: eachIterator1, methodName: 'next' }]),
'eachIterator following old vat subscriber',
);

// Verify receipt of the last published value from v1.
const v2FirstCell = await run('messageVatObject', [
{ presence: sub2, methodName: 'subscribeAfter' },
]);
assertCells(t, 'v2 first', [v2FirstCell], 2n, { value: value2, done: false });
const v2FirstIterationResults = {
eachIterator: await run('messageVatObject', [
{ presence: eachIterator2, methodName: 'next' },
]),
};
assertCells(t, 'v2 first', [v2FirstCell], 2n, expectedV1SecondResult, {
iterationResults: v2FirstIterationResults,
});

// Verify receipt of a published value from v2.
const value3 = Symbol.for('value3');
await publish(value3);
const expectedV2SecondResult = { value: value3, done: false };
const v2SecondCells = [
await run('awaitVatObject', [{ presence: v2FirstCell.tail }]),
await run('messageVatObject', [
Expand All @@ -366,14 +435,18 @@ test('durable publish kit upgrade trauma (full-vat integration)', async t => {
{ presence: sub2, methodName: 'subscribeAfter' },
]),
];
assertCells(
t,
'v2 second',
v2SecondCells,
3n,
{ value: value3, done: false },
{ strict: false },
);
const v2SecondIterationResults = {
eachIterator: await run('messageVatObject', [
{ presence: eachIterator2, methodName: 'next' },
]),
latestIterator: await run('messageVatObject', [
{ presence: latestIterator1, methodName: 'next' },
]),
};
assertCells(t, 'v2 second', v2SecondCells, 3n, expectedV2SecondResult, {
strict: false,
iterationResults: v2SecondIterationResults,
});
});

// TODO: Find a way to test virtual object rehydration
Expand Down
8 changes: 7 additions & 1 deletion packages/notifier/test/vat-integration/vat-pubsub.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import { Far } from '@endo/marshal';
import { provide } from '@agoric/vat-data';
import { prepareDurablePublishKit } from '../../src/index.js';
import {
prepareDurablePublishKit,
subscribeEach,
subscribeLatest,
} from '../../src/index.js';

export const buildRootObject = (_vatPowers, vatParameters, baggage) => {
const makeDurablePublishKit = prepareDurablePublishKit(
Expand All @@ -19,6 +23,8 @@ export const buildRootObject = (_vatPowers, vatParameters, baggage) => {
getVersion: () => version,
getParameters: () => vatParameters,
getSubscriber: () => subscriber,
subscribeEach: topic => subscribeEach(topic),
subscribeLatest: topic => subscribeLatest(topic),
makeDurablePublishKit: (...args) => makeDurablePublishKit(...args),
publish: value => publisher.publish(value),
finish: finalValue => publisher.finish(finalValue),
Expand Down

0 comments on commit aa383c5

Please sign in to comment.