-
Notifications
You must be signed in to change notification settings - Fork 212
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: vowTools.allSettled
#10077
feat: vowTools.allSettled
#10077
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,7 +10,7 @@ const { Fail, bare, details: X } = assert; | |
* @import {Zone} from '@agoric/base-zone'; | ||
* @import {Watch} from './watch.js'; | ||
* @import {When} from './when.js'; | ||
* @import {VowKit, AsPromiseFunction, IsRetryableReason, EVow} from './types.js'; | ||
* @import {VowKit, AsPromiseFunction, IsRetryableReason, Vow} from './types.js'; | ||
*/ | ||
|
||
const VowShape = M.tagged( | ||
|
@@ -54,11 +54,16 @@ export const prepareWatchUtils = ( | |
{ | ||
utils: M.interface('Utils', { | ||
all: M.call(M.arrayOf(M.any())).returns(VowShape), | ||
allSettled: M.call(M.arrayOf(M.any())).returns(VowShape), | ||
asPromise: M.call(M.raw()).rest(M.raw()).returns(M.promise()), | ||
}), | ||
watcher: M.interface('Watcher', { | ||
onFulfilled: M.call(M.any()).rest(M.any()).returns(M.any()), | ||
onRejected: M.call(M.any()).rest(M.any()).returns(M.any()), | ||
onFulfilled: M.call(M.raw()).rest(M.raw()).returns(M.raw()), | ||
onRejected: M.call(M.raw()).rest(M.raw()).returns(M.raw()), | ||
}), | ||
helper: M.interface('Helper', { | ||
createVow: M.call(M.arrayOf(M.any()), M.boolean()).returns(VowShape), | ||
processResult: M.call(M.raw()).rest(M.raw()).returns(M.undefined()), | ||
}), | ||
retryRejectionPromiseWatcher: PromiseWatcherI, | ||
}, | ||
|
@@ -68,6 +73,7 @@ export const prepareWatchUtils = ( | |
* @property {number} remaining | ||
* @property {MapStore<number, any>} resultsMap | ||
* @property {VowKit['resolver']} resolver | ||
* @property {boolean} [isAllSettled] | ||
*/ | ||
/** @type {MapStore<bigint, VowState>} */ | ||
const idToVowState = detached.mapStore('idToVowState'); | ||
|
@@ -79,32 +85,83 @@ export const prepareWatchUtils = ( | |
}, | ||
{ | ||
utils: { | ||
/** @param {unknown[]} specimens */ | ||
all(specimens) { | ||
return this.facets.helper.createVow(specimens, false); | ||
}, | ||
/** @param {unknown[]} specimens */ | ||
allSettled(specimens) { | ||
return /** @type {Vow<({status: 'fulfilled', value: any} | {status: 'rejected', reason: any})[]>} */ ( | ||
this.facets.helper.createVow(specimens, true) | ||
); | ||
}, | ||
/** @type {AsPromiseFunction} */ | ||
asPromise(specimenP, ...watcherArgs) { | ||
// Watch the specimen in case it is an ephemeral promise. | ||
const vow = watch(specimenP, ...watcherArgs); | ||
const promise = when(vow); | ||
// Watch the ephemeral result promise to ensure that if its settlement is | ||
// lost due to upgrade of this incarnation, we will at least cause an | ||
// unhandled rejection in the new incarnation. | ||
zone.watchPromise(promise, this.facets.retryRejectionPromiseWatcher); | ||
|
||
return promise; | ||
}, | ||
}, | ||
watcher: { | ||
/** | ||
* @param {EVow<unknown>[]} vows | ||
* @param {unknown} value | ||
* @param {object} ctx | ||
* @param {bigint} ctx.id | ||
* @param {number} ctx.index | ||
* @param {number} ctx.numResults | ||
* @param {boolean} ctx.isAllSettled | ||
*/ | ||
all(vows) { | ||
onFulfilled(value, ctx) { | ||
this.facets.helper.processResult(value, ctx, 'fulfilled'); | ||
}, | ||
/** | ||
* @param {unknown} reason | ||
* @param {object} ctx | ||
* @param {bigint} ctx.id | ||
* @param {number} ctx.index | ||
* @param {number} ctx.numResults | ||
* @param {boolean} ctx.isAllSettled | ||
*/ | ||
onRejected(reason, ctx) { | ||
this.facets.helper.processResult(reason, ctx, 'rejected'); | ||
}, | ||
}, | ||
helper: { | ||
/** | ||
* @param {unknown[]} specimens | ||
* @param {boolean} isAllSettled | ||
*/ | ||
createVow(specimens, isAllSettled) { | ||
const { nextId: id, idToVowState } = this.state; | ||
/** @type {VowKit<any[]>} */ | ||
const kit = makeVowKit(); | ||
|
||
// Preserve the order of the vow results. | ||
for (let index = 0; index < vows.length; index += 1) { | ||
watch(vows[index], this.facets.watcher, { | ||
// Preserve the order of the results. | ||
for (let index = 0; index < specimens.length; index += 1) { | ||
watch(specimens[index], this.facets.watcher, { | ||
id, | ||
index, | ||
numResults: vows.length, | ||
numResults: specimens.length, | ||
isAllSettled, | ||
}); | ||
} | ||
|
||
if (vows.length > 0) { | ||
if (specimens.length > 0) { | ||
// Save the state until rejection or all fulfilled. | ||
this.state.nextId += 1n; | ||
idToVowState.init( | ||
id, | ||
harden({ | ||
resolver: kit.resolver, | ||
remaining: vows.length, | ||
remaining: specimens.length, | ||
resultsMap: detached.mapStore('resultsMap'), | ||
isAllSettled, | ||
}), | ||
); | ||
const idToNonStorableResults = provideLazyMap( | ||
|
@@ -119,27 +176,36 @@ export const prepareWatchUtils = ( | |
} | ||
return kit.vow; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a path for typing this, so we can keep the existing h/t @Chris-Hibbert for the suggestion: https://github.com/Agoric/agoric-sdk/pull/9902/files#r1757395986 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've given this a shot above, but it is wholly untested. Hope it helps. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks, your suggestion worked! In hindsight, it was obvious 😅 |
||
}, | ||
/** @type {AsPromiseFunction} */ | ||
asPromise(specimenP, ...watcherArgs) { | ||
// Watch the specimen in case it is an ephemeral promise. | ||
const vow = watch(specimenP, ...watcherArgs); | ||
const promise = when(vow); | ||
// Watch the ephemeral result promise to ensure that if its settlement is | ||
// lost due to upgrade of this incarnation, we will at least cause an | ||
// unhandled rejection in the new incarnation. | ||
zone.watchPromise(promise, this.facets.retryRejectionPromiseWatcher); | ||
|
||
return promise; | ||
}, | ||
}, | ||
watcher: { | ||
onFulfilled(value, { id, index, numResults }) { | ||
/** | ||
* @param {unknown} result | ||
* @param {object} ctx | ||
* @param {bigint} ctx.id | ||
* @param {number} ctx.index | ||
* @param {number} ctx.numResults | ||
* @param {boolean} ctx.isAllSettled | ||
* @param {'fulfilled' | 'rejected'} status | ||
*/ | ||
processResult(result, { id, index, numResults, isAllSettled }, status) { | ||
const { idToVowState } = this.state; | ||
if (!idToVowState.has(id)) { | ||
// Resolution of the returned vow happened already. | ||
return; | ||
} | ||
const { remaining, resultsMap, resolver } = idToVowState.get(id); | ||
if (!isAllSettled && status === 'rejected') { | ||
// For 'all', we reject immediately on the first rejection | ||
idToVowState.delete(id); | ||
resolver.reject(result); | ||
return; | ||
} | ||
|
||
const possiblyWrappedResult = isAllSettled | ||
? harden({ | ||
status, | ||
[status === 'fulfilled' ? 'value' : 'reason']: result, | ||
}) | ||
: result; | ||
|
||
const idToNonStorableResults = provideLazyMap( | ||
utilsToNonStorableResults, | ||
this.facets.utils, | ||
|
@@ -152,15 +218,16 @@ export const prepareWatchUtils = ( | |
); | ||
|
||
// Capture the fulfilled value. | ||
if (zone.isStorable(value)) { | ||
resultsMap.init(index, value); | ||
if (zone.isStorable(possiblyWrappedResult)) { | ||
resultsMap.init(index, possiblyWrappedResult); | ||
} else { | ||
nonStorableResults.set(index, value); | ||
nonStorableResults.set(index, possiblyWrappedResult); | ||
} | ||
const vowState = harden({ | ||
remaining: remaining - 1, | ||
resultsMap, | ||
resolver, | ||
isAllSettled, | ||
}); | ||
if (vowState.remaining > 0) { | ||
idToVowState.set(id, vowState); | ||
|
@@ -177,26 +244,19 @@ export const prepareWatchUtils = ( | |
results[i] = resultsMap.get(i); | ||
} else { | ||
numLost += 1; | ||
results[i] = isAllSettled | ||
? { status: 'rejected', reason: 'Unstorable result was lost' } | ||
: undefined; | ||
} | ||
} | ||
if (numLost > 0) { | ||
if (numLost > 0 && !isAllSettled) { | ||
resolver.reject( | ||
assert.error(X`${numLost} unstorable results were lost`), | ||
); | ||
} else { | ||
resolver.resolve(harden(results)); | ||
} | ||
}, | ||
onRejected(value, { id, index: _index, numResults: _numResults }) { | ||
const { idToVowState } = this.state; | ||
if (!idToVowState.has(id)) { | ||
// First rejection wins. | ||
return; | ||
} | ||
const { resolver } = idToVowState.get(id); | ||
idToVowState.delete(id); | ||
resolver.reject(value); | ||
}, | ||
}, | ||
retryRejectionPromiseWatcher: { | ||
onFulfilled(_result) {}, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👏 !