Skip to content

Commit

Permalink
fix(stream-transform): backpressure after push
Browse files Browse the repository at this point in the history
  • Loading branch information
wdavidw committed Oct 9, 2023
1 parent 8c1868e commit 3e83f4e
Show file tree
Hide file tree
Showing 11 changed files with 160 additions and 82 deletions.
46 changes: 0 additions & 46 deletions demo/issues-esm/labo/361-transform-memory.js

This file was deleted.

88 changes: 88 additions & 0 deletions demo/issues-esm/labo/361-transform.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import { Writable } from "node:stream";
import { pipeline } from "stream/promises";
import { generate } from "csv-generate";
import { transform } from "stream-transform";

// Configuration
const config = {
// Use `true`for a simple iterator or `false` to use csv generate
iterate: true,
// Generation window size, 1 for one record per `StreamReader._read` call
// Default is `objectMode ? 16 : 16384`
highWaterMark: 1,
// Number of records to generate, `-1` for infinite
length: -1,
// Generate object or strings, both are supported
objectMode: false,
// Write delay, `0` to write instantly
write_delay: 1000,
};

// Internal counter
let count = 0;

// Memory information
const formatMemoryUsage = (data) =>
`${Math.round((data / 1024 / 1024) * 100) / 100} MB`;
const interval = setInterval(() => {
const memoryData = process.memoryUsage();
const memoryUsage = {
rss: `${formatMemoryUsage(
memoryData.rss
)} -> Resident Set Size - total memory allocated for the process execution`,
heapTotal: `${formatMemoryUsage(
memoryData.heapTotal
)} -> total size of the allocated heap`,
heapUsed: `${formatMemoryUsage(
memoryData.heapUsed
)} -> actual memory used during the execution`,
external: `${formatMemoryUsage(memoryData.external)} -> V8 external memory`,
};
console.log(`${count} records, usage:`, memoryUsage);
if (config.length !== -1 && count >= config.length) {
clearInterval(interval);
}
}, 1000);

// Iterate over an unlimited records
const iterate = function* () {
let i = -1;
// Run with
while (true) {
i++;
yield { i };
}
};

// Consome the records instantly or with delay
const consume = new Writable({
write: function (_, __, callback) {
if (config.write_delay === 0) {
count++;
callback();
} else {
setTimeout(() => {
count++;
callback();
}, config.write_delay);
}
},
});

