Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streams memory leak #8955

Closed
devmetal opened this issue Oct 6, 2016 · 7 comments
Closed

Streams memory leak #8955

devmetal opened this issue Oct 6, 2016 · 7 comments
Labels
memory Issues and PRs related to the memory management or memory footprint. question Issues that look for answers. stream Issues and PRs related to the stream subsystem.

Comments

@devmetal
Copy link

devmetal commented Oct 6, 2016

Hi, i have a problems with stream. I try "streamify" some database selects. I created a pager. The pager job is simple, get some records then push in object mode to a stream. I will show the relevant parts of the debugger code, and the results. I hope this is not an issue just i miss something about streams.

My workflow is simple:
[ {/result objects from database/}, {}, {} ] -> (Push objects to PassThrough) {}, {} -> (Create array rows) [ 'values', 'from', 'objects' ] -> (CSV Writer) -> File or http response

//This will create arrays from specified keys from object
const recordStream = (fields) => new Transform({
  readableObjectMode: true,
  writableObjectMode: true,

  transform(chunk, enc, done) {
    if (!chunk) {
      return this.end();
    }

    const record = [];
    for (const field of fields) {
      record.push(chunk[field]);
    }

    this.push(record);
    done();
  }
});

// this is a csv writer instance from https://www.npmjs.com/package/csv-write-stream
const csvStream = (headers) => csvWrite({ headers });


// For better maintanable code, i use generators with promises

/**
 * Based on this
 * https://www.promisejs.org/generators/
 */
const gen = function generatorAsync(iterator) {
  return function generatorHandler() {
    const internal = iterator.apply(this, arguments);

    function handle(result) {
      if (result.done) return Promise.resolve(result.value);

      return Promise.resolve(result.value).then(
        res => handle(internal.next(res)),
        err => handle(internal.throw(err))
      );
    };

    try {
      return handle(internal.next());
    } catch (e) {
      return Promise.reject(e);
    }
  }
}

// The pager get stream instance and push evry record to
const pager = gen(function* (stream) {
  let skip = 0;
  const query = 'SELECT FROM E SKIP :skip LIMIT :limit';
  let results = yield db.query(query, { params: { skip, limit: 5000 } });

  while (results && !!results.length) {
    for (const row of results)  {
      stream.push(row);
    }

    skip += results.length;
    results = yield db.query(query, { params: { skip, limit: 5000 } });
  }

  return stream.end();
});


const records = recordStream(fields);
const csv = csvStream(fields);
const through = new PassThrough({ objectMode: true });

through
  .pipe(records)
  .pipe(csv)
  .pipe(fs.createWriteStream('./out.csv'));

through.on('end', () => console.log('end'));

pager(through);

// Debug
setInterval(function () {
  var mem = process.memoryUsage();
  var fmt = v => (v / (1024 * 1024)).toFixed(0) + 'MB';
  console.log('RSS = ' + fmt(mem.rss), 'Heap = ' + fmt(mem.heapUsed));
}, 1000);

There is a lot of record in database 30.000+. But its run really fast. The problem is the leak. I runned the server via pm2 and i see in monitor the memory not released.

When i run this code the output always similar this:

RSS = 80MB Heap = 29MB
RSS = 92MB Heap = 50MB
RSS = 102MB Heap = 55MB
RSS = 108MB Heap = 60MB
RSS = 101MB Heap = 28MB
RSS = 101MB Heap = 41MB
end
RSS = 101MB Heap = 41MB
RSS = 101MB Heap = 41MB
RSS = 101MB Heap = 41MB
RSS = 101MB Heap = 41MB
RSS = 101MB Heap = 41MB
RSS = 101MB Heap = 41MB
RSS = 101MB Heap = 41MB
RSS = 101MB Heap = 41MB
...

If you want to, i can put the whole code here. Just some requires and database connection.
I have to use orientdb with orientjs.

Thank you for help

