Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add experimental support of AbortSignal #55

Merged
merged 3 commits into from
Nov 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 66 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
- [`encoding = db.keyEncoding([encoding])`](#encoding--dbkeyencodingencoding)
- [`encoding = db.valueEncoding([encoding])`](#encoding--dbvalueencodingencoding)
- [`key = db.prefixKey(key, keyFormat[, local])`](#key--dbprefixkeykey-keyformat-local)
- [`db.defer(fn)`](#dbdeferfn)
- [`db.deferAsync(fn)`](#dbdeferasyncfn)
- [`db.defer(fn[, options])`](#dbdeferfn-options)
- [`db.deferAsync(fn[, options])`](#dbdeferasyncfn-options)
- [`chainedBatch`](#chainedbatch)
- [`chainedBatch.put(key, value[, options])`](#chainedbatchputkey-value-options)
- [`chainedBatch.del(key[, options])`](#chainedbatchdelkey-options)
Expand All @@ -59,6 +59,7 @@
- [`iterator.db`](#iteratordb)
- [`iterator.count`](#iteratorcount)
- [`iterator.limit`](#iteratorlimit)
- [Aborting Iterators](#aborting-iterators)
- [`keyIterator`](#keyiterator)
- [`valueIterator`](#valueiterator)
- [`sublevel`](#sublevel)
Expand Down Expand Up @@ -103,6 +104,7 @@
- [`LEVEL_ITERATOR_NOT_OPEN`](#level_iterator_not_open)
- [`LEVEL_ITERATOR_BUSY`](#level_iterator_busy)
- [`LEVEL_BATCH_NOT_OPEN`](#level_batch_not_open)
- [`LEVEL_ABORTED`](#level_aborted)
- [`LEVEL_ENCODING_NOT_FOUND`](#level_encoding_not_found)
- [`LEVEL_ENCODING_NOT_SUPPORTED`](#level_encoding_not_supported)
- [`LEVEL_DECODE_ERROR`](#level_decode_error)
Expand Down Expand Up @@ -375,6 +377,7 @@ The `gte` and `lte` range options take precedence over `gt` and `lt` respectivel
- `values` (boolean, default: `true`): whether to return the value of each entry. If set to `false`, the iterator will yield values that are `undefined`. Prefer to use `db.values()` instead.
- `keyEncoding`: custom key encoding for this iterator, used to encode range options, to encode `seek()` targets and to decode keys.
- `valueEncoding`: custom value encoding for this iterator, used to decode values.
- `signal`: an [`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) to [abort read operations on the iterator](#aborting-iterators).

Lastly, an implementation is free to add its own options.

Expand Down Expand Up @@ -523,9 +526,9 @@ console.log(nested.prefixKey('a', 'utf8')) // '!example!!nested!a'
console.log(nested.prefixKey('a', 'utf8', true)) // '!nested!a'
```

### `db.defer(fn)`
### `db.defer(fn[, options])`

Call the function `fn` at a later time when [`db.status`](#dbstatus) changes to `'open'` or `'closed'`. Used by `abstract-level` itself to implement "deferred open" which is a feature that makes it possible to call methods like `db.put()` before the database has finished opening. The `defer()` method is exposed for implementations and plugins to achieve the same on their custom methods:
Call the function `fn` at a later time when [`db.status`](#dbstatus) changes to `'open'` or `'closed'`. Known as a _deferred operation_. Used by `abstract-level` itself to implement "deferred open" which is a feature that makes it possible to call methods like `db.put()` before the database has finished opening. The `defer()` method is exposed for implementations and plugins to achieve the same on their custom methods:

```js
db.foo = function (key) {
Expand All @@ -537,9 +540,13 @@ db.foo = function (key) {
}
```

The optional `options` object may contain:

- `signal`: an [`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) to abort the deferred operation. When aborted (now or later) the `fn` function will not be called.

When deferring a custom operation, do it early: after normalizing optional arguments but before encoding (to avoid double encoding and to emit original input if the operation has events) and before any _fast paths_ (to avoid calling back before the database has finished opening). For example, `db.batch([])` has an internal fast path where it skips work if the array of operations is empty. Resources that can be closed on their own (like iterators) should however first check such state before deferring, in order to reject operations after close (including when the database was reopened).

### `db.deferAsync(fn)`
### `db.deferAsync(fn[, options])`

Similar to `db.defer(fn)` but for asynchronous work. Returns a promise, which waits for [`db.status`](#dbstatus) to change to `'open'` or `'closed'` and then calls `fn` which itself must return a promise. This allows for recursion:

Expand All @@ -553,6 +560,10 @@ db.foo = async function (key) {
}
```

The optional `options` object may contain:

- `signal`: an [`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) to abort the deferred operation. When aborted (now or later) the `fn` function will not be called, and the promise returned by `deferAsync()` will be rejected with a [`LEVEL_ABORTED`](#errors) error.

### `chainedBatch`

#### `chainedBatch.put(key, value[, options])`
Expand Down Expand Up @@ -715,6 +726,44 @@ const hasMore = iterator.count < iterator.limit
const remaining = iterator.limit - iterator.count
```

#### Aborting Iterators

Iterators take an experimental `signal` option that, once signaled, aborts an in-progress read operation (if any) and rejects subsequent reads. The relevant promise will be rejected with a [`LEVEL_ABORTED`](#errors) error. Aborting does not close the iterator, because closing is asynchronous and may result in an error that needs a place to go. This means signals should be used together with a pattern that automatically closes the iterator:

```js
const abortController = new AbortController()
const signal = abortController.signal

// Will result in 'aborted' log
abortController.abort()

try {
for await (const entry of db.iterator({ signal })) {
console.log(entry)
}
} catch (err) {
if (err.code === 'LEVEL_ABORTED') {
console.log('aborted')
}
}
```

Otherwise, close the iterator explicitly:

```js
const iterator = db.iterator({ signal })

try {
const entries = await iterator.nextv(10)
} catch (err) {
if (err.code === 'LEVEL_ABORTED') {
console.log('aborted')
}
} finally {
await iterator.close()
}
```

### `keyIterator`

A key iterator has the same interface as `iterator` except that its methods yield keys instead of entries. Usage is otherwise the same.
Expand Down Expand Up @@ -1155,6 +1204,10 @@ When `iterator.next()` or `seek()` was called while a previous `next()` call was

When an operation was made on a chained batch while it was closing or closed, which may also be the result of the database being closed or that `write()` was called on the chained batch.

#### `LEVEL_ABORTED`

When an operation was aborted by the user.

#### `LEVEL_ENCODING_NOT_FOUND`

When a `keyEncoding` or `valueEncoding` option specified a named encoding that does not exist.
Expand Down Expand Up @@ -1558,6 +1611,14 @@ class ExampleSublevel extends AbstractSublevel {

The first argument to this constructor must be an instance of the relevant `AbstractLevel` implementation. The constructor will set `iterator.db` which is used (among other things) to access encodings and ensures that `db` will not be garbage collected in case there are no other references to it. The `options` argument must be the original `options` object that was passed to `db._iterator()` and it is therefore not (publicly) possible to create an iterator via constructors alone.

The `signal` option, if any and once signaled, should abort an in-progress `_next()`, `_nextv()` or `_all()` call and reject the promise returned by that call with a [`LEVEL_ABORTED`](#errors) error. Doing so is optional until a future semver-major release. Responsibilities are divided as follows:

1. Before a database has finished opening, `abstract-level` handles the signal
2. While a call is in progress, the implementation handles the signal
3. Once the signal is aborted, `abstract-level` rejects further calls.

A method like `_next()` therefore doesn't have to check the signal _before_ it start its asynchronous work, only _during_ that work. Whether to respect the signal and on which (potentially long-running) methods, is up to the implementation.

#### `iterator._next()`

Advance to the next entry and yield that entry. Must return a promise. If an error occurs, reject the promise. If the natural end of the iterator has been reached, resolve the promise with `undefined`. Otherwise resolve the promise with an array containing a `key` and `value`. If a `limit` was set and the iterator already yielded that many entries (via any of the methods) then `_next()` will not be called.
Expand Down
6 changes: 2 additions & 4 deletions UPGRADING.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ db.batch([{ type: 'del', key: 'blue', sublevel: colorIndex }])

#### 2.1. Promises all the way

All private methods that previously took a callback now use a promise. For example, the function signature `_get(key, options, callback)` has changed to `async _get(key, options)`. Same as in the public API, the new function signatures are predictable and the only method that requires special attention is `iterator._next()`. Which in addition now also takes an `options` argument. For details, please see the updated [README](./README.md#private-api-for-implementors).
All private methods that previously took a callback now use a promise. For example, the function signature `_get(key, options, callback)` has changed to `async _get(key, options)`. Same as in the public API, the new function signatures are predictable and the only method that requires special attention is `iterator._next()`. For details, please see the updated [README](./README.md#private-api-for-implementors).

#### 2.2. Ticks

Expand Down Expand Up @@ -210,9 +210,7 @@ class ExampleLevel extends AbstractLevel {

#### 2.3. A new way to abort iterator work

_This section is incomplete._

Closing an iterator now aborts work, if supported by implementation. The undocumented `abortOnClose` option of iterators (added as a workaround for `many-level`) has been removed in favor of AbortSignal.
Iterators now take an experimental `signal` option that is an [`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal). You can use the `signal` to abort an in-progress `_next()`, `_nextv()` or `_all()` call. Doing so is optional until a future semver-major release.

#### 2.4. Snapshots must be synchronous

Expand Down
39 changes: 23 additions & 16 deletions abstract-chained-batch.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ const kStatus = Symbol('status')
const kPublicOperations = Symbol('publicOperations')
const kLegacyOperations = Symbol('legacyOperations')
const kPrivateOperations = Symbol('privateOperations')
const kCallClose = Symbol('callClose')
const kClosePromise = Symbol('closePromise')
const kLength = Symbol('length')
const kPrewriteRun = Symbol('prewriteRun')
Expand Down Expand Up @@ -276,13 +275,8 @@ class AbstractChainedBatch {
} else {
this[kStatus] = 'writing'

// Prepare promise in case write() is called in the mean time
let close
this[kClosePromise] = new Promise((resolve, reject) => {
close = () => {
this[kCallClose]().then(resolve, reject)
}
})
// Prepare promise in case close() is called in the mean time
const close = prepareClose(this)

try {
// Process operations added by prewrite hook functions
Expand Down Expand Up @@ -347,20 +341,33 @@ class AbstractChainedBatch {
// First caller of close() or write() is responsible for error
return this[kClosePromise].catch(noop)
} else {
this[kClosePromise] = this[kCallClose]()
// Wrap promise to avoid race issues on recursive calls
prepareClose(this)()
return this[kClosePromise]
}
}

async [kCallClose] () {
this[kStatus] = 'closing'
await this._close()
this.db.detachResource(this)
}

async _close () {}
}

const prepareClose = function (batch) {
let close

batch[kClosePromise] = new Promise((resolve, reject) => {
close = () => {
privateClose(batch).then(resolve, reject)
}
})

return close
}

const privateClose = async function (batch) {
batch[kStatus] = 'closing'
await batch._close()
batch.db.detachResource(batch)
}

class PrewriteData {
constructor (privateOperations, publicOperations) {
this[kPrivateOperations] = privateOperations
Expand All @@ -384,7 +391,7 @@ class PrewriteData {
}
}

function assertStatus (batch) {
const assertStatus = function (batch) {
if (batch[kStatus] !== 'open') {
throw new ModuleError('Batch is not open: cannot change operations after write() or close()', {
code: 'LEVEL_BATCH_NOT_OPEN'
Expand Down
Loading