await pipeline(
// Step 1 - generate
config.iterate
? iterate()
: generate({
columns: 10,
objectMode: config.objectMode,
length: config.length,
highWaterMark: config.highWaterMark,
}),
// Step 2 - transform
transform({ parallel: 100, highWaterMark: 1 }, function (chunk, next) {
next(null, JSON.stringify(chunk) + "\n");
}),
// Step 3 - consume
consume
);
12 changes: 8 additions & 4 deletions packages/stream-transform/dist/cjs/index.cjs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ const Transformer = function(options = {}, handler){
this.state = {
running: 0,
started: 0,
finished: 0
finished: 0,
paused: false,
};
return this;
};
Expand All @@ -30,7 +31,8 @@ util.inherits(Transformer, stream.Transform);
Transformer.prototype._transform = function(chunk, _, cb){
this.state.started++;
this.state.running++;
if(this.state.running < this.options.parallel){
// Accept additionnal chunks to be processed in parallel
if(!this.state.paused && this.state.running < this.options.parallel){
cb();
cb = null; // Cancel further callback execution
}
Expand All @@ -40,7 +42,8 @@ Transformer.prototype._transform = function(chunk, _, cb){
l--;
}
if(l === 1){ // sync
this.__done(null, [this.handler.call(this, chunk, this.options.params)], cb);
const result = this.handler.call(this, chunk, this.options.params);
this.__done(null, [result], cb);
}else if(l === 2){ // async
const callback = (err, ...chunks) =>
this.__done(err, chunks, cb);
Expand Down Expand Up @@ -75,9 +78,10 @@ Transformer.prototype.__done = function(err, chunks, cb){
// We dont push empty string
// See https://nodejs.org/api/stream.html#stream_readable_push
if(chunk !== undefined && chunk !== null && chunk !== ''){
this.push(chunk);
this.state.paused = !this.push(chunk);
}
}
// Chunk has been processed
if(cb){
cb();
}
Expand Down
12 changes: 8 additions & 4 deletions packages/stream-transform/dist/cjs/sync.cjs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ const Transformer = function(options = {}, handler){
this.state = {
running: 0,
started: 0,
finished: 0
finished: 0,
paused: false,
};
return this;
};
Expand All @@ -30,7 +31,8 @@ util.inherits(Transformer, stream.Transform);
Transformer.prototype._transform = function(chunk, _, cb){
this.state.started++;
this.state.running++;
if(this.state.running < this.options.parallel){
// Accept additionnal chunks to be processed in parallel
if(!this.state.paused && this.state.running < this.options.parallel){
cb();
cb = null; // Cancel further callback execution
}
Expand All @@ -40,7 +42,8 @@ Transformer.prototype._transform = function(chunk, _, cb){
l--;
}
if(l === 1){ // sync
this.__done(null, [this.handler.call(this, chunk, this.options.params)], cb);
const result = this.handler.call(this, chunk, this.options.params);
this.__done(null, [result], cb);
}else if(l === 2){ // async
const callback = (err, ...chunks) =>
this.__done(err, chunks, cb);
Expand Down Expand Up @@ -75,9 +78,10 @@ Transformer.prototype.__done = function(err, chunks, cb){
// We dont push empty string
// See https://nodejs.org/api/stream.html#stream_readable_push
if(chunk !== undefined && chunk !== null && chunk !== ''){
this.push(chunk);
this.state.paused = !this.push(chunk);
}
}
// Chunk has been processed
if(cb){
cb();
}
Expand Down
12 changes: 8 additions & 4 deletions packages/stream-transform/dist/esm/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5211,7 +5211,8 @@ const Transformer = function(options = {}, handler){
this.state = {
running: 0,
started: 0,
finished: 0
finished: 0,
paused: false,
};
return this;
};
Expand All @@ -5221,7 +5222,8 @@ util.inherits(Transformer, Stream.Transform);
Transformer.prototype._transform = function(chunk, _, cb){
this.state.started++;
this.state.running++;
if(this.state.running < this.options.parallel){
// Accept additionnal chunks to be processed in parallel
if(!this.state.paused && this.state.running < this.options.parallel){
cb();
cb = null; // Cancel further callback execution
}
Expand All @@ -5231,7 +5233,8 @@ Transformer.prototype._transform = function(chunk, _, cb){
l--;
}
if(l === 1){ // sync
this.__done(null, [this.handler.call(this, chunk, this.options.params)], cb);
const result = this.handler.call(this, chunk, this.options.params);
this.__done(null, [result], cb);
}else if(l === 2){ // async
const callback = (err, ...chunks) =>
this.__done(err, chunks, cb);
Expand Down Expand Up @@ -5266,9 +5269,10 @@ Transformer.prototype.__done = function(err, chunks, cb){
// We dont push empty string
// See https://nodejs.org/api/stream.html#stream_readable_push
if(chunk !== undefined && chunk !== null && chunk !== ''){
this.push(chunk);
this.state.paused = !this.push(chunk);
}
}
// Chunk has been processed
if(cb){
cb();
}
Expand Down
12 changes: 8 additions & 4 deletions packages/stream-transform/dist/esm/sync.js
Original file line number Diff line number Diff line change
Expand Up @@ -5211,7 +5211,8 @@ const Transformer = function(options = {}, handler){
this.state = {
running: 0,
started: 0,
finished: 0
finished: 0,
paused: false,
};
return this;
};
Expand All @@ -5221,7 +5222,8 @@ util.inherits(Transformer, Stream.Transform);
Transformer.prototype._transform = function(chunk, _, cb){
this.state.started++;
this.state.running++;
if(this.state.running < this.options.parallel){
// Accept additionnal chunks to be processed in parallel
if(!this.state.paused && this.state.running < this.options.parallel){
cb();
cb = null; // Cancel further callback execution
}
Expand All @@ -5231,7 +5233,8 @@ Transformer.prototype._transform = function(chunk, _, cb){
l--;
}
if(l === 1){ // sync
this.__done(null, [this.handler.call(this, chunk, this.options.params)], cb);
const result = this.handler.call(this, chunk, this.options.params);
this.__done(null, [result], cb);
}else if(l === 2){ // async
const callback = (err, ...chunks) =>
this.__done(err, chunks, cb);
Expand Down Expand Up @@ -5266,9 +5269,10 @@ Transformer.prototype.__done = function(err, chunks, cb){
// We dont push empty string
// See https://nodejs.org/api/stream.html#stream_readable_push
if(chunk !== undefined && chunk !== null && chunk !== ''){
this.push(chunk);
this.state.paused = !this.push(chunk);
}
}
// Chunk has been processed
if(cb){
cb();
}
Expand Down
12 changes: 8 additions & 4 deletions packages/stream-transform/dist/iife/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5214,7 +5214,8 @@ var stream_transform = (function (exports) {
this.state = {
running: 0,
started: 0,
finished: 0
finished: 0,
paused: false,
};
return this;
};
Expand All @@ -5224,7 +5225,8 @@ var stream_transform = (function (exports) {
Transformer.prototype._transform = function(chunk, _, cb){
this.state.started++;
this.state.running++;
if(this.state.running < this.options.parallel){
// Accept additionnal chunks to be processed in parallel
if(!this.state.paused && this.state.running < this.options.parallel){
cb();
cb = null; // Cancel further callback execution
}
Expand All @@ -5234,7 +5236,8 @@ var stream_transform = (function (exports) {
l--;
}
if(l === 1){ // sync
this.__done(null, [this.handler.call(this, chunk, this.options.params)], cb);
const result = this.handler.call(this, chunk, this.options.params);
this.__done(null, [result], cb);
}else if(l === 2){ // async
const callback = (err, ...chunks) =>
this.__done(err, chunks, cb);
Expand Down Expand Up @@ -5269,9 +5272,10 @@ var stream_transform = (function (exports) {
// We dont push empty string
// See https://nodejs.org/api/stream.html#stream_readable_push
if(chunk !== undefined && chunk !== null && chunk !== ''){
this.push(chunk);
this.state.paused = !this.push(chunk);
}
}
// Chunk has been processed
if(cb){
cb();
}
Expand Down
12 changes: 8 additions & 4 deletions packages/stream-transform/dist/iife/sync.js
Original file line number Diff line number Diff line change
Expand Up @@ -5214,7 +5214,8 @@ var stream_transform_sync = (function (exports) {
this.state = {
running: 0,
started: 0,
finished: 0
finished: 0,
paused: false,
};
return this;
};
Expand All @@ -5224,7 +5225,8 @@ var stream_transform_sync = (function (exports) {
Transformer.prototype._transform = function(chunk, _, cb){
this.state.started++;
this.state.running++;
if(this.state.running < this.options.parallel){
// Accept additionnal chunks to be processed in parallel
if(!this.state.paused && this.state.running < this.options.parallel){
cb();
cb = null; // Cancel further callback execution
}
Expand All @@ -5234,7 +5236,8 @@ var stream_transform_sync = (function (exports) {
l--;
}
if(l === 1){ // sync
this.__done(null, [this.handler.call(this, chunk, this.options.params)], cb);
const result = this.handler.call(this, chunk, this.options.params);
this.__done(null, [result], cb);
}else if(l === 2){ // async
const callback = (err, ...chunks) =>
this.__done(err, chunks, cb);
Expand Down Expand Up @@ -5269,9 +5272,10 @@ var stream_transform_sync = (function (exports) {
// We dont push empty string
// See https://nodejs.org/api/stream.html#stream_readable_push
if(chunk !== undefined && chunk !== null && chunk !== ''){
this.push(chunk);
this.state.paused = !this.push(chunk);
}
}
// Chunk has been processed
if(cb){
cb();
}
Expand Down
Loading

0 comments on commit 3e83f4e

Please sign in to comment.