@mscdex mscdex added stream Issues and PRs related to the stream subsystem. memory Issues and PRs related to the memory management or memory footprint. question Issues that look for answers. labels Oct 6, 2016
@bnoordhuis
Copy link
Member

Do you have a test case that doesn't depend on third-party modules or external resources like a database? In other words, a standalone test case that only uses built-in modules?

@devmetal
Copy link
Author

devmetal commented Oct 6, 2016

@bnoordhuis

Hi, i created an example with only nodejs modules. Same output.

First i created a quick script (from documentaion) to generate json records to a file, for simulate some input.

function writeFile(writer, encoding, callback) {
  let i = 200000;
  write();
  function write() {
    var ok = true;
    do {
      i--;
      if (i === 0) {
        // last time!
        writer.write(`{"id":"${i}", "data":"randomcontent"}\n`, encoding, callback);
      } else {
        // see if we should continue, or wait
        // don't pass the callback, because we're not done yet.
        ok = writer.write(`{"id":"${i}", "data":"randomcontent"}\n`, encoding);
      }
    } while (i > 0 && ok);
    if (i > 0) {
      // had to stop early!
      // write some more once it drains
      writer.once('drain', write);
    }
  }
}

Secound i wrote the script:

'use strict';

const fs = require('fs');
const stream = require('stream');
const readline = require('readline');
const Transform = stream.Transform;
const PassThrough = stream.PassThrough;

const jsonStream = () => new Transform({
  readableObjectMode: true,

  transform(chunk, enc, done) {
    if (!chunk) {
      this.push(null);
      return done();
    }

    const jsonString = chunk.toString();
    const json = JSON.parse(jsonString);
    this.push(json);
    done();
  }
});

const recordStream = (fields) => new Transform({
  readableObjectMode: false,
  writableObjectMode: true,

  transform(chunk, enc, done) {
    if (!chunk) {
      return this.end();
    }

    const record = [];
    for (const field of fields) {
      record.push(chunk[field]);
    }

    this.push(record.join(',') + '\n');
    done();
  }
});

setInterval(function () {
  var mem = process.memoryUsage();
  var fmt = v => (v / (1024 * 1024)).toFixed(0) + 'MB';
  console.log('RSS = ' + fmt(mem.rss), 'Heap = ' + fmt(mem.heapUsed));
}, 1000);


const os = fs.createWriteStream('./output');
const is = fs.createReadStream('./input');
const json = jsonStream();
const record = recordStream(['id', 'data']);
const through = new PassThrough();

const rl = readline.createInterface({
  input: is
});

rl.on('line', (line) => {
  through.push(line);
});

rl.on('close', () => {
  through.end();
});

through.pipe(json).pipe(record).pipe(os)
.on('end', () => {
  console.log('end');
})
.on('close', () => {
  console.log('close');
});

The result is same. With 200000 records not so dramatic, but with 500000 or 1000000 is so huge memory usage. The output with 200000 record:

RSS = 71MB Heap = 25MB
RSS = 74MB Heap = 21MB
RSS = 74MB Heap = 26MB
RSS = 75MB Heap = 31MB
RSS = 78MB Heap = 26MB
RSS = 78MB Heap = 31MB
RSS = 83MB Heap = 28MB
RSS = 84MB Heap = 31MB
RSS = 84MB Heap = 35MB
RSS = 84MB Heap = 37MB
RSS = 87MB Heap = 30MB
RSS = 87MB Heap = 36MB
RSS = 87MB Heap = 28MB
RSS = 87MB Heap = 36MB
RSS = 87MB Heap = 29MB
RSS = 88MB Heap = 39MB
RSS = 88MB Heap = 33MB
RSS = 88MB Heap = 29MB
RSS = 88MB Heap = 41MB
RSS = 88MB Heap = 40MB
RSS = 88MB Heap = 28MB
RSS = 88MB Heap = 41MB
close
RSS = 88MB Heap = 27MB
RSS = 88MB Heap = 27MB
RSS = 88MB Heap = 27MB
RSS = 88MB Heap = 27MB
RSS = 88MB Heap = 27MB
RSS = 88MB Heap = 27MB
RSS = 88MB Heap = 27MB
RSS = 88MB Heap = 27MB
...

