Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

exception thrown within the mapper does not catch by stream on error mechanism #38

Open
raz-viber opened this issue Aug 17, 2020 · 1 comment

Comments

@raz-viber
Copy link
Contributor

example:

pipeline(
            this.s3StreamReader.openStream(this.bucket, key),
            splitLine(line => {
                try {
                   return JSON.parse(line.toString());
                } catch (error) {
                    throw new TypeError(`line: "${line}" is not in json format`);
                }
            }),
            (error?) => {
                if (!error) {
                    this.logger.debug(`done streaming file: '${this.bucket}/${key}'`);
                }
                else {
                    this.logger.error(`error streaming file: '${this.bucket}/${key}', ${error}`);
                }
            }
        ).on("error" (err) => {
this.logger("WILL NEVER BE CALLED")
});

the on error will never be called.

fix suggestion to the transform function:

function transform (chunk, enc, cb) {
  var list
  if (this.overflow) { // Line buffer is full. Skip to start of next line.
    var buf = this[kDecoder].write(chunk)
    list = buf.split(this.matcher)

    if (list.length === 1) return cb() // Line ending not found. Discard entire chunk.

    // Line ending found. Discard trailing fragment of previous line and reset overflow state.
    list.shift()
    this.overflow = false
  } else {
    this[kLast] += this[kDecoder].write(chunk)
    list = this[kLast].split(this.matcher)
  }

  this[kLast] = list.pop()

  for (var i = 0; i < list.length; i++) {
  try{
    push(this, this.mapper(list[i]));
  } catch(error)
    cb(error)
  }

  this.overflow = this[kLast].length > this.maxLength
  if (this.overflow && !this.skipOverflow) return cb(new Error('maximum buffer reached'))

  cb()
}

basically wrapping the call with try/catch, on catch cb(error)

@mcollina
Copy link
Owner

Would you like to send a Pull Request to address this issue? Remember to add unit tests.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants