diff --git a/packages/vow/src/tools.js b/packages/vow/src/tools.js index ff35539726c4..6feaab524b36 100644 --- a/packages/vow/src/tools.js +++ b/packages/vow/src/tools.js @@ -54,10 +54,17 @@ export const prepareBasicVowTools = (zone, powers = {}) => { /** * Vow-tolerant implementation of Promise.all. * - * @param {EVow[]} maybeVows + * @param {unknown[]} maybeVows */ const allVows = maybeVows => watchUtils.all(maybeVows); + /** + * Vow-tolerant implementation of Promise.allSettled. + * + * @param {unknown[]} maybeVows + */ + const allVowsSettled = maybeVows => watchUtils.allSettled(maybeVows); + /** @type {AsPromiseFunction} */ const asPromise = (specimenP, ...watcherArgs) => watchUtils.asPromise(specimenP, ...watcherArgs); @@ -67,6 +74,7 @@ export const prepareBasicVowTools = (zone, powers = {}) => { watch, makeVowKit, allVows, + allVowsSettled, asVow, asPromise, retriable, diff --git a/packages/vow/src/watch-utils.js b/packages/vow/src/watch-utils.js index 1039b950005c..24b809f5a67d 100644 --- a/packages/vow/src/watch-utils.js +++ b/packages/vow/src/watch-utils.js @@ -10,7 +10,7 @@ const { Fail, bare, details: X } = assert; * @import {Zone} from '@agoric/base-zone'; * @import {Watch} from './watch.js'; * @import {When} from './when.js'; - * @import {VowKit, AsPromiseFunction, IsRetryableReason, EVow} from './types.js'; + * @import {VowKit, AsPromiseFunction, IsRetryableReason, Vow} from './types.js'; */ const VowShape = M.tagged( @@ -54,12 +54,17 @@ export const prepareWatchUtils = ( { utils: M.interface('Utils', { all: M.call(M.arrayOf(M.any())).returns(VowShape), + allSettled: M.call(M.arrayOf(M.any())).returns(VowShape), asPromise: M.call(M.raw()).rest(M.raw()).returns(M.promise()), }), watcher: M.interface('Watcher', { onFulfilled: M.call(M.raw()).rest(M.raw()).returns(M.raw()), onRejected: M.call(M.raw()).rest(M.raw()).returns(M.raw()), }), + helper: M.interface('Helper', { + createVow: M.call(M.arrayOf(M.any()), M.boolean()).returns(VowShape), + processResult: M.call(M.raw()).rest(M.raw()).returns(M.undefined()), + }), retryRejectionPromiseWatcher: PromiseWatcherI, }, () => { @@ -68,6 +73,7 @@ export const prepareWatchUtils = ( * @property {number} remaining * @property {MapStore} resultsMap * @property {VowKit['resolver']} resolver + * @property {boolean} [isAllSettled] */ /** @type {MapStore} */ const idToVowState = detached.mapStore('idToVowState'); @@ -79,32 +85,83 @@ export const prepareWatchUtils = ( }, { utils: { + /** @param {unknown[]} specimens */ + all(specimens) { + return this.facets.helper.createVow(specimens, false); + }, + /** @param {unknown[]} specimens */ + allSettled(specimens) { + return /** @type {Vow<({status: 'fulfilled', value: any} | {status: 'rejected', reason: any})[]>} */ ( + this.facets.helper.createVow(specimens, true) + ); + }, + /** @type {AsPromiseFunction} */ + asPromise(specimenP, ...watcherArgs) { + // Watch the specimen in case it is an ephemeral promise. + const vow = watch(specimenP, ...watcherArgs); + const promise = when(vow); + // Watch the ephemeral result promise to ensure that if its settlement is + // lost due to upgrade of this incarnation, we will at least cause an + // unhandled rejection in the new incarnation. + zone.watchPromise(promise, this.facets.retryRejectionPromiseWatcher); + + return promise; + }, + }, + watcher: { /** - * @param {EVow[]} vows + * @param {unknown} value + * @param {object} ctx + * @param {bigint} ctx.id + * @param {number} ctx.index + * @param {number} ctx.numResults + * @param {boolean} ctx.isAllSettled */ - all(vows) { + onFulfilled(value, ctx) { + this.facets.helper.processResult(value, ctx, 'fulfilled'); + }, + /** + * @param {unknown} reason + * @param {object} ctx + * @param {bigint} ctx.id + * @param {number} ctx.index + * @param {number} ctx.numResults + * @param {boolean} ctx.isAllSettled + */ + onRejected(reason, ctx) { + this.facets.helper.processResult(reason, ctx, 'rejected'); + }, + }, + helper: { + /** + * @param {unknown[]} specimens + * @param {boolean} isAllSettled + */ + createVow(specimens, isAllSettled) { const { nextId: id, idToVowState } = this.state; /** @type {VowKit} */ const kit = makeVowKit(); - // Preserve the order of the vow results. - for (let index = 0; index < vows.length; index += 1) { - watch(vows[index], this.facets.watcher, { + // Preserve the order of the results. + for (let index = 0; index < specimens.length; index += 1) { + watch(specimens[index], this.facets.watcher, { id, index, - numResults: vows.length, + numResults: specimens.length, + isAllSettled, }); } - if (vows.length > 0) { + if (specimens.length > 0) { // Save the state until rejection or all fulfilled. this.state.nextId += 1n; idToVowState.init( id, harden({ resolver: kit.resolver, - remaining: vows.length, + remaining: specimens.length, resultsMap: detached.mapStore('resultsMap'), + isAllSettled, }), ); const idToNonStorableResults = provideLazyMap( @@ -119,27 +176,36 @@ export const prepareWatchUtils = ( } return kit.vow; }, - /** @type {AsPromiseFunction} */ - asPromise(specimenP, ...watcherArgs) { - // Watch the specimen in case it is an ephemeral promise. - const vow = watch(specimenP, ...watcherArgs); - const promise = when(vow); - // Watch the ephemeral result promise to ensure that if its settlement is - // lost due to upgrade of this incarnation, we will at least cause an - // unhandled rejection in the new incarnation. - zone.watchPromise(promise, this.facets.retryRejectionPromiseWatcher); - - return promise; - }, - }, - watcher: { - onFulfilled(value, { id, index, numResults }) { + /** + * @param {unknown} result + * @param {object} ctx + * @param {bigint} ctx.id + * @param {number} ctx.index + * @param {number} ctx.numResults + * @param {boolean} ctx.isAllSettled + * @param {'fulfilled' | 'rejected'} status + */ + processResult(result, { id, index, numResults, isAllSettled }, status) { const { idToVowState } = this.state; if (!idToVowState.has(id)) { // Resolution of the returned vow happened already. return; } const { remaining, resultsMap, resolver } = idToVowState.get(id); + if (!isAllSettled && status === 'rejected') { + // For 'all', we reject immediately on the first rejection + idToVowState.delete(id); + resolver.reject(result); + return; + } + + const possiblyWrappedResult = isAllSettled + ? harden({ + status, + [status === 'fulfilled' ? 'value' : 'reason']: result, + }) + : result; + const idToNonStorableResults = provideLazyMap( utilsToNonStorableResults, this.facets.utils, @@ -152,15 +218,16 @@ export const prepareWatchUtils = ( ); // Capture the fulfilled value. - if (zone.isStorable(value)) { - resultsMap.init(index, value); + if (zone.isStorable(possiblyWrappedResult)) { + resultsMap.init(index, possiblyWrappedResult); } else { - nonStorableResults.set(index, value); + nonStorableResults.set(index, possiblyWrappedResult); } const vowState = harden({ remaining: remaining - 1, resultsMap, resolver, + isAllSettled, }); if (vowState.remaining > 0) { idToVowState.set(id, vowState); @@ -177,9 +244,12 @@ export const prepareWatchUtils = ( results[i] = resultsMap.get(i); } else { numLost += 1; + results[i] = isAllSettled + ? { status: 'rejected', reason: 'Unstorable result was lost' } + : undefined; } } - if (numLost > 0) { + if (numLost > 0 && !isAllSettled) { resolver.reject( assert.error(X`${numLost} unstorable results were lost`), ); @@ -187,16 +257,6 @@ export const prepareWatchUtils = ( resolver.resolve(harden(results)); } }, - onRejected(value, { id, index: _index, numResults: _numResults }) { - const { idToVowState } = this.state; - if (!idToVowState.has(id)) { - // First rejection wins. - return; - } - const { resolver } = idToVowState.get(id); - idToVowState.delete(id); - resolver.reject(value); - }, }, retryRejectionPromiseWatcher: { onFulfilled(_result) {}, diff --git a/packages/vow/test/types.test-d.ts b/packages/vow/test/types.test-d.ts index 8a859348caf8..432bc0916970 100644 --- a/packages/vow/test/types.test-d.ts +++ b/packages/vow/test/types.test-d.ts @@ -16,3 +16,18 @@ expectType<(p1: number, p2: string) => Vow<{ someValue: 'bar' }>>( Promise.resolve({ someValue: 'bar' } as const), ), ); + +expectType< + Vow< + ( + | { status: 'fulfilled'; value: any } + | { status: 'rejected'; reason: any } + )[] + > +>( + vt.allVowsSettled([ + Promise.resolve(1), + Promise.reject(new Error('test')), + Promise.resolve('hello'), + ]), +); diff --git a/packages/vow/test/watch-utils.test.js b/packages/vow/test/watch-utils.test.js index ac6a597d7df3..8b076fb97365 100644 --- a/packages/vow/test/watch-utils.test.js +++ b/packages/vow/test/watch-utils.test.js @@ -1,4 +1,5 @@ // @ts-check +/* global setTimeout */ import test from 'ava'; import { makeHeapZone } from '@agoric/base-zone/heap.js'; @@ -268,3 +269,62 @@ test('allVows handles unstorable results', async t => { t.is(result[1], nonPassable); t.is(result[1](), 'i am a function'); }); + +test('allVowsSettled handles mixed fulfilled and rejected vows', async t => { + const zone = makeHeapZone(); + const { watch, when, allVowsSettled } = prepareBasicVowTools(zone); + + const vowA = watch(Promise.resolve('a')); + const vowB = watch(Promise.reject(new Error('b'))); + const vowC = watch(Promise.resolve('c')); + + const result = await when(allVowsSettled([vowA, vowB, vowC])); + t.is(result.length, 3); + t.deepEqual(result[0], { status: 'fulfilled', value: 'a' }); + t.deepEqual(result[1], { + status: 'rejected', + reason: new Error('b'), + }); + t.deepEqual(result[2], { status: 'fulfilled', value: 'c' }); +}); + +test('allVowsSettled returns vows in order', async t => { + const zone = makeHeapZone(); + const { watch, when, allVowsSettled, makeVowKit } = + prepareBasicVowTools(zone); + const kit = makeVowKit(); + + const vowA = watch(kit.vow); + const vowB = watch(Promise.resolve('b')); + const vowC = watch(Promise.reject(new Error('c'))); + const allSettledV = allVowsSettled([vowA, vowB, vowC]); + setTimeout(() => kit.resolver.resolve('a'), 250); + + const result = await when(allSettledV); + t.is(result.length, 3); + t.deepEqual(result[0], { status: 'fulfilled', value: 'a' }); + t.deepEqual(result[1], { status: 'fulfilled', value: 'b' }); + t.deepEqual(result[2], { + status: 'rejected', + reason: new Error('c'), + }); +}); + +test('allVowsSettled handles unstorable results', async t => { + const zone = makeHeapZone(); + const { watch, when, allVowsSettled } = prepareBasicVowTools(zone); + + const nonPassable = () => 'im a function'; + t.is(zone.isStorable(nonPassable), false); + + const vowA = watch(Promise.resolve('a')); + const vowB = watch(nonPassable); + + const result = await when(allVowsSettled([vowA, vowB])); + + t.is(result.length, 2); + t.deepEqual(result[0], { status: 'fulfilled', value: 'a' }); + t.deepEqual(result[1], { status: 'fulfilled', value: nonPassable }); + // @ts-expect-error narrowed in line above + t.is(result[1].value(), 'im a function'); +});