Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Live streaming values from a level #222

Closed
ralphtheninja opened this issue Sep 27, 2022 · 6 comments
Closed

Live streaming values from a level #222

ralphtheninja opened this issue Sep 27, 2022 · 6 comments

Comments

@ralphtheninja
Copy link
Member

@vweevers If I wanted to have some sort of live streaming as keys are inserted into the db. How would go about doing that with the new level implementation? (i.e. no createReadStream())

@vweevers
Copy link
Member

For a quick solution, you could fork level-live and replace its db.createReadStream(opts).on('data', ..) line with const { EntryStream } = require('level-read-stream') and new EntryStream(db, opts).on('data', ..). Alternatively, replace that part with an iterator, or remove it altogether if you're only interested in the live part (which should still work the same because level-live uses db events for that).

@vweevers
Copy link
Member

It might also be interesting to consider an approach that solely uses async iterators. Along the lines of:

const { on } = require('events')

async function* live (db) {
  // Current entries
  for await (const [key, value] of db.iterator()) {
    yield [key, value]
  }

  // Live entries (this is missing logic for batch and del)
  for await (const [key, value] of on(db, 'put')) {
    yield [key, value]
  }
}

@ralphtheninja
Copy link
Member Author

It might also be interesting to consider an approach that solely uses async iterators.

This looks really clean. I take it async function* is the pattern for returning back an async iterator. Yeah, sorry, I have hardly used them before.

@vweevers
Copy link
Member

Yeah, that function is an async generator. PS. My example looks too good to be true; it needs some form of buffering to capture events emitted while it's busy iterating db.iterator().

@vweevers
Copy link
Member

vweevers commented Oct 23, 2022

A couple of things I want to do that will benefit live streams/iterators:

  • Remove 'put' and 'del' events in favor of 'batch' event (just one event to listen to). This will also prepare us for hooks, especially a prewrite hook that would re-route db.put() to use a batch under the hood (and thus no longer emit a 'put' event).
  • In the 'batch' event, use "normalized" operation objects, meaning they should include encodings and other (default) options. And encodings should be objects here, rather than their string names.
  • The operation objects should include both the original input data (as today) and encoded data. Aka public data (what the public API received, e.g. db.batch()) and private data (what the private API received, e.g. db._batch()).

We can then support aligning the encoding of live data with the encoding of the iterator. E.g. if you do live(db, { valueEncoding: 'json' }) but then a db.put(key, value, { valueEncoding: 'buffer' }), the resulting 'batch' event includes the necessary information to transcode from buffer to json. Roughly like so:

const valueEncoding = db.valueEncoding('json')
const iterator = db.iterator({ valueEncoding })

db.on('batch', (operations) => {
  for (const op of operations) {
    if (op.valueEncoding.commonName === valueEncoding.commonName) {
      // Public data matches desired encoding
      const value = op.value
    } else if (op.valueEncoding.format === valueEncoding.commonName) {
      // Private data matches desired encoding
      const value = op.encodedValue
    } else {
      // Decode private data (one of view, buffer, utf8) to match desired encoding
      const transcoder = valueEncoding.createTranscoder(op.valueEncoding.format)
      const value = transcoder.decode(op.encodedValue)
    }
  }
})

That last step there (createTranscoder()) will just need a small utility method in level-transcoder:

Click to expand
function createTranscoder (format) {
  if (format === 'view') {
    return this.createViewTranscoder()
  } else if (format === 'buffer') {
    return this.createBufferTranscoder()
  } else if (format === 'utf8') {
    return this.createUTF8Transcoder()
  } else {
    throw new Error('nope')
  }
}

@vweevers
Copy link
Member

Having that private data also means ltgt logic (like level-live has) can work regardless of the user's choice of encoding, because it can compare buffers and strings rather than public data with arbitrary types.

vweevers added a commit to Level/abstract-level that referenced this issue Oct 30, 2022
Adds postopen, prewrite and newsub hooks that allow userland "hook
functions" to customize behavior of the database. See README for
details. A quick example:

```js
db.hooks.prewrite.add(function (op, batch) {
  if (op.type === 'put') {
    batch.add({
      type: 'put',
      key: op.value.foo,
      value: op.key,
      sublevel: fooIndex
    })
  }
})
```

More generally, this is a move towards "renewed modularity". Our
ecosystem is old and many modules no longer work because they had
no choice but to monkeypatch database methods, of which the
signature has changed since then.

So in addition to hooks, this:

- Introduces a new `write` event that is emitted on `db.batch()`,
  `db.put()` and `db.del()` and has richer data: userland options,
  encoded data, keyEncoding and valueEncoding. The `batch`, `put`
  and `del` events are now deprecated and will be removed in a
  future version. Related to Level/level#222.
- Restores support of userland options on batch operations. In
  particular, to copy options in `db.batch(ops, options)` to ops,
  allowing for code like `db.batch(ops, { ttl: 123 })` to apply a
  default userland `ttl` option to all ops.

No breaking changes, yet. Using hooks means opting-in to new
behaviors (like the new write event) and disables some old behaviors
(like the deprecated events). Later on we can make those the default
behavior, regardless of whether hooks are used.

TODO: benchmarks, tests and optionally some light refactoring.

Closes Level/community#44.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants