Skip to content

Commit

Permalink
feat: vowTools.allVowsSettled
Browse files Browse the repository at this point in the history
- adds vowTools helper that mimics the behavior of Promise.allSettled
  • Loading branch information
0xpatrickdev committed Sep 16, 2024
1 parent efaeb1c commit baedae2
Show file tree
Hide file tree
Showing 4 changed files with 182 additions and 39 deletions.
10 changes: 9 additions & 1 deletion packages/vow/src/tools.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,17 @@ export const prepareBasicVowTools = (zone, powers = {}) => {
/**
* Vow-tolerant implementation of Promise.all.
*
* @param {EVow<unknown>[]} maybeVows
* @param {unknown[]} maybeVows
*/
const allVows = maybeVows => watchUtils.all(maybeVows);

/**
* Vow-tolerant implementation of Promise.allSettled.
*
* @param {unknown[]} maybeVows
*/
const allVowsSettled = maybeVows => watchUtils.allSettled(maybeVows);

/** @type {AsPromiseFunction} */
const asPromise = (specimenP, ...watcherArgs) =>
watchUtils.asPromise(specimenP, ...watcherArgs);
Expand All @@ -67,6 +74,7 @@ export const prepareBasicVowTools = (zone, powers = {}) => {
watch,
makeVowKit,
allVows,
allVowsSettled,
asVow,
asPromise,
retriable,
Expand Down
136 changes: 98 additions & 38 deletions packages/vow/src/watch-utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const { Fail, bare, details: X } = assert;
* @import {Zone} from '@agoric/base-zone';
* @import {Watch} from './watch.js';
* @import {When} from './when.js';
* @import {VowKit, AsPromiseFunction, IsRetryableReason, EVow} from './types.js';
* @import {VowKit, AsPromiseFunction, IsRetryableReason, Vow} from './types.js';
*/

const VowShape = M.tagged(
Expand Down Expand Up @@ -54,12 +54,17 @@ export const prepareWatchUtils = (
{
utils: M.interface('Utils', {
all: M.call(M.arrayOf(M.any())).returns(VowShape),
allSettled: M.call(M.arrayOf(M.any())).returns(VowShape),
asPromise: M.call(M.raw()).rest(M.raw()).returns(M.promise()),
}),
watcher: M.interface('Watcher', {
onFulfilled: M.call(M.raw()).rest(M.raw()).returns(M.raw()),
onRejected: M.call(M.raw()).rest(M.raw()).returns(M.raw()),
}),
helper: M.interface('Helper', {
createVow: M.call(M.arrayOf(M.any()), M.boolean()).returns(VowShape),
processResult: M.call(M.raw()).rest(M.raw()).returns(M.undefined()),
}),
retryRejectionPromiseWatcher: PromiseWatcherI,
},
() => {
Expand All @@ -68,6 +73,7 @@ export const prepareWatchUtils = (
* @property {number} remaining
* @property {MapStore<number, any>} resultsMap
* @property {VowKit['resolver']} resolver
* @property {boolean} [isAllSettled]
*/
/** @type {MapStore<bigint, VowState>} */
const idToVowState = detached.mapStore('idToVowState');
Expand All @@ -79,32 +85,83 @@ export const prepareWatchUtils = (
},
{
utils: {
/** @param {unknown[]} specimens */
all(specimens) {
return this.facets.helper.createVow(specimens, false);
},
/** @param {unknown[]} specimens */
allSettled(specimens) {
return /** @type {Vow<({status: 'fulfilled', value: any} | {status: 'rejected', reason: any})[]>} */ (
this.facets.helper.createVow(specimens, true)
);
},
/** @type {AsPromiseFunction} */
asPromise(specimenP, ...watcherArgs) {
// Watch the specimen in case it is an ephemeral promise.
const vow = watch(specimenP, ...watcherArgs);
const promise = when(vow);
// Watch the ephemeral result promise to ensure that if its settlement is
// lost due to upgrade of this incarnation, we will at least cause an
// unhandled rejection in the new incarnation.
zone.watchPromise(promise, this.facets.retryRejectionPromiseWatcher);

return promise;
},
},
watcher: {
/**
* @param {EVow<unknown>[]} vows
* @param {unknown} value
* @param {object} ctx
* @param {bigint} ctx.id
* @param {number} ctx.index
* @param {number} ctx.numResults
* @param {boolean} ctx.isAllSettled
*/
all(vows) {
onFulfilled(value, ctx) {
this.facets.helper.processResult(value, ctx, 'fulfilled');
},
/**
* @param {unknown} reason
* @param {object} ctx
* @param {bigint} ctx.id
* @param {number} ctx.index
* @param {number} ctx.numResults
* @param {boolean} ctx.isAllSettled
*/
onRejected(reason, ctx) {
this.facets.helper.processResult(reason, ctx, 'rejected');
},
},
helper: {
/**
* @param {unknown[]} specimens
* @param {boolean} isAllSettled
*/
createVow(specimens, isAllSettled) {
const { nextId: id, idToVowState } = this.state;
/** @type {VowKit<any[]>} */
const kit = makeVowKit();

// Preserve the order of the vow results.
for (let index = 0; index < vows.length; index += 1) {
watch(vows[index], this.facets.watcher, {
// Preserve the order of the results.
for (let index = 0; index < specimens.length; index += 1) {
watch(specimens[index], this.facets.watcher, {
id,
index,
numResults: vows.length,
numResults: specimens.length,
isAllSettled,
});
}

if (vows.length > 0) {
if (specimens.length > 0) {
// Save the state until rejection or all fulfilled.
this.state.nextId += 1n;
idToVowState.init(
id,
harden({
resolver: kit.resolver,
remaining: vows.length,
remaining: specimens.length,
resultsMap: detached.mapStore('resultsMap'),
isAllSettled,
}),
);
const idToNonStorableResults = provideLazyMap(
Expand All @@ -119,27 +176,36 @@ export const prepareWatchUtils = (
}
return kit.vow;
},
/** @type {AsPromiseFunction} */
asPromise(specimenP, ...watcherArgs) {
// Watch the specimen in case it is an ephemeral promise.
const vow = watch(specimenP, ...watcherArgs);
const promise = when(vow);
// Watch the ephemeral result promise to ensure that if its settlement is
// lost due to upgrade of this incarnation, we will at least cause an
// unhandled rejection in the new incarnation.
zone.watchPromise(promise, this.facets.retryRejectionPromiseWatcher);

return promise;
},
},
watcher: {
onFulfilled(value, { id, index, numResults }) {
/**
* @param {unknown} result
* @param {object} ctx
* @param {bigint} ctx.id
* @param {number} ctx.index
* @param {number} ctx.numResults
* @param {boolean} ctx.isAllSettled
* @param {'fulfilled' | 'rejected'} status
*/
processResult(result, { id, index, numResults, isAllSettled }, status) {
const { idToVowState } = this.state;
if (!idToVowState.has(id)) {
// Resolution of the returned vow happened already.
return;
}
const { remaining, resultsMap, resolver } = idToVowState.get(id);
if (!isAllSettled && status === 'rejected') {
// For 'all', we reject immediately on the first rejection
idToVowState.delete(id);
resolver.reject(result);
return;
}

const possiblyWrappedResult = isAllSettled
? harden({
status,
[status === 'fulfilled' ? 'value' : 'reason']: result,
})
: result;

const idToNonStorableResults = provideLazyMap(
utilsToNonStorableResults,
this.facets.utils,
Expand All @@ -152,15 +218,16 @@ export const prepareWatchUtils = (
);

// Capture the fulfilled value.
if (zone.isStorable(value)) {
resultsMap.init(index, value);
if (zone.isStorable(possiblyWrappedResult)) {
resultsMap.init(index, possiblyWrappedResult);
} else {
nonStorableResults.set(index, value);
nonStorableResults.set(index, possiblyWrappedResult);
}
const vowState = harden({
remaining: remaining - 1,
resultsMap,
resolver,
isAllSettled,
});
if (vowState.remaining > 0) {
idToVowState.set(id, vowState);
Expand All @@ -177,26 +244,19 @@ export const prepareWatchUtils = (
results[i] = resultsMap.get(i);
} else {
numLost += 1;
results[i] = isAllSettled
? { status: 'rejected', reason: 'Unstorable result was lost' }
: undefined;
}
}
if (numLost > 0) {
if (numLost > 0 && !isAllSettled) {
resolver.reject(
assert.error(X`${numLost} unstorable results were lost`),
);
} else {
resolver.resolve(harden(results));
}
},
onRejected(value, { id, index: _index, numResults: _numResults }) {
const { idToVowState } = this.state;
if (!idToVowState.has(id)) {
// First rejection wins.
return;
}
const { resolver } = idToVowState.get(id);
idToVowState.delete(id);
resolver.reject(value);
},
},
retryRejectionPromiseWatcher: {
onFulfilled(_result) {},
Expand Down
15 changes: 15 additions & 0 deletions packages/vow/test/types.test-d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,18 @@ expectType<(p1: number, p2: string) => Vow<{ someValue: 'bar' }>>(
Promise.resolve({ someValue: 'bar' } as const),
),
);

expectType<
Vow<
(
| { status: 'fulfilled'; value: any }
| { status: 'rejected'; reason: any }
)[]
>
>(
vt.allVowsSettled([
Promise.resolve(1),
Promise.reject(new Error('test')),
Promise.resolve('hello'),
]),
);
60 changes: 60 additions & 0 deletions packages/vow/test/watch-utils.test.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// @ts-check
/* global setTimeout */
import test from 'ava';

import { makeHeapZone } from '@agoric/base-zone/heap.js';
Expand Down Expand Up @@ -268,3 +269,62 @@ test('allVows handles unstorable results', async t => {
t.is(result[1], nonPassable);
t.is(result[1](), 'i am a function');
});

test('allVowsSettled handles mixed fulfilled and rejected vows', async t => {
const zone = makeHeapZone();
const { watch, when, allVowsSettled } = prepareBasicVowTools(zone);

const vowA = watch(Promise.resolve('a'));
const vowB = watch(Promise.reject(new Error('b')));
const vowC = watch(Promise.resolve('c'));

const result = await when(allVowsSettled([vowA, vowB, vowC]));
t.is(result.length, 3);
t.deepEqual(result[0], { status: 'fulfilled', value: 'a' });
t.deepEqual(result[1], {
status: 'rejected',
reason: new Error('b'),
});
t.deepEqual(result[2], { status: 'fulfilled', value: 'c' });
});

test('allVowsSettled returns vows in order', async t => {
const zone = makeHeapZone();
const { watch, when, allVowsSettled, makeVowKit } =
prepareBasicVowTools(zone);
const kit = makeVowKit();

const vowA = watch(kit.vow);
const vowB = watch(Promise.resolve('b'));
const vowC = watch(Promise.reject(new Error('c')));
const allSettledV = allVowsSettled([vowA, vowB, vowC]);
setTimeout(() => kit.resolver.resolve('a'), 250);

const result = await when(allSettledV);
t.is(result.length, 3);
t.deepEqual(result[0], { status: 'fulfilled', value: 'a' });
t.deepEqual(result[1], { status: 'fulfilled', value: 'b' });
t.deepEqual(result[2], {
status: 'rejected',
reason: new Error('c'),
});
});

test('allVowsSettled handles unstorable results', async t => {
const zone = makeHeapZone();
const { watch, when, allVowsSettled } = prepareBasicVowTools(zone);

const nonPassable = () => 'im a function';
t.is(zone.isStorable(nonPassable), false);

const vowA = watch(Promise.resolve('a'));
const vowB = watch(nonPassable);

const result = await when(allVowsSettled([vowA, vowB]));

t.is(result.length, 2);
t.deepEqual(result[0], { status: 'fulfilled', value: 'a' });
t.deepEqual(result[1], { status: 'fulfilled', value: nonPassable });
// @ts-expect-error narrowed in line above
t.is(result[1].value(), 'im a function');
});

0 comments on commit baedae2

Please sign in to comment.