Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

for await & Readable #29428

Closed
pgherveou opened this issue Sep 3, 2019 · 12 comments
Closed

for await & Readable #29428

pgherveou opened this issue Sep 3, 2019 · 12 comments
Labels
stream Issues and PRs related to the stream subsystem.

Comments

@pgherveou
Copy link

Hi, I am trying to consume a readable with a for await loop.
When I create the simple stream below, the console output the error straight away instead of logging the data events first. Is it the expected behavior

Version: v12.9.1
Platform: Darwin Kernel Version 18.7.0

const { Readable } = require('stream')

async function* generate() {
  yield 1
  yield 2
  yield Promise.reject('Boum')
}

;(async () => {
  try {
    for await (const d of Readable.from(generate())) {
      console.log(d)
    }
  } catch (e) {
    console.log(e)
  }
})()

the output is

Boum

instead of the expected

1
2
Boum
@ronag
Copy link
Member

ronag commented Sep 4, 2019

I think this is probably correct (although probably surprising) since readable is supposed to be greedy.

In order to get the behaviour you want you need to set highWaterMark to 1, e.g.

for await (const d of Readable.from(generate(), { highWaterMark: 1 })) {
  console.log(d)
}

@ronag
Copy link
Member

ronag commented Sep 4, 2019

@mcollina: Thoughts? In the given example I believe the behaviour is correct since the generator is async.

However, using a sync generator has the same behavior and I'm not sure that is correct?

@Fishrock123 Fishrock123 added the stream Issues and PRs related to the stream subsystem. label Sep 5, 2019
@mcollina
Copy link
Member

mcollina commented Sep 6, 2019

We might need to think about it a bit more. Look at the following:

async function* generate() {
  yield 1
  yield 2
  throw new Error('Boum')
}

;(async () => {
  try {
    for await (const d of generate()) {
      console.log(d)
    }
  } catch (e) {
    console.log(e)
  }
})()

The result is:

1
2
Boum

I think this might be something we can fix. Specifically, the following is making the iterator exit eagerly:

// If we have detected an error in the meanwhile
// reject straight away.
const error = this[kError];
if (error !== null) {
return Promise.reject(error);
}
if (this[kEnded]) {
return Promise.resolve(createIterResult(undefined, true));
}
if (this[kStream].destroyed) {
// We need to defer via nextTick because if .destroy(err) is
// called, the error will be emitted via nextTick, and
// we cannot guarantee that there is no error lingering around
// waiting to be emitted.
return new Promise((resolve, reject) => {
process.nextTick(() => {
if (this[kError]) {
reject(this[kError]);
} else {
resolve(createIterResult(undefined, true));
}
});
});
}

I think we might want to put the promise rejection in the kLastPromise property queue instead:

iterator[kLastPromise] = null;
.

@benjamingr what do you think?

@mcollina
Copy link
Member

mcollina commented Sep 6, 2019

On second thought, I might be wrong, and this might be due for something else, or something that we cannot really fix.

Streams are greedy into emitting'error'. They emit an error as soon as it happens. In

node/lib/_stream_readable.js

Lines 1223 to 1236 in 63b056d

async function next() {
try {
const { value, done } = await iterator.next();
if (done) {
readable.push(null);
} else if (readable.push(await value)) {
next();
} else {
reading = false;
}
} catch (err) {
readable.destroy(err);
}
}
, we see that we call destroy eagerly and this is what triggers it.

As a short term measure, we should definitely document it.

@benjamingr
Copy link
Member

@benjamingr what do you think?

That streams do watermarking and that they are greedy :]

To avoid the stream behaviour one can simple not convert their AsyncIterator to a stream.

Now I do believe we can "fix" this (on the async iterator side) by only emitting the error and destroying after we are done emitting data on the iterator but honestly I am not sure that would be better from a stream consumer PoV. In fact it would likely be worse.

So +1 on a docs change, -0.5 on changing the Symbol.asyncIterator.

@addaleax
Copy link
Member

addaleax commented Sep 6, 2019

Now I do believe we can "fix" this (on the async iterator side) by only emitting the error and destroying after we are done emitting data on the iterator

