diff --git a/README.md b/README.md index d6d0aca..9b9793b 100644 --- a/README.md +++ b/README.md @@ -39,8 +39,8 @@ - [`encoding = db.keyEncoding([encoding])`](#encoding--dbkeyencodingencoding) - [`encoding = db.valueEncoding([encoding])`](#encoding--dbvalueencodingencoding) - [`key = db.prefixKey(key, keyFormat[, local])`](#key--dbprefixkeykey-keyformat-local) - - [`db.defer(fn)`](#dbdeferfn) - - [`db.deferAsync(fn)`](#dbdeferasyncfn) + - [`db.defer(fn[, options])`](#dbdeferfn-options) + - [`db.deferAsync(fn[, options])`](#dbdeferasyncfn-options) - [`chainedBatch`](#chainedbatch) - [`chainedBatch.put(key, value[, options])`](#chainedbatchputkey-value-options) - [`chainedBatch.del(key[, options])`](#chainedbatchdelkey-options) @@ -59,6 +59,7 @@ - [`iterator.db`](#iteratordb) - [`iterator.count`](#iteratorcount) - [`iterator.limit`](#iteratorlimit) + - [Aborting Iterators](#aborting-iterators) - [`keyIterator`](#keyiterator) - [`valueIterator`](#valueiterator) - [`sublevel`](#sublevel) @@ -103,6 +104,7 @@ - [`LEVEL_ITERATOR_NOT_OPEN`](#level_iterator_not_open) - [`LEVEL_ITERATOR_BUSY`](#level_iterator_busy) - [`LEVEL_BATCH_NOT_OPEN`](#level_batch_not_open) + - [`LEVEL_ABORTED`](#level_aborted) - [`LEVEL_ENCODING_NOT_FOUND`](#level_encoding_not_found) - [`LEVEL_ENCODING_NOT_SUPPORTED`](#level_encoding_not_supported) - [`LEVEL_DECODE_ERROR`](#level_decode_error) @@ -381,6 +383,7 @@ The `gte` and `lte` range options take precedence over `gt` and `lt` respectivel - `values` (boolean, default: `true`): whether to return the value of each entry. If set to `false`, the iterator will yield values that are `undefined`. Prefer to use `db.values()` instead. - `keyEncoding`: custom key encoding for this iterator, used to encode range options, to encode `seek()` targets and to decode keys. - `valueEncoding`: custom value encoding for this iterator, used to decode values. +- `signal`: an [`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) to [abort read operations on the iterator](#aborting-iterators). Lastly, an implementation is free to add its own options. @@ -529,9 +532,9 @@ console.log(nested.prefixKey('a', 'utf8')) // '!example!!nested!a' console.log(nested.prefixKey('a', 'utf8', true)) // '!nested!a' ``` -### `db.defer(fn)` +### `db.defer(fn[, options])` -Call the function `fn` at a later time when [`db.status`](#dbstatus) changes to `'open'` or `'closed'`. Used by `abstract-level` itself to implement "deferred open" which is a feature that makes it possible to call methods like `db.put()` before the database has finished opening. The `defer()` method is exposed for implementations and plugins to achieve the same on their custom methods: +Call the function `fn` at a later time when [`db.status`](#dbstatus) changes to `'open'` or `'closed'`. Known as a _deferred operation_. Used by `abstract-level` itself to implement "deferred open" which is a feature that makes it possible to call methods like `db.put()` before the database has finished opening. The `defer()` method is exposed for implementations and plugins to achieve the same on their custom methods: ```js db.foo = function (key) { @@ -543,9 +546,13 @@ db.foo = function (key) { } ``` +The optional `options` object may contain: + +- `signal`: an [`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) to abort the deferred operation. When aborted (now or later) the `fn` function will not be called. + When deferring a custom operation, do it early: after normalizing optional arguments but before encoding (to avoid double encoding and to emit original input if the operation has events) and before any _fast paths_ (to avoid calling back before the database has finished opening). For example, `db.batch([])` has an internal fast path where it skips work if the array of operations is empty. Resources that can be closed on their own (like iterators) should however first check such state before deferring, in order to reject operations after close (including when the database was reopened). -### `db.deferAsync(fn)` +### `db.deferAsync(fn[, options])` Similar to `db.defer(fn)` but for asynchronous work. Returns a promise, which waits for [`db.status`](#dbstatus) to change to `'open'` or `'closed'` and then calls `fn` which itself must return a promise. This allows for recursion: @@ -559,6 +566,10 @@ db.foo = async function (key) { } ``` +The optional `options` object may contain: + +- `signal`: an [`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) to abort the deferred operation. When aborted (now or later) the `fn` function will not be called, and the promise returned by `deferAsync()` will be rejected with a [`LEVEL_ABORTED`](#errors) error. + ### `chainedBatch` #### `chainedBatch.put(key, value[, options])` @@ -721,6 +732,44 @@ const hasMore = iterator.count < iterator.limit const remaining = iterator.limit - iterator.count ``` +#### Aborting Iterators + +Iterators take an experimental `signal` option that, once signaled, aborts an in-progress read operation (if any) and rejects subsequent reads. The relevant promise will be rejected with a [`LEVEL_ABORTED`](#errors) error. Aborting does not close the iterator, because closing is asynchronous and may result in an error that needs a place to go. This means signals should be used together with a pattern that automatically closes the iterator: + +```js +const abortController = new AbortController() +const signal = abortController.signal + +// Will result in 'aborted' log +abortController.abort() + +try { + for await (const entry of db.iterator({ signal })) { + console.log(entry) + } +} catch (err) { + if (err.code === 'LEVEL_ABORTED') { + console.log('aborted') + } +} +``` + +Otherwise, close the iterator explicitly: + +```js +const iterator = db.iterator({ signal }) + +try { + const entries = await iterator.nextv(10) +} catch (err) { + if (err.code === 'LEVEL_ABORTED') { + console.log('aborted') + } +} finally { + await iterator.close() +} +``` + ### `keyIterator` A key iterator has the same interface as `iterator` except that its methods yield keys instead of entries. Usage is otherwise the same. @@ -1161,6 +1210,10 @@ When `iterator.next()` or `seek()` was called while a previous `next()` call was When an operation was made on a chained batch while it was closing or closed, which may also be the result of the database being closed or that `write()` was called on the chained batch. +#### `LEVEL_ABORTED` + +When an operation was aborted by the user. + #### `LEVEL_ENCODING_NOT_FOUND` When a `keyEncoding` or `valueEncoding` option specified a named encoding that does not exist. @@ -1564,6 +1617,14 @@ class ExampleSublevel extends AbstractSublevel { The first argument to this constructor must be an instance of the relevant `AbstractLevel` implementation. The constructor will set `iterator.db` which is used (among other things) to access encodings and ensures that `db` will not be garbage collected in case there are no other references to it. The `options` argument must be the original `options` object that was passed to `db._iterator()` and it is therefore not (publicly) possible to create an iterator via constructors alone. +The `signal` option, if any and once signaled, should abort an in-progress `_next()`, `_nextv()` or `_all()` call and reject the promise returned by that call with a [`LEVEL_ABORTED`](#errors) error. Doing so is optional until a future semver-major release. Responsibilities are divided as follows: + +1. Before a database has finished opening, `abstract-level` handles the signal +2. While a call is in progress, the implementation handles the signal +3. Once the signal is aborted, `abstract-level` rejects further calls. + +A method like `_next()` therefore doesn't have to check the signal _before_ it start its asynchronous work, only _during_ that work. Whether to respect the signal and on which (potentially long-running) methods, is up to the implementation. + #### `iterator._next()` Advance to the next entry and yield that entry. Must return a promise. If an error occurs, reject the promise. If the natural end of the iterator has been reached, resolve the promise with `undefined`. Otherwise resolve the promise with an array containing a `key` and `value`. If a `limit` was set and the iterator already yielded that many entries (via any of the methods) then `_next()` will not be called. diff --git a/UPGRADING.md b/UPGRADING.md index 4037927..02c1a0b 100644 --- a/UPGRADING.md +++ b/UPGRADING.md @@ -202,7 +202,7 @@ As for why that last example works yet the same is not supported on a chained ba #### 2.1. Promises all the way -All private methods that previously took a callback now use a promise. For example, the function signature `_get(key, options, callback)` has changed to `async _get(key, options)`. Same as in the public API, the new function signatures are predictable and the only method that requires special attention is `iterator._next()`. Which in addition now also takes an `options` argument. For details, please see the updated [README](./README.md#private-api-for-implementors). +All private methods that previously took a callback now use a promise. For example, the function signature `_get(key, options, callback)` has changed to `async _get(key, options)`. Same as in the public API, the new function signatures are predictable and the only method that requires special attention is `iterator._next()`. For details, please see the updated [README](./README.md#private-api-for-implementors). #### 2.2. Ticks @@ -240,9 +240,7 @@ class ExampleLevel extends AbstractLevel { #### 2.3. A new way to abort iterator work -_This section is incomplete._ - -Closing an iterator now aborts work, if supported by implementation. The undocumented `abortOnClose` option of iterators (added as a workaround for `many-level`) has been removed in favor of AbortSignal. +Iterators now take an experimental `signal` option that is an [`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal). You can use the `signal` to abort an in-progress `_next()`, `_nextv()` or `_all()` call. Doing so is optional until a future semver-major release. #### 2.4. Snapshots must be synchronous diff --git a/abstract-chained-batch.js b/abstract-chained-batch.js index f542bb7..17452b3 100644 --- a/abstract-chained-batch.js +++ b/abstract-chained-batch.js @@ -10,7 +10,6 @@ const kStatus = Symbol('status') const kPublicOperations = Symbol('publicOperations') const kLegacyOperations = Symbol('legacyOperations') const kPrivateOperations = Symbol('privateOperations') -const kCallClose = Symbol('callClose') const kClosePromise = Symbol('closePromise') const kLength = Symbol('length') const kPrewriteRun = Symbol('prewriteRun') @@ -273,13 +272,8 @@ class AbstractChainedBatch { } else { this[kStatus] = 'writing' - // Prepare promise in case write() is called in the mean time - let close - this[kClosePromise] = new Promise((resolve, reject) => { - close = () => { - this[kCallClose]().then(resolve, reject) - } - }) + // Prepare promise in case close() is called in the mean time + const close = prepareClose(this) try { // Process operations added by prewrite hook functions @@ -344,20 +338,33 @@ class AbstractChainedBatch { // First caller of close() or write() is responsible for error return this[kClosePromise].catch(noop) } else { - this[kClosePromise] = this[kCallClose]() + // Wrap promise to avoid race issues on recursive calls + prepareClose(this)() return this[kClosePromise] } } - async [kCallClose] () { - this[kStatus] = 'closing' - await this._close() - this.db.detachResource(this) - } - async _close () {} } +const prepareClose = function (batch) { + let close + + batch[kClosePromise] = new Promise((resolve, reject) => { + close = () => { + privateClose(batch).then(resolve, reject) + } + }) + + return close +} + +const privateClose = async function (batch) { + batch[kStatus] = 'closing' + await batch._close() + batch.db.detachResource(batch) +} + class PrewriteData { constructor (privateOperations, publicOperations) { this[kPrivateOperations] = privateOperations @@ -381,7 +388,7 @@ class PrewriteData { } } -function assertStatus (batch) { +const assertStatus = function (batch) { if (batch[kStatus] !== 'open') { throw new ModuleError('Batch is not open: cannot change operations after write() or close()', { code: 'LEVEL_BATCH_NOT_OPEN' diff --git a/abstract-iterator.js b/abstract-iterator.js index 246b842..42d1411 100644 --- a/abstract-iterator.js +++ b/abstract-iterator.js @@ -3,18 +3,14 @@ const ModuleError = require('module-error') const combineErrors = require('maybe-combine-errors') const { getOptions, emptyOptions, noop } = require('./lib/common') -const { AbortController } = require('./lib/abort') +const { AbortError } = require('./lib/errors') const kWorking = Symbol('working') const kDecodeOne = Symbol('decodeOne') const kDecodeMany = Symbol('decodeMany') -const kAbortController = Symbol('abortController') -const kAbortSignalOptions = Symbol('abortSignalOptions') -const kClosing = Symbol('closing') -const kCallClose = Symbol('callClose') +const kSignal = Symbol('signal') const kPendingClose = Symbol('pendingClose') const kClosingPromise = Symbol('closingPromise') -const kClosed = Symbol('closed') const kKeyEncoding = Symbol('keyEncoding') const kValueEncoding = Symbol('valueEncoding') const kKeys = Symbol('keys') @@ -35,25 +31,14 @@ class CommonIterator { throw new TypeError('The second argument must be an options object') } - this[kClosed] = false this[kWorking] = false - this[kClosing] = false this[kPendingClose] = null this[kClosingPromise] = null this[kKeyEncoding] = options[kKeyEncoding] this[kValueEncoding] = options[kValueEncoding] this[kLimit] = Number.isInteger(options.limit) && options.limit >= 0 ? options.limit : Infinity this[kCount] = 0 - - // TODO (signals): docs, types, tests - this[kAbortController] = new AbortController() - this[kAbortSignalOptions] = Object.freeze({ - signal: this[kAbortController].signal - }) - - if (options.signal) { - // TODO (signals): combine signals - } + this[kSignal] = options.signal != null ? options.signal : null this.db = db this.db.attachResource(this) @@ -68,15 +53,14 @@ class CommonIterator { } async next () { - assertStatus(this) - this[kWorking] = true + startWork(this) try { if (this[kCount] >= this[kLimit]) { return undefined } - let item = await this._next(this[kAbortSignalOptions]) + let item = await this._next() try { if (item !== undefined) { @@ -89,30 +73,23 @@ class CommonIterator { return item } finally { - this[kWorking] = false - - if (this[kPendingClose] !== null) { - this[kPendingClose]() - } + endWork(this) } } - // TODO (signals): docs - // TODO (signals): check if signal option can work in many-level - async _next (options) {} + async _next () {} async nextv (size, options) { if (!Number.isInteger(size)) { throw new TypeError("The first argument 'size' must be an integer") } - options = getAbortOptions(this, options) - assertStatus(this) + options = getOptions(options, emptyOptions) if (size < 1) size = 1 if (this[kLimit] < Infinity) size = Math.min(size, this[kLimit] - this[kCount]) - this[kWorking] = true + startWork(this) try { if (size <= 0) return [] @@ -128,11 +105,7 @@ class CommonIterator { this[kCount] += items.length return items } finally { - this[kWorking] = false - - if (this[kPendingClose] !== null) { - this[kPendingClose]() - } + endWork(this) } } @@ -143,21 +116,14 @@ class CommonIterator { while (acc.length < size && (item = await this._next(options)) !== undefined) { acc.push(item) - - // TODO (signals) - // if (options.signal.aborted) { - // throw new AbortedError() - // } } return acc } async all (options) { - options = getAbortOptions(this, options) - assertStatus(this) - - this[kWorking] = true + options = getOptions(options, emptyOptions) + startWork(this) try { if (this[kCount] >= this[kLimit]) { @@ -175,27 +141,11 @@ class CommonIterator { this[kCount] += items.length return items } catch (err) { - this[kWorking] = false - - if (this[kPendingClose] !== null) { - this[kPendingClose]() - } - - try { - await this.close() - } catch (closeErr) { - throw combineErrors([err, closeErr]) - } - - throw err + endWork(this) + await destroy(this, err) } finally { if (this[kWorking]) { - this[kWorking] = false - - if (this[kPendingClose] !== null) { - this[kPendingClose]() - } - + endWork(this) await this.close() } } @@ -203,7 +153,6 @@ class CommonIterator { async _all (options) { // Must count here because we're directly calling _nextv() - // TODO: should we not increment this[kCount] as well? let count = this[kCount] const acc = [] @@ -224,18 +173,13 @@ class CommonIterator { acc.push.apply(acc, items) count += items.length - - // TODO (signals) - // if (options.signal.aborted) { - // throw new AbortedError() - // } } } seek (target, options) { options = getOptions(options, emptyOptions) - if (this[kClosing]) { + if (this[kClosingPromise] !== null) { // Don't throw here, to be kind to implementations that wrap // another db and don't necessarily control when the db is closed } else if (this[kWorking]) { @@ -262,48 +206,29 @@ class CommonIterator { } async close () { - if (this[kClosed]) { - return - } - - if (this[kClosing]) { + if (this[kClosingPromise] !== null) { // First caller of close() is responsible for error return this[kClosingPromise].catch(noop) - } else { - this[kClosing] = true + } - if (this[kWorking]) { - // Wait for work, but handle closing and its error here. - this[kClosingPromise] = new Promise((resolve, reject) => { - this[kPendingClose] = () => { - this[kCallClose]().then(resolve, reject) - } - }) - - // If implementation supports it, abort the work. - this[kAbortController].abort() - } else { - this[kClosingPromise] = this[kCallClose]() + // Wrap to avoid race issues on recursive calls + this[kClosingPromise] = new Promise((resolve, reject) => { + this[kPendingClose] = () => { + this[kPendingClose] = null + privateClose(this).then(resolve, reject) } + }) - return this[kClosingPromise] + // If working we'll delay closing, but still handle the close error (if any) here + if (!this[kWorking]) { + this[kPendingClose]() } + + return this[kClosingPromise] } async _close () {} - async [kCallClose] () { - this[kPendingClose] = null - - try { - await this._close() - } finally { - this[kClosed] = true - } - - this.db.detachResource(this) - } - async * [Symbol.asyncIterator] () { try { let item @@ -311,8 +236,10 @@ class CommonIterator { while ((item = (await this.next())) !== undefined) { yield item } + } catch (err) { + await destroy(this, err) } finally { - if (!this[kClosed]) await this.close() + await this.close() } } } @@ -394,19 +321,8 @@ class IteratorDecodeError extends ModuleError { } } -// Internal utility, not typed or exported -// TODO (signals): define and document new code -// class AbortedError extends ModuleError { -// constructor (cause) { -// super('Iterator has been aborted', { -// code: 'LEVEL_ITERATOR_NOT_OPEN', -// cause -// }) -// } -// } - -function assertStatus (iterator) { - if (iterator[kClosing]) { +const startWork = function (iterator) { + if (iterator[kClosingPromise] !== null) { throw new ModuleError('Iterator is not open: cannot read after close()', { code: 'LEVEL_ITERATOR_NOT_OPEN' }) @@ -414,28 +330,36 @@ function assertStatus (iterator) { throw new ModuleError('Iterator is busy: cannot read until previous read has completed', { code: 'LEVEL_ITERATOR_BUSY' }) + } else if (iterator[kSignal] !== null && iterator[kSignal].aborted) { + throw new AbortError() } - // TODO (signals): may want to do (unless aborting closes the iterator, TBD): - // if (iterator[kAbortController].signal.aborted) { - // throw new AbortedError() - // } + iterator[kWorking] = true } -function getAbortOptions (iterator, options) { - if (typeof options === 'object' && options !== null) { - // The signal option should only be set via constructor. Including when we're - // forwarding calls like in AbstractSublevelIterator#_next(). Meaning we knowingly - // lose the signal between _next({ signal }) and next({ signal }) calls. We might - // support merging signals in the future but at this time we don't need it, because - // in these forwarding scenarios, we also forward close() and thus the main signal. - return Object.assign({}, options, iterator[kAbortSignalOptions]) - } else { - // Avoid an expensive Object.assign({}) - return iterator[kAbortSignalOptions] +const endWork = function (iterator) { + iterator[kWorking] = false + + if (iterator[kPendingClose] !== null) { + iterator[kPendingClose]() } } +const privateClose = async function (iterator) { + await iterator._close() + iterator.db.detachResource(iterator) +} + +const destroy = async function (iterator, err) { + try { + await iterator.close() + } catch (closeErr) { + throw combineErrors([err, closeErr]) + } + + throw err +} + // Exposed so that AbstractLevel can set these options AbstractIterator.keyEncoding = kKeyEncoding AbstractIterator.valueEncoding = kValueEncoding diff --git a/abstract-level.js b/abstract-level.js index 474e8db..141bd7a 100644 --- a/abstract-level.js +++ b/abstract-level.js @@ -12,14 +12,14 @@ const { DefaultChainedBatch } = require('./lib/default-chained-batch') const { DatabaseHooks } = require('./lib/hooks') const { PrewriteBatch } = require('./lib/prewrite-batch') const { EventMonitor } = require('./lib/event-monitor') -const { getOptions, noop, emptyOptions } = require('./lib/common') +const { getOptions, noop, emptyOptions, resolvedPromise } = require('./lib/common') const { prefixDescendantKey } = require('./lib/prefixes') +const { DeferredQueue } = require('./lib/deferred-queue') const rangeOptions = require('./lib/range-options') const kResources = Symbol('resources') const kCloseResources = Symbol('closeResources') -const kOperations = Symbol('operations') -const kUndefer = Symbol('undefer') +const kQueue = Symbol('queue') const kDeferOpen = Symbol('deferOpen') const kOptions = Symbol('options') const kStatus = Symbol('status') @@ -44,7 +44,7 @@ class AbstractLevel extends EventEmitter { const { keyEncoding, valueEncoding, passive, ...forward } = options this[kResources] = new Set() - this[kOperations] = [] + this[kQueue] = new DeferredQueue() this[kDeferOpen] = true this[kOptions] = forward this[kStatus] = 'opening' @@ -159,18 +159,19 @@ class AbstractLevel extends EventEmitter { if (this[kStatus] !== 'open') throw new NotOpenError() } else if (this[kStatus] === 'closed' || this[kDeferOpen]) { this[kDeferOpen] = false - this[kStatus] = 'opening' - this.emit('opening') - + this[kStatusChange] = resolvedPromise // TODO: refactor this[kStatusChange] = (async () => { + this[kStatus] = 'opening' + try { + this.emit('opening') await this._open(options) } catch (err) { this[kStatus] = 'closed' // Must happen before we close resources, in case their close() is waiting // on a deferred operation which in turn is waiting on db.open(). - this[kUndefer]() + this[kQueue].drain() try { await this[kCloseResources]() @@ -200,7 +201,7 @@ class AbstractLevel extends EventEmitter { // Revert if (hookErr) { this[kStatus] = 'closing' - this[kUndefer]() + this[kQueue].drain() try { await this[kCloseResources]() @@ -222,7 +223,7 @@ class AbstractLevel extends EventEmitter { } } - this[kUndefer]() + this[kQueue].drain() this.emit('open') })() @@ -232,8 +233,7 @@ class AbstractLevel extends EventEmitter { this[kStatusChange] = null } } else if (this[kStatus] !== 'open') { - // Should not happen - /* istanbul ignore next */ + /* istanbul ignore next: should not happen */ throw new NotOpenError() } } @@ -251,21 +251,23 @@ class AbstractLevel extends EventEmitter { const fromInitial = this[kDeferOpen] this[kDeferOpen] = false - this[kStatus] = 'closing' - this.emit('closing') - + this[kStatusChange] = resolvedPromise this[kStatusChange] = (async () => { + this[kStatus] = 'closing' + this[kQueue].drain() + try { + this.emit('closing') await this[kCloseResources]() if (!fromInitial) await this._close() } catch (err) { this[kStatus] = 'open' - this[kUndefer]() + this[kQueue].drain() throw new NotClosedError(err) } this[kStatus] = 'closed' - this[kUndefer]() + this[kQueue].drain() this.emit('closed') })() @@ -275,8 +277,7 @@ class AbstractLevel extends EventEmitter { this[kStatusChange] = null } } else if (this[kStatus] !== 'closed') { - // Should not happen - /* istanbul ignore next */ + /* istanbul ignore next: should not happen */ throw new NotClosedError() } } @@ -290,6 +291,7 @@ class AbstractLevel extends EventEmitter { const resources = Array.from(this[kResources]) const promises = resources.map(closeResource) + // TODO: async/await return Promise.allSettled(promises).then(async (results) => { const errors = [] @@ -798,36 +800,29 @@ class AbstractLevel extends EventEmitter { return new DefaultValueIterator(this, options) } - defer (fn) { + defer (fn, options) { if (typeof fn !== 'function') { throw new TypeError('The first argument must be a function') } - this[kOperations].push(fn) + this[kQueue].add(function (abortError) { + if (!abortError) fn() + }, options) } - // TODO (signals): support signal option deferAsync (fn, options) { if (typeof fn !== 'function') { throw new TypeError('The first argument must be a function') } return new Promise((resolve, reject) => { - this[kOperations].push(function () { - fn().then(resolve, reject) - }) + this[kQueue].add(function (abortError) { + if (abortError) reject(abortError) + else fn().then(resolve, reject) + }, options) }) } - [kUndefer] () { - const operations = this[kOperations] - this[kOperations] = [] - - for (const op of operations) { - op() - } - } - // TODO: docs and types attachResource (resource) { if (typeof resource !== 'object' || resource === null || diff --git a/index.d.ts b/index.d.ts index e4a49d8..4277657 100644 --- a/index.d.ts +++ b/index.d.ts @@ -12,7 +12,8 @@ export { AbstractBatchDelOperation, AbstractClearOptions, AbstractDatabaseHooks, - AbstractHook + AbstractHook, + AbstractDeferOptions } from './types/abstract-level' export { diff --git a/lib/abort-ponyfill.js b/lib/abort-ponyfill.js deleted file mode 100644 index 51bc518..0000000 --- a/lib/abort-ponyfill.js +++ /dev/null @@ -1,29 +0,0 @@ -'use strict' - -const kAbort = Symbol('abort') -const kAborted = Symbol('aborted') - -// Minimal ponyfill. Scope is TBD. -exports.AbortController = class AbortController { - constructor () { - this.signal = new exports.AbortSignal() - } - - abort () { - this.signal[kAbort]() - } -} - -exports.AbortSignal = class AbortSignal { - constructor () { - this[kAborted] = false - } - - get aborted () { - return this[kAborted] - } - - [kAbort] () { - this[kAborted] = true - } -} diff --git a/lib/abort.js b/lib/abort.js deleted file mode 100644 index 4fbd7ce..0000000 --- a/lib/abort.js +++ /dev/null @@ -1,9 +0,0 @@ -'use strict' - -// Requires Node.js >= 15 -const src = typeof globalThis.AbortController !== 'undefined' - ? globalThis - : require('./abort-ponyfill') - -exports.AbortController = src.AbortController -exports.AbortSignal = src.AbortSignal diff --git a/lib/common.js b/lib/common.js index e48dd1f..7ebc5dd 100644 --- a/lib/common.js +++ b/lib/common.js @@ -17,6 +17,7 @@ exports.getOptions = function (options, def) { exports.emptyOptions = Object.freeze({}) exports.noop = function () {} +exports.resolvedPromise = Promise.resolve() exports.deprecate = function (message) { if (!deprecations.has(message)) { diff --git a/lib/deferred-iterator.js b/lib/deferred-iterator.js index c55b6ed..8292b61 100644 --- a/lib/deferred-iterator.js +++ b/lib/deferred-iterator.js @@ -6,6 +6,7 @@ const ModuleError = require('module-error') const kNut = Symbol('nut') const kUndefer = Symbol('undefer') const kFactory = Symbol('factory') +const kSignalOptions = Symbol('signalOptions') class DeferredIterator extends AbstractIterator { constructor (db, options) { @@ -13,8 +14,9 @@ class DeferredIterator extends AbstractIterator { this[kNut] = null this[kFactory] = () => db.iterator(options) + this[kSignalOptions] = { signal: options.signal } - this.db.defer(() => this[kUndefer]()) + this.db.defer(() => this[kUndefer](), this[kSignalOptions]) } } @@ -24,8 +26,9 @@ class DeferredKeyIterator extends AbstractKeyIterator { this[kNut] = null this[kFactory] = () => db.keys(options) + this[kSignalOptions] = { signal: options.signal } - this.db.defer(() => this[kUndefer]()) + this.db.defer(() => this[kUndefer](), this[kSignalOptions]) } } @@ -35,8 +38,9 @@ class DeferredValueIterator extends AbstractValueIterator { this[kNut] = null this[kFactory] = () => db.values(options) + this[kSignalOptions] = { signal: options.signal } - this.db.defer(() => this[kUndefer]()) + this.db.defer(() => this[kUndefer](), this[kSignalOptions]) } } @@ -47,11 +51,11 @@ for (const Iterator of [DeferredIterator, DeferredKeyIterator, DeferredValueIter } } - Iterator.prototype._next = async function (options) { + Iterator.prototype._next = async function () { if (this[kNut] !== null) { return this[kNut].next() } else if (this.db.status === 'opening') { - return this.db.deferAsync(() => this._next(), options) + return this.db.deferAsync(() => this._next(), this[kSignalOptions]) } else { throw new ModuleError('Iterator is not open: cannot call next() after close()', { code: 'LEVEL_ITERATOR_NOT_OPEN' @@ -63,7 +67,7 @@ for (const Iterator of [DeferredIterator, DeferredKeyIterator, DeferredValueIter if (this[kNut] !== null) { return this[kNut].nextv(size, options) } else if (this.db.status === 'opening') { - return this.db.deferAsync(() => this._nextv(size, options), options) + return this.db.deferAsync(() => this._nextv(size, options), this[kSignalOptions]) } else { throw new ModuleError('Iterator is not open: cannot call nextv() after close()', { code: 'LEVEL_ITERATOR_NOT_OPEN' @@ -75,7 +79,7 @@ for (const Iterator of [DeferredIterator, DeferredKeyIterator, DeferredValueIter if (this[kNut] !== null) { return this[kNut].all() } else if (this.db.status === 'opening') { - return this.db.deferAsync(() => this._all(options), options) + return this.db.deferAsync(() => this._all(options), this[kSignalOptions]) } else { throw new ModuleError('Iterator is not open: cannot call all() after close()', { code: 'LEVEL_ITERATOR_NOT_OPEN' @@ -88,7 +92,7 @@ for (const Iterator of [DeferredIterator, DeferredKeyIterator, DeferredValueIter // TODO: explain why we need _seek() rather than seek() here this[kNut]._seek(target, options) } else if (this.db.status === 'opening') { - this.db.defer(() => this._seek(target, options)) + this.db.defer(() => this._seek(target, options), this[kSignalOptions]) } } diff --git a/lib/deferred-queue.js b/lib/deferred-queue.js new file mode 100644 index 0000000..b11454a --- /dev/null +++ b/lib/deferred-queue.js @@ -0,0 +1,86 @@ +'use strict' + +const { getOptions, emptyOptions } = require('./common') +const { AbortError } = require('./errors') + +const kOperations = Symbol('operations') +const kSignals = Symbol('signals') +const kHandleAbort = Symbol('handleAbort') + +class DeferredOperation { + constructor (fn, signal) { + this.fn = fn + this.signal = signal + } +} + +class DeferredQueue { + constructor () { + this[kOperations] = [] + this[kSignals] = new Set() + this[kHandleAbort] = this[kHandleAbort].bind(this) + } + + add (fn, options) { + options = getOptions(options, emptyOptions) + const signal = options.signal + + if (signal == null) { + this[kOperations].push(new DeferredOperation(fn, null)) + return + } + + if (signal.aborted) { + // Note that this is called in the same tick + fn(new AbortError()) + return + } + + if (!this[kSignals].has(signal)) { + this[kSignals].add(signal) + signal.addEventListener('abort', this[kHandleAbort], { once: true }) + } + + this[kOperations].push(new DeferredOperation(fn, signal)) + } + + drain () { + const operations = this[kOperations] + const signals = this[kSignals] + + this[kOperations] = [] + this[kSignals] = new Set() + + for (const signal of signals) { + signal.removeEventListener('abort', this[kHandleAbort]) + } + + for (const operation of operations) { + operation.fn.call(null) + } + } + + [kHandleAbort] (ev) { + const signal = ev.target + const err = new AbortError() + const aborted = [] + + // TODO: optimize + this[kOperations] = this[kOperations].filter(function (operation) { + if (operation.signal !== null && operation.signal === signal) { + aborted.push(operation) + return false + } else { + return true + } + }) + + this[kSignals].delete(signal) + + for (const operation of aborted) { + operation.fn.call(null, err) + } + } +} + +exports.DeferredQueue = DeferredQueue diff --git a/lib/errors.js b/lib/errors.js new file mode 100644 index 0000000..3a88ed1 --- /dev/null +++ b/lib/errors.js @@ -0,0 +1,25 @@ +'use strict' + +const ModuleError = require('module-error') + +class AbortError extends ModuleError { + constructor (cause) { + super('Operation has been aborted', { + code: 'LEVEL_ABORTED', + cause + }) + } + + // TODO: we should set name to AbortError for web compatibility. See: + // https://dom.spec.whatwg.org/#aborting-ongoing-activities + // https://github.com/nodejs/node/pull/35911#discussion_r515779306 + // + // But I'm not sure we can do the same for errors created by a Node-API addon (like + // classic-level) so for now this behavior is undocumented and folks should use the + // LEVEL_ABORTED code instead. + get name () { + return 'AbortError' + } +} + +exports.AbortError = AbortError diff --git a/test/iterator-test.js b/test/iterator-test.js index 820ccbc..cf59690 100644 --- a/test/iterator-test.js +++ b/test/iterator-test.js @@ -101,29 +101,18 @@ exports.sequence = function (test, testCommon) { return iterator.close() }) } - } - } - for (const deferred of [false, true]) { - for (const mode of ['iterator', 'keys', 'values']) { - for (const method of ['next', 'nextv', 'all']) { - const requiredArgs = method === 'nextv' ? [10] : [] - - // NOTE: adapted from leveldown - test(`${mode}().${method}() after db.close() yields error (deferred: ${deferred})`, async function (t) { + for (const deferred of [false, true]) { + test(`${mode}().${method}() during close() yields error (deferred: ${deferred})`, async function (t) { t.plan(2) const db = testCommon.factory() if (!deferred) await db.open() - - await db.put('a', 'a') - await db.put('b', 'b') - const it = db[mode]() - // The first call *should* succeed, because it was scheduled before close(). However, success - // is not a must. Because nextv() and all() fallback to next*(), they're allowed to fail. An - // implementation can also choose to abort any pending call on close. + // The first call *may* succeed, because it was scheduled before close(). The + // default implementations of nextv() and all() fallback to next*() and thus + // make multiple calls, so they're allowed to fail. let promise = it[method](...requiredArgs).then(() => { t.pass('Optionally succeeded') }, (err) => { @@ -139,9 +128,31 @@ exports.sequence = function (test, testCommon) { }) }) - return Promise.all([db.close(), promise]) + await Promise.all([it.close(), promise]) + return db.close() }) } + + // At the moment, we can only be sure that signals are supported if the iterator is deferred + globalThis.AbortController && test(`${mode}().${method}() with aborted signal yields error`, async function (t) { + t.plan(2) + + const db = testCommon.factory() + const ac = new globalThis.AbortController() + const it = db[mode]({ signal: ac.signal }) + + t.is(db.status, 'opening', 'is deferred') + ac.abort() + + try { + await it[method](...requiredArgs) + } catch (err) { + t.is(err.code, 'LEVEL_ABORTED') + } + + await it.close() + return db.close() + }) } } } diff --git a/test/open-test.js b/test/open-test.js index 0f8f239..b4f81b6 100644 --- a/test/open-test.js +++ b/test/open-test.js @@ -51,14 +51,14 @@ exports.open = function (test, testCommon) { t.is(db.status, 'opening', 'is opening') - // This wins from the open() call + // This eventually wins from the open() call db.close().then(function () { order.push('B') t.same(order, ['open event', 'A', 'closed event', 'B'], 'order is correct') t.is(db.status, 'closed', 'is closed') }) - // But open() is still in control + // But open() is still in progress t.is(db.status, 'opening', 'is still opening') db.on('open', () => { order.push('open event') }) @@ -176,31 +176,59 @@ exports.open = function (test, testCommon) { } }) - test('close() on open event', function (t) { - t.plan(4) + for (const event of ['open', 'opening']) { + test(`close() on ${event} event`, function (t) { + t.plan(3) - const db = testCommon.factory() - const order = [] + const db = testCommon.factory() + const order = [] - db.open().then(function () { - order.push('A') - t.is(db.status, 'open', 'is open') + db.on(event, function () { + order.push(`${event} event`) + + // This eventually wins from the in-progress open() call + db.close().then(function () { + order.push('B') + t.same(order, [`${event} event`, 'A', 'closed event', 'B'], 'order is correct') + t.is(db.status, 'closed', 'is closed') + }, t.fail.bind(t)) + }) + + db.open().then(function () { + order.push('A') + t.is(db.status, 'open', 'is open') + }, t.fail.bind(t)) + + db.on('closed', () => { order.push('closed event') }) }) + } - db.on('open', function () { - order.push('open event') + for (const event of ['closed', 'closing']) { + test(`open() on ${event} event`, function (t) { + t.plan(3) - // This wins from the (still in progress) open() call - db.close().then(function (err) { - order.push('B') - t.same(order, ['open event', 'A', 'closed event', 'B'], 'order is correct') - t.ifError(err, 'no close() error') - t.is(db.status, 'closed', 'is closed') + const db = testCommon.factory() + const order = [] + + db.on(event, function () { + order.push(`${event} event`) + + // This eventually wins from the in-progress close() call + db.open().then(function () { + order.push('B') + t.same(order, [`${event} event`, 'A', 'open event', 'B'], 'order is correct') + t.is(db.status, 'open', 'is open') + }, t.fail.bind(t)) }) - }) - db.on('closed', () => { order.push('closed event') }) - }) + db.close().then(function () { + order.push('A') + t.is(db.status, 'closed', 'is closed') + }, t.fail.bind(t)) + + db.on('open', () => { order.push('open event') }) + }) + } test('passive open()', async function (t) { t.plan(1) diff --git a/test/self.js b/test/self.js index 210ddf8..0299f9e 100644 --- a/test/self.js +++ b/test/self.js @@ -861,7 +861,8 @@ test('rangeOptions', function (t) { }) }) -require('./self/abort-test') +require('./self/deferred-queue-test') +require('./self/errors-test') require('./self/defer-test') require('./self/attach-resource-test') require('./self/abstract-iterator-test') diff --git a/test/self/abort-test.js b/test/self/abort-test.js deleted file mode 100644 index a737aec..0000000 --- a/test/self/abort-test.js +++ /dev/null @@ -1,28 +0,0 @@ -'use strict' - -const test = require('tape') -const real = require('../../lib/abort') -const ponyfill = require('../../lib/abort-ponyfill') - -test('AbortController', function (t) { - verify(t, real) -}) - -test('AbortController ponyfill', function (t) { - verify(t, ponyfill) -}) - -function verify (t, src) { - const controller = new src.AbortController() - - t.ok(controller.signal instanceof src.AbortSignal) - t.is(controller.signal.aborted, false) - - t.is(controller.abort(), undefined) - t.is(controller.signal.aborted, true) - - t.is(controller.abort(), undefined) - t.is(controller.signal.aborted, true) - - t.end() -} diff --git a/test/self/abstract-iterator-test.js b/test/self/abstract-iterator-test.js index 4f2e123..86ac069 100644 --- a/test/self/abstract-iterator-test.js +++ b/test/self/abstract-iterator-test.js @@ -2,7 +2,6 @@ const test = require('tape') const { AbstractLevel, AbstractIterator, AbstractKeyIterator, AbstractValueIterator } = require('../..') -const { AbortSignal } = require('../../lib/abort') const testCommon = require('../common')({ test, @@ -51,15 +50,12 @@ for (const Ctor of [AbstractIterator, AbstractKeyIterator, AbstractValueIterator }) test(`${Ctor.name}.next() extensibility`, async function (t) { - t.plan(4) + t.plan(2) class TestIterator extends Ctor { - async _next (options) { + async _next () { t.is(this, it, 'thisArg on _next() was correct') - t.is(arguments.length, 1, 'got one argument') - const { signal, ...rest } = options - t.ok(signal instanceof AbortSignal) - t.same(rest, {}) + t.is(arguments.length, 0, 'got 0 arguments') } } @@ -71,16 +67,14 @@ for (const Ctor of [AbstractIterator, AbstractKeyIterator, AbstractValueIterator }) test(`${Ctor.name}.nextv() extensibility`, async function (t) { - t.plan(5 * 2) + t.plan(4 * 2) class TestIterator extends Ctor { async _nextv (size, options) { t.is(this, it, 'thisArg on _nextv() was correct') t.is(arguments.length, 2, 'got 2 arguments') t.is(size, 100) - const { signal, ...rest } = options - t.ok(signal instanceof AbortSignal) - t.same(rest, {}) + t.same(options, {}) return [] } } @@ -94,14 +88,12 @@ for (const Ctor of [AbstractIterator, AbstractKeyIterator, AbstractValueIterator }) test(`${Ctor.name}.nextv() extensibility (options)`, async function (t) { - t.plan(3) + t.plan(2) class TestIterator extends Ctor { async _nextv (size, options) { t.is(size, 100) - const { signal, ...rest } = options - t.ok(signal instanceof AbortSignal) - t.same(rest, { foo: 123 }, 'got userland options') + t.same(options, { foo: 123 }, 'got userland options') return [] } } @@ -115,16 +107,14 @@ for (const Ctor of [AbstractIterator, AbstractKeyIterator, AbstractValueIterator }) test(`${Ctor.name}.all() extensibility`, async function (t) { - t.plan(2 * 4) + t.plan(2 * 3) for (const args of [[], [{}]]) { class TestIterator extends Ctor { async _all (options) { t.is(this, it, 'thisArg on _all() was correct') t.is(arguments.length, 1, 'got 1 argument') - const { signal, ...rest } = options - t.ok(signal instanceof AbortSignal) - t.same(rest, {}, '') + t.same(options, {}, '') return [] } } @@ -138,13 +128,11 @@ for (const Ctor of [AbstractIterator, AbstractKeyIterator, AbstractValueIterator }) test(`${Ctor.name}.all() extensibility (options)`, async function (t) { - t.plan(2) + t.plan(1) class TestIterator extends Ctor { async _all (options) { - const { signal, ...rest } = options - t.ok(signal instanceof AbortSignal) - t.same(rest, { foo: 123 }, 'got userland options') + t.same(options, { foo: 123 }, 'got userland options') return [] } } diff --git a/test/self/deferred-iterator-test.js b/test/self/deferred-iterator-test.js index c86bd25..a578176 100644 --- a/test/self/deferred-iterator-test.js +++ b/test/self/deferred-iterator-test.js @@ -15,8 +15,8 @@ for (const mode of ['iterator', 'keys', 'values']) { const publicMethod = mode // NOTE: adapted from deferred-leveldown - test(`deferred ${mode}()`, function (t) { - t.plan(6) + test(`deferred ${mode}().next()`, async function (t) { + t.plan(5) const keyEncoding = { format: 'utf8', @@ -28,7 +28,7 @@ for (const mode of ['iterator', 'keys', 'values']) { } class MockIterator extends RealCtor { - async _next (options) { + async _next () { return nextArg } @@ -50,16 +50,8 @@ for (const mode of ['iterator', 'keys', 'values']) { const it = db[publicMethod]({ gt: 'foo' }) t.ok(it instanceof DeferredCtor, 'is deferred') - let nextFirst = false - - it.next().then(function (item) { - nextFirst = true - t.is(item, nextArg) - }) - - it.close().then(function () { - t.ok(nextFirst) - }) + t.is(await it.next(), nextArg) + return it.close() }) // NOTE: adapted from deferred-leveldown @@ -71,7 +63,7 @@ for (const mode of ['iterator', 'keys', 'values']) { t.is(target, '123') } - async _next (options) { + async _next () { return nextArg } } @@ -82,6 +74,7 @@ for (const mode of ['iterator', 'keys', 'values']) { } }) + // TODO: async/await db.open().then(function () { it.seek(123) it.next().then(function (item) { @@ -186,6 +179,7 @@ for (const mode of ['iterator', 'keys', 'values']) { return original.call(this, ...args) } + // TODO: async/await db.open().then(() => db.close()).then(function () { verifyClosed(t, it, method, function () { db.open().then(function () { @@ -271,6 +265,39 @@ for (const mode of ['iterator', 'keys', 'values']) { }) }) + globalThis.AbortController && test(`deferred ${mode}(): skips real iterator if aborted`, function (t) { + t.plan(3) + + const order = [] + const db = mockLevel({ + async _open (options) { + order.push('_open') + }, + [privateMethod] (options) { + t.fail('should not be called') + } + }) + + const ac = new globalThis.AbortController() + const it = db[publicMethod]({ signal: ac.signal }) + t.ok(it instanceof DeferredCtor) + + // Test synchronous call, which should be silently skipped on abort + it.seek('foo') + + // Test asynchronous call, which should be rejected + it.next().then(t.fail.bind(t, 'should not succeed'), function (err) { + t.is(err.code, 'LEVEL_ABORTED') + }) + + // Signal should prevent real iterator from being created. + ac.abort() + + it.close().then(function () { + t.same(order, ['_open']) + }) + }) + // TODO: async/await const verifyClosed = function (t, it, method, cb) { const requiredArgs = method === 'nextv' ? [10] : [] diff --git a/test/self/deferred-operations-test.js b/test/self/deferred-operations-test.js index 78f260d..491c65f 100644 --- a/test/self/deferred-operations-test.js +++ b/test/self/deferred-operations-test.js @@ -27,7 +27,7 @@ test('deferred operations are called in order', function (t) { _iterator (options) { calls.push({ type: 'iterator' }) return mockIterator(this, options, { - async _next (options) { + async _next () { calls.push({ type: 'iterator.next' }) } }) diff --git a/test/self/deferred-queue-test.js b/test/self/deferred-queue-test.js new file mode 100644 index 0000000..5b27646 --- /dev/null +++ b/test/self/deferred-queue-test.js @@ -0,0 +1,93 @@ +'use strict' + +const test = require('tape') +const { DeferredQueue } = require('../../lib/deferred-queue') +const supported = !!globalThis.AbortController + +test('DeferredQueue calls operations in FIFO order', async function (t) { + const queue = new DeferredQueue() + const calls = [] + + queue.add(() => { calls.push(1) }) + queue.add(() => { calls.push(2) }) + queue.add(() => { calls.push(3) }) + + queue.drain() + t.same(calls, [1, 2, 3]) +}) + +test('DeferredQueue only calls operation once', async function (t) { + const queue = new DeferredQueue() + + let calls = 0 + queue.add(() => { calls++ }) + + queue.drain() + t.same(calls, 1) + + queue.drain() + t.same(calls, 1, 'no new calls') +}) + +supported && test('DeferredQueue does not add operation if given an aborted signal', async function (t) { + const ac = new globalThis.AbortController() + const queue = new DeferredQueue() + const calls = [] + + ac.abort() + queue.add((abortError) => { calls.push(abortError) }, { signal: ac.signal }) + + t.is(calls.length, 1) + t.is(calls[0].code, 'LEVEL_ABORTED') + + queue.drain() + t.is(calls.length, 1, 'not called again') +}) + +supported && test('DeferredQueue aborts operation on signal abort', async function (t) { + const ac1 = new globalThis.AbortController() + const ac2 = new globalThis.AbortController() + const queue = new DeferredQueue() + const calls = [] + + queue.add((abortError) => { calls.push([1, abortError]) }, { signal: ac1.signal }) + queue.add((abortError) => { calls.push([2, abortError]) }, { signal: ac2.signal }) + t.is(calls.length, 0, 'not yet called') + + ac1.abort() + t.is(calls.length, 1, 'called') + t.is(calls[0][0], 1, 'signal1') + t.is(calls[0][1].code, 'LEVEL_ABORTED') + + ac2.abort() + t.is(calls.length, 2, 'called') + t.is(calls[1][0], 2, 'signal2') + t.is(calls[1][1].code, 'LEVEL_ABORTED') + + queue.drain() + ac2.abort() + t.is(calls.length, 2, 'not called again') +}) + +supported && test('DeferredQueue calls operation if signal is not aborted', async function (t) { + const ac1 = new globalThis.AbortController() + const ac2 = new globalThis.AbortController() + const queue = new DeferredQueue() + const calls = [] + + queue.add((abortError) => { calls.push([1, abortError]) }, { signal: ac1.signal }) + queue.add((abortError) => { calls.push([2, abortError]) }, { signal: ac2.signal }) + t.is(calls.length, 0, 'not yet called') + + queue.drain() + t.is(calls.length, 2, 'called') + t.is(calls[0][0], 1, 'signal1') + t.is(calls[0][1], undefined, 'no abort error') + t.is(calls[1][0], 2, 'signal2') + t.is(calls[1][1], undefined, 'no abort error') + + queue.drain() + ac1.abort() + ac2.abort() + t.is(calls.length, 2, 'not called again') +}) diff --git a/test/self/errors-test.js b/test/self/errors-test.js new file mode 100644 index 0000000..d7b685d --- /dev/null +++ b/test/self/errors-test.js @@ -0,0 +1,11 @@ +'use strict' + +const test = require('tape') +const { AbortError } = require('../../lib/errors') + +test('AbortError', function (t) { + const err = new AbortError() + t.is(err.code, 'LEVEL_ABORTED') + t.is(err.name, 'AbortError') + t.end() +}) diff --git a/test/self/iterator-test.js b/test/self/iterator-test.js index a6d4ea9..59c5750 100644 --- a/test/self/iterator-test.js +++ b/test/self/iterator-test.js @@ -4,7 +4,6 @@ const test = require('tape') const { Buffer } = require('buffer') const { AbstractLevel } = require('../..') const { AbstractIterator, AbstractKeyIterator, AbstractValueIterator } = require('../..') -const { AbortSignal } = require('../../lib/abort') const { mockLevel, mockIterator, nullishEncoding } = require('../util') const identity = (v) => v @@ -84,7 +83,7 @@ for (const deferred of [false, true]) { let yielded = 0 class MockIterator extends Ctor { - async _next (options) { + async _next () { calls++ if (mode === 'iterator' || def) { @@ -158,7 +157,7 @@ for (const deferred of [false, true]) { let nextCount = 0 class MockIterator extends Ctor { - async _next (options) { + async _next () { if (++nextCount > 10) { throw new Error('Potential infinite loop') } else if (mode === 'iterator' || def) { @@ -220,7 +219,7 @@ for (const deferred of [false, true]) { } class MockIterator extends Ctor { - async _next (options) { + async _next () { if (mode === 'iterator' || def) { return ['a', 'a'] } else { @@ -273,7 +272,7 @@ for (const deferred of [false, true]) { } class MockIterator extends Ctor { - _next (options) { + _next () { if (mode === 'iterator' || def) { return ['281', Buffer.from('a')] } else if (mode === 'keys') { @@ -305,7 +304,7 @@ for (const deferred of [false, true]) { } class MockIterator extends Ctor { - async _next (options) { + async _next () { if (mode === 'iterator' || def) { return [Buffer.from('a'), Buffer.from('b')] } else if (mode === 'keys') { @@ -342,7 +341,7 @@ for (const deferred of [false, true]) { } class MockIterator extends Ctor { - async _next (options) { + async _next () { if (mode === 'iterator' || def) { return ['a', 'b'] } else if (mode === 'keys') { @@ -557,7 +556,7 @@ for (const deferred of [false, true]) { let pos = 0 class MockIterator extends Ctor { - async _next (options) { + async _next () { if (mode === 'iterator' || def) { return ['k' + pos, 'v' + (pos++)] } else if (mode === 'keys') { @@ -600,7 +599,7 @@ for (const deferred of [false, true]) { } class MockIterator extends Ctor { - async _next (options) { + async _next () { t.pass('called') throw new Error('test') } @@ -617,7 +616,7 @@ for (const deferred of [false, true]) { }) test(`${mode}() has default all() (deferred: ${deferred}, default implementation: ${def})`, async function (t) { - t.plan(11) + t.plan(8) class MockLevel extends AbstractLevel { [privateMethod] (options) { @@ -630,10 +629,7 @@ for (const deferred of [false, true]) { class MockIterator extends Ctor { async _nextv (size, options) { t.is(size, 1000) - - const { signal, ...rest } = options - t.ok(signal instanceof AbortSignal) - t.same(rest, {}) + t.same(options, {}) if (pos === 4) { return [] @@ -690,7 +686,7 @@ for (const deferred of [false, true]) { }) test(`${mode}() custom all() (deferred: ${deferred}, default implementation: ${def})`, async function (t) { - t.plan(4) + t.plan(3) class MockLevel extends AbstractLevel { [privateMethod] (options) { @@ -700,9 +696,7 @@ for (const deferred of [false, true]) { class MockIterator extends Ctor { async _all (options) { - const { signal, ...rest } = options - t.ok(signal instanceof AbortSignal) - t.same(rest, {}) + t.same(options, {}) if (mode === 'iterator' || def) { return [['k0', 'v0'], ['k1', 'v1']] @@ -777,7 +771,7 @@ for (const deferred of [false, true]) { t.is(options.keys, false) return mockIterator(this, options, { - async _next (options) { + async _next () { return ['', 'value'] } }) @@ -808,7 +802,7 @@ for (const deferred of [false, true]) { t.is(options.values, false) return mockIterator(this, options, { - async _next (options) { + async _next () { return ['key', ''] } }) diff --git a/test/self/sublevel-test.js b/test/self/sublevel-test.js index 087b8e2..738a838 100644 --- a/test/self/sublevel-test.js +++ b/test/self/sublevel-test.js @@ -452,7 +452,7 @@ test('opening & closing sublevel', function (t) { } class MockIterator extends Ctor { - async _next (options) { + async _next () { throw new Error('next() error from parent database') } } @@ -752,7 +752,7 @@ test('sublevel encodings', function (t) { const prefixedKey = Buffer.concat([Buffer.from('!test!'), testKey]) class MockIterator extends Ctor { - async _next (options) { + async _next () { if (mode === 'iterator' || def) { return [prefixedKey, 'bar'] } else if (mode === 'keys') { @@ -794,7 +794,7 @@ test('sublevel encodings', function (t) { prefixedKey.set(testKey, prefix.byteLength) class MockIterator extends Ctor { - async _next (options) { + async _next () { if (mode === 'iterator' || def) { return [prefixedKey, 'bar'] } else if (mode === 'keys') { diff --git a/test/util.js b/test/util.js index 2a8014a..2bf6810 100644 --- a/test/util.js +++ b/test/util.js @@ -144,7 +144,7 @@ class MinimalValueIterator extends AbstractValueIterator { for (const Ctor of [MinimalIterator, MinimalKeyIterator, MinimalValueIterator]) { const mapEntry = Ctor === MinimalIterator ? e => e : Ctor === MinimalKeyIterator ? e => e[0] : e => e[1] - Ctor.prototype._next = async function (options) { + Ctor.prototype._next = async function () { const entry = this[kEntries][this[kPosition]++] if (entry === undefined) return undefined return mapEntry(entry) diff --git a/types/abstract-iterator.d.ts b/types/abstract-iterator.d.ts index 15a099d..109406d 100644 --- a/types/abstract-iterator.d.ts +++ b/types/abstract-iterator.d.ts @@ -1,7 +1,16 @@ import * as Transcoder from 'level-transcoder' import { RangeOptions } from './interfaces' -export interface AbstractIteratorOptions extends RangeOptions { +declare interface CommonIteratorOptions { + /** + * An [`AbortSignal`][1] to abort read operations on the iterator. + * + * [1]: https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal + */ + signal?: AbortSignal | undefined +} + +export interface AbstractIteratorOptions extends RangeOptions, CommonIteratorOptions { /** * Whether to return the key of each entry. Defaults to `true`. If set to `false`, * the iterator will yield keys that are `undefined`. @@ -26,7 +35,7 @@ export interface AbstractIteratorOptions extends RangeOptions { valueEncoding?: string | Transcoder.PartialDecoder | undefined } -export interface AbstractKeyIteratorOptions extends RangeOptions { +export interface AbstractKeyIteratorOptions extends RangeOptions, CommonIteratorOptions { /** * Custom key encoding for this iterator, used to encode range options, to encode * {@link AbstractKeyIterator.seek} targets and to decode keys. @@ -34,7 +43,7 @@ export interface AbstractKeyIteratorOptions extends RangeOptions { keyEncoding?: string | Transcoder.PartialEncoding | undefined } -export interface AbstractValueIteratorOptions extends RangeOptions { +export interface AbstractValueIteratorOptions extends RangeOptions, CommonIteratorOptions { /** * Custom key encoding for this iterator, used to encode range options and * {@link AbstractValueIterator.seek} targets. diff --git a/types/abstract-level.d.ts b/types/abstract-level.d.ts index cd25d52..6b9932b 100644 --- a/types/abstract-level.d.ts +++ b/types/abstract-level.d.ts @@ -237,15 +237,22 @@ declare class AbstractLevel /** * Call the function {@link fn} at a later time when {@link status} changes to - * `'open'` or `'closed'`. + * `'open'` or `'closed'`. Known as a _deferred operation_. + * + * @param fn Synchronous function to (eventually) call. + * @param options Options for the deferred operation. */ - defer (fn: Function): void + defer (fn: Function, options?: AbstractDeferOptions | undefined): void /** * Call the function {@link fn} at a later time when {@link status} changes to - * `'open'` or `'closed'`. + * `'open'` or `'closed'`. Known as a _deferred operation_. + * + * @param fn Asynchronous function to (eventually) call. + * @param options Options for the deferred operation. + * @returns A promise for the result of {@link fn}. */ - deferAsync (fn: () => Promise): Promise + deferAsync (fn: () => Promise, options?: AbstractDeferOptions | undefined): Promise } export { AbstractLevel } @@ -515,3 +522,15 @@ export interface AbstractHook { */ delete: (fn: TFn) => void } + +/** + * Options for {@link AbstractLevel.defer()} and {@link AbstractLevel.deferAsync()}. + */ +export interface AbstractDeferOptions { + /** + * An [`AbortSignal`][1] to abort the deferred operation. + * + * [1]: https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal + */ + signal?: AbortSignal | undefined +}