Skip to content

Commit

Permalink
[perf] Use messageHandlers only if permessage-deflate is in use (#853)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tom Atkinson authored and lpinca committed Oct 13, 2016
1 parent 73298bf commit 98a9121
Showing 1 changed file with 94 additions and 42 deletions.
136 changes: 94 additions & 42 deletions lib/Sender.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,24 @@ class Sender extends EventEmitter {
dataBuffer.writeUInt16BE(code, 0);
if (dataBuffer.length > 2) dataBuffer.write(data, 2);

var self = this;
this.messageHandlers.push(function (callback) {
self.frameAndSend(0x8, dataBuffer, true, mask);
callback();
if (typeof cb === 'function') cb();
});
this.flush();
if (this.extensions[PerMessageDeflate.extensionName]) {
this.enqueue([this.doClose, [dataBuffer, mask, cb]]);
} else {
this.doClose(dataBuffer, mask, cb);
}
}

/**
* Sends a close frame.
*
* @api private
*/
doClose (data, mask, cb) {
this.frameAndSend(0x8, data, true, mask);
if (this.extensions[PerMessageDeflate.extensionName]) {
this.messageHandlerCallback();
}
if (cb) cb();
}

/**
Expand All @@ -56,13 +67,24 @@ class Sender extends EventEmitter {
* @api public
*/
ping (data, options) {
if (this.extensions[PerMessageDeflate.extensionName]) {
this.enqueue([this.doPing, [data, options]]);
} else {
this.doPing(data, options);
}
}

/**
* Sends a ping frame.
*
* @api private
*/
doPing (data, options) {
var mask = options && options.mask;
var self = this;
this.messageHandlers.push(function (callback) {
self.frameAndSend(0x9, data || '', true, mask);
callback();
});
this.flush();
this.frameAndSend(0x9, data || '', true, mask);
if (this.extensions[PerMessageDeflate.extensionName]) {
this.messageHandlerCallback();
}
}

/**
Expand All @@ -71,14 +93,24 @@ class Sender extends EventEmitter {
* @api public
*/
pong (data, options) {
var mask = options && options.mask;
var self = this;
this.messageHandlers.push(function (callback) {
self.frameAndSend(0xa, data || '', true, mask);
callback();
});
if (this.extensions[PerMessageDeflate.extensionName]) {
this.enqueue([this.doPong, [data, options]]);
} else {
this.doPong(data, options);
}
}

this.flush();
/**
* Sends a pong frame.
*
* @api private
*/
doPong (data, options) {
var mask = options && options.mask;
this.frameAndSend(0xa, data || '', true, mask);
if (this.extensions[PerMessageDeflate.extensionName]) {
this.messageHandlerCallback();
}
}

/**
Expand All @@ -100,21 +132,28 @@ class Sender extends EventEmitter {
}
if (finalFragment) this.firstFragment = true;

var compressFragment = this.compress;

var self = this;
this.messageHandlers.push(function (callback) {
self.applyExtensions(data, finalFragment, compressFragment, function (err, data) {
if (err) {
if (typeof cb === 'function') cb(err);
else self.emit('error', err);
return;
}
self.frameAndSend(opcode, data, finalFragment, mask, compress, cb);
callback();
});
if (this.extensions[PerMessageDeflate.extensionName]) {
this.enqueue([this.sendCompressed, [opcode, data, finalFragment, mask, compress, cb]]);
} else {
this.frameAndSend(opcode, data, finalFragment, mask, false, cb);
}
}

/**
* Sends compressed data.
*
* @api private
*/
sendCompressed (opcode, data, finalFragment, mask, compress, cb) {
this.applyExtensions(data, finalFragment, this.compress, (err, data) => {

This comment has been minimized.

Copy link
@lpinca

lpinca Oct 21, 2016

Member

@Nibbler999 shouldn't this.compress be just compress here? I think I missed this when reviewing.

Edit: nvm, it's correct.

This comment has been minimized.

Copy link
@Nibbler999

Nibbler999 Oct 21, 2016

Member

this.compress is correct.

if (err) {
if (cb) cb(err);
else this.emit('error', err);
return;
}
this.frameAndSend(opcode, data, finalFragment, mask, compress, cb);
this.messageHandlerCallback();
});
this.flush();
}

/**
Expand Down Expand Up @@ -210,14 +249,27 @@ class Sender extends EventEmitter {

this.processing = true;

var self = this;
handler[0].apply(this, handler[1]);
}

handler(function () {
self.processing = false;
process.nextTick(function () {
self.flush();
});
});
/**
* Callback to indicate message handler completion.
*
* @api private
*/
messageHandlerCallback () {
this.processing = false;
process.nextTick(() => this.flush());
}

/**
* Enqueues a send frame operation.
*
* @api private
*/
enqueue (params) {
this.messageHandlers.push(params);
this.flush();
}

/**
Expand Down Expand Up @@ -264,7 +316,7 @@ function sendFramedData (outputBuffer, data, cb) {
this._socket.write(outputBuffer, 'binary', cb);
}
} catch (e) {
if (typeof cb === 'function') cb(e);
if (cb) cb(e);
else this.emit('error', e);
}
}

0 comments on commit 98a9121

Please sign in to comment.