Skip to content

Commit

Permalink
stream: fix _final and 'prefinish' timing
Browse files Browse the repository at this point in the history
This PR fixes a few different things:

The timing of 'prefinish' depends on whether or not
_final is defined. In on case the event is emitted
synchronously with end() and otherwise asynchronously.

_final is currently unecessarily called asynchronously
which forces implementors to use 'prefinish' as a hack
to emulate synchronous behaviour. Furthermore, this hack
is subtly broken due to the above issue.

Refs: nodejs#31401
Refs: nodejs#32763 (comment)

PR-URL: nodejs#32780
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Rich Trott <[email protected]>
  • Loading branch information
ronag committed Apr 22, 2020
1 parent d08bd41 commit ea87809
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 23 deletions.
8 changes: 4 additions & 4 deletions lib/_stream_transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,10 @@ function Transform(options) {
this._flush = options.flush;
}

// TODO(ronag): Unfortunately _final is invoked asynchronously.
// Use `prefinish` hack. `prefinish` is emitted synchronously when
// and only when `_final` is not defined. Implementing `_final`
// to a Transform should be an error.
// When the writable side finishes, then flush out anything remaining.
// Backwards compat. Some Transform streams incorrectly implement _final
// instead of or in addition to _flush. By using 'prefinish' instead of
// implementing _final we continue supporting this unfortunate use case.
this.on('prefinish', prefinish);
}

Expand Down
24 changes: 15 additions & 9 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -635,24 +635,30 @@ function needFinish(state) {
}

function callFinal(stream, state) {
state.sync = true;
state.pendingcb++;
stream._final((err) => {
state.pendingcb--;
if (err) {
errorOrDestroy(stream, err);
} else {
errorOrDestroy(stream, err, state.sync);
} else if (needFinish(state)) {
state.prefinished = true;
stream.emit('prefinish');
finishMaybe(stream, state);
// Backwards compat. Don't check state.sync here.
// Some streams assume 'finish' will be emitted
// asynchronously relative to _final callback.
state.pendingcb++;
process.nextTick(finish, stream, state);
}
});
state.sync = false;
}

function prefinish(stream, state) {
if (!state.prefinished && !state.finalCalled) {
if (typeof stream._final === 'function' && !state.destroyed) {
state.pendingcb++;
state.finalCalled = true;
process.nextTick(callFinal, stream, state);
callFinal(stream, state);
} else {
state.prefinished = true;
stream.emit('prefinish');
Expand All @@ -661,10 +667,9 @@ function prefinish(stream, state) {
}

function finishMaybe(stream, state, sync) {
const need = needFinish(state);
if (need) {
if (needFinish(state)) {
prefinish(stream, state);
if (state.pendingcb === 0) {
if (state.pendingcb === 0 && needFinish(state)) {
state.pendingcb++;
if (sync) {
process.nextTick(finish, stream, state);
Expand All @@ -673,14 +678,15 @@ function finishMaybe(stream, state, sync) {
}
}
}
return need;
}

function finish(stream, state) {
state.pendingcb--;
if (state.errorEmitted)
return;

// TODO(ronag): This could occur after 'close' is emitted.

state.finished = true;
stream.emit('finish');

Expand Down
9 changes: 6 additions & 3 deletions lib/internal/http2/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -1710,11 +1710,14 @@ function streamOnPause() {
}

function afterShutdown(status) {
const stream = this.handle[kOwner];
if (stream) {
stream.on('finish', () => {
stream[kMaybeDestroy]();
});
}
// Currently this status value is unused
this.callback();
const stream = this.handle[kOwner];
if (stream)
stream[kMaybeDestroy]();
}

function finishSendTrailers(stream, headersList) {
Expand Down
6 changes: 3 additions & 3 deletions test/parallel/test-stream-transform-final-sync.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,15 @@ const t = new stream.Transform({
process.nextTick(function() {
state++;
// fluchCallback part 2
assert.strictEqual(state, 15);
assert.strictEqual(state, 13);
done();
});
}, 1)
});
t.on('finish', common.mustCall(function() {
state++;
// finishListener
assert.strictEqual(state, 13);
assert.strictEqual(state, 14);
}, 1));
t.on('end', common.mustCall(function() {
state++;
Expand All @@ -106,5 +106,5 @@ t.write(4);
t.end(7, common.mustCall(function() {
state++;
// endMethodCallback
assert.strictEqual(state, 14);
assert.strictEqual(state, 15);
}, 1));
6 changes: 3 additions & 3 deletions test/parallel/test-stream-transform-final.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,15 @@ const t = new stream.Transform({
process.nextTick(function() {
state++;
// flushCallback part 2
assert.strictEqual(state, 15);
assert.strictEqual(state, 13);
done();
});
}, 1)
});
t.on('finish', common.mustCall(function() {
state++;
// finishListener
assert.strictEqual(state, 13);
assert.strictEqual(state, 14);
}, 1));
t.on('end', common.mustCall(function() {
state++;
Expand All @@ -108,5 +108,5 @@ t.write(4);
t.end(7, common.mustCall(function() {
state++;
// endMethodCallback
assert.strictEqual(state, 14);
assert.strictEqual(state, 15);
}, 1));
58 changes: 57 additions & 1 deletion test/parallel/test-stream-writable-finished.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const assert = require('assert');
}

{
// Emit finish asynchronously
// Emit finish asynchronously.

const w = new Writable({
write(chunk, encoding, cb) {
Expand All @@ -41,3 +41,59 @@ const assert = require('assert');
w.end();
w.on('finish', common.mustCall());
}

{
// Emit prefinish synchronously.

const w = new Writable({
write(chunk, encoding, cb) {
cb();
}
});

let sync = true;
w.on('prefinish', common.mustCall(() => {
assert.strictEqual(sync, true);
}));
w.end();
sync = false;
}

{
// Emit prefinish synchronously w/ final.

const w = new Writable({
write(chunk, encoding, cb) {
cb();
},
final(cb) {
cb();
}
});

let sync = true;
w.on('prefinish', common.mustCall(() => {
assert.strictEqual(sync, true);
}));
w.end();
sync = false;
}


{
// Call _final synchronously.

let sync = true;
const w = new Writable({
write(chunk, encoding, cb) {
cb();
},
final: common.mustCall((cb) => {
assert.strictEqual(sync, true);
cb();
})
});

w.end();
sync = false;
}

0 comments on commit ea87809

Please sign in to comment.