With 1000000 records:

RSS = 74MB Heap = 19MB
RSS = 75MB Heap = 26MB
RSS = 75MB Heap = 31MB
RSS = 78MB Heap = 27MB
RSS = 78MB Heap = 32MB
RSS = 82MB Heap = 29MB
RSS = 84MB Heap = 36MB
RSS = 84MB Heap = 31MB
RSS = 84MB Heap = 36MB
RSS = 87MB Heap = 30MB
RSS = 87MB Heap = 35MB
RSS = 87MB Heap = 40MB
RSS = 89MB Heap = 34MB
RSS = 90MB Heap = 39MB
...
RSS = 157MB Heap = 83MB
RSS = 157MB Heap = 85MB
RSS = 158MB Heap = 86MB
RSS = 158MB Heap = 88MB
RSS = 158MB Heap = 90MB
....

And still going. Can u tell me where i go wrong?
Thank you :)

@devmetal
Copy link
Author

devmetal commented Oct 6, 2016

@bnoordhuis

Sorry, i made a huge mistake. Actually right now i replaced the readline with my custom line be line transform stream:

class LineByLine extends Transform {
  constructor(options) {
    super(options);
    this.buff = '';
  }

  _transform(chunk, enc, done) {
    const chr = chunk.toString();
    let i = 0;

    while(i < chr.length) {
      if (chr[i] === '\n') {
        this.push(this.buff);
        this.buff = '';
      } else {
        this.buff += chr[i];
      }

      i++;
    }

    done();
  }
}

I think its not the best way to do this but its really fast. With 1000000 record its done by seconds. So sorry for this, but i think we have also a little leak. Here is the output with 1000000 record without readline:

RSS = 57MB Heap = 16MB
RSS = 60MB Heap = 17MB
RSS = 60MB Heap = 16MB
RSS = 60MB Heap = 11MB
RSS = 57MB Heap = 18MB
RSS = 57MB Heap = 7MB
RSS = 57MB Heap = 14MB
RSS = 58MB Heap = 6MB
RSS = 58MB Heap = 12MB
RSS = 58MB Heap = 14MB
RSS = 58MB Heap = 17MB
RSS = 58MB Heap = 10MB
close
RSS = 59MB Heap = 12MB
RSS = 59MB Heap = 12MB
RSS = 59MB Heap = 12MB
RSS = 59MB Heap = 12MB
RSS = 59MB Heap = 12MB
RSS = 59MB Heap = 12MB
RSS = 59MB Heap = 12MB
RSS = 59MB Heap = 12MB

What do you think? 2MB is important or its normal?

@bnoordhuis
Copy link
Member

That doesn't strike me as very significant; it could very well be a garbage collector or memory fragmentation artifact. The litmus test for memory leaks normally is whether the process eventually dies with an out-of-memory error.

@devmetal
Copy link
Author

devmetal commented Oct 6, 2016

Okay, i think we can close this. Without thrid party libraries its working well. So i have to find the leak in original code. Maybe i can throw out the csv writer. Do you think the PassThrough stream cause issues like this? Or the generators? Maybe its all on them :)

Have you any suggestion about find memory leaks?

@devmetal devmetal closed this as completed Oct 6, 2016
@bnoordhuis
Copy link
Member

You could try https://www.npmjs.com/package/heapdump or the V8 inspector in v6.3.0+.

@devmetal
Copy link
Author

devmetal commented Oct 6, 2016

Thank you for help :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
memory Issues and PRs related to the memory management or memory footprint. question Issues that look for answers. stream Issues and PRs related to the stream subsystem.
Projects
None yet
Development

No branches or pull requests

3 participants