-
Notifications
You must be signed in to change notification settings - Fork 29.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
lib: remove queue implementation from JSStreamWrap #17918
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,6 +8,15 @@ const uv = process.binding('uv'); | |
const debug = util.debuglog('stream_wrap'); | ||
const errors = require('internal/errors'); | ||
|
||
const kCurrentWriteRequest = Symbol('kCurrentWriteRequest'); | ||
const kCurrentShutdownRequest = Symbol('kCurrentShutdownRequest'); | ||
|
||
function isClosing() { return this.owner.isClosing(); } | ||
function onreadstart() { return this.owner.readStart(); } | ||
function onreadstop() { return this.owner.readStop(); } | ||
function onshutdown(req) { return this.owner.doShutdown(req); } | ||
function onwrite(req, bufs) { return this.owner.doWrite(req, bufs); } | ||
|
||
/* This class serves as a wrapper for when the C++ side of Node wants access | ||
* to a standard JS stream. For example, TLS or HTTP do not operate on network | ||
* resources conceptually, although that is the common case and what we are | ||
|
@@ -27,12 +36,13 @@ class JSStreamWrap extends Socket { | |
debug('close'); | ||
this.doClose(cb); | ||
}; | ||
handle.isAlive = () => this.isAlive(); | ||
handle.isClosing = () => this.isClosing(); | ||
handle.onreadstart = () => this.readStart(); | ||
handle.onreadstop = () => this.readStop(); | ||
handle.onshutdown = (req) => this.doShutdown(req); | ||
handle.onwrite = (req, bufs) => this.doWrite(req, bufs); | ||
// Inside of the following functions, `this` refers to the handle | ||
// and `this.owner` refers to this JSStreamWrap instance. | ||
handle.isClosing = isClosing; | ||
handle.onreadstart = onreadstart; | ||
handle.onreadstop = onreadstop; | ||
handle.onshutdown = onshutdown; | ||
handle.onwrite = onwrite; | ||
|
||
stream.pause(); | ||
stream.on('error', (err) => this.emit('error', err)); | ||
|
@@ -60,7 +70,10 @@ class JSStreamWrap extends Socket { | |
|
||
super({ handle, manualStart: true }); | ||
this.stream = stream; | ||
this._list = null; | ||
this[kCurrentWriteRequest] = null; | ||
this[kCurrentShutdownRequest] = null; | ||
|
||
// Start reading. | ||
this.read(0); | ||
} | ||
|
||
|
@@ -69,10 +82,6 @@ class JSStreamWrap extends Socket { | |
return JSStreamWrap; | ||
} | ||
|
||
isAlive() { | ||
return true; | ||
} | ||
|
||
isClosing() { | ||
return !this.readable || !this.writable; | ||
} | ||
|
@@ -88,33 +97,56 @@ class JSStreamWrap extends Socket { | |
} | ||
|
||
doShutdown(req) { | ||
assert.strictEqual(this[kCurrentShutdownRequest], null); | ||
this[kCurrentShutdownRequest] = req; | ||
|
||
// TODO(addaleax): It might be nice if we could get into a state where | ||
// DoShutdown() is not called on streams while a write is still pending. | ||
// | ||
// Currently, the only part of the code base where that happens is the | ||
// TLS implementation, which calls both DoWrite() and DoShutdown() on the | ||
// underlying network stream inside of its own DoShutdown() method. | ||
// Working around that on the native side is not quite trivial (yet?), | ||
// so for now that is supported here. | ||
|
||
if (this[kCurrentWriteRequest] !== null) | ||
return this.on('drain', () => this.doShutdown(req)); | ||
assert.strictEqual(this[kCurrentWriteRequest], null); | ||
|
||
const handle = this._handle; | ||
const item = this._enqueue('shutdown', req); | ||
|
||
this.stream.end(() => { | ||
// Ensure that write was dispatched | ||
setImmediate(() => { | ||
if (!this._dequeue(item)) | ||
return; | ||
|
||
handle.finishShutdown(req, 0); | ||
this.finishShutdown(handle, 0); | ||
}); | ||
}); | ||
return 0; | ||
} | ||
|
||
// handle === this._handle except when called from doClose(). | ||
finishShutdown(handle, errCode) { | ||
// The shutdown request might already have been cancelled. | ||
if (this[kCurrentShutdownRequest] === null) | ||
return; | ||
const req = this[kCurrentShutdownRequest]; | ||
this[kCurrentShutdownRequest] = null; | ||
handle.finishShutdown(req, errCode); | ||
} | ||
|
||
doWrite(req, bufs) { | ||
const self = this; | ||
const handle = this._handle; | ||
assert.strictEqual(this[kCurrentWriteRequest], null); | ||
assert.strictEqual(this[kCurrentShutdownRequest], null); | ||
this[kCurrentWriteRequest] = req; | ||
|
||
var pending = bufs.length; | ||
const handle = this._handle; | ||
const self = this; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it still needed ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @billouboq it’s used inside the |
||
|
||
// Queue the request to be able to cancel it | ||
const item = this._enqueue('write', req); | ||
let pending = bufs.length; | ||
|
||
this.stream.cork(); | ||
for (var n = 0; n < bufs.length; n++) | ||
this.stream.write(bufs[n], done); | ||
for (var i = 0; i < bufs.length; ++i) | ||
this.stream.write(bufs[i], done); | ||
this.stream.uncork(); | ||
|
||
function done(err) { | ||
|
@@ -126,93 +158,42 @@ class JSStreamWrap extends Socket { | |
|
||
let errCode = 0; | ||
if (err) { | ||
const code = uv[`UV_${err.code}`]; | ||
errCode = (err.code && code) ? code : uv.UV_EPIPE; | ||
errCode = uv[`UV_${err.code}`] || uv.UV_EPIPE; | ||
} | ||
|
||
// Ensure that write was dispatched | ||
setImmediate(function() { | ||
// Do not invoke callback twice | ||
if (!self._dequeue(item)) | ||
return; | ||
|
||
handle.finishWrite(req, errCode); | ||
setImmediate(() => { | ||
self.finishWrite(handle, errCode); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you can avoid this closure completely with |
||
}); | ||
} | ||
|
||
return 0; | ||
} | ||
|
||
_enqueue(type, req) { | ||
const item = new QueueItem(type, req); | ||
if (this._list === null) { | ||
this._list = item; | ||
return item; | ||
} | ||
|
||
item.next = this._list.next; | ||
item.prev = this._list; | ||
item.next.prev = item; | ||
item.prev.next = item; | ||
|
||
return item; | ||
} | ||
|
||
_dequeue(item) { | ||
assert(item instanceof QueueItem); | ||
|
||
var next = item.next; | ||
var prev = item.prev; | ||
|
||
if (next === null && prev === null) | ||
return false; | ||
|
||
item.next = null; | ||
item.prev = null; | ||
|
||
if (next === item) { | ||
prev = null; | ||
next = null; | ||
} else { | ||
prev.next = next; | ||
next.prev = prev; | ||
} | ||
|
||
if (this._list === item) | ||
this._list = next; | ||
// handle === this._handle except when called from doClose(). | ||
finishWrite(handle, errCode) { | ||
// The write request might already have been cancelled. | ||
if (this[kCurrentWriteRequest] === null) | ||
return; | ||
const req = this[kCurrentWriteRequest]; | ||
this[kCurrentWriteRequest] = null; | ||
|
||
return true; | ||
handle.finishWrite(req, errCode); | ||
} | ||
|
||
doClose(cb) { | ||
const handle = this._handle; | ||
|
||
setImmediate(() => { | ||
while (this._list !== null) { | ||
const item = this._list; | ||
const req = item.req; | ||
this._dequeue(item); | ||
|
||
const errCode = uv.UV_ECANCELED; | ||
if (item.type === 'write') { | ||
handle.finishWrite(req, errCode); | ||
} else if (item.type === 'shutdown') { | ||
handle.finishShutdown(req, errCode); | ||
} | ||
} | ||
|
||
// Should be already set by net.js | ||
assert.strictEqual(this._handle, null); | ||
|
||
this.finishWrite(handle, uv.UV_ECANCELED); | ||
this.finishShutdown(handle, uv.UV_ECANCELED); | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you remove this closure as well? |
||
cb(); | ||
}); | ||
} | ||
} | ||
|
||
function QueueItem(type, req) { | ||
this.type = type; | ||
this.req = req; | ||
this.prev = this; | ||
this.next = this; | ||
} | ||
|
||
module.exports = JSStreamWrap; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unused?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The linter would complain about that. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not seeing it, though. Same for the
const handle = ...
on line 112.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It’s passed to the
self.finishWrite()
call down on line 162There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, indeed. Unfortunate fold in the diff.