Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(notifier): subscribeEach/subscribeLatest iterators retry when broken by vat upgrade #7401

Merged
merged 11 commits into from
Apr 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions packages/SwingSet/src/kernel/kernel.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { assert, Fail } from '@agoric/assert';
import { isNat } from '@endo/nat';
import { importBundle } from '@endo/import-bundle';
import { makeUpgradeDisconnection } from '@agoric/internal/src/upgrade-api.js';
import { assertKnownOptions } from '../lib/assertOptions.js';
import { foreverPolicy } from '../lib/runPolicies.js';
import { kser, kslot, makeError } from '../lib/kmarshal.js';
Expand Down Expand Up @@ -815,12 +816,11 @@ export default function buildKernel(
const { meterID } = vatInfo;
let computrons;
const vatKeeper = kernelKeeper.provideVatKeeper(vatID);
const disconnectObject = {
name: 'vatUpgraded',
const disconnectionObject = makeUpgradeDisconnection(
upgradeMessage,
incarnationNumber: vatKeeper.getIncarnationNumber(),
};
const disconnectionCapData = kser(disconnectObject);
vatKeeper.getIncarnationNumber(),
);
const disconnectionCapData = kser(disconnectionObject);

/**
* Terminate the vat and translate internal-delivery results into
Expand Down
2 changes: 1 addition & 1 deletion packages/SwingSet/src/types-external.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ export {};
* @typedef { import('@agoric/swingset-liveslots').Message } Message
*
* @typedef { 'none' | 'ignore' | 'logAlways' | 'logFailure' | 'panic' } ResolutionPolicy
* @typedef {{ name: string, upgradeMessage: string, incarnationNumber: number }} DisconnectObject
* @typedef {import('@agoric/internal/src/upgrade-api.js').DisconnectionObject} DisconnectionObject
*
* @typedef { import('@agoric/swingset-liveslots').VatDeliveryObject } VatDeliveryObject
* @typedef { import('@agoric/swingset-liveslots').VatDeliveryResult } VatDeliveryResult
Expand Down
51 changes: 38 additions & 13 deletions packages/inter-protocol/test/stakeFactory/test-stakeFactory.js
Original file line number Diff line number Diff line change
Expand Up @@ -499,12 +499,35 @@ test('forged Attestation fails', async t => {
await t.throwsAsync(E(seat).getOfferResult());
});

const approxEqual = (t, actual, expected, epsilon) => {
/**
* @param {import('ava').ExecutionContext} t
* @param {Amount<'nat'>} actual
* @param {Amount<'nat'>} target
* @param {Amount<'nat'>} epsilon
* @param {bigint} [scale]
* @returns {void}
*/
const assertSimilarAmount = (t, actual, target, epsilon, scale) => {
const { min, max, subtract, isGTE } = AmountMath;
if (isGTE(epsilon, subtract(max(actual, expected), min(actual, expected)))) {
t.pass();
// kludgy but good-enough decimal division by `scale`
const unscale = amount => {
const { value } = amount;
if (!scale) {
return value;
}
const logScale = `${scale - 1n}`.length;
const digits = `${(BigInt(value) * 10n ** BigInt(logScale)) / scale}`;
const padded = digits.padStart(logScale + 1, '0');
const decimal = `${padded.slice(0, -logScale)}.${padded.slice(-logScale)}`;
return decimal.replace(/\.?0+$/, '');
};
// prettier-ignore
const description = `${unscale(actual)} must be in ${unscale(target)} ± ${unscale(epsilon)}`;

if (isGTE(epsilon, subtract(max(actual, target), min(actual, target)))) {
t.pass(description);
} else {
t.deepEqual(actual, expected);
t.deepEqual(actual, target, description);
}
};

Expand Down Expand Up @@ -560,7 +583,7 @@ const makeWorld = async t => {
const attPurse = E(attIssuer).makeEmptyPurse();
const runPurse = E(runIssuer).makeEmptyPurse();
const rewardPurse = E(runIssuer).makeEmptyPurse();
const epsilon = AmountMath.make(runBrand, micro.unit / 5n);
const epsilon = AmountMath.make(runBrand, micro.unit / 4n);

await E(rewardPurse).deposit(
await mintRunPayment(500n * micro.unit, {
Expand Down Expand Up @@ -727,23 +750,25 @@ const makeWorld = async t => {
await returnAttestation(attBack);
await E(runPurse).deposit(runPmt);
},
checkRUNBalance: async target => {
checkRUNBalance: async unscaledTargetValue => {
const actual = await E(runPurse).getCurrentAmount();
approxEqual(
assertSimilarAmount(
t,
actual,
AmountMath.make(runBrand, target * micro.unit),
AmountMath.make(runBrand, unscaledTargetValue * micro.unit),
epsilon,
micro.unit,
);
},
checkRUNDebt: async expected => {
checkRUNDebt: async unscaledTargetValue => {
const { vault } = offerResult;
const debt = await E(vault).getCurrentDebt();
approxEqual(
const actual = await E(vault).getCurrentDebt();
assertSimilarAmount(
t,
debt,
AmountMath.make(runBrand, expected * micro.unit),
actual,
AmountMath.make(runBrand, unscaledTargetValue * micro.unit),
epsilon,
micro.unit,
);
},
setMintingRatio: async newRunToBld => {
Expand Down
41 changes: 41 additions & 0 deletions packages/internal/src/upgrade-api.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// @ts-check
// @jessie-check
import { isObject } from '@endo/marshal';

/**
* @typedef {{ name: string, upgradeMessage: string, incarnationNumber: number }} DisconnectionObject
*/

/**
* Makes an Error-like object for use as the rejection value of promises
* abandoned by upgrade.
*
* @param {string} upgradeMessage
* @param {number} toIncarnationNumber
* @returns {DisconnectionObject}
*/
export const makeUpgradeDisconnection = (upgradeMessage, toIncarnationNumber) =>
harden({
name: 'vatUpgraded',
upgradeMessage,
incarnationNumber: toIncarnationNumber,
});
harden(makeUpgradeDisconnection);

// TODO: Simplify once we have @endo/patterns (or just export the shape).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do have @endo/patterns now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My first attempt didn't quite work, so I'm deferring this until we've got a little more time to spend on it.

// const upgradeDisconnectionShape = harden({
// name: 'vatUpgraded',
// upgradeMessage: M.string(),
// incarnationNumber: M.number(),
// });
// const isUpgradeDisconnection = err => matches(err, upgradeDisconnectionShape);
/**
* @param {any} err
* @returns {err is DisconnectionObject}
*/
export const isUpgradeDisconnection = err =>
isObject(err) &&
err.name === 'vatUpgraded' &&
typeof err.upgradeMessage === 'string' &&
typeof err.incarnationNumber === 'number';
harden(isUpgradeDisconnection);
21 changes: 21 additions & 0 deletions packages/internal/test/test-upgrade-api.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// @ts-check
import '@endo/init';
import test from 'ava';
import {
makeUpgradeDisconnection,
isUpgradeDisconnection,
} from '../src/upgrade-api.js';

test('isUpgradeDisconnection must recognize disconnection objects', t => {
const disconnection = makeUpgradeDisconnection('vat upgraded', 2);
t.true(isUpgradeDisconnection(disconnection));
});

test('isUpgradeDisconnection must recognize original-format disconnection objects', t => {
const disconnection = harden({
name: 'vatUpgraded',
upgradeMessage: 'vat upgraded',
incarnationNumber: 2,
});
t.true(isUpgradeDisconnection(disconnection));
});
121 changes: 102 additions & 19 deletions packages/notifier/src/subscribe.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,64 @@
import { E, Far } from '@endo/far';
import { isObject } from '@endo/marshal';
import { isUpgradeDisconnection } from '@agoric/internal/src/upgrade-api.js';

import './types-ambient.js';

const { details: X, Fail } = assert;
const sink = () => {};

/**
* Check the promise returned by a function for rejection by vat upgrade,
* and refetch upon encountering that condition.
*
* @template T
* @param {() => ERef<T>} getter
* @param {ERef<T>[]} [seed]
* @returns {Promise<T>}
*/
const reconnectAsNeeded = async (getter, seed = []) => {
let disconnection;
let lastVersion = -Infinity;
// End synchronous prelude.
await null;
for (let i = 0; ; i += 1) {
try {
const resultP = i < seed.length ? seed[i] : getter();
// eslint-disable-next-line no-await-in-loop, @jessie.js/no-nested-await
const result = await resultP;
return result;
} catch (err) {
if (isUpgradeDisconnection(err)) {
if (!disconnection) {
disconnection = err;
}
const { incarnationNumber: version } = err;
if (version > lastVersion) {
// We don't expect another upgrade in between receiving
// a disconnection and re-requesting an update, but must
// nevertheless be prepared for that.
lastVersion = version;
continue;
}
}
// if `err` is an (Error) object, we can try to associate it with
// information about the disconnection that prompted the request
// for which it is a result.
if (isObject(err) && disconnection && disconnection !== err) {
try {
assert.note(
err,
X`Attempting to recover from disconnection: ${disconnection}`,
);
} catch (_err) {
// eslint-disable-next-line no-empty
}
}
throw err;
}
}
};

/**
* Create a near iterable that corresponds to a potentially far one.
*
Expand All @@ -26,25 +81,48 @@ export const subscribe = itP =>
* longer needed so they can be garbage collected.
*
* @template T
* @param {ERef<PublicationRecord<T>>} pubList
* @param {ERef<EachTopic<T>>} topic
* @param {ERef<PublicationRecord<T>>} nextCellP
* PublicationRecord corresponding with the first iteration result
* @returns {ForkableAsyncIterator<T, T>}
*/
const makeEachIterator = pubList => {
const makeEachIterator = (topic, nextCellP) => {
// To understand the implementation, start with
// https://web.archive.org/web/20160404122250/http://wiki.ecmascript.org/doku.php?id=strawman:concurrency#infinite_queue
return Far('EachIterator', {
next: () => {
const resultP = E.get(pubList).head;
pubList = E.get(pubList).tail;
// We expect the tail to be the "cannot read past end" error at the end
// of the happy path.
// Since we are wrapping that error with eventual send, we sink the
// rejection here too so it doesn't become an invalid unhandled rejection
// later.
void E.when(pubList, sink, sink);
const {
head: resultP,
publishCount: publishCountP,
tail: tailP,
} = E.get(nextCellP);

// If tailP is broken by upgrade, we will need to re-request it
// directly from `topic`.
const getSuccessor = async () => {
const publishCount = await publishCountP;
assert.typeof(publishCount, 'bigint');
const successor = await E(topic).subscribeAfter(publishCount);
const newPublishCount = successor.publishCount;
if (newPublishCount !== publishCount + 1n) {
Fail`eachIterator broken by gap from publishCount ${publishCount} to ${newPublishCount}`;
}
return successor;
};

// Replace nextCellP on every call to next() so things work even
// with an eager consumer that doesn't wait for results to settle.
nextCellP = reconnectAsNeeded(getSuccessor, [tailP]);

// Avoid unhandled rejection warnings here if the previous cell was rejected or
// there is no further request of this iterator.
// `tailP` is handled inside `reconnectAsNeeded` and `resultP` is the caller's
// concern, leaving only `publishCountP` and the new `nextCellP`.
void E.when(publishCountP, sink, sink);
void E.when(nextCellP, sink, sink);
return resultP;
},
fork: () => makeEachIterator(pubList),
fork: () => makeEachIterator(topic, nextCellP),
});
};

Expand All @@ -53,7 +131,10 @@ const makeEachIterator = pubList => {
* provides "prefix lossy" iterations of the underlying PublicationList.
* By "prefix lossy", we mean that you may miss everything published before
* you ask the returned iterable for an iterator. But the returned iterator
* will enumerate each thing published from that iterator's starting point.
* will enumerate each thing published from that iterator's starting point
* up to a disconnection result indicating upgrade of the producer
* (which breaks the gap-free guarantee and therefore terminates any active
* iterator while still supporting creation of new iterators).
*
* If the underlying PublicationList is terminated, that terminal value will be
* reported losslessly.
Expand All @@ -64,8 +145,8 @@ const makeEachIterator = pubList => {
export const subscribeEach = topic => {
const iterable = Far('EachIterable', {
[Symbol.asyncIterator]: () => {
const pubList = E(topic).subscribeAfter();
return makeEachIterator(pubList);
const firstCellP = reconnectAsNeeded(() => E(topic).subscribeAfter());
return makeEachIterator(topic, firstCellP);
Comment on lines +148 to +149
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really clear!

},
});
return iterable;
Expand Down Expand Up @@ -95,9 +176,10 @@ const cloneLatestIterator = (topic, localUpdateCount, terminalResult) => {
return terminalResult;
}

// Send the next request now, skipping past intermediate updates.
const { value, updateCount } = await E(topic).getUpdateSince(
localUpdateCount,
// Send the next request now, skipping past intermediate updates
// and upgrade disconnections.
const { value, updateCount } = await reconnectAsNeeded(() =>
E(topic).getUpdateSince(localUpdateCount),
);
// Make sure the next request is for a fresher value.
localUpdateCount = updateCount;
Expand Down Expand Up @@ -161,8 +243,9 @@ const makeLatestIterator = topic => cloneLatestIterator(topic);
* By "lossy", we mean that you may miss any published state if a more
* recent published state can be reported instead.
*
* If the underlying PublicationList is terminated, that terminal value will be
* reported losslessly.
* If the underlying PublicationList is terminated by upgrade of the producer,
* it will be re-requested. All other terminal values will be losslessly
* propagated.
*
* @template T
* @param {ERef<LatestTopic<T>>} topic
Expand Down
12 changes: 7 additions & 5 deletions packages/notifier/src/types.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ export {};
* outside the current package should consider it opaque, not depending on its
* internal structure.
* @property {IteratorResult<T>} head
* @property {bigint} publishCount
* @property {bigint} publishCount starts at 1 for the first result
* and advances by 1 for each subsequent result
* @property {Promise<PublicationRecord<T>>} tail
*/

Expand Down Expand Up @@ -105,13 +106,14 @@ export {};
/**
* @template T
* @typedef {object} Publisher
* A valid sequence of calls to the methods of an `IterationObserver`
* A valid sequence of calls to the methods of a Publisher
* represents an iteration. A valid sequence consists of any number of calls
* to `publish` with the successive non-final values, followed by a
* to `publish` with the successive non-final values, optionally followed by a
* final call to either `finish` with a successful `completion` value
* or `fail` with the alleged `reason` for failure. After at most one
* terminating calls, no further calls to these methods are valid and must be
* rejected.
* terminating call, further calls to any of these methods are invalid and
* must be rejected.
*
* @property {(nonFinalValue: T) => void} publish
* @property {(completion: T) => void} finish
* @property {(reason: any) => void} fail
Expand Down
Loading