Skip to content
This repository has been archived by the owner on Apr 22, 2023. It is now read-only.

net: WIP run close callbacks in correct eloop phase #6802

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 9 additions & 18 deletions lib/child_process.js
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,8 @@ function setupChannel(target, channel) {
target.disconnect();
channel.onread = nop;
channel.close();
// TODO(trevnorris): Don't close this until after the close callback
// has fired.
maybeClose(target);
}
};
Expand Down Expand Up @@ -853,26 +855,15 @@ function ChildProcess() {
self.stdin.destroy();
}

self._handle.close();
self._handle = null;

if (exitCode < 0) {
self.emit('error', err);
} else {
self.emit('exit', self.exitCode, self.signalCode);
}

// if any of the stdio streams have not been touched,
// then pull all the data through so that it can get the
// eof and emit a 'close' event.
// Do it on nextTick so that the user has one last chance
// to consume the output, if for example they only want to
// start reading the data once the process exits.
process.nextTick(function() {
self._handle.close(function() {
if (exitCode < 0)
self.emit('error', err);
else
self.emit('exit', self.exitCode, self.signalCode);
flushStdio(self);
maybeClose(self);
});

maybeClose(self);
self._handle = null;
};
}
util.inherits(ChildProcess, EventEmitter);
Expand Down
13 changes: 0 additions & 13 deletions lib/cluster.js
Original file line number Diff line number Diff line change
Expand Up @@ -520,18 +520,6 @@ function workerInit() {
return 0;
}

function close() {
// lib/net.js treats server._handle.close() as effectively synchronous.
// That means there is a time window between the call to close() and
// the ack by the master process in which we can still receive handles.
// onconnection() below handles that by sending those handles back to
// the master.
if (util.isUndefined(key)) return;
send({ act: 'close', key: key });
delete handles[key];
key = undefined;
}

function getsockname(out) {
if (key) util._extend(out, message.sockname);
return 0;
Expand All @@ -541,7 +529,6 @@ function workerInit() {
// with it. Fools net.Server into thinking that it's backed by a real
// handle.
var handle = {
close: close,
listen: listen
};
if (message.sockname) {
Expand Down
6 changes: 4 additions & 2 deletions lib/dgram.js
Original file line number Diff line number Diff line change
Expand Up @@ -326,9 +326,11 @@ function afterSend(err) {
Socket.prototype.close = function() {
this._healthCheck();
this._stopReceiving();
this._handle.close();
var self = this;
this._handle.close(function() {
self.emit('close');
});
this._handle = null;
this.emit('close');
};


Expand Down
85 changes: 42 additions & 43 deletions lib/net.js
Original file line number Diff line number Diff line change
Expand Up @@ -1078,11 +1078,10 @@ Server.prototype._listen2 = function(address, port, addressType, backlog, fd) {

if (err) {
var ex = errnoException(err, 'listen');
self._handle.close();
self._handle = null;
process.nextTick(function() {
self._handle.close(function() {
self.emit('error', ex);
});
self._handle = null;
return;
}

Expand Down Expand Up @@ -1276,58 +1275,58 @@ Server.prototype.getConnections = function(cb) {
};


Server.prototype.close = function(cb) {
function onSlaveClose() {
if (--left !== 0) return;
function emitWhenClosed(self) {
debug('SERVER: emitWhenClosed');

self._connections = 0;
self._emitCloseIfDrained();
if (self._handle || self._connections) {
debug('SERVER handle? %j connections? %d',
!!self._handle, self._connections);
return;
}

if (!this._handle) {
// Throw error. Follows net_legacy behaviour.
throw new Error('Not running');
}
debug('SERVER: emit close');
self.emit('close');
}

if (cb) {
this.once('close', cb);
}
this._handle.close();
this._handle = null;

if (this._usingSlaves) {
var self = this,
left = this._slaves.length;
function closeWhenDrained() {
var self = this.owner;

// Increment connections to be sure that, even if all sockets will be closed
// during polling of slaves, `close` event will be emitted only once.
this._connections++;
// If no slaves then not much more to do.
if (!self._usingSlaves)
return emitWhenClosed(self);

// Poll slaves
this._slaves.forEach(function(slave) {
slave.close(onSlaveClose);
var left = self._slaves.length;

// Increment connections to be sure that, even if all sockets will be closed
// during polling of slaves, `close` event will be emitted only once.
self.connections++;

// Poll slaves
self._slaves.forEach(function(slave) {
slave.close(function callOnSlaveClose() {
if (--left > 0)
return;

self._connections = 0;
emitWhenClosed(self);
});
} else {
this._emitCloseIfDrained();
}
});
}

return this;
};

Server.prototype._emitCloseIfDrained = function() {
debug('SERVER _emitCloseIfDrained');
var self = this;
Server.prototype.close = function(cb) {
// Throw error. Follows net_legacy behaviour.
if (!this._handle)
throw new Error('Not running');

if (self._handle || self._connections) {
debug('SERVER handle? %j connections? %d',
!!self._handle, self._connections);
return;
}
if (cb)
this.once('close', cb);

process.nextTick(function() {
debug('SERVER: emit close');
self.emit('close');
});
this._handle.close(closeWhenDrained);
this._handle = null;

return this;
};


Expand Down
9 changes: 4 additions & 5 deletions lib/zlib.js
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,9 @@ function zlibBuffer(engine, buffer, callback) {
function onEnd() {
var buf = Buffer.concat(buffers, nread);
buffers = [];
callback(null, buf);
engine.close();
engine.close(function() {
callback(null, buf);
});
}
}

Expand Down Expand Up @@ -421,10 +422,8 @@ Zlib.prototype.close = function(callback) {

this._closed = true;

this._binding.close();

var self = this;
process.nextTick(function() {
this._binding.close(function() {
self.emit('close');
});
};
Expand Down