I think I’d be in favour of that, fwiw.

@benjamingr
Copy link
Member

benjamingr commented Sep 6, 2019

I think I’d be in favour of that, fwiw.

That was my first intuition too but I am having a hard time coming up with a real use-case in which this (buffering) behaviour is better. Namely, in all cases I considered I would rather recover from the error sooner rather than later. The cases I came up with were:

  • Persistent database change stream (I would rather re-connect and sync which I have to do anyway to listen to new changes).
  • Query results iterated with a cursor (if the query failed on some results, I would rather retry the query sooner rather than later).
  • An http body stream (I would need to recover and marginally find the incomplete data useful).

Edit: the theme of these is having to reconnect to establish synchronisation with the producer later on anyway.

On the other hand if I look at async iterators without streams, such use cases are easy to come up with and are abundant in reactive systems (like the whole server flow being accepting requests from clients in a for...await )

The biggest issue I see here is that there is missing data that we are dropping that the user might be interested in. I am also not sure how else we can expose it. Basically the constraints are:

  • As a consumer if there is an error downstream I want to know about it as soon as possible.
  • As a consumer I never want to miss any data on the stream.

@addaleax
Copy link
Member

addaleax commented Sep 6, 2019

The biggest issue I see here is that there is missing data that we are dropping that the user might be interested in. I am also not sure how else we can expose it. Basically the constraints are:

  • As a consumer if there is an error downstream I want to know about it as soon as possible.
  • As a consumer I never want to miss any data on the stream.

Yeah, that’s what I’m thinking too. I’d also consider it more expected for (async) iterators to throw the exception only after providing data that was produced before the error, because in the typical (async) generator function approach, that’s when that exception would be have been created.

@mcollina
Copy link
Member

mcollina commented Sep 8, 2019

Note that we can consider this to be a problem for our Readable.from() implementation rather than our async iterator support. Essentially, defer calling destroy() until all chunks have been emitted. This can be easily achieved by setting highWaterMark: 1, and we may just set that as the default.

@pgherveou
Copy link
Author

pgherveou commented Sep 10, 2019

Note that running this other snippet produces a different output

function makeStreamV2() {
  return new Readable({
    objectMode: true,

    read() {
      if (this.reading) {
        return
      }

      this.reading = true
      this.push(1)
      this.push(2)
      this.emit('error', 'boum')
    }
  })
}

;(async () => {
  try {
    for await (const d of makeStreamV2()) {
      console.log(d)
    }
  } catch (e) {
    console.log(e)
  }
})()
1
boum

@ronag
Copy link
Member

ronag commented May 1, 2020

@mcollina did we resolve this or is this something someone needs to further look into?

@mcollina
Copy link
Member

mcollina commented May 1, 2020

This needs some decision to be made, and possibly some docs to be added, or the behavior changed.

ronag added a commit to nxtedition/node that referenced this issue May 1, 2020
Currently from will eagerly buffer up items
which means that errors are also eagerly
encountered and items which are buffer when
an error occurs will be discarded, which is
inconsistent with how generators work.

Fixes: nodejs#29428
@ronag ronag closed this as completed in 8607f9e May 6, 2020
codebytere pushed a commit that referenced this issue May 7, 2020
Currently from will eagerly buffer up items
which means that errors are also eagerly
encountered and items which are buffer when
an error occurs will be discarded, which is
inconsistent with how generators work.

Fixes: #29428

PR-URL: #33201
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: James M Snell <[email protected]>
Reviewed-By: Benjamin Gruenbaum <[email protected]>
Reviewed-By: Ruben Bridgewater <[email protected]>
codebytere pushed a commit that referenced this issue Jun 7, 2020
Currently from will eagerly buffer up items
which means that errors are also eagerly
encountered and items which are buffer when
an error occurs will be discarded, which is
inconsistent with how generators work.

Fixes: #29428

PR-URL: #33201
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: James M Snell <[email protected]>
Reviewed-By: Benjamin Gruenbaum <[email protected]>
Reviewed-By: Ruben Bridgewater <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
stream Issues and PRs related to the stream subsystem.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants