diff --git a/.gitignore b/.gitignore index 60d562e9..ff43404e 100644 --- a/.gitignore +++ b/.gitignore @@ -5,5 +5,5 @@ build build/* builderror.log npm-debug.log -test/autobahn/reports/* -test/heapdump/* +test-scripts/autobahn/reports/* +test-scripts/heapdump/* diff --git a/lib/WebSocketFrame.js b/lib/WebSocketFrame.js index d85c7de1..a3626b7c 100644 --- a/lib/WebSocketFrame.js +++ b/lib/WebSocketFrame.js @@ -1,5 +1,5 @@ /************************************************************************ - * Copyright 2010-2011 Worlize Inc. + * Copyright 2010-2014 Brian McKelvey * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,272 +14,23 @@ * limitations under the License. ***********************************************************************/ -var ctio = require('../vendor/node-ctype/ctio-faster'); -var bufferUtil = require('./BufferUtil').BufferUtil; +var stream = require('readable-stream'); +var util = require('util'); +var PassThrough = require('stream').PassThrough; -const DECODE_HEADER = 1; -const WAITING_FOR_16_BIT_LENGTH = 2; -const WAITING_FOR_64_BIT_LENGTH = 3; -const WAITING_FOR_MASK_KEY = 4; -const WAITING_FOR_PAYLOAD = 5; -const COMPLETE = 6; +util.inherits(WebSocketFrame, PassThrough); -// WebSocketConnection will pass shared buffer objects for maskBytes and -// frameHeader into the constructor to avoid tons of small memory allocations -// for each frame we have to parse. This is only used for parsing frames -// we receive off the wire. -function WebSocketFrame(maskBytes, frameHeader, config) { - this.maskBytes = maskBytes; - this.frameHeader = frameHeader; - this.config = config; - this.maxReceivedFrameSize = config.maxReceivedFrameSize; - this.protocolError = false; - this.frameTooLarge = false; - this.invalidCloseFrameLength = false; - this.parseState = DECODE_HEADER; - this.closeStatus = -1; +function WebSocketFrame() { + this.fin = this.rsv1 = this.rsv2 = this.rsv3 = this.mask = false; + this.length = 0; + this.opcode = 0; + this.maskBytes = null; + + PassThrough.call(this); } -WebSocketFrame.prototype.addData = function(bufferList, fragmentationType) { - var temp; - if (this.parseState === DECODE_HEADER) { - if (bufferList.length >= 2) { - bufferList.joinInto(this.frameHeader, 0, 0, 2); - bufferList.advance(2); - var firstByte = this.frameHeader[0]; - var secondByte = this.frameHeader[1]; - - this.fin = Boolean(firstByte & 0x80); - this.rsv1 = Boolean(firstByte & 0x40); - this.rsv2 = Boolean(firstByte & 0x20); - this.rsv3 = Boolean(firstByte & 0x10); - this.mask = Boolean(secondByte & 0x80); - - this.opcode = firstByte & 0x0F; - this.length = secondByte & 0x7F; - - // Control frame sanity check - if (this.opcode >= 0x08) { - if (this.length > 125) { - this.protocolError = true; - this.dropReason = "Illegal control frame longer than 125 bytes."; - return true; - } - if (!this.fin) { - this.protocolError = true; - this.dropReason = "Control frames must not be fragmented."; - return true; - } - } - - if (this.length === 126) { - this.parseState = WAITING_FOR_16_BIT_LENGTH; - } - else if (this.length === 127) { - this.parseState = WAITING_FOR_64_BIT_LENGTH; - } - else { - this.parseState = WAITING_FOR_MASK_KEY; - } - } - } - if (this.parseState === WAITING_FOR_16_BIT_LENGTH) { - if (bufferList.length >= 2) { - bufferList.joinInto(this.frameHeader, 2, 0, 2); - bufferList.advance(2); - this.length = ctio.ruint16(this.frameHeader, 'big', 2); - this.parseState = WAITING_FOR_MASK_KEY; - } - } - else if (this.parseState === WAITING_FOR_64_BIT_LENGTH) { - if (bufferList.length >= 8) { - bufferList.joinInto(this.frameHeader, 2, 0, 8); - bufferList.advance(8); - var lengthPair = ctio.ruint64(this.frameHeader, 'big', 2); - if (lengthPair[0] !== 0) { - this.protocolError = true; - this.dropReason = "Unsupported 64-bit length frame received"; - return true; - } - this.length = lengthPair[1]; - this.parseState = WAITING_FOR_MASK_KEY; - } - } - - if (this.parseState === WAITING_FOR_MASK_KEY) { - if (this.mask) { - if (bufferList.length >= 4) { - bufferList.joinInto(this.maskBytes, 0, 0, 4); - bufferList.advance(4); - this.parseState = WAITING_FOR_PAYLOAD; - } - } - else { - this.parseState = WAITING_FOR_PAYLOAD; - } - } - - if (this.parseState === WAITING_FOR_PAYLOAD) { - if (this.length > this.maxReceivedFrameSize) { - this.frameTooLarge = true; - this.dropReason = "Frame size of " + this.length.toString(10) + - " bytes exceeds maximum accepted frame size"; - return true; - } - - if (this.length === 0) { - this.binaryPayload = new Buffer(0); - this.parseState = COMPLETE; - return true; - } - if (bufferList.length >= this.length) { - this.binaryPayload = bufferList.take(this.length); - bufferList.advance(this.length); - if (this.mask) { - bufferUtil.unmask(this.binaryPayload, this.maskBytes); - // xor(this.binaryPayload, this.maskBytes, 0); - } - - if (this.opcode === 0x08) { // WebSocketOpcode.CONNECTION_CLOSE - if (this.length === 1) { - // Invalid length for a close frame. Must be zero or at least two. - this.binaryPayload = new Buffer(0); - this.invalidCloseFrameLength = true; - } - if (this.length >= 2) { - this.closeStatus = ctio.ruint16(this.binaryPayload, 'big', 0); - this.binaryPayload = this.binaryPayload.slice(2); - } - } - - this.parseState = COMPLETE; - return true; - } - } - return false; -}; - -WebSocketFrame.prototype.throwAwayPayload = function(bufferList) { - if (bufferList.length >= this.length) { - bufferList.advance(this.length); - this.parseState = COMPLETE; - return true; - } - return false; -}; - -WebSocketFrame.prototype.toBuffer = function(nullMask) { - var maskKey; - var headerLength = 2; - var data; - var outputPos; - var firstByte = 0x00; - var secondByte = 0x00; - - if (this.fin) { - firstByte |= 0x80; - } - if (this.rsv1) { - firstByte |= 0x40; - } - if (this.rsv2) { - firstByte |= 0x20; - } - if (this.rsv3) { - firstByte |= 0x10; - } - if (this.mask) { - secondByte |= 0x80; - } - - firstByte |= (this.opcode & 0x0F); - - // the close frame is a special case because the close reason is - // prepended to the payload data. - if (this.opcode === 0x08) { - this.length = 2; - if (this.binaryPayload) { - this.length += this.binaryPayload.length; - } - data = new Buffer(this.length); - ctio.wuint16(this.closeStatus, 'big', data, 0); - if (this.length > 2) { - this.binaryPayload.copy(data, 2); - } - } - else if (this.binaryPayload) { - data = this.binaryPayload; - this.length = data.length; - } - else { - this.length = 0; - } - - if (this.length <= 125) { - // encode the length directly into the two-byte frame header - secondByte |= (this.length & 0x7F); - } - else if (this.length > 125 && this.length <= 0xFFFF) { - // Use 16-bit length - secondByte |= 126; - headerLength += 2; - } - else if (this.length > 0xFFFF) { - // Use 64-bit length - secondByte |= 127; - headerLength += 8; - } - - var output = new Buffer(this.length + headerLength + (this.mask ? 4 : 0)); - - // write the frame header - output[0] = firstByte; - output[1] = secondByte; - - outputPos = 2; - - if (this.length > 125 && this.length <= 0xFFFF) { - // write 16-bit length - ctio.wuint16(this.length, 'big', output, outputPos); - outputPos += 2; - } - else if (this.length > 0xFFFF) { - // write 64-bit length - ctio.wuint64([0x00000000, this.length], 'big', output, outputPos); - outputPos += 8; - } - - if (this.length > 0) { - if (this.mask) { - if (!nullMask) { - // Generate a mask key - maskKey = parseInt(Math.random()*0xFFFFFFFF); - } - else { - maskKey = 0x00000000; - } - ctio.wuint32(maskKey, 'big', this.maskBytes, 0); - - // write the mask key - this.maskBytes.copy(output, outputPos); - outputPos += 4; - - data.copy(output, outputPos); - var dataSegment = output.slice(outputPos); - bufferUtil.mask(dataSegment, this.maskBytes, dataSegment, 0, this.length); - // xor(output.slice(outputPos), this.maskBytes, 0); - } - else { - data.copy(output, outputPos); - } - } - - return output; -}; - WebSocketFrame.prototype.toString = function() { - return "Opcode: " + this.opcode + ", fin: " + this.fin + ", length: " + this.length + ", hasPayload: " + Boolean(this.binaryPayload) + ", masked: " + this.mask; + return "Opcode: " + this.opcode + ", fin: " + this.fin + ", length: " + this.length + ", masked: " + this.mask; }; - module.exports = WebSocketFrame; diff --git a/lib/WebSocketFrameParser.js b/lib/WebSocketFrameParser.js new file mode 100644 index 00000000..bff7ac1c --- /dev/null +++ b/lib/WebSocketFrameParser.js @@ -0,0 +1,227 @@ +/************************************************************************ + * Copyright 2014 Brian McKelvey + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***********************************************************************/ + +var stream = require('readable-stream'); +var BufferList = require('bl'); +var util = require('util'); +var utils = require('./utils'); +var debug = utils.debuglog('websocket_parser'); +var WebSocketFrame = require('./WebSocketFrame'); +var Transform = stream.Transform; + +util.inherits(WebSocketFrameParser, Transform); +module.exports = WebSocketFrameParser; + +var RECEIVE_HEADER = 1; +var RECEIVE_16_BIT_LENGTH = 2; +var RECEIVE_64_BIT_LENGTH = 3; +var RECEIVE_MASK_KEY = 4; +var BEGIN_STREAM = 5; +var STREAMING_PAYLOAD = 6; + +function WebSocketFrameParser(options) { + if (!(this instanceof WebSocketFrameParser)) { + return new WebSocketFrameParser(options); + } + + Transform.call(this); + this._writableState.objectMode = false; + this._readableState.objectMode = true; + + this._resetParserState(); + + this._frameDrainHandler = this._frameDrained.bind(this); +}; + +WebSocketFrameParser.prototype._resetParserState = function() { + debug("_resetParserState"); + this._state = RECEIVE_HEADER; + this._bytesRemaining = 0; + this._bl = new BufferList(); + var frame = this._currentFrame = new WebSocketFrame(); + var originalEmit = frame.emit; + frame.emit = function(event) { + if (event === 'data') { + console.log("emit([\"data\"]) len: " + arguments[1].length); + } + else { + console.log("emit(%j)", Array.prototype.slice.call(arguments)); + } + originalEmit.apply(frame, arguments); + } +}; + +WebSocketFrameParser.prototype._frameDrained = function() { + +}; + +WebSocketFrameParser.prototype._write = function(chunk, encoding, done) { + if (this._state === STREAMING_PAYLOAD) { + debug("_write - streaming to frame."); + + this._streamPayload(chunk, done); + return; + } + + debug("_write - normal transform implementation. Chunk size: %d bytes", chunk.length); + return Transform.prototype._write.call(this, chunk, encoding, done); +}; + +WebSocketFrameParser.prototype._transform = function _transform(chunk, encoding, done) { + var error; + this._bl.append(chunk); + + if (this._state === RECEIVE_HEADER) { + if (!this._parseHeader(done)) { return; } + } + + switch (this._state) { + case RECEIVE_16_BIT_LENGTH: + if (this._bl.length >= 2) { + this._currentFrame.length = this._bl.readUInt16BE(0); + this._bl.consume(2); + this._state = RECEIVE_MASK_KEY; + } + break; + case RECEIVE_64_BIT_LENGTH: + if (this._bl.length >= 8) { + if (this._bl.readInt32BE(0)) { + done(new Error("Unsupported 64-bit length frame received")); + return; + } + this._currentFrame.length = this._bl.readUInt32BE(4); + this._bl.consume(8); + this._state = RECEIVE_MASK_KEY; + } + break; + } + + if (this._state === RECEIVE_MASK_KEY) { + this._parseMaskKey(); + } + + if (this._state === BEGIN_STREAM) { + this._bytesRemaining = this._currentFrame.length; + this.push(this._currentFrame); + this._streamBufferlist(); + + done(); + return; + } + + done(); +}; + +WebSocketFrameParser.prototype._parseHeader = function(done) { + var frame = this._currentFrame; + if (this._bl.length < 2) { return; } + var header = this._bl.readUInt16BE(0); + this._bl.consume(2); + debug('_parseHeader - %d %d', (header & 0xFF00) >> 8, header & 0xFF); + + frame.fin = Boolean(header & 0x8000); + frame.rsv1 = Boolean(header & 0x4000); + frame.rsv2 = Boolean(header & 0x2000); + frame.rsv3 = Boolean(header & 0x1000); + frame.mask = Boolean(header & 0x0080); + + frame.opcode = (header & 0x0F00) >> 8; + frame.length = header & 0x7F; + + // Control frame sanity check + if (frame.opcode >= 0x08) { + if (frame.length > 125) { + done(new Error("Illegal control frame longer than 125 bytes.")); + return false; + } + if (!frame.fin) { + done(new Error("Control frames must not be fragmented.")); + return false; + } + } + + if (frame.length === 126) { + this._state = RECEIVE_16_BIT_LENGTH; + } + else if (frame.length === 127) { + this._state = RECEIVE_64_BIT_LENGTH; + } + else { + this._state = RECEIVE_MASK_KEY; + } + + return true; +}; + +WebSocketFrameParser.prototype._parseMaskKey = function() { + debug("_parseMaskKey"); + var frame = this._currentFrame; + var bufferList = this._bl; + if (frame.mask) { + if (bufferList.length >= 4) { + frame.maskBytes = bufferList.slice(0, 4); + bufferList.consume(4); + this._state = BEGIN_STREAM; + } + } + else { + this._state = BEGIN_STREAM; + } +}; + +WebSocketFrameParser.prototype._streamBufferlist = function() { + if (this._bl.length === 0) { return; } + debug('_streamBufferlist'); + var frame = this._currentFrame; + var chunk = this._bl.slice(0, frame.length); + this._bytesRemaining -= chunk.length; + debug("_streamBufferlist - pushing %d bytes. %d bytes remaining.", + chunk.length, this._bytesRemaining); + this._bl.consume(chunk.length); + this._currentFrame.push(chunk); + this._state = STREAMING_PAYLOAD; + this._checkPayloadEnd(); +}; + +WebSocketFrameParser.prototype._streamPayload = function(chunk, done) { + debug('_streamPayload'); + if (chunk.length > this._bytesRemaining) { + this.unshift(chunk.slice(this._bytesRemaining, chunk.length)); + chunk = chunk.slice(0, this._bytesRemaining); + } + this._bytesRemaining -= chunk.length; + var shouldPause = !this._currentFrame.push(chunk); + debug("_streamPayload - pushed %d bytes. shouldPause: %s Bytes Remaining: %d", + chunk.length, shouldPause, this._bytesRemaining); + this._checkPayloadEnd(); + done(); +}; + +WebSocketFrameParser.prototype._checkPayloadEnd = function() { + debug("_checkPayloadEnd"); + if (this._bytesRemaining === 0) { + debug(" ending frame content stream"); + this._currentFrame.push(null); + this._resetParserState(); + } + else if (this._bytesRemaining < 0) { + throw new Error( + util.format( + "Critical internal error: bytesRemaining = %d Expected %d", + this._bytesRemaining, 0) + ); + } +}; diff --git a/lib/stream_modules/fixed-length-stream.js b/lib/stream_modules/fixed-length-stream.js new file mode 100644 index 00000000..8eb10b97 --- /dev/null +++ b/lib/stream_modules/fixed-length-stream.js @@ -0,0 +1,39 @@ +var Transform = require('readable-stream/transform'); +var util = require('util'); + +util.inherits(FixedLengthStream, Transform); + +function FixedLengthStream(length) { + if ('number' !== typeof length) { throw new Error("invalid length") }; + this._fixedLength = length; + this._bytesCounted = 0; +} + +FixedLengthStream.prototype._transform = function(chunk, encoding, done) { + var extra; + + if (this._bytesCounted + chunk.length > this._fixedLength) { + var neededLength = this._fixedLength - this._bytesCounted; + extra = chunk.slice(neededLength, chunk.length); + chunk = chunk.slice(0, neededLength); + this.bytesCounted += chunk.length; + this.push(chunk); + this.unshift(extra); + this.push(null); + return done(); + } + + this._bytesCounted += chunk.length; + + if (this._bytesCounted === this._fixedLength) { + this._bytesCounted + this.push(chunk); + this.push(null); + return done(); + } + + this.push(chunk); + return done(); +}; + +module.exports = FixedLengthStream; diff --git a/lib/stream_modules/xor-masking-stream.js b/lib/stream_modules/xor-masking-stream.js new file mode 100644 index 00000000..b713f9f1 --- /dev/null +++ b/lib/stream_modules/xor-masking-stream.js @@ -0,0 +1,22 @@ +var Transform = require('readable-stream/transform'); +var util = require('util'); + +util.inherits(XORMaskingStream, Transform); + +function XORMaskingStream(maskBytes, initPosition) { + this._maskBytes = maskBytes || new Buffer(4); + this._pos = initPosition || 0; + this._counter = 0; + Transform.call(this); +} + +XORMaskingStream.prototype._transform = function(chunk, encoding, done) { + for (var i=0, len=chunk.length; i < len; i ++) { + chunk[i] = chunk[i] ^ this._maskBytes[this._pos]; + this._pos = (this._pos + 1) & 0x03; + } + this.push(chunk); + done(); +} + +module.exports = XORMaskingStream; diff --git a/lib/utils.js b/lib/utils.js index 94dbb5ea..8d673e7f 100644 --- a/lib/utils.js +++ b/lib/utils.js @@ -1,9 +1,19 @@ var util = require('util'); -exports.extend = function extend(dest, source) { - for (var prop in source) { - dest[prop] = source[prop]; - } +function isObject(arg) { + return typeof arg === 'object' && arg !== null; +} + +exports.extend = function(origin, add) { + // Don't do anything if add isn't an object + if (!add || !isObject(add)) return origin; + + var keys = Object.keys(add); + var i = keys.length; + while (i--) { + origin[keys[i]] = add[keys[i]]; + } + return origin; }; if ('debuglog' in util) { diff --git a/package.json b/package.json index 1a44b9ef..694686f7 100644 --- a/package.json +++ b/package.json @@ -1,31 +1,53 @@ { - "name": "websocket", - "description": "Websocket Client & Server Library implementing the WebSocket protocol as specified in RFC 6455.", - "keywords": [ - "websocket", "websockets", "socket", "networking", "comet", "push", - "RFC-6455", "realtime", "server", "client" - ], - "author": "Brian McKelvey (https://www.worlize.com/)", - "version": "1.0.10", - "repository": { - "type": "git", - "url": "http://github.com/Worlize/WebSocket-Node.git" - }, - "engines": { - "node": ">=0.8.0" - }, - "dependencies": { - "nan": "~1.0.0" - }, - "config": { - "verbose" : false - }, - "scripts": { - "install": "(node-gyp rebuild 2> builderror.log) || (exit 0)" - }, - "main": "index", - "directories": { - "lib": "./lib" - }, - "browser": "lib/browser.js" + "name": "websocket", + "description": "Websocket Client & Server Library implementing the WebSocket protocol as specified in RFC 6455.", + "keywords": [ + "websocket", + "websockets", + "socket", + "networking", + "comet", + "push", + "RFC-6455", + "realtime", + "server", + "client" + ], + "author": "Brian McKelvey (https://www.worlize.com/)", + "version": "1.0.10", + "repository": { + "type": "git", + "url": "http://github.com/Worlize/WebSocket-Node.git" + }, + "engines": { + "node": ">=0.8.0" + }, + "dependencies": { + "bl": "^0.9.3", + "bytechunker": "^1.0.1", + "duplexer2": "0.0.2", + "nan": "~1.0.0", + "readable-stream": "^1.0.0", + "through2": "^0.6.3" + }, + "config": { + "verbose": false + }, + "scripts": { + "install": "(node-gyp rebuild 2> builderror.log) || (exit 0)", + "test": "faucet" + }, + "main": "index", + "directories": { + "lib": "./lib" + }, + "browser": "lib/browser.js", + "devDependencies": { + "buffertools": "^2.1.2", + "concat-stream": "^1.4.6", + "stream-spigot": "^3.0.4", + "tape": "^3.0.1", + "through2-spy": "^1.2.0", + "websocket": "1.0.10" + } } diff --git a/test/autobahn-test-client.js b/test-scripts/autobahn-test-client.js similarity index 100% rename from test/autobahn-test-client.js rename to test-scripts/autobahn-test-client.js diff --git a/test/autobahn/fuzzingclient.json b/test-scripts/autobahn/fuzzingclient.json similarity index 100% rename from test/autobahn/fuzzingclient.json rename to test-scripts/autobahn/fuzzingclient.json diff --git a/test/certificate.pem b/test-scripts/certificate.pem similarity index 100% rename from test/certificate.pem rename to test-scripts/certificate.pem diff --git a/test/echo-server.js b/test-scripts/echo-server.js similarity index 100% rename from test/echo-server.js rename to test-scripts/echo-server.js diff --git a/test/fragmentation-test-client.js b/test-scripts/fragmentation-test-client.js similarity index 100% rename from test/fragmentation-test-client.js rename to test-scripts/fragmentation-test-client.js diff --git a/test/fragmentation-test-page.html b/test-scripts/fragmentation-test-page.html similarity index 100% rename from test/fragmentation-test-page.html rename to test-scripts/fragmentation-test-page.html diff --git a/test/fragmentation-test-server.js b/test-scripts/fragmentation-test-server.js similarity index 100% rename from test/fragmentation-test-server.js rename to test-scripts/fragmentation-test-server.js diff --git a/test/libwebsockets-test-client.js b/test-scripts/libwebsockets-test-client.js similarity index 100% rename from test/libwebsockets-test-client.js rename to test-scripts/libwebsockets-test-client.js diff --git a/test/libwebsockets-test-server.js b/test-scripts/libwebsockets-test-server.js similarity index 100% rename from test/libwebsockets-test-server.js rename to test-scripts/libwebsockets-test-server.js diff --git a/test/libwebsockets-test.html b/test-scripts/libwebsockets-test.html similarity index 100% rename from test/libwebsockets-test.html rename to test-scripts/libwebsockets-test.html diff --git a/test/memoryleak-client.js b/test-scripts/memoryleak-client.js similarity index 100% rename from test/memoryleak-client.js rename to test-scripts/memoryleak-client.js diff --git a/test/memoryleak-server.js b/test-scripts/memoryleak-server.js similarity index 100% rename from test/memoryleak-server.js rename to test-scripts/memoryleak-server.js diff --git a/test/privatekey.pem b/test-scripts/privatekey.pem similarity index 100% rename from test/privatekey.pem rename to test-scripts/privatekey.pem diff --git a/test/frame-parser.js b/test/frame-parser.js new file mode 100644 index 00000000..e0f31ec4 --- /dev/null +++ b/test/frame-parser.js @@ -0,0 +1,63 @@ +"use strict"; + +var test = require('tape'); +var spigot = require('stream-spigot'); +var concat = require('concat-stream'); +var through2 = require('through2'); +var chunker = require('bytechunker'); +var WebSocketFrame = require('websocket').frame; +var WebSocketFrameParser = require('../lib/WebSocketFrameParser'); + +var maskBytes = new Buffer(4); +var frameHeader = new Buffer(10); +var config = { maxReceivedFrameSize: 0x10000 }; // 64KiB + +function textFrame(payload) { + payload = payload || "Watson, can you hear me?"; + var frame = new WebSocketFrame(maskBytes, frameHeader, config); + frame.opcode = 0x01; // Text frame + frame.fin = true; + frame.binaryPayload = new Buffer(payload, 'utf8'); + return frame; +} + + +test("frameparser", function(t) { + t.plan(9); + + var payloads = [ + (new Array(27000)).join('Watson, can you hear me? '), + 'Well hello there!' + ]; + + var source = spigot.array([ + textFrame(payloads[0]).toBuffer(), + textFrame(payloads[1]).toBuffer() + ]); + + var parser = new WebSocketFrameParser(); + var frameCount = 0; + + var prettyPrinter = through2.obj(function(chunk, encoding, done) { + var frame = chunk; + frameCount ++; + t.ok(true, "Got a new WebSocketFrame(" + frameCount + ") from the parser."); + console.log("Frame length: %d", frame.length); + + var sink = concat(function(data) { + t.ok(true, "Got the frame's data."); + t.equal(data.length, payloads[frameCount-1].length, "Payload should be the right length"); + t.equal(data.toString(), payloads[frameCount-1], "Payload should match"); + }); + + frame.pipe(sink); + + done(); + }); + + source.pipe(chunker(5000)).pipe(parser).pipe(prettyPrinter); + + parser.on('end', function() { + t.equal(frameCount, 2, "There should be two frames emitted."); + }); +}); diff --git a/test/xor-masking-stream.js b/test/xor-masking-stream.js new file mode 100644 index 00000000..1483c3b8 --- /dev/null +++ b/test/xor-masking-stream.js @@ -0,0 +1,179 @@ +"use strict"; + +var buffertools = require('buffertools'); +var test = require('tape'); +var spigot = require('stream-spigot'); +var concat = require('concat-stream'); +var chunker = require('bytechunker'); +var spy = require('through2-spy'); +var XORMaskingStream = require('../lib/stream_modules/xor-masking-stream'); + +function generateRandomMask() { + var result = new Buffer(4); + for (var i=0; i < 4; i++) { + result[i] = (Math.random() * 0xFF) | 0; + } + return result; +} + +function buildSource(text, chunkSize) { + var source = spigot.array([text]); + if ('number' === typeof chunkSize) { + return ; + } + return source; +} + +function generateRandomBinaryData(length) { + length = length || 16384; + var buf = new Buffer(length); + for (var i=0; i < length; i ++) { + buf[0] = Math.floor(Math.random() * 0xFF); + } + return buf; +} + +function setup(mode) { + mode = mode || "string"; + var mask = generateRandomMask(); + var state = { + sourceChunkCount: 0, + sourceByteCount: 0, + outputChunkCount: 0, + outputByteCount: 0, + mask: mask, + masker: new XORMaskingStream(mask), + unmasker: new XORMaskingStream(mask), + sourceCounter: spy(function(chunk) { + state.sourceChunkCount ++; + state.sourceByteCount += chunk.length; + }), + outputCounter: spy(function(chunk) { + state.outputChunkCount ++; + state.outputByteCount += chunk.length; + }) + }; + if (mode === 'string') { + state.text = (new Array(20)).join("This Is A Test Of The Emergency Brodcasting System. "); + state.source = spigot.array([state.text]); + } + else if (mode === 'binary') { + state.binary = generateRandomBinaryData(); + state.source = spigot.array([state.binary]); + } + return state; +} + + +test("XOR-Masking-Stream: Masked output, String Source", function(t) { + var state = setup('string'); + t.plan(2); + + var sink = concat(function(result) { + var resultString = result.toString(); + t.notEqual(resultString, state.text, "masked output should not match input."); + t.equal(state.sourceByteCount, state.outputByteCount, + "masked output should be the same number of bytes as the encoded input string"); + }); + + state.source + .pipe(state.sourceCounter) + .pipe(state.masker) + .pipe(state.outputCounter) + .pipe(sink); +}); + +test("XOR-Masking-Stream: Masked output, Binary Source", function(t) { + var state = setup('binary'); + t.plan(4); + var sink = concat(function(result) { + t.equal(state.outputByteCount, state.sourceByteCount, + "Stream output byte count should match source byte count"); + t.equal(result.length, state.binary.length, + "Final accumulated buffer should be the same length as the source buffer."); + t.equal(result.length, state.sourceByteCount, + "Final accumulated buffer should be the same lenth as the observed source stream byte count."); + t.equal(state.binary.length, state.outputByteCount, + "Original buffer should be the same length as the observed output byte count."); + }); + + state.source + .pipe(state.sourceCounter) + .pipe(state.masker) + .pipe(state.outputCounter) + .pipe(sink); +}); + +test("XOR-Masking-Stream: Masking -> Unmasking Round-Trip. Non-Chunking String Source", function(t) { + var state = setup('string'); + t.plan(3); + + var sink = concat(function(result) { + t.equal(result.toString(), state.text, "input and output strings should match"); + t.equal(state.sourceChunkCount, state.outputChunkCount, "size and number of chunks should match"); + t.equal(state.sourceChunkCount, 1, "there should only be one chunk"); + }); + + state.source + .pipe(state.sourceCounter) + .pipe(state.masker) + .pipe(state.unmasker) + .pipe(state.outputCounter) + .pipe(sink); +}); + +test("XOR-Masking-Stream: Masking -> Unmasking Round-Trip. Non-Chunking Binary Source", function(t) { + var state = setup('binary'); + t.plan(3); + + var sink = concat(function(result) { + t.assert(buffertools.equals(result, state.binary), "input and output buffers should match"); + t.equal(state.sourceChunkCount, state.outputChunkCount, "size and number of chunks should match"); + t.equal(state.sourceChunkCount, 1, "there should only be one chunk"); + }); + + state.source + .pipe(state.sourceCounter) + .pipe(state.masker) + .pipe(state.unmasker) + .pipe(state.outputCounter) + .pipe(sink); +}); + +test("XOR-Masking-Stream: Masking -> Unmasking Round-Trip. Chunking String Source", function(t) { + var state = setup('string'); + t.plan(3); + + var sink = concat(function(result) { + t.equal(result.toString(), state.text, "input and output strings should match"); + t.equal(state.sourceChunkCount, state.outputChunkCount, "size and number of chunks should match"); + t.assert(state.sourceChunkCount > 1, "there should be more than one chunk"); + }); + + state.source + .pipe(chunker(3)) + .pipe(state.sourceCounter) + .pipe(state.masker) + .pipe(state.unmasker) + .pipe(state.outputCounter) + .pipe(sink); +}); + +test("XOR-Masking-Stream: Masking -> Unmasking Round-Trip. Chunking Binary Source", function(t) { + var state = setup('binary'); + t.plan(3); + + var sink = concat(function(result) { + t.assert(buffertools.equals(result, state.binary), "input and output strings should match"); + t.equal(state.sourceChunkCount, state.outputChunkCount, "size and number of chunks should match"); + t.assert(state.sourceChunkCount > 1, "there should be more than one chunk"); + }); + + state.source + .pipe(chunker(3)) + .pipe(state.sourceCounter) + .pipe(state.masker) + .pipe(state.unmasker) + .pipe(state.outputCounter) + .pipe(sink); +});