diff --git a/.kuzzlerc.sample b/.kuzzlerc.sample index 4bc848a540..dafa992f00 100644 --- a/.kuzzlerc.sample +++ b/.kuzzlerc.sample @@ -380,10 +380,25 @@ "websocket": { // * enabled: // Set to true to enable WebSocket support + // * idleTimeout: + // The maximum time (in milliseconds) without sending or receiving a + // message from a client. Once reached, the client's socket is + // forcibly closed. + // Contrary to heartbeats (see below), this is a passive check, + // forcing all clients to actively send either PINGs or messages to + // maintain their connection active. + // Set the value to 0 to disable this feature (should only be + // activated if heartbeat is disabled) // * heartbeat: - // The time, in milliseconds, between the server's PING requests. - // Setting this value to 0 disables PING/PONG requests entirely. + // The time, in milliseconds, between the server's PING requests to + // clients, to make sure they are still active. + // Setting this value to 0 disables PING requests from the server + // (it will still respond with a PONG to PING requests from clients). + // If heartbeat is deactivated, then setting a non-zero value to + // idleTimeout is strongly recommended to detect and remove + // dead sockets. "enabled": true, + "idleTimeout": 0, "heartbeat": 60000 } } diff --git a/default.config.js b/default.config.js index 1463136ddd..c75bdafde5 100644 --- a/default.config.js +++ b/default.config.js @@ -203,6 +203,7 @@ module.exports = { }, websocket: { enabled: true, + idleTimeout: 0, heartbeat: 60000 } } diff --git a/features/support/api/websocket.js b/features/support/api/websocket.js index 5624244dec..b16d634b4b 100644 --- a/features/support/api/websocket.js +++ b/features/support/api/websocket.js @@ -1,7 +1,7 @@ const Bluebird = require('bluebird'), WsApiBase = require('./websocketBase'), - Ws = require('uws'); + Ws = require('ws'); class WebSocketApi extends WsApiBase { diff --git a/lib/api/core/entrypoints/embedded/index.js b/lib/api/core/entrypoints/embedded/index.js index ed1adb8a8d..395e6b4158 100644 --- a/lib/api/core/entrypoints/embedded/index.js +++ b/lib/api/core/entrypoints/embedded/index.js @@ -479,7 +479,7 @@ class EmbeddedEntryPoint extends EntryPoint { protocol.broadcast(this.constructor._removeErrorStack(data)); } catch (e) { - this.kuzzle.emit('log:error', `[broadcast] protocol ${protoKey} failed: ${e.message}`); + this.kuzzle.emit('log:error', `[broadcast] protocol ${protoKey} failed: ${e.message}\n${e.stack}`); } } } diff --git a/lib/api/core/entrypoints/embedded/protocols/http.js b/lib/api/core/entrypoints/embedded/protocols/http.js index a04d4a59bd..2d96b06456 100644 --- a/lib/api/core/entrypoints/embedded/protocols/http.js +++ b/lib/api/core/entrypoints/embedded/protocols/http.js @@ -22,23 +22,36 @@ 'use strict'; const - debug = require('../../../../../kuzzleDebug')('kuzzle:entry-point:protocols:http'), assert = require('assert'), + debug = require('../../../../../kuzzleDebug')('kuzzle:entry-point:protocols:http'), bytes = require('bytes'), url = require('url'), ClientConnection = require('../clientConnection'), Protocol = require('./protocol'), - Writable = require('stream').Writable, + { Writable } = require('stream'), HttpFormDataStream = require('../service/httpFormDataStream'), { - KuzzleError, - BadRequestError, - SizeLimitError, - InternalError: KuzzleInternalError - } = require('kuzzle-common-objects').errors, - Request = require('kuzzle-common-objects').Request, + Request, + errors: { + KuzzleError, + BadRequestError, + SizeLimitError + } + } = require('kuzzle-common-objects'), zlib = require('zlib'); +const + defaultAllowedMethods = 'GET,POST,PUT,PATCH,DELETE,HEAD,OPTIONS', + defaultAllowedHeaders = [ + 'Content-Type', + 'Access-Control-Allow-Headers', + 'Authorization', + 'X-Requested-With', + 'Content-Length', + 'Content-Encoding', + 'X-Kuzzle-Volatile' + ].join(','); + /** * @class HttpProtocol */ @@ -60,152 +73,137 @@ class HttpProtocol extends Protocol { init(entryPoint) { return super.init('http', entryPoint) .then(() => { - if (entryPoint.config.protocols.http.enabled === false) { + if (this.config.enabled === false) { return false; } - debug('initializing http Server with config: %a', entryPoint.config); + debug('initializing http Server with config: %a', this.config); - this.maxFormFileSize = bytes.parse(entryPoint.config.protocols.http.maxFormFileSize); - this.maxEncodingLayers = entryPoint.config.protocols.http.maxEncodingLayers; + this.maxFormFileSize = bytes.parse(this.config.maxFormFileSize); + this.maxEncodingLayers = this.config.maxEncodingLayers; this.server = entryPoint.httpServer; - for (const numericParameter of ['maxFormFileSize', 'maxEncodingLayers']) { - if (this[numericParameter] === null || isNaN(this[numericParameter])) { - throw new KuzzleInternalError(`Invalid HTTP "${numericParameter}" parameter value: expected a numeric value`); - } + for (const value of ['maxFormFileSize', 'maxEncodingLayers']) { + assert( + Number.isInteger(this[value]), + `Invalid HTTP "${value}" parameter value: expected a numeric value`); } this.decoders = this._setDecoders(); - this.server.on('request', (request, response) => { - const ips = [request.socket.remoteAddress]; - - if (request.headers['x-forwarded-for']) { - request.headers['x-forwarded-for'].split(',').forEach(s => ips.push(s.trim())); - } - - const - connection = new ClientConnection('HTTP/' + request.httpVersion, ips, request.headers), - payload = { - ips, - requestId: connection.id, - url: request.url, - method: request.method, - headers: request.headers, - content: '' - }; - - debug('[%s] receiving HTTP request: %a', connection.id, payload); - this.entryPoint.newConnection(connection); - - if (request.headers['content-length'] > this.maxRequestSize) { - request.resume(); - return this._replyWithError(connection.id, payload, response, new SizeLimitError('Maximum HTTP request size exceeded')); - } - - let stream; - - if (!request.headers['content-type'] || request.headers['content-type'].startsWith('application/json')) { - stream = this._createWriteStream(connection.id, payload); - } else { - try { - stream = new HttpFormDataStream({ - headers: request.headers, - limits: {fileSize: this.maxFormFileSize} - }, payload, request); - } catch (error) { - request.resume(); - return this._replyWithError(connection.id, payload, response, new BadRequestError(error)); - } - } - - let pipes; - - try { - pipes = this._uncompress(request); - } catch(err) { - return this._replyWithError(connection.id, payload, response, err); - } - - // We attach our writable stream to the last pipe of the chain - if (pipes.length > 0) { - pipes[pipes.length-1].pipe(stream); - } else { - request.pipe(stream); - } - - // We forwarded all pipe errors to the request's event handler - request.on('error', err => { - const kerr = err instanceof KuzzleError ? err : new BadRequestError(err); - // remove all pipes before flushing the stream - request.unpipe(); - request.removeAllListeners().resume(); - stream.removeAllListeners().end(); - - // When an error occurs on a Readable Stream, the - // registered pipes are NOT freed automatically - pipes.forEach(pipe => pipe.close()); - - return this._replyWithError(connection.id, payload, response, kerr); - }); - - stream.on('finish', () => { - debug('[%s] End Request', connection.id); - payload.headers['content-type'] = 'application/json'; - this._sendRequest(connection.id, response, payload); - }); - }); + this.server.on('request', this.onMessage.bind(this)); return true; }); } /** - * Send a request to Kuzzle and forwards the response back to the client + * Invoked whenever a HTTP request is received * - * @param {string} connectionId - connection Id - * @param {ServerResponse} response - * @param {Object} payload + * @param {http.IncomingMessage} request + * @param {http.ServerResponse} response */ - _sendRequest (connectionId, response, payload) { - debug('[%s] sendRequest: %a', connectionId, payload); + onMessage(request, response) { + const + ips = this._getIps(request), + connection = new ClientConnection( + `HTTP/${request.httpVersion}`, + ips, + request.headers), + // emulates a request coming from the (deprecated) Kuzzle Proxy server + proxyRequest = { + ips, + requestId: connection.id, + url: request.url, + method: request.method, + headers: request.headers, + content: '' + }; + + debug('[%s] receiving HTTP request: %a', connection.id, proxyRequest); + this.entryPoint.newConnection(connection); + + if (request.headers['content-length'] > this.maxRequestSize) { + request.resume(); + this._replyWithError( + connection, + proxyRequest, + response, + new SizeLimitError('Maximum HTTP request size exceeded')); + return; + } + + let stream, pipes; - if (payload.json) { - payload.content = JSON.stringify(payload.json); + try { + stream = this._createWritableStream(request, proxyRequest); + pipes = this._uncompress(request); + } catch(err) { + request.resume(); + this._replyWithError(connection, proxyRequest, response, err); + return; } - this.entryPoint.kuzzle.router.http.route(payload, result => { - this.entryPoint.logAccess(result, payload); - const resp = result.response.toJSON(); + // We attach our writable stream to the last pipe of the chain + if (pipes.length > 0) { + pipes[pipes.length-1].pipe(stream); + } else { + request.pipe(stream); + } - if (payload.requestId !== resp.requestId) { - resp.requestId = payload.requestId; + // We forwarded all pipe errors to the request's event handler + request.on('error', err => { + const kerr = err instanceof KuzzleError ? err : new BadRequestError(err); + // remove all pipes before flushing the stream + request.unpipe(); + request.removeAllListeners().resume(); + stream.removeAllListeners().end(); - if (!resp.raw) { - resp.content.requestId = payload.requestId; - } - } + // When an error occurs on a Readable Stream, the + // registered pipes are NOT freed automatically + pipes.forEach(pipe => pipe.close()); - debug('sending HTTP request response to client: %a', resp); + this._replyWithError(connection, proxyRequest, response, kerr); + }); - if (resp.headers) { - for (const header of Object.keys(resp.headers)) { - response.setHeader(header, resp.headers[header]); - } - } + stream.on('finish', () => { + debug('[%s] End Request', connection.id); + proxyRequest.headers['content-type'] = 'application/json'; + this._sendRequest(connection, response, proxyRequest); + }); + } - const data = this._contentToPayload(resp, payload.url); + /** + * Send a request to Kuzzle and forwards the response back to the client + * + * @param {ClientConnection} connection + * @param {http.ServerResponse} response + * @param {Object} proxyRequest + */ + _sendRequest (connection, response, proxyRequest) { + debug('[%s] sendRequest: %a', connection.id, proxyRequest); + + if (proxyRequest.json) { + proxyRequest.content = JSON.stringify(proxyRequest.json); + } + + this.entryPoint.kuzzle.router.http.route(proxyRequest, request => { + this.entryPoint.logAccess(request, proxyRequest); + + const data = this._getResponseData(request, proxyRequest); - this._compress(data, response, payload.headers, (err, buf) => { + this._compress(data, response, proxyRequest.headers, (err, deflated) => { if (err) { - const kuzerr = err instanceof KuzzleError ? err : new BadRequestError(err); - return this._replyWithError(connectionId, payload, response, kuzerr); + const kuzerr = err instanceof KuzzleError ? + err : new BadRequestError(err); + this._replyWithError(connection, proxyRequest, response, kuzerr); + return; } - response.writeHead(resp.status); - response.end(buf); - this.entryPoint.removeConnection(connectionId); + response.writeHead(request.response.status, request.response.headers); + response.end(deflated); + + this.entryPoint.removeConnection(connection.id); }); }); } @@ -213,75 +211,86 @@ class HttpProtocol extends Protocol { /** * Forward an error response to the client * - * @param {string} connectionId - * @param {Object} payload - * @param {Object} response - * @param {Object} error + * @param {ClientConnection} connection + * @param {Object} proxyRequest + * @param {http.ServerResponse} response + * @param {Error} error */ - _replyWithError(connectionId, payload, response, error) { + _replyWithError(connection, proxyRequest, response, error) { const result = { raw: true, content: JSON.stringify(error) }; - debug('[%s] replyWithError: %a', connectionId, error); + debug('[%s] replyWithError: %a', connection.id, error); - this.entryPoint.logAccess(new Request(payload, {error, connectionId}), payload); + this.entryPoint.logAccess( + new Request(proxyRequest, {error, connectionId: connection.id}), + proxyRequest); response.writeHead(error.status, { 'Content-Type': 'application/json', 'Access-Control-Allow-Origin': '*', - 'Access-Control-Allow-Methods' : 'GET,POST,PUT,PATCH,DELETE,HEAD,OPTIONS', - 'Access-Control-Allow-Headers': 'Content-Type, Access-Control-Allow-Headers, Authorization, X-Requested-With, Content-Length, Content-Encoding, X-Kuzzle-Volatile' + 'Access-Control-Allow-Methods': defaultAllowedMethods, + 'Access-Control-Allow-Headers': defaultAllowedHeaders }); response.end(result.content); - this.entryPoint.removeConnection(connectionId); + this.entryPoint.removeConnection(connection.id); } /** * Convert a Kuzzle query result into an appropriate payload format * to send back to the client * - * @param {Object} result - * @param {String} invokedUrl - invoked URL. Used to check if the ?pretty - * argument was passed by the client, changing - * the payload format + * @param {Request} request + * @param {Object} proxyRequest * @return {Buffer} */ - _contentToPayload(result, invokedUrl) { - let data; + _getResponseData(request, proxyRequest) { + let data = request.response.toJSON(); + + if (proxyRequest.requestId !== data.requestId) { + data.requestId = proxyRequest.requestId; - if (result.raw) { - if (typeof result.content === 'object') { + if (!data.raw) { + data.content.requestId = proxyRequest.requestId; + } + } + + debug('HTTP request response: %a', data); + + if (data.raw) { + if (typeof data.content === 'object') { /* This object can be either a Buffer object, a stringified Buffer object, or anything else. - In the former two cases, we create a new Buffer object, and in the latter, - we stringify the content. + In the former two cases, we create a new Buffer object, and in the + latter, we stringify the content. */ - if (result.content instanceof Buffer || (result.content.type === 'Buffer' && Array.isArray(result.content.data))) { - data = result.content; + if (data.content instanceof Buffer || + (data.content.type === 'Buffer' && Array.isArray(data.content.data)) + ) { + data = data.content; } else { - data = JSON.stringify(result.content); + data = JSON.stringify(data.content); } } else { // scalars are sent as-is - data = result.content; + data = data.content; } - } - else { + } else { let indent = 0; - const parsedUrl = url.parse(invokedUrl, true); + const parsedUrl = url.parse(proxyRequest.url, true); if (parsedUrl.query && parsedUrl.query.pretty !== undefined) { indent = 2; } - data = JSON.stringify(result.content, undefined, indent); + data = JSON.stringify(data.content, undefined, indent); } return Buffer.from(data); @@ -339,12 +348,26 @@ class HttpProtocol extends Protocol { } /** - * Create a new Writable stream configured to receive a HTTP request - * @param {string} connectionId - * @param {Object} payload + * Either get a HTTP form stream, or create a new Writable stream from + * scratch configured to receive a HTTP request + * + * @param {http.IncomingMessage} request + * @param {Object} proxyRequest * @return {stream.Writable} */ - _createWriteStream(connectionId, payload) { + _createWritableStream(request, proxyRequest) { + if (request.headers['content-type'] && + !request.headers['content-type'].startsWith('application/json') + ) { + return new HttpFormDataStream( + { + headers: request.headers, + limits: { fileSize: this.maxFormFileSize } + }, + proxyRequest, + request); + } + const maxRequestSize = this.maxRequestSize; // prevent context mismatch let streamLength = 0; @@ -358,13 +381,11 @@ class HttpProtocol extends Protocol { streamLength += chunk.length; if (streamLength > maxRequestSize) { - return callback(new SizeLimitError('Maximum HTTP request size exceeded')); + callback(new SizeLimitError('Maximum HTTP request size exceeded')); + return; } - const str = chunk.toString(); - - debug('[%s] writing chunk: %a', connectionId, str); - payload.content += str; + proxyRequest.content += chunk.toString(); callback(); } }); @@ -377,7 +398,7 @@ class HttpProtocol extends Protocol { */ _setDecoders() { const - allowCompression = this.entryPoint.config.protocols.http.allowCompression, + allowCompression = this.config.allowCompression, disabledfn = () => { throw new BadRequestError('Compression support is disabled'); }; diff --git a/lib/api/core/entrypoints/embedded/protocols/mqtt.js b/lib/api/core/entrypoints/embedded/protocols/mqtt.js index 9244a7aeb7..2e38c17162 100644 --- a/lib/api/core/entrypoints/embedded/protocols/mqtt.js +++ b/lib/api/core/entrypoints/embedded/protocols/mqtt.js @@ -51,11 +51,11 @@ class MqttProtocol extends Protocol { init (entryPoint) { return super.init('mqtt', entryPoint) .then(() => { - if (entryPoint.config.protocols.mqtt.enabled === false) { + if (this.config.enabled === false) { return false; } - debug('initializing MQTT Server with config: %a', entryPoint.config.protocols.mqtt); + debug('initializing MQTT Server with config: %a', this.config); this.entryPoint = entryPoint; this.kuzzle = this.entryPoint.kuzzle; @@ -69,7 +69,7 @@ class MqttProtocol extends Protocol { server: { port: 1883 } - }, entryPoint.config.protocols.mqtt || {}); + }, this.config); this.server = new mosca.Server(this.config.server); diff --git a/lib/api/core/entrypoints/embedded/protocols/protocol.js b/lib/api/core/entrypoints/embedded/protocols/protocol.js index 322ed1ccd9..2e8195dc5b 100644 --- a/lib/api/core/entrypoints/embedded/protocols/protocol.js +++ b/lib/api/core/entrypoints/embedded/protocols/protocol.js @@ -18,23 +18,23 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -const - { - InternalError: KuzzleInternalError - } = require('kuzzle-common-objects').errors, - bytes = require('bytes'); +const + assert = require('assert'), + bytes = require('bytes'), + http = require('http'); class Protocol { constructor() { this.maxRequestSize = null; this.entryPoint = null; this.name = null; + this.config = {}; } /** * @param {string} name - Protocol name (used for accessor) * @param {EmbeddedEntryPoint} entryPoint - * + * * @return {Promise} */ init(name, entryPoint) { @@ -43,18 +43,21 @@ class Protocol { return Promise.resolve() .then(() => { this.name = name; + this.maxRequestSize = bytes.parse(entryPoint.config.maxRequestSize); + + assert( + typeof this.name === 'string' && this.name.length > 0, + 'Invalid "name" parameter value: expected a non empty string value'); - if (typeof this.name !== 'string' || this.name.length <= 0) { - throw new KuzzleInternalError('Invalid "name" parameter value: expected a non empty string value'); + if (entryPoint.config.protocols && entryPoint.config.protocols[name]) { + this.config = entryPoint.config.protocols[name]; } - this.maxRequestSize = bytes.parse(entryPoint.config.maxRequestSize); + assert( + Number.isInteger(this.maxRequestSize), + 'Invalid "maxRequestSize" parameter value: expected a numeric value'); - if (this.maxRequestSize === null || isNaN(this.maxRequestSize)) { - throw new KuzzleInternalError('Invalid "maxRequestSize" parameter value: expected a numeric value'); - } - - return true; + return true; }); } @@ -76,6 +79,33 @@ class Protocol { // do nothing by default } + + /** + * Extract and return the list of IP addresses from a request made to a + * protocol. + * + * @param {*} request + * @return {Array.} List of IP addresses + */ + _getIps(request) { + const ips = []; + + if (request instanceof http.IncomingMessage) { + ips.push(request.socket.remoteAddress); + + if (request.headers['x-forwarded-for']) { + for (const header of request.headers['x-forwarded-for'].split(',')) { + const trimmed = header.trim(); + + if (trimmed.length > 0) { + ips.push(trimmed); + } + } + } + } + + return ips; + } } module.exports = Protocol; diff --git a/lib/api/core/entrypoints/embedded/protocols/socketio.js b/lib/api/core/entrypoints/embedded/protocols/socketio.js index 1d33f44b49..c675df233e 100644 --- a/lib/api/core/entrypoints/embedded/protocols/socketio.js +++ b/lib/api/core/entrypoints/embedded/protocols/socketio.js @@ -47,15 +47,15 @@ class SocketIoProtocol extends Protocol { init(entryPoint) { return super.init('socketio', entryPoint) .then(() => { - if (entryPoint.config.protocols.socketio.enabled === false) { + if (this.config.enabled === false) { return false; } - debug('initializing socketIo Server with config: %a', entryPoint.config.socketio); + debug('initializing socket.io Server with config: %a', this.config); this.kuzzle = this.entryPoint.kuzzle; - // SocketIo server listens by default to "/socket.io" path + // Socket.io server listens by default to "/socket.io" path // (see (http://socket.io/docs/server-api/#server#path(v:string):server)) this.io = require('socket.io')(entryPoint.httpServer, { // maxHttpBufferSize is passed to the ws library as the maxPayload option @@ -63,7 +63,7 @@ class SocketIoProtocol extends Protocol { maxHttpBufferSize: this.maxRequestSize }); - this.io.set('origins', entryPoint.config.protocols.socketio.origins); + this.io.set('origins', this.config.origins); this.io.on('connection', socket => this.onConnection(socket)); this.io.on('error', error => this.onServerError(error)); @@ -73,7 +73,8 @@ class SocketIoProtocol extends Protocol { } onServerError(error) { - this.kuzzle.emit('log:error', '[socketio] An error has occured:\n' + error.stack); + this.kuzzle.emit( + 'log:error', '[socketio] An error has occured:\n' + error.stack); } onConnection(socket) { diff --git a/lib/api/core/entrypoints/embedded/protocols/websocket.js b/lib/api/core/entrypoints/embedded/protocols/websocket.js index 7cd8901653..169d25db55 100644 --- a/lib/api/core/entrypoints/embedded/protocols/websocket.js +++ b/lib/api/core/entrypoints/embedded/protocols/websocket.js @@ -22,28 +22,70 @@ 'use strict'; const + assert = require('assert'), debug = require('../../../../../kuzzleDebug')('kuzzle:entry-point:protocols:websocket'), Protocol = require('./protocol'), - Request = require('kuzzle-common-objects').Request, - WebSocketServer = require('uws').Server, + { Server: WebSocketServer, Sender } = require('ws'), ClientConnection = require('../clientConnection'), { - BadRequestError, - InternalError: KuzzleInternalError - } = require('kuzzle-common-objects').errors; + Request, + errors: { + BadRequestError, + InternalError: KuzzleInternalError + } + } = require('kuzzle-common-objects'); + +// Used by the broadcast method to build JSON payloads while limiting the +// number of JSON serializations +const + jsonRoomProp = ',"room":"', + jsonEnder = '"}'; + +// Passed to ws.Sender to build a RFC-6455 frame +const wsFrameOptions = { + fin: true, + rsv1: false, + opcode: 1, + mask: false, + readOnly: true +}; + +/** + * @class WebSocketConnection + */ +class WebSocketConnection { + constructor(socket, lastActivity) { + this.alive = true; + this.socket = socket; + this.channels = new Set(); + this.lastActivity = lastActivity; + } +} /** * @class WebsocketProtocol */ -class WebsocketProtocol extends Protocol { +class WebSocketProtocol extends Protocol { constructor () { super(); - - this.channels = {}; - this.connectionPool = {}; this.server = null; this.kuzzle = null; this.heartbeatInterval = null; + this.idleSweepInterval = null; + + // Prevents thousands of "Date.now()" per second: it's far more efficient + // to have a timestamp property refreshed every second or so, since we + // don't need to be more precise than that + this.activityTimestamp = Date.now(); + this.activityInterval = setInterval(() => { + this.activityTimestamp = Date.now(); + }, 1000); + + // Map> + this.channels = new Map(); + + // Map + this.connectionPool = new Map(); } /** @@ -53,25 +95,14 @@ class WebsocketProtocol extends Protocol { init (entryPoint) { return super.init('websocket', entryPoint) .then(() => { - if (entryPoint.config.protocols.websocket.enabled === false) { + if (this.config.enabled === false) { return false; } - debug( - 'initializing WebSocket Server with config: %a', - entryPoint.config.protocols.websocket); + debug('initializing WebSocket Server with config: %a', this.config); - const heartbeat = entryPoint.config.protocols.websocket.heartbeat; - - if (!Number.isInteger(heartbeat) || heartbeat < 0) { - throw new KuzzleInternalError(`WebSocket: invalid heartbeat value ${heartbeat}`); - } - - if (heartbeat > 0) { - this.heartbeatInterval = setInterval( - () => this._doHeartbeat(), - heartbeat); - } + this._startHeartbeat(); + this._startIdleSweeps(); this.kuzzle = this.entryPoint.kuzzle; @@ -81,11 +112,8 @@ class WebsocketProtocol extends Protocol { perMessageDeflate: false }); - this.server.on( - 'connection', - (socket, request) => this.onConnection(socket, request)); - - this.server.on('error', error => this.onServerError(error)); + this.server.on('connection', this.onConnection.bind(this)); + this.server.on('error', this.onServerError.bind(this)); return true; }); @@ -97,93 +125,87 @@ class WebsocketProtocol extends Protocol { `[websocket] An error has occured "${error.message}":\n${error.stack}`); } - onConnection(clientSocket, request) { + onConnection(socket, request) { if (request.url && request.url.startsWith('/socket.io/')) { // Discarding request management here: let socket.io protocol manage // this connection - return false; + return; } - let ips = [clientSocket._socket.remoteAddress]; - if (request.headers && request.headers['x-forwarded-for']) { - ips = request.headers['x-forwarded-for'] - .split(',') - .map(s => s.trim()) - .concat(ips); - } + const + ips = this._getIps(request), + connection = new ClientConnection(this.name, ips, request.headers), + wsConnection = new WebSocketConnection(socket, this.activityTimestamp); - const connection = new ClientConnection(this.name, ips, request.headers); debug('[%s] creating Websocket connection', connection.id); - try { - this.entryPoint.newConnection(connection); - } - catch (err) { - this.entryPoint.log.warn('[websocket] Client connection refused with message "%s": initialization still underway', err.message); - clientSocket.close(1013, err.message); - return false; - } - - this.connectionPool[connection.id] = { - alive: true, - socket: clientSocket, - channels: [] - }; + this.entryPoint.newConnection(connection); + this.connectionPool.set(connection.id, wsConnection); - clientSocket.on('close', () => { + socket.on('close', () => { debug('[%s] received a `close` event', connection.id); this.onClientDisconnection(connection.id); }); - clientSocket.on('error', () => { + socket.on('error', () => { debug('[%s] received an `error` event', connection.id); this.onClientDisconnection(connection.id); }); - clientSocket.on('message', data => { + socket.on('message', data => { debug('[%s] received a `message` event', connection.id); this.onClientMessage(connection, data); }); - clientSocket.on('pong', () => { - debug('[%s] received a `pong` event', connection.id); - this.connectionPool[connection.id].alive = true; + socket.on('ping', () => { + debug('[%s] received a `ping` event', connection.id); + wsConnection.alive = true; + wsConnection.lastActivity = this.activityTimestamp; }); - return true; + socket.on('pong', () => { + debug('[%s] received a `pong` event', connection.id); + wsConnection.alive = true; + wsConnection.lastActivity = this.activityTimestamp; + }); } onClientDisconnection(clientId) { - debug('[%s] onClientDisconnection', clientId); + debug('[%s] Client disconnected', clientId); this.entryPoint.removeConnection(clientId); - const connection = this.connectionPool[clientId]; + const connection = this.connectionPool.get(clientId); + if (!connection) { return; } connection.alive = false; - delete this.entryPoint.clients[connection.id]; for (const channel of connection.channels) { - if (this.channels[channel] && this.channels[channel].count > 1) { - delete this.channels[channel][clientId]; - this.channels[channel].count--; - } - else { - delete this.channels[channel]; + const ids = this.channels.get(channel); + + if (ids) { + ids.delete(clientId); + if (ids.size === 0) { + this.channels.delete(channel); + } } } - delete this.connectionPool[clientId]; + this.connectionPool.delete(clientId); } onClientMessage(connection, data) { - if (!data || !this.connectionPool[connection.id]) { + const wsConnection = this.connectionPool.get(connection.id); + + if (!data || !wsConnection) { return; } + wsConnection.lastActivity = this.activityTimestamp; + let parsed; debug('[%s] onClientMessage: %a', connection.id, data); @@ -196,10 +218,11 @@ class WebsocketProtocol extends Protocol { we cannot add a "room" information since we need to extract a request ID from the incoming data, which is apparently not a valid JSON - So... the error is forwarded to the client, hoping he know + So... the error is forwarded to the client, hoping they know what to do with it. */ - return this._send(connection.id, JSON.stringify(new BadRequestError(e.message))); + return this._send( + connection.id, JSON.stringify(new BadRequestError(e.message))); } try { @@ -219,34 +242,68 @@ class WebsocketProtocol extends Protocol { } }; - this.entryPoint.kuzzle.emit('log:error', new KuzzleInternalError(e.message)); + this.entryPoint.kuzzle.emit( + 'log:error', new KuzzleInternalError(e.message)); return this._send(connection.id, JSON.stringify(errobj)); } } + /** + * /!\ WARNING: CRITICAL CODE SECTION AHEAD /!\ + * (this means performance over maintenability or readability... sorry) + * + * Do not change without reason: this method is used by the real-time engine + * to send very similar notifications to A LOT of different sockets + * + * This function precomputes RFC-6455 WebSocket frames and send them raw to + * destination sockets. This prevents the recomputing of the same frame + * over and over again for each broadcast. + * + * @param {Object} data + */ broadcast(data) { - /* - Avoids stringifying the payload multiple times just to update the room: - - we start deleting the last character, which is the closing JSON bracket ('}') - - we then only have to inject the following string to each channel: - ,"room":""} + const + stringified = JSON.stringify(data.payload), + // 255 bytes should be enough to hold the following: + // ,"room":"" + // (with current channel encoding, this is less than 100 bytes) + payload = Buffer.allocUnsafe(stringified.length + 255); + + let offset = stringified.length - 1; + + payload.write(stringified, 0); + payload.write(jsonRoomProp, offset); + + offset += jsonRoomProp.length; - So, instead of stringifying the payload for each channel, we only concat - a new substring to the original payload. - */ - const payload = JSON.stringify(data.payload).slice(0, -1) + ',"room":"'; + for (const channel of data.channels) { + const connectionIds = this.channels.get(channel); - debug('broadcast: %a', data); + if (connectionIds) { + // Adds the room property to the message + payload.write(channel, offset); + payload.write(jsonEnder, offset + channel.length); - for (const channelId of data.channels) { - if (this.channels[channelId]) { + // prevent buffer overwrites due to socket.send being an + // async method (race condition) const - channel = this.channels[channelId], - channelPayload = payload + channelId + '"}'; + payloadLength = offset + channel.length + jsonEnder.length, + payloadSafeCopy = Buffer.allocUnsafe(payloadLength); - for (const connectionId of Object.keys(channel)) { - this._send(connectionId, channelPayload); + payload.copy(payloadSafeCopy, 0, 0, payloadLength); + + const frame = Sender.frame(payloadSafeCopy, wsFrameOptions); + + for (const connectionId of connectionIds) { + const connection = this.connectionPool.get(connectionId); + + if (connection && + connection.alive && + connection.socket.readyState === connection.socket.OPEN + ) { + connection.socket._sender.sendFrame(frame); + } } } } @@ -266,79 +323,122 @@ class WebsocketProtocol extends Protocol { joinChannel(channel, connectionId) { debug('joinChannel: %s %s', channel, connectionId); - const connection = this.connectionPool[connectionId]; + const connection = this.connectionPool.get(connectionId); if (!connection || !connection.alive) { return; } - if (!this.channels[channel]) { - this.channels[channel] = { - count: 0 - }; + let ids = this.channels.get(channel); + + if (!ids) { + ids = new Set([connectionId]); + this.channels.set(channel, ids); + } else { + ids.add(connectionId); } - this.channels[channel][connectionId] = true; - this.channels[channel].count++; - connection.channels.push(channel); + + connection.channels.add(channel); } leaveChannel(channel, connectionId) { debug('leaveChannel: %s %s', channel, connectionId); - const connection = this.connectionPool[connectionId]; + const + connection = this.connectionPool.get(connectionId), + ids = this.channels.get(channel); - if (!connection - || !connection.alive - || !this.channels[channel] - || !this.channels[channel][connectionId]) { + if (!connection || !ids || !ids.has(connectionId)) { return; } - if (this.channels[channel].count > 1) { - delete this.channels[channel][connectionId]; - this.channels[channel].count--; - } - else { - delete this.channels[channel]; - } + ids.delete(connectionId); - const index = connection.channels.indexOf(channel); - if (index !== -1) { - connection.channels.splice(index, 1); + if (ids.size === 0) { + this.channels.delete(channel); } + + connection.channels.delete(channel); } disconnect(clientId, message = 'Connection closed by remote host') { debug('[%s] disconnect', clientId); - if (this.connectionPool[clientId]) { - this.connectionPool[clientId].socket.close(1011, message); + const connection = this.connectionPool.get(clientId); + if (connection) { + connection.alive = false; + connection.socket.close(1011, message); } } _send (id, data) { debug('[%s] send: %a', id, data); - const connection = this.connectionPool[id]; + const connection = this.connectionPool.get(id); - if (connection && connection.alive && connection.socket.readyState === connection.socket.OPEN) { + if (connection && connection.alive && + connection.socket.readyState === connection.socket.OPEN + ) { connection.socket.send(data); } } + _startHeartbeat() { + assert( + Number.isInteger(this.config.heartbeat) && this.config.heartbeat >= 0, + `WebSocket: invalid heartbeat value ${this.config.heartbeat}`); + + if (this.config.heartbeat > 0) { + this.heartbeatInterval = setInterval( + this._doHeartbeat.bind(this), + this.config.heartbeat); + } + } + + _startIdleSweeps() { + assert( + Number.isInteger(this.config.idleTimeout) && this.config.idleTimeout >= 0, + `WebSocket: invalid idleTimeout value ${this.config.idleTimeout}`); + + if (this.config.idleTimeout > 0) { + this.idleSweepInterval = setInterval( + this._sweepIdleSockets.bind(this), + this.config.idleTimeout); + } + } + _doHeartbeat() { - debug('[WebSocket] Heartbeat'); - for (const id of Object.keys(this.connectionPool)) { - // did not respond since we last sent a PING request - if (this.connectionPool[id].alive === false) { + debug('Heartbeat'); + const + lastActivityThreshold = this.activityTimestamp - this.config.heartbeat; + + for (const connection of this.connectionPool.values()) { + if (connection.alive === false) { + // if still marked as "not alive", then the socket did not respond + // to the last emitted PING request + // (this correctly triggers the 'close' event handler on that socket) + connection.socket.terminate(); + } else if (connection.lastActivity < lastActivityThreshold) { + // emit a PING request only if the socket has been inactive for longer + // than the heartbeat value + connection.alive = false; + connection.socket.ping(); + } + } + } + + _sweepIdleSockets() { + debug('Cleaning dead sockets'); + + const now = Date.now(); + + for (const connection of this.connectionPool.values()) { + if ((now - connection.lastActivity) > this.config.idleTimeout) { // correctly triggers the 'close' event handler on that socket - this.connectionPool[id].socket.terminate(); - } else { - this.connectionPool[id].alive = false; - this.connectionPool[id].socket.ping(); + connection.socket.terminate(); } } } } -module.exports = WebsocketProtocol; +module.exports = WebSocketProtocol; diff --git a/lib/services/broker/wsBrokerClient.js b/lib/services/broker/wsBrokerClient.js index 694ad5d73c..10790f3982 100644 --- a/lib/services/broker/wsBrokerClient.js +++ b/lib/services/broker/wsBrokerClient.js @@ -25,7 +25,7 @@ const debug = require('../../kuzzleDebug')('kuzzle:broker:client'), Bluebird = require('bluebird'), KuzzleInternalError = require('kuzzle-common-objects').errors.InternalError, - WS = require('uws'); + WS = require('ws'); /** * Websocket client implementation for Kuzzle internal broker diff --git a/lib/services/broker/wsBrokerServer.js b/lib/services/broker/wsBrokerServer.js index 7f2ed98b3d..84a1dee997 100644 --- a/lib/services/broker/wsBrokerServer.js +++ b/lib/services/broker/wsBrokerServer.js @@ -27,7 +27,7 @@ const CircularList = require('easy-circular-list'), KuzzleInternalError = require('kuzzle-common-objects').errors.InternalError, http = require('http'), - WS = require('uws').Server; + WS = require('ws').Server; /** * Web Socket server implementation of Kuzzle's internal broker. diff --git a/package-lock.json b/package-lock.json index 7bced8c1ce..bc143381f6 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1162,6 +1162,14 @@ "requires": { "ms": "^2.1.1" } + }, + "ws": { + "version": "6.2.1", + "resolved": "https://registry.npmjs.org/ws/-/ws-6.2.1.tgz", + "integrity": "sha512-GIyAXC2cB7LjvpgMt9EKS2ldqr0MTrORaleiOno6TweZ6r3TKtoFQWay/2PceJ3RuBasOHzXNn5Lrw1X0bEjqA==", + "requires": { + "async-limiter": "~1.0.0" + } } } }, @@ -1610,7 +1618,8 @@ "ansi": { "version": "0.3.1", "resolved": "https://registry.npmjs.org/ansi/-/ansi-0.3.1.tgz", - "integrity": "sha1-DELU+xcWDVqa8eSEus4cZpIsGyE=" + "integrity": "sha1-DELU+xcWDVqa8eSEus4cZpIsGyE=", + "optional": true }, "ansi-colors": { "version": "3.2.3", @@ -2222,6 +2231,7 @@ "version": "0.0.9", "resolved": "https://registry.npmjs.org/block-stream/-/block-stream-0.0.9.tgz", "integrity": "sha1-E+v+d4oDIFz+A3UUgeu0szAMEmo=", + "optional": true, "requires": { "inherits": "~2.0.0" } @@ -2403,6 +2413,7 @@ "version": "1.2.0", "resolved": "https://registry.npmjs.org/buffer-alloc/-/buffer-alloc-1.2.0.tgz", "integrity": "sha512-CFsHQgjtW1UChdXgbyJGtnm+O/uLQeZdtbDo8mfUgYXCHSM1wgrVxXm6bSyrUuErEb+4sYVGCzASBRot7zyrow==", + "optional": true, "requires": { "buffer-alloc-unsafe": "^1.1.0", "buffer-fill": "^1.0.0" @@ -2411,7 +2422,8 @@ "buffer-alloc-unsafe": { "version": "1.1.0", "resolved": "https://registry.npmjs.org/buffer-alloc-unsafe/-/buffer-alloc-unsafe-1.1.0.tgz", - "integrity": "sha512-TEM2iMIEQdJ2yjPJoSIsldnleVaAk1oW3DBVUykyOLsEsFmEc9kn+SFFPz+gl54KQNxlDnAwCXosOS9Okx2xAg==" + "integrity": "sha512-TEM2iMIEQdJ2yjPJoSIsldnleVaAk1oW3DBVUykyOLsEsFmEc9kn+SFFPz+gl54KQNxlDnAwCXosOS9Okx2xAg==", + "optional": true }, "buffer-crc32": { "version": "0.2.13", @@ -2432,7 +2444,8 @@ "buffer-fill": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/buffer-fill/-/buffer-fill-1.0.0.tgz", - "integrity": "sha1-+PeLdniYiO858gXNY39o5wISKyw=" + "integrity": "sha1-+PeLdniYiO858gXNY39o5wISKyw=", + "optional": true }, "buffer-from": { "version": "1.1.1", @@ -2442,7 +2455,8 @@ "buffer-more-ints": { "version": "0.0.2", "resolved": "https://registry.npmjs.org/buffer-more-ints/-/buffer-more-ints-0.0.2.tgz", - "integrity": "sha1-JrOIXRD6E9t/wBquOquHAZngEkw=" + "integrity": "sha1-JrOIXRD6E9t/wBquOquHAZngEkw=", + "optional": true }, "buffer-xor": { "version": "1.0.3", @@ -2464,6 +2478,14 @@ "integrity": "sha1-skV5w77U1tOWru5tmorn9Ugqt7s=", "optional": true }, + "bufferutil": { + "version": "4.0.1", + "resolved": "https://registry.npmjs.org/bufferutil/-/bufferutil-4.0.1.tgz", + "integrity": "sha512-xowrxvpxojqkagPcWRQVXZl0YXhRhAtBEIq3VoER1NH5Mw1n1o0ojdspp+GS2J//2gCVyrzQDApQ4unGF+QOoA==", + "requires": { + "node-gyp-build": "~3.7.0" + } + }, "builtin-status-codes": { "version": "3.0.0", "resolved": "https://registry.npmjs.org/builtin-status-codes/-/builtin-status-codes-3.0.0.tgz", @@ -4493,7 +4515,8 @@ "fs-constants": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/fs-constants/-/fs-constants-1.0.0.tgz", - "integrity": "sha512-y6OAwoSIf7FyjMIv94u+b5rdheZEjzR63GTyZJm5qh4Bi+2YgwLCcI/fPFZkL5PSixOt6ZNKm+w+Hfp/Bciwow==" + "integrity": "sha512-y6OAwoSIf7FyjMIv94u+b5rdheZEjzR63GTyZJm5qh4Bi+2YgwLCcI/fPFZkL5PSixOt6ZNKm+w+Hfp/Bciwow==", + "optional": true }, "fs-ext": { "version": "0.5.0", @@ -4555,7 +4578,8 @@ }, "ansi-regex": { "version": "2.1.1", - "bundled": true + "bundled": true, + "optional": true }, "aproba": { "version": "1.2.0", @@ -4592,7 +4616,8 @@ }, "code-point-at": { "version": "1.1.0", - "bundled": true + "bundled": true, + "optional": true }, "concat-map": { "version": "0.0.1", @@ -4601,7 +4626,8 @@ }, "console-control-strings": { "version": "1.1.0", - "bundled": true + "bundled": true, + "optional": true }, "core-util-is": { "version": "1.0.2", @@ -4704,7 +4730,8 @@ }, "inherits": { "version": "2.0.3", - "bundled": true + "bundled": true, + "optional": true }, "ini": { "version": "1.3.5", @@ -4714,6 +4741,7 @@ "is-fullwidth-code-point": { "version": "1.0.0", "bundled": true, + "optional": true, "requires": { "number-is-nan": "^1.0.0" } @@ -4733,11 +4761,13 @@ }, "minimist": { "version": "0.0.8", - "bundled": true + "bundled": true, + "optional": true }, "minipass": { "version": "2.3.5", "bundled": true, + "optional": true, "requires": { "safe-buffer": "^5.1.2", "yallist": "^3.0.0" @@ -4754,6 +4784,7 @@ "mkdirp": { "version": "0.5.1", "bundled": true, + "optional": true, "requires": { "minimist": "0.0.8" } @@ -4826,7 +4857,8 @@ }, "number-is-nan": { "version": "1.0.1", - "bundled": true + "bundled": true, + "optional": true }, "object-assign": { "version": "4.1.1", @@ -4836,6 +4868,7 @@ "once": { "version": "1.4.0", "bundled": true, + "optional": true, "requires": { "wrappy": "1" } @@ -4911,7 +4944,8 @@ }, "safe-buffer": { "version": "5.1.2", - "bundled": true + "bundled": true, + "optional": true }, "safer-buffer": { "version": "2.1.2", @@ -4941,6 +4975,7 @@ "string-width": { "version": "1.0.2", "bundled": true, + "optional": true, "requires": { "code-point-at": "^1.0.0", "is-fullwidth-code-point": "^1.0.0", @@ -4958,6 +4993,7 @@ "strip-ansi": { "version": "3.0.1", "bundled": true, + "optional": true, "requires": { "ansi-regex": "^2.0.0" } @@ -4996,25 +5032,16 @@ }, "wrappy": { "version": "1.0.2", - "bundled": true + "bundled": true, + "optional": true }, "yallist": { "version": "3.0.3", - "bundled": true + "bundled": true, + "optional": true } } }, - "fstream": { - "version": "1.0.11", - "resolved": "https://registry.npmjs.org/fstream/-/fstream-1.0.11.tgz", - "integrity": "sha1-XB+x8RdHcRTwYyoOtLcbPLD9MXE=", - "requires": { - "graceful-fs": "^4.1.2", - "inherits": "~2.0.0", - "mkdirp": ">=0.5 0", - "rimraf": "2" - } - }, "ftp": { "version": "0.3.10", "resolved": "https://registry.npmjs.org/ftp/-/ftp-0.3.10.tgz", @@ -5158,6 +5185,7 @@ "version": "3.2.6", "resolved": "https://registry.npmjs.org/ghutils/-/ghutils-3.2.6.tgz", "integrity": "sha512-WpYHgLQkqU7Cv147wKUEThyj6qKHCdnAG2CL9RRsRQImVdLGdVqblJ3JUnj3ToQwgm1ALPS+FXgR0448AgGPUg==", + "optional": true, "requires": { "jsonist": "~2.1.0", "xtend": "~4.0.1" @@ -5486,6 +5514,7 @@ "version": "2.1.3", "resolved": "https://registry.npmjs.org/hyperquest/-/hyperquest-2.1.3.tgz", "integrity": "sha512-fUuDOrB47PqNK/BAMOS13v41UoaqIxqSLHX6CAbOD7OfT+/GCWO1/vPLfTNutOeXrv1ikuaZ3yux+33Z9vh+rw==", + "optional": true, "requires": { "buffer-from": "^0.1.1", "duplexer2": "~0.0.2", @@ -5495,12 +5524,14 @@ "buffer-from": { "version": "0.1.2", "resolved": "https://registry.npmjs.org/buffer-from/-/buffer-from-0.1.2.tgz", - "integrity": "sha512-RiWIenusJsmI2KcvqQABB83tLxCByE3upSP8QU3rJDMVFGPWLvPQJt/O1Su9moRWeH7d+Q2HYb68f6+v+tw2vg==" + "integrity": "sha512-RiWIenusJsmI2KcvqQABB83tLxCByE3upSP8QU3rJDMVFGPWLvPQJt/O1Su9moRWeH7d+Q2HYb68f6+v+tw2vg==", + "optional": true }, "duplexer2": { "version": "0.0.2", "resolved": "https://registry.npmjs.org/duplexer2/-/duplexer2-0.0.2.tgz", "integrity": "sha1-xhTc9n4vsUmVqRcR5aYX6KYKMds=", + "optional": true, "requires": { "readable-stream": "~1.1.9" } @@ -5508,12 +5539,14 @@ "isarray": { "version": "0.0.1", "resolved": "https://registry.npmjs.org/isarray/-/isarray-0.0.1.tgz", - "integrity": "sha1-ihis/Kmo9Bd+Cav8YDiTmwXR7t8=" + "integrity": "sha1-ihis/Kmo9Bd+Cav8YDiTmwXR7t8=", + "optional": true }, "readable-stream": { "version": "1.1.14", "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-1.1.14.tgz", "integrity": "sha1-fPTFTvZI44EwhMY23SB54WbAgdk=", + "optional": true, "requires": { "core-util-is": "~1.0.0", "inherits": "~2.0.1", @@ -5524,12 +5557,14 @@ "string_decoder": { "version": "0.10.31", "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-0.10.31.tgz", - "integrity": "sha1-YuIDvEF2bGwoyfyEMB2rHFMQ+pQ=" + "integrity": "sha1-YuIDvEF2bGwoyfyEMB2rHFMQ+pQ=", + "optional": true }, "through2": { "version": "0.6.5", "resolved": "https://registry.npmjs.org/through2/-/through2-0.6.5.tgz", "integrity": "sha1-QaucZ7KdVyCQcUEOHXp6lozTrUg=", + "optional": true, "requires": { "readable-stream": ">=1.0.33-1 <1.1.0-0", "xtend": ">=4.0.0 <4.1.0-0" @@ -5539,6 +5574,7 @@ "version": "1.0.34", "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-1.0.34.tgz", "integrity": "sha1-Elgg40vIQtLyqq+v5MKRbuMsFXw=", + "optional": true, "requires": { "core-util-is": "~1.0.0", "inherits": "~2.0.1", @@ -6192,6 +6228,7 @@ "version": "2.1.0", "resolved": "https://registry.npmjs.org/jsonist/-/jsonist-2.1.0.tgz", "integrity": "sha1-RHek0WzTd/rsWNjPhwt+OS9tf+k=", + "optional": true, "requires": { "bl": "~1.2.0", "hyperquest": "~2.1.2", @@ -6390,6 +6427,16 @@ "ora": "^3.2.0", "webpack": "^4.29.6", "ws": "^6.2.0" + }, + "dependencies": { + "ws": { + "version": "6.2.1", + "resolved": "https://registry.npmjs.org/ws/-/ws-6.2.1.tgz", + "integrity": "sha512-GIyAXC2cB7LjvpgMt9EKS2ldqr0MTrORaleiOno6TweZ6r3TKtoFQWay/2PceJ3RuBasOHzXNn5Lrw1X0bEjqA==", + "requires": { + "async-limiter": "~1.0.0" + } + } } }, "lazy": { @@ -6732,17 +6779,20 @@ "lodash.pad": { "version": "4.5.1", "resolved": "https://registry.npmjs.org/lodash.pad/-/lodash.pad-4.5.1.tgz", - "integrity": "sha1-QzCUmoM6fI2iLMIPaibE1Z3runA=" + "integrity": "sha1-QzCUmoM6fI2iLMIPaibE1Z3runA=", + "optional": true }, "lodash.padend": { "version": "4.6.1", "resolved": "https://registry.npmjs.org/lodash.padend/-/lodash.padend-4.6.1.tgz", - "integrity": "sha1-U8y6BH0G4VjTEfRdpiX05J5vFm4=" + "integrity": "sha1-U8y6BH0G4VjTEfRdpiX05J5vFm4=", + "optional": true }, "lodash.padstart": { "version": "4.6.1", "resolved": "https://registry.npmjs.org/lodash.padstart/-/lodash.padstart-4.6.1.tgz", - "integrity": "sha1-0uPuv/DZ05rVD1y9G1KnvOa7YRs=" + "integrity": "sha1-0uPuv/DZ05rVD1y9G1KnvOa7YRs=", + "optional": true }, "log-driver": { "version": "1.2.7", @@ -7597,12 +7647,12 @@ } }, "murmurhash-native": { - "version": "3.3.0", - "resolved": "https://registry.npmjs.org/murmurhash-native/-/murmurhash-native-3.3.0.tgz", - "integrity": "sha512-4MKcYAQLCFBmiXBZc+8hs5VUrlKwmMeyXzGLunhHOs+AVGAkEboNt0shoZayYfmd/kMrs6dkDVts5hoePoUGbA==", + "version": "3.4.1", + "resolved": "https://registry.npmjs.org/murmurhash-native/-/murmurhash-native-3.4.1.tgz", + "integrity": "sha512-uQVthrPc6ChkQSkwJ6Ro1kzVnJXX3Ks95w06rZP7Ath2MXTmK9C/nQL0GY8AM/mi1vdX8be3QcyP3/mShHYXNQ==", "requires": { - "nan": "^2.11.1", - "node-pre-gyp": "^0.11.0" + "nan": "^2.13.2", + "node-pre-gyp": "^0.13.0" } }, "mute-stream": { @@ -7771,6 +7821,18 @@ "which": "1" }, "dependencies": { + "fstream": { + "version": "1.0.12", + "resolved": "https://registry.npmjs.org/fstream/-/fstream-1.0.12.tgz", + "integrity": "sha512-WvJ193OHa0GHPEL+AycEJgxvBEwyfRkN1vhjca23OaPVMCaLCXTd5qAu82AjTcgP1UJmytkOKb63Ypde7raDIg==", + "optional": true, + "requires": { + "graceful-fs": "^4.1.2", + "inherits": "~2.0.0", + "mkdirp": ">=0.5 0", + "rimraf": "2" + } + }, "nopt": { "version": "3.0.6", "resolved": "https://registry.npmjs.org/nopt/-/nopt-3.0.6.tgz", @@ -7787,18 +7849,23 @@ "optional": true }, "tar": { - "version": "2.2.1", - "resolved": "https://registry.npmjs.org/tar/-/tar-2.2.1.tgz", - "integrity": "sha1-jk0qJWwOIYXGsYrWlK7JaLg8sdE=", + "version": "2.2.2", + "resolved": "https://registry.npmjs.org/tar/-/tar-2.2.2.tgz", + "integrity": "sha512-FCEhQ/4rE1zYv9rYXJw/msRqsnmlje5jHP6huWeBZ704jUTy02c5AZyWujpMR1ax6mVw9NyJMfuK2CMDWVIfgA==", "optional": true, "requires": { "block-stream": "*", - "fstream": "^1.0.2", + "fstream": "^1.0.12", "inherits": "2" } } } }, + "node-gyp-build": { + "version": "3.7.0", + "resolved": "https://registry.npmjs.org/node-gyp-build/-/node-gyp-build-3.7.0.tgz", + "integrity": "sha512-L/Eg02Epx6Si2NXmedx+Okg+4UHqmaf3TNcxd50SF9NQGcJaON3AtU++kax69XV7YWz4tUspqZSAsVofhFKG2w==" + }, "node-interval-tree": { "version": "1.3.3", "resolved": "https://registry.npmjs.org/node-interval-tree/-/node-interval-tree-1.3.3.tgz", @@ -7866,6 +7933,18 @@ "which": "1" }, "dependencies": { + "fstream": { + "version": "1.0.12", + "resolved": "https://registry.npmjs.org/fstream/-/fstream-1.0.12.tgz", + "integrity": "sha512-WvJ193OHa0GHPEL+AycEJgxvBEwyfRkN1vhjca23OaPVMCaLCXTd5qAu82AjTcgP1UJmytkOKb63Ypde7raDIg==", + "optional": true, + "requires": { + "graceful-fs": "^4.1.2", + "inherits": "~2.0.0", + "mkdirp": ">=0.5 0", + "rimraf": "2" + } + }, "gauge": { "version": "1.2.7", "resolved": "https://registry.npmjs.org/gauge/-/gauge-1.2.7.tgz", @@ -7900,22 +7979,22 @@ } }, "tar": { - "version": "2.2.1", - "resolved": "https://registry.npmjs.org/tar/-/tar-2.2.1.tgz", - "integrity": "sha1-jk0qJWwOIYXGsYrWlK7JaLg8sdE=", + "version": "2.2.2", + "resolved": "https://registry.npmjs.org/tar/-/tar-2.2.2.tgz", + "integrity": "sha512-FCEhQ/4rE1zYv9rYXJw/msRqsnmlje5jHP6huWeBZ704jUTy02c5AZyWujpMR1ax6mVw9NyJMfuK2CMDWVIfgA==", "optional": true, "requires": { "block-stream": "*", - "fstream": "^1.0.2", + "fstream": "^1.0.12", "inherits": "2" } } } }, "node-pre-gyp": { - "version": "0.11.0", - "resolved": "https://registry.npmjs.org/node-pre-gyp/-/node-pre-gyp-0.11.0.tgz", - "integrity": "sha512-TwWAOZb0j7e9eGaf9esRx3ZcLaE5tQ2lvYy1pb5IAaG1a2e2Kv5Lms1Y4hpj+ciXJRofIxxlt5haeQ/2ANeE0Q==", + "version": "0.13.0", + "resolved": "https://registry.npmjs.org/node-pre-gyp/-/node-pre-gyp-0.13.0.tgz", + "integrity": "sha512-Md1D3xnEne8b/HGVQkZZwV27WUi1ZRuZBij24TNaZwUPU3ZAFtvT6xxJGaUVillfmMKnn5oD1HoGsp2Ftik7SQ==", "requires": { "detect-libc": "^1.0.2", "mkdirp": "^0.5.1", @@ -11331,6 +11410,20 @@ "semver": "2.x || 3.x || 4 || 5", "tar": "^2.0.0", "which": "1" + }, + "dependencies": { + "fstream": { + "version": "1.0.12", + "resolved": "https://registry.npmjs.org/fstream/-/fstream-1.0.12.tgz", + "integrity": "sha512-WvJ193OHa0GHPEL+AycEJgxvBEwyfRkN1vhjca23OaPVMCaLCXTd5qAu82AjTcgP1UJmytkOKb63Ypde7raDIg==", + "optional": true, + "requires": { + "graceful-fs": "^4.1.2", + "inherits": "~2.0.0", + "mkdirp": ">=0.5 0", + "rimraf": "2" + } + } } }, "nopt": { @@ -11355,14 +11448,28 @@ } }, "tar": { - "version": "2.2.1", - "resolved": "https://registry.npmjs.org/tar/-/tar-2.2.1.tgz", - "integrity": "sha1-jk0qJWwOIYXGsYrWlK7JaLg8sdE=", + "version": "2.2.2", + "resolved": "https://registry.npmjs.org/tar/-/tar-2.2.2.tgz", + "integrity": "sha512-FCEhQ/4rE1zYv9rYXJw/msRqsnmlje5jHP6huWeBZ704jUTy02c5AZyWujpMR1ax6mVw9NyJMfuK2CMDWVIfgA==", "optional": true, "requires": { "block-stream": "*", - "fstream": "^1.0.2", + "fstream": "^1.0.12", "inherits": "2" + }, + "dependencies": { + "fstream": { + "version": "1.0.12", + "resolved": "https://registry.npmjs.org/fstream/-/fstream-1.0.12.tgz", + "integrity": "sha512-WvJ193OHa0GHPEL+AycEJgxvBEwyfRkN1vhjca23OaPVMCaLCXTd5qAu82AjTcgP1UJmytkOKb63Ypde7raDIg==", + "optional": true, + "requires": { + "graceful-fs": "^4.1.2", + "inherits": "~2.0.0", + "mkdirp": ">=0.5 0", + "rimraf": "2" + } + } } } } @@ -11913,6 +12020,7 @@ "version": "1.6.2", "resolved": "https://registry.npmjs.org/tar-stream/-/tar-stream-1.6.2.tgz", "integrity": "sha512-rzS0heiNf8Xn7/mpdSVVSMAWAoy9bfb1WOTYC78Z0UQKeKa/CWS8FOq0lKGNa8DWKAn9gxjCvMLYc5PGXYlK2A==", + "optional": true, "requires": { "bl": "^1.0.0", "buffer-alloc": "^1.2.0", @@ -12088,7 +12196,8 @@ "to-buffer": { "version": "1.1.1", "resolved": "https://registry.npmjs.org/to-buffer/-/to-buffer-1.1.1.tgz", - "integrity": "sha512-lx9B5iv7msuFYE3dytT+KE5tap+rNYw+K4jVkb9R/asAb+pbBSM17jtunHplhBe6RRJdZx3Pn2Jph24O32mOVg==" + "integrity": "sha512-lx9B5iv7msuFYE3dytT+KE5tap+rNYw+K4jVkb9R/asAb+pbBSM17jtunHplhBe6RRJdZx3Pn2Jph24O32mOVg==", + "optional": true }, "to-fast-properties": { "version": "2.0.0", @@ -12451,6 +12560,14 @@ "resolved": "https://registry.npmjs.org/use/-/use-3.1.1.tgz", "integrity": "sha512-cwESVXlO3url9YWlFW/TA9cshCEhtu7IKJ/p5soJ/gGpj7vbvFrAY/eIioQ6Dw23KjZhYgiIo8HOs1nQ2vr/oQ==" }, + "utf-8-validate": { + "version": "5.0.2", + "resolved": "https://registry.npmjs.org/utf-8-validate/-/utf-8-validate-5.0.2.tgz", + "integrity": "sha512-SwV++i2gTD5qh2XqaPzBnNX88N6HdyhQrNNRykvcS0QKvItV9u3vPEJr+X5Hhfb1JC0r0e1alL0iB09rY8+nmw==", + "requires": { + "node-gyp-build": "~3.7.0" + } + }, "util": { "version": "0.11.1", "resolved": "https://registry.npmjs.org/util/-/util-0.11.1.tgz", @@ -12490,11 +12607,6 @@ "resolved": "https://registry.npmjs.org/uuid/-/uuid-3.3.2.tgz", "integrity": "sha512-yXJmeNaw3DnnKAOKJE51sL/ZaYfWJRl1pK9dr19YFCu0ObS231AB1/LbqTKRAQ5kw8A90rA6fr4riOUpTZvQZA==" }, - "uws": { - "version": "10.148.0", - "resolved": "https://registry.npmjs.org/uws/-/uws-10.148.0.tgz", - "integrity": "sha512-aJpFgMMyxubiE/ll4nj9nWoQbv0HzZZDWXfwyu78nuFObX0Zoyv3TWjkqKPQ1vb2sMPZoz67tri7QNE6dybNmQ==" - }, "v8-compile-cache": { "version": "2.0.2", "resolved": "https://registry.npmjs.org/v8-compile-cache/-/v8-compile-cache-2.0.2.tgz", diff --git a/package.json b/package.json index 14b7daa20b..f5f2b47fa9 100644 --- a/package.json +++ b/package.json @@ -21,6 +21,7 @@ "dependencies": { "async": "^2.6.2", "bluebird": "^3.5.3", + "bufferutil": "^4.0.1", "busboy": "^0.3.0", "bytes": "^3.1.0", "cli-color": "^1.4.0", @@ -44,7 +45,7 @@ "moment": "^2.24.0", "mosca": "^2.8.3", "ms": "^2.1.1", - "murmurhash-native": "^3.3.0", + "murmurhash-native": "^3.4.1", "passport": "^0.4.0", "pm2": "^3.4.0", "rc": "1.2.8", @@ -53,13 +54,14 @@ "socket.io": "^2.2.0", "sorted-array": "^2.0.4", "triple-beam": "^1.3.0", + "utf-8-validate": "^5.0.2", "uuid": "^3.3.2", - "uws": "10.148.0", "validator": "^10.11.0", "winston": "^3.2.1", "winston-elasticsearch": "0.7.8", "winston-syslog": "^2.1.0", - "winston-transport": "^4.3.0" + "winston-transport": "^4.3.0", + "ws": "^6.2.1" }, "repository": { "type": "git", diff --git a/test/api/core/entrypoints/embedded/index.test.js b/test/api/core/entrypoints/embedded/index.test.js index 3c9c433ca8..0cc5c318cd 100644 --- a/test/api/core/entrypoints/embedded/index.test.js +++ b/test/api/core/entrypoints/embedded/index.test.js @@ -1,5 +1,7 @@ 'use strict'; +const root = '../../../../../'; + const { Request, @@ -11,7 +13,7 @@ const } } = require('kuzzle-common-objects'), path = require('path'), - KuzzleMock = require('../../../../mocks/kuzzle.mock'), + KuzzleMock = require(`${root}/test/mocks/kuzzle.mock`), mockrequire = require('mock-require'), rewire = require('rewire'), should = require('should'), @@ -83,10 +85,11 @@ describe('lib/core/api/core/entrypoints/embedded/index', () => { winstonTransportFile = sinon.spy(); winstonTransportSyslog = sinon.spy(); - mockrequire('../../../../../lib/api/core/entrypoints/embedded/protocols/http', HttpMock); - mockrequire('../../../../../lib/api/core/entrypoints/embedded/protocols/websocket', WebSocketMock); - mockrequire('../../../../../lib/api/core/entrypoints/embedded/protocols/socketio', SocketIOMock); - mockrequire('../../../../../lib/api/core/entrypoints/embedded/protocols/mqtt', MqttMock); + const embeddedPath = `${root}/lib/api/core/entrypoints/embedded`; + mockrequire(`${embeddedPath}/protocols/http`, HttpMock); + mockrequire(`${embeddedPath}/protocols/websocket`, WebSocketMock); + mockrequire(`${embeddedPath}/protocols/socketio`, SocketIOMock); + mockrequire(`${embeddedPath}/protocols/mqtt`, MqttMock); mockrequire('http', httpMock); mockrequire('winston', { @@ -101,15 +104,16 @@ describe('lib/core/api/core/entrypoints/embedded/index', () => { mockrequire('winston-syslog', winstonTransportSyslog); // Disables unnecessary console warnings - AbstractManifest = rewire('../../../../../lib/api/core/abstractManifest'); + AbstractManifest = rewire(`${root}/lib/api/core/abstractManifest`); AbstractManifest.__set__({ console: { warn: sinon.stub() }}); - mockrequire('../../../../../lib/api/core/abstractManifest', AbstractManifest); + mockrequire(`${root}/lib/api/core/abstractManifest`, AbstractManifest); - Manifest = rewire('../../../../../lib/api/core/entrypoints/embedded/manifest'); + Manifest = rewire(`${embeddedPath}/manifest`); Manifest.__set__({ console: { warn: sinon.stub() }}); - mockrequire('../../../../../lib/api/core/entrypoints/embedded/manifest', Manifest); + mockrequire(`${embeddedPath}/manifest`, Manifest); - // Bluebird.map forces a different context, preventing rewire to mock "require" + // Bluebird.map forces a different context, preventing rewire to mock + // "require" mockrequire('bluebird', { map: (arr, fn) => Promise.all(arr.map(e => { let result; @@ -127,7 +131,7 @@ describe('lib/core/api/core/entrypoints/embedded/index', () => { all: Bluebird.all }); - EntryPoint = mockrequire.reRequire('../../../../../lib/api/core/entrypoints/embedded'); + EntryPoint = mockrequire.reRequire(embeddedPath); entrypoint = new EntryPoint(kuzzle); @@ -140,10 +144,9 @@ describe('lib/core/api/core/entrypoints/embedded/index', () => { } }); - HttpMock.prototype.init = sinon.stub().resolves(true); - WebSocketMock.prototype.init = sinon.stub().resolves(true); - SocketIOMock.prototype.init = sinon.stub().resolves(true); - MqttMock.prototype.init = sinon.stub().resolves(true); + for (const Class of [HttpMock, WebSocketMock, SocketIOMock, MqttMock]) { + Class.prototype.init = sinon.stub().resolves(true); + } }); afterEach(() => { diff --git a/test/api/core/entrypoints/embedded/protocols/http.test.js b/test/api/core/entrypoints/embedded/protocols/http.test.js index d8e1394b58..1af5a3bef2 100644 --- a/test/api/core/entrypoints/embedded/protocols/http.test.js +++ b/test/api/core/entrypoints/embedded/protocols/http.test.js @@ -1,17 +1,22 @@ +const root = '../../../../../..'; + const mockrequire = require('mock-require'), - HttpFormDataStream = require('../../../../../../lib/api/core/entrypoints/embedded/service/httpFormDataStream'), - EntryPoint = require('../../../../../../lib/api/core/entrypoints/embedded'), - KuzzleMock = require('../../../../../mocks/kuzzle.mock'), - Request = require('kuzzle-common-objects').Request, + HttpFormDataStream = require(`${root}/lib/api/core/entrypoints/embedded/service/httpFormDataStream`), + EntryPoint = require(`${root}/lib/api/core/entrypoints/embedded`), + ClientConnection = require(`${root}/lib/api/core/entrypoints/embedded/clientConnection`), + KuzzleMock = require(`${root}/test/mocks/kuzzle.mock`), { - KuzzleError, - SizeLimitError, - BadRequestError - } = require('kuzzle-common-objects').errors, + Request, + errors: { + KuzzleError, + SizeLimitError, + BadRequestError + } + } = require('kuzzle-common-objects'), should = require('should'), sinon = require('sinon'), - Writable = require('stream').Writable; + { Writable } = require('stream'); describe('/lib/api/core/entrypoints/embedded/protocols/http', () => { const @@ -116,7 +121,7 @@ describe('/lib/api/core/entrypoints/embedded/protocols/http', () => { should(protocol.decoders.gzip).eql(zlibstub.createGunzip); should(protocol.decoders.deflate).eql(zlibstub.createInflate); should(protocol.decoders.identity).be.a.Function(); - should(protocol.decoders.identity('foobar')).eql(null); + should(protocol.decoders.identity('foobar')).eql(null); }); }); @@ -130,10 +135,10 @@ describe('/lib/api/core/entrypoints/embedded/protocols/http', () => { should(protocol.decoders.gzip).Function().and.not.eql(gunzipMock); should(protocol.decoders.deflate).Function().and.not.eql(inflateMock); should(protocol.decoders.identity).be.a.Function(); - + should(() => protocol.decoders.gzip()).throw(BadRequestError, {message}); should(() => protocol.decoders.deflate()).throw(BadRequestError, {message}); - should(protocol.decoders.identity('foobar')).eql(null); + should(protocol.decoders.identity('foobar')).eql(null); }); }); @@ -194,19 +199,19 @@ describe('/lib/api/core/entrypoints/embedded/protocols/http', () => { should(protocol._replyWithError) .be.calledOnce() - .be.calledWithMatch(/^[0-9a-w-]+$/, { - url: request.url, - method: request.method - }, - response, - {message: 'Maximum HTTP request size exceeded'}); + .be.calledWithMatch( + sinon.match.instanceOf(ClientConnection), + { url: request.url, method: request.method }, + response, + {message: 'Maximum HTTP request size exceeded'}); }); it('should handle json content', done => { request.headers['content-type'] = 'application/json charset=utf-8'; - protocol._sendRequest = (connectionId, resp, payload) => { + protocol._sendRequest = (connection, resp, payload) => { try { + should(connection).be.instanceOf(ClientConnection); should(payload.content).eql('chunk1chunk2chunk3'); done(); } @@ -248,12 +253,11 @@ describe('/lib/api/core/entrypoints/embedded/protocols/http', () => { should(request.pipe).not.called(); should(protocol._replyWithError) .be.calledOnce() - .be.calledWithMatch(/^[0-9a-w-]+$/, { - url: request.url, - method: request.method - }, - response, - {message: 'Too many encodings'}); + .be.calledWithMatch( + sinon.match.instanceOf(ClientConnection), + { url: request.url, method: request.method }, + response, + {message: 'Too many encodings'}); }); it('should reject if an unknown compression algorithm is provided', () => { @@ -263,12 +267,11 @@ describe('/lib/api/core/entrypoints/embedded/protocols/http', () => { should(protocol._replyWithError) .be.calledOnce() - .be.calledWithMatch(/^[0-9a-w-]+$/, { - url: request.url, - method: request.method - }, - response, - {message: 'Unsupported compression algorithm "foobar"'}); + .be.calledWithMatch( + sinon.match.instanceOf(ClientConnection), + { url: request.url, method: request.method }, + response, + {message: 'Unsupported compression algorithm "foobar"'}); }); it('should handle chain pipes properly to match multi-layered compression', () => { @@ -292,7 +295,8 @@ describe('/lib/api/core/entrypoints/embedded/protocols/http', () => { }); it('should handle valid x-www-form-urlencoded request', done => { - protocol._sendRequest = (connectionId, resp, payload) => { + protocol._sendRequest = (connection, resp, payload) => { + should(connection).instanceOf(ClientConnection); should(payload.content).be.empty(''); should(payload.json.foo).be.exactly('bar'); should(payload.json.baz).be.exactly('1234'); @@ -311,7 +315,8 @@ describe('/lib/api/core/entrypoints/embedded/protocols/http', () => { }); it('should handle valid multipart/form-data request', done => { - protocol._sendRequest = (connectionId, resp, payload) => { + protocol._sendRequest = (connection, resp, payload) => { + should(connection).instanceOf(ClientConnection); should(payload.content).be.empty(''); should(payload.json.foo).be.exactly('bar'); should(payload.json.baz.filename).be.exactly('test-multipart.txt'); @@ -337,7 +342,9 @@ describe('/lib/api/core/entrypoints/embedded/protocols/http', () => { onRequest(request, response); should(request.pipe).calledOnce().calledWith(gunzipMock); - should(gunzipMock.pipe).calledOnce().calledWith(sinon.match.instanceOf(Writable)); + should(gunzipMock.pipe) + .calledOnce() + .calledWith(sinon.match.instanceOf(Writable)); const writable = gunzipMock.pipe.firstCall.args[0]; sinon.spy(writable, 'removeAllListeners'); @@ -348,7 +355,9 @@ describe('/lib/api/core/entrypoints/embedded/protocols/http', () => { writable.on('error', error => { try { - should(error).instanceOf(SizeLimitError).match({message: 'Maximum HTTP request size exceeded'}); + should(error) + .instanceOf(SizeLimitError) + .match({message: 'Maximum HTTP request size exceeded'}); // called automatically when a pipe rejects a callback, but not // by our mock obviously @@ -357,25 +366,25 @@ describe('/lib/api/core/entrypoints/embedded/protocols/http', () => { should(request.unpipe).calledOnce(); should(request.removeAllListeners).calledOnce(); // should-sinon is outdated, so we cannot use it with calledAfter :-( - should(request.removeAllListeners.calledAfter(request.unpipe)).be.true(); + should(request.removeAllListeners.calledAfter(request.unpipe)) + .be.true(); should(request.resume).calledOnce(); - should(request.resume.calledAfter(request.removeAllListeners)).be.true(); + should(request.resume.calledAfter(request.removeAllListeners)) + .be.true(); should(writable.removeAllListeners).calledOnce(); should(writable.end).calledOnce(); - should(writable.end.calledAfter(writable.removeAllListeners)).be.true(); + should(writable.end.calledAfter(writable.removeAllListeners)) + .be.true(); // pipes should be closed manually should(gunzipMock.close).calledOnce(); should(protocol._replyWithError) - .be.calledWithMatch(/^[0-9a-z-]+$/, { - url: request.url, - method: request.method - }, - response, - { - message: 'Maximum HTTP request size exceeded' - }); + .be.calledWithMatch( + sinon.match.instanceOf(ClientConnection), + { url: request.url, method: request.method }, + response, + { message: 'Maximum HTTP request size exceeded' }); done(); } catch (e) { done(e); @@ -393,14 +402,11 @@ describe('/lib/api/core/entrypoints/embedded/protocols/http', () => { should(request.resume).be.calledOnce(); should(protocol._replyWithError) .be.calledOnce() - .be.calledWithMatch(/^[0-9a-w-]+$/, { - url: request.url, - method: request.method - }, - response, - { - message: 'Unsupported content type: foo/bar' - }); + .be.calledWithMatch( + sinon.match.instanceOf(ClientConnection), + { url: request.url, method: request.method }, + response, + { message: 'Unsupported content type: foo/bar' }); }); it('should reply with error if the binary file size sent exceeds the maxFormFileSize', () => { @@ -442,29 +448,26 @@ describe('/lib/api/core/entrypoints/embedded/protocols/http', () => { }); it('should call kuzzle http router and send the client the response back', () => { - protocol._sendRequest('connectionId', response, payload); + protocol._sendRequest({id: 'connectionId'}, response, payload); - should(kuzzle.router.http.route) - .be.calledWith(payload); + should(kuzzle.router.http.route).be.calledWith(payload); - const cb = kuzzle.router.http.route.firstCall.args[1]; - const result = new Request({}); - result.setResult('content', { - status: 444, - headers: { - 'x-foo': 'bar' - } - }); + const + cb = kuzzle.router.http.route.firstCall.args[1], + result = new Request({}); - cb(result); + result.setResult( + 'content', + { + status: 444, + headers: { 'x-foo': 'bar' } + }); - should(response.setHeader) - .be.calledOnce() - .be.calledWith('x-foo', 'bar'); + cb(result); should(response.writeHead) .be.calledOnce() - .be.calledWith(444); + .be.calledWithMatch(444, { 'x-foo': 'bar' }); const expected = { requestId: 'requestId', @@ -484,11 +487,13 @@ describe('/lib/api/core/entrypoints/embedded/protocols/http', () => { }); it('should output buffer raw result', () => { - protocol._sendRequest('connectionId', response, payload); + protocol._sendRequest({id: 'connectionId'}, response, payload); + + const + cb = kuzzle.router.http.route.firstCall.args[1], + content = Buffer.from('test'), + result = new Request({}); - const cb = kuzzle.router.http.route.firstCall.args[1]; - const content = Buffer.from('test'); - const result = new Request({}); result.setResult(content, { raw: true, status: 444 @@ -499,16 +504,17 @@ describe('/lib/api/core/entrypoints/embedded/protocols/http', () => { // Buffer.from(content) !== content const sent = response.end.firstCall.args[0]; - should(content.toString()) - .eql(sent.toString()); + should(content.toString()).eql(sent.toString()); }); it('should output a stringified buffer as a raw buffer result', () => { - protocol._sendRequest('connectionId', response, payload); - const cb = kuzzle.router.http.route.firstCall.args[1]; + protocol._sendRequest({id: 'connectionId'}, response, payload); + + const + cb = kuzzle.router.http.route.firstCall.args[1], + content = JSON.parse(JSON.stringify(Buffer.from('test'))), + result = new Request({}); - const content = JSON.parse(JSON.stringify(Buffer.from('test'))); - const result = new Request({}); result.setResult(content, { raw: true, status: 444 @@ -518,17 +524,17 @@ describe('/lib/api/core/entrypoints/embedded/protocols/http', () => { const sent = response.end.firstCall.args[0]; - should(sent) - .be.an.instanceof(Buffer); - should(sent.toString()) - .eql('test'); + should(sent).be.an.instanceof(Buffer); + should(sent.toString()).eql('test'); }); it('should output serialized JS objects marked as raw', () => { - protocol._sendRequest('connectionId', response, payload); - const cb = kuzzle.router.http.route.firstCall.args[1]; + protocol._sendRequest({id: 'connectionId'}, response, payload); + + const + cb = kuzzle.router.http.route.firstCall.args[1], + result = new Request({}); - const result = new Request({}); result.setResult([{foo: 'bar'}], { raw: true }); @@ -540,10 +546,12 @@ describe('/lib/api/core/entrypoints/embedded/protocols/http', () => { }); it('should output scalar content as-is if marked as raw', () => { - protocol._sendRequest('connectionId', response, payload); - const cb = kuzzle.router.http.route.firstCall.args[1]; + protocol._sendRequest({id: 'connectionId'}, response, payload); + + const + cb = kuzzle.router.http.route.firstCall.args[1], + result = new Request({}); - const result = new Request({}); result.setResult('content', { raw: true }); @@ -557,11 +565,12 @@ describe('/lib/api/core/entrypoints/embedded/protocols/http', () => { it('should compress the outgoing message with deflate if asked to', () => { payload.headers = {'accept-encoding': 'identity, foo, bar, identity, qux, deflate, baz'}; - protocol._sendRequest('connectionId', response, payload); + protocol._sendRequest({id: 'connectionId'}, response, payload); - const cb = kuzzle.router.http.route.firstCall.args[1]; + const + cb = kuzzle.router.http.route.firstCall.args[1], + result = new Request({}); - const result = new Request({}); result.setResult('content', {}); cb(result); @@ -572,12 +581,15 @@ describe('/lib/api/core/entrypoints/embedded/protocols/http', () => { }); it('should compress the outgoing message with gzip if asked to', () => { - payload.headers = {'accept-encoding': 'identity, foo, bar, identity, qux, gzip, baz'}; - protocol._sendRequest('connectionId', response, payload); + payload.headers = { + 'accept-encoding': 'identity, foo, bar, identity, qux, gzip, baz' + }; + protocol._sendRequest({id: 'connectionId'}, response, payload); - const cb = kuzzle.router.http.route.firstCall.args[1]; + const + cb = kuzzle.router.http.route.firstCall.args[1], + result = new Request({}); - const result = new Request({}); result.setResult('content', {}); cb(result); @@ -588,12 +600,15 @@ describe('/lib/api/core/entrypoints/embedded/protocols/http', () => { }); it('should not compress if no suitable algorithm is found within the accept-encoding list', () => { - payload.headers = {'accept-encoding': 'identity, foo, bar, identity, qux, baz'}; - protocol._sendRequest('connectionId', response, payload); + payload.headers = { + 'accept-encoding': 'identity, foo, bar, identity, qux, baz' + }; + protocol._sendRequest({id: 'connectionId'}, response, payload); - const cb = kuzzle.router.http.route.firstCall.args[1]; + const + cb = kuzzle.router.http.route.firstCall.args[1], + result = new Request({}); - const result = new Request({}); result.setResult('content', {}); cb(result); @@ -604,12 +619,15 @@ describe('/lib/api/core/entrypoints/embedded/protocols/http', () => { }); it('should prefer gzip over deflate if both algorithm are accepted', () => { - payload.headers = {'accept-encoding': 'deflate,deflate,DEFLATE,dEfLaTe, GZiP, DEFLATE,deflate'}; - protocol._sendRequest('connectionId', response, payload); + payload.headers = { + 'accept-encoding': 'deflate,deflate,DEFLATE,dEfLaTe, GZiP, DEFLATE,deflate' + }; + protocol._sendRequest({id: 'connectionId'}, response, payload); - const cb = kuzzle.router.http.route.firstCall.args[1]; + const + cb = kuzzle.router.http.route.firstCall.args[1], + result = new Request({}); - const result = new Request({}); result.setResult('content', {}); cb(result); @@ -623,23 +641,26 @@ describe('/lib/api/core/entrypoints/embedded/protocols/http', () => { payload.headers = {'accept-encoding': 'gzip'}; zlibstub.gzip.yields(new Error('foobar')); sinon.stub(protocol, '_replyWithError'); - protocol._sendRequest('connectionId', response, payload); + protocol._sendRequest({id: 'connectionId'}, response, payload); - const cb = kuzzle.router.http.route.firstCall.args[1]; + const + cb = kuzzle.router.http.route.firstCall.args[1], + result = new Request({}); - const result = new Request({}); result.setResult('content', {}); cb(result); should(protocol._replyWithError) .be.calledOnce() - .be.calledWithMatch('connectionId', + .be.calledWithMatch( + {id: 'connectionId'}, payload, response, {message: 'foobar'}); - should(protocol._replyWithError.firstCall.args[3]).be.instanceOf(BadRequestError); + should(protocol._replyWithError.firstCall.args[3]) + .be.instanceOf(BadRequestError); should(response.setHeader).calledWith('Content-Encoding', 'gzip'); should(zlibstub.deflate).not.called(); should(zlibstub.gzip).calledOnce(); @@ -656,20 +677,20 @@ describe('/lib/api/core/entrypoints/embedded/protocols/http', () => { it('should log the access and reply with error', () => { const - error = new KuzzleError('test'), + error = new KuzzleError('test', 123), connectionId = 'connectionId', payload = {requestId: 'foobar'}; - error.status = 123; const expected = (new Request(payload, {connectionId, error})).serialize(); // likely to be different, and we do not care about it delete expected.data.timestamp; - protocol._replyWithError(connectionId, payload, response, error); + protocol._replyWithError({id: connectionId}, payload, response, error); should(entrypoint.logAccess).be.calledOnce(); - should(entrypoint.logAccess.firstCall.args[0].serialize()).match(expected); + should(entrypoint.logAccess.firstCall.args[0].serialize()) + .match(expected); should(response.writeHead) .be.calledOnce() @@ -677,7 +698,7 @@ describe('/lib/api/core/entrypoints/embedded/protocols/http', () => { 'Content-Type': 'application/json', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Methods' : 'GET,POST,PUT,PATCH,DELETE,HEAD,OPTIONS', - 'Access-Control-Allow-Headers': 'Content-Type, Access-Control-Allow-Headers, Authorization, X-Requested-With, Content-Length, Content-Encoding, X-Kuzzle-Volatile' + 'Access-Control-Allow-Headers': 'Content-Type,Access-Control-Allow-Headers,Authorization,X-Requested-With,Content-Length,Content-Encoding,X-Kuzzle-Volatile' }); }); @@ -687,7 +708,7 @@ describe('/lib/api/core/entrypoints/embedded/protocols/http', () => { entrypoint.clients.connectionId = {}; - protocol._replyWithError('connectionId', {}, response, error); + protocol._replyWithError({id: 'connectionId'}, {}, response, error); should(entrypoint.clients).be.empty(); }); diff --git a/test/api/core/entrypoints/embedded/protocols/websocket.test.js b/test/api/core/entrypoints/embedded/protocols/websocket.test.js index 2a7749fe7a..07d34d67e0 100644 --- a/test/api/core/entrypoints/embedded/protocols/websocket.test.js +++ b/test/api/core/entrypoints/embedded/protocols/websocket.test.js @@ -1,14 +1,12 @@ +const root = '../../../../../..'; + const mockrequire = require('mock-require'), should = require('should'), sinon = require('sinon'), - EntryPoint = require('../../../../../../lib/api/core/entrypoints/embedded'), - KuzzleMock = require('../../../../../mocks/kuzzle.mock'), - { - errors: { - InternalError: KuzzleInternalError - } - } = require('kuzzle-common-objects'); + { IncomingMessage } = require('http'), + EntryPoint = require(`${root}/lib/api/core/entrypoints/embedded`), + KuzzleMock = require(`${root}/test/mocks/kuzzle.mock`); describe('/lib/api/core/entrypoints/embedded/protocols/websocket', () => { let @@ -16,6 +14,7 @@ describe('/lib/api/core/entrypoints/embedded/protocols/websocket', () => { entrypoint, WebSocketProtocol, WebSocketServer, + WebSocketSender, protocol; beforeEach(() => { @@ -25,17 +24,26 @@ describe('/lib/api/core/entrypoints/embedded/protocols/websocket', () => { WebSocketServer = sinon.spy(function () { this.on = sinon.spy(); }); - mockrequire('uws', { - Server: WebSocketServer + + WebSocketSender = { + frame: sinon.stub().returnsArg(0) + }; + + mockrequire('ws', { + Server: WebSocketServer, + Sender: WebSocketSender }); - WebSocketProtocol = mockrequire.reRequire('../../../../../../lib/api/core/entrypoints/embedded/protocols/websocket'); + WebSocketProtocol = mockrequire.reRequire( + `${root}/lib/api/core/entrypoints/embedded/protocols/websocket`); protocol = new WebSocketProtocol(); }); afterEach(() => { clearInterval(protocol.heartbeatInterval); + clearInterval(protocol.idleSweepInterval); + clearInterval(protocol.activityInterval); mockrequire.stopAll(); }); @@ -50,42 +58,6 @@ describe('/lib/api/core/entrypoints/embedded/protocols/websocket', () => { }); }); - it('should throw if the heartbeat value is not set to a valid value', () => { - for (const heartbeat of [null, 'foo', {}, [], 3.14159, true, -42, undefined]) { - const - ep = new EntryPoint(kuzzle), - wsp = new WebSocketProtocol(); - - ep.config.protocols.websocket.heartbeat = heartbeat; - - should(wsp.init(ep)).be.rejectedWith(KuzzleInternalError); - } - }); - - it('should start a heartbeat if asked to', () => { - const - clock = sinon.useFakeTimers(), - heartbeatSpy = sinon.stub(protocol, '_doHeartbeat'); - - entrypoint.config.protocols.websocket.heartbeat = 1000; - - return protocol.init(entrypoint) - .then(() => { - should(protocol.heartbeatInterval).not.be.null(); - should(heartbeatSpy).not.be.called(); - - clock.tick(1000); - - should(heartbeatSpy).be.calledOnce(); - - clock.tick(1000); - - should(heartbeatSpy).be.calledTwice(); - - clock.restore(); - }); - }); - it('should disable heartbeats if set to 0', () => { entrypoint.config.protocols.websocket.heartbeat = 0; @@ -150,18 +122,14 @@ describe('/lib/api/core/entrypoints/embedded/protocols/websocket', () => { socket = { on: onClientSpy, - close: sinon.stub(), - _socket: { - remoteAddress: 'ip' - } + close: sinon.stub() }; - request = { - headers: { - 'X-Foo': 'bar', - 'x-forwarded-for': '1.1.1.1,2.2.2.2' - } - }; + request = new IncomingMessage(); + Object.assign(request, { + socket: { remoteAddress: 'ip' }, + headers: { 'X-Foo': 'bar', 'x-forwarded-for': '1.1.1.1,2.2.2.2' } + }); return protocol.init(entrypoint) .then(() => { @@ -186,23 +154,27 @@ describe('/lib/api/core/entrypoints/embedded/protocols/websocket', () => { .be.calledOnce() .be.calledWithMatch({ protocol: protocol.name, - ips: ['1.1.1.1', '2.2.2.2', 'ip'], + ips: ['ip', '1.1.1.1', '2.2.2.2'], headers: request.headers }); - const connection = protocol.entryPoint.newConnection.firstCall.args[0]; - should(protocol.connectionPool[connection.id]) + const + connection = protocol.entryPoint.newConnection.firstCall.args[0], + pooledConnection = protocol.connectionPool.get(connection.id); + + should(pooledConnection) .match({ - alive: true, socket, - channels: [] + alive: true, + channels: new Set() }); - should(onClientSpy.callCount).eql(4); + should(onClientSpy.callCount).eql(5); should(onClientSpy) .be.calledWith('close') .be.calledWith('error') .be.calledWith('message') + .be.calledWith('ping') .be.calledWith('pong'); { @@ -232,11 +204,23 @@ describe('/lib/api/core/entrypoints/embedded/protocols/websocket', () => { } { - const onPongH = onClientSpy.getCall(3).args[1]; + const onPingH = onClientSpy.getCall(3).args[1]; - protocol.connectionPool[connection.id].alive = false; + pooledConnection.alive = false; + pooledConnection.lastActivity = -1; + onPingH(); + should(pooledConnection.alive).be.true(); + should(pooledConnection.lastActivity).eql(protocol.activityTimestamp); + } + + { + const onPongH = onClientSpy.getCall(4).args[1]; + + pooledConnection.alive = false; + pooledConnection.lastActivity = -1; onPongH(); - should(protocol.connectionPool[connection.id].alive).be.true(); + should(pooledConnection.alive).be.true(); + should(pooledConnection.lastActivity).eql(protocol.activityTimestamp); } }); @@ -246,24 +230,18 @@ describe('/lib/api/core/entrypoints/embedded/protocols/websocket', () => { beforeEach(() => { return protocol.init(entrypoint) .then(() => { - protocol.connectionPool = { - connectionId: { + protocol.connectionPool = new Map([ + ['connectionId', { id: 'connectionId', - channels: ['c1', 'c2', 'c3'] - } - }; - - protocol.channels = { - c1: { - count: 3 - }, - c2: { - count: 1 - }, - c3: { - count: 2 - } - }; + channels: new Set(['c1', 'c2', 'c3']) + }] + ]); + + protocol.channels = new Map([ + ['c1', new Set(['foo', 'connectionId', 'bar'])], + ['c2', new Set(['connectionId'])], + ['c3', new Set(['connectionId'])] + ]); }); }); @@ -271,15 +249,11 @@ describe('/lib/api/core/entrypoints/embedded/protocols/websocket', () => { entrypoint.clients.connectionId = {}; protocol.onClientDisconnection('connectionId'); - should(entrypoint.clients) - .be.empty(); + should(entrypoint.clients).be.empty(); - should(protocol.channels) - .match({ - c1: { - count: 2 - } - }); + should(protocol.channels).deepEqual(new Map([ + ['c1', new Set(['foo', 'bar'])] + ])); }); }); @@ -300,20 +274,16 @@ describe('/lib/api/core/entrypoints/embedded/protocols/websocket', () => { return protocol.init(entrypoint) .then(() => { - protocol.connectionPool = { - connectionId: { - id: 'connectionId' - } - }; + protocol.connectionPool = new Map([ + ['connectionId', { id: 'connectionId' } ] + ]); protocol._send = sinon.spy(); }); }); it('should do nothing if no data is given or if the connection is unknown', () => { protocol.onClientMessage({id: 'foo'}); - - should(entrypoint.execute) - .have.callCount(0); + should(entrypoint.execute).have.callCount(0); }); it('should call entrypoint execute', () => { @@ -321,11 +291,9 @@ describe('/lib/api/core/entrypoints/embedded/protocols/websocket', () => { protocol.onClientMessage(connection, data); - should(entrypoint.execute) - .be.calledOnce(); + should(entrypoint.execute).be.calledOnce(); - const request = entrypoint.execute.firstCall.args[0]; - const cb = entrypoint.execute.firstCall.args[1]; + const [request, cb] = entrypoint.execute.firstCall.args; should(request.serialize()) .match({ @@ -345,8 +313,7 @@ describe('/lib/api/core/entrypoints/embedded/protocols/websocket', () => { cb(result); - should(result.content.room) - .eql('test'); + should(result.content.room).eql('test'); should(protocol._send) .be.calledOnce() @@ -360,28 +327,32 @@ describe('/lib/api/core/entrypoints/embedded/protocols/websocket', () => { beforeEach(() => { return protocol.init(entrypoint) .then(() => { - protocol._send = sinon.spy(); + const socketMock = function () { + return { + _sender: { + sendFrame: sinon.stub() + } + }; + }; + + protocol.connectionPool = new Map([ + ['cx1', {alive: true, socket: new socketMock()}], + ['cx2', {alive: true, socket: new socketMock()}], + ['cx3', {alive: true, socket: new socketMock()}], + ['cx4', {alive: true, socket: new socketMock()}], + ]); + + protocol.channels = new Map([ + ['c1', new Set(['cx1', 'cx2', 'cx3'])], + ['c2', new Set(['cx1', 'cx3'])], + ['c3', new Set(['cx2', 'cx3', 'cx4'])] + ]); + }); }); it('should send the message to all clients registered to the message channels', () => { - protocol.channels = { - c1: { - cx1: true, - cx2: true, - cx3: true - }, - c2: { - cx1: true, - cx3: true - }, - c3: { - cx2: true, - cx3: true, - cx4: true - } - }; - + let frame; const data = { channels: ['c1', 'c3', 'c4'], payload: {foo: 'bar'} @@ -389,13 +360,23 @@ describe('/lib/api/core/entrypoints/embedded/protocols/websocket', () => { protocol.broadcast(data); - should(protocol._send) - .be.calledWith('cx1', JSON.stringify(Object.assign(data.payload, {room: 'c1'}))) - .be.calledWith('cx2', JSON.stringify(Object.assign(data.payload, {room: 'c1'}))) - .be.calledWith('cx3', JSON.stringify(Object.assign(data.payload, {room: 'c1'}))) - .be.calledWith('cx2', JSON.stringify(Object.assign(data.payload, {room: 'c3'}))) - .be.calledWith('cx3', JSON.stringify(Object.assign(data.payload, {room: 'c3'}))) - .be.calledWith('cx4', JSON.stringify(Object.assign(data.payload, {room: 'c3'}))); + // channel: c1 + data.payload.room = 'c1'; + frame = Buffer.from(JSON.stringify(data.payload)); + + for (const connId of ['cx1', 'cx2', 'cx3']) { + should(protocol.connectionPool.get(connId).socket._sender.sendFrame) + .calledWith(frame); + } + + // channel: c3 + data.payload.room = 'c3'; + frame = Buffer.from(JSON.stringify(data.payload)); + + for (const connId of ['cx2', 'cx3', 'cx4']) { + should(protocol.connectionPool.get(connId).socket._sender.sendFrame) + .calledWith(frame); + } }); }); @@ -412,9 +393,15 @@ describe('/lib/api/core/entrypoints/embedded/protocols/websocket', () => { protocol.notify(data); should(protocol._send) - .be.calledWith('connectionId', JSON.stringify(Object.assign({}, data.payload, {room: 'c1'}))) - .be.calledWith('connectionId', JSON.stringify(Object.assign({}, data.payload, {room: 'c3'}))) - .be.calledWith('connectionId', JSON.stringify(Object.assign({}, data.payload, {room: 'c4'}))); + .be.calledWith( + 'connectionId', + JSON.stringify(Object.assign({}, data.payload, {room: 'c1'}))) + .be.calledWith( + 'connectionId', + JSON.stringify(Object.assign({}, data.payload, {room: 'c3'}))) + .be.calledWith( + 'connectionId', + JSON.stringify(Object.assign({}, data.payload, {room: 'c4'}))); }); }); @@ -422,131 +409,316 @@ describe('/lib/api/core/entrypoints/embedded/protocols/websocket', () => { it('should do nothing if the connection is unknonwn', () => { protocol.joinChannel('channel', 'foo'); - should(protocol.channels) - .be.empty(); + should(protocol.channels).be.empty(); }); - it('should link the connection to the channel', () => { + it('should link the connection to a new channel', () => { const connection = { id: 'connectionId', alive: true, - channels: [] + channels: new Set() }; - protocol.connectionPool[connection.id] = connection; + protocol.connectionPool.set(connection.id, connection); protocol.joinChannel('channel', connection.id); should(protocol.channels) - .match({ - channel: { - connectionId: true, - count: 1 - } - }); + .deepEqual(new Map([['channel', new Set(['connectionId'])]])); + should(connection.channels).deepEqual(new Set(['channel'])); + }); - should(connection.channels) - .eql(['channel']); + it('should add a connection to an existing channel', () => { + const + connection1 = { + id: 'connectionId', + alive: true, + channels: new Set() + }, + connection2 = { + id: 'connectionId2', + alive: true, + channels: new Set(['foo']) + }; + + protocol.connectionPool.set(connection1.id, connection1); + protocol.connectionPool.set(connection2.id, connection2); + + protocol.joinChannel('channel', connection1.id); + protocol.joinChannel('channel', connection2.id); + + should(protocol.channels) + .deepEqual(new Map([ + ['channel', new Set(['connectionId', 'connectionId2'])] + ])); + + should(connection1.channels).deepEqual(new Set(['channel'])); + should(connection2.channels).deepEqual(new Set(['foo', 'channel'])); }); }); describe('#leaveChannel', () => { beforeEach(() => { - protocol.connectionPool = { - connectionId: { - alive: true, - channels: ['foo', 'channel'] - } - }; - protocol.channels = { - channel: { - count: 5, - connectionId: true - } - }; + protocol.connectionPool = new Map([ + ['connectionId', {alive: true, channels: new Set(['foo', 'channel'])}] + ]); + + protocol.channels = new Map([ + ['channel', new Set(['foo', 'bar', 'baz', 'connectionId', 'qux'])] + ]); }); it('should do nothing if the connection is unknonw', () => { - protocol.leaveChannel('channel', 'foo'); + protocol.leaveChannel('channel', 'unknown'); - should(protocol.channels.channel.count) - .eql(5); + should(protocol.channels.get('channel')).have.size(5); }); it('should unsubscribe the client', () => { protocol.leaveChannel('channel', 'connectionId'); - should(protocol.channels.channel.count) - .eql(4); - should(protocol.connectionPool.connectionId.channels) - .eql(['foo']); + should(protocol.channels.get('channel')) + .have.size(4) + .and.deepEqual(new Set(['foo', 'bar', 'baz', 'qux'])); + should(protocol.connectionPool.get('connectionId').channels) + .deepEqual(new Set(['foo'])); }); + it('should remove an unused channel entry', () => { + protocol.channels = new Map([ + ['channel', new Set(['connectionId'])] + ]); + + protocol.leaveChannel('channel', 'connectionId'); + + should(protocol.channels.has('channel')).be.false(); + should(protocol.connectionPool.get('connectionId').channels) + .deepEqual(new Set(['foo'])); + }); }); describe('#disconnect', () => { it('should close the connection', () => { - protocol.connectionPool = { - connectionId: { - socket: { - close: sinon.spy() - } - } - }; + protocol.connectionPool = new Map([ + ['connectionId', { socket: { close: sinon.spy() } } ] + ]); protocol.disconnect('connectionId', 'test'); - should(protocol.connectionPool.connectionId.socket.close) + should(protocol.connectionPool.get('connectionId').socket.close) .be.calledWith(1011, 'test'); }); }); describe('#_send', () => { it('should send the message', () => { - protocol.connectionPool = { - connectionId: { - alive: true, - socket: { - OPEN: 'open', - readyState: 'open', - send: sinon.spy() + protocol.connectionPool = new Map([ + [ + 'connectionId', + { + alive: true, + socket: { + OPEN: 'open', + readyState: 'open', + send: sinon.spy() + } } - } - }; + ] + ]); protocol._send('connectionId', 'test'); - should(protocol.connectionPool.connectionId.socket.send) + should(protocol.connectionPool.get('connectionId').socket.send) .be.calledWith('test'); }); }); - describe('#_doHeartbeat', () => { + describe('#Heartbeat', () => { + it('should throw if the heartbeat value is not set to a valid value', () => { + const + values = [null, 'foo', {}, [], 3.14159, true, -42, undefined], + promises = []; + + for (const heartbeat of values) { + const + ep = new EntryPoint(kuzzle), + wsp = new WebSocketProtocol(); + + ep.config.protocols.websocket.heartbeat = heartbeat; + promises.push( + should(wsp.init(ep)).rejectedWith( + {message: /WebSocket: invalid heartbeat value /}) + .then(() => { + clearInterval(wsp.heartbeatInterval); + clearInterval(wsp.idleSweepInterval); + clearInterval(wsp.activityInterval); + })); + } + + return Promise.all(promises); + }); + + it('should start a heartbeat if asked to', () => { + const + clock = sinon.useFakeTimers(), + heartbeatSpy = sinon.stub(protocol, '_doHeartbeat'); + + entrypoint.config.protocols.websocket.heartbeat = 1000; + + return protocol.init(entrypoint) + .then(() => { + should(protocol.heartbeatInterval).not.be.null(); + should(heartbeatSpy).not.be.called(); + + clock.tick(1000); + + should(heartbeatSpy).be.calledOnce(); + + clock.tick(1000); + + should(heartbeatSpy).be.calledTwice(); + + clock.restore(); + }); + }); + it('should terminate dead sockets, and mark others as dead', () => { - protocol.connectionPool = { - ahAhAhAhStayinAliveStayinAlive: { - alive: true, - socket: { terminate: sinon.spy(), ping: sinon.spy() } - }, - dead: { - alive: false, - socket: { terminate: sinon.spy(), ping: sinon.spy() } - }, - ahAhAhAhStayinAliiiiiiiiive: { - alive: true, - socket: { terminate: sinon.spy(), ping: sinon.spy() } - } + const Connection = function (alive, lastActivity) { + return { + alive, + lastActivity, + socket: { + terminate: sinon.stub(), + ping: sinon.stub() + } + }; }; + protocol.connectionPool = new Map([ + ['ahAhAhAhStayinAliveStayinAlive', new Connection(true, 0)], + ['dead', new Connection(false, 0)], + ['ahAhAhAhStayinAliiiiiiiiive', new Connection(true, 0)], + ['active', new Connection(true, Date.now())] + ]); + + protocol.config.heartbeat = 1000; protocol._doHeartbeat(); - for (const id of ['ahAhAhAhStayinAliveStayinAlive', 'ahAhAhAhStayinAliiiiiiiiive']) { - should(protocol.connectionPool[id].alive).be.false(); - should(protocol.connectionPool[id].socket.terminate).not.be.called(); - should(protocol.connectionPool[id].socket.ping).be.calledOnce(); + // inactive sockets are pinged + for (const id of [ + 'ahAhAhAhStayinAliveStayinAlive', + 'ahAhAhAhStayinAliiiiiiiiive' + ]) { + const connection = protocol.connectionPool.get(id); + + should(connection.alive).be.false(); + should(connection.socket.terminate).not.be.called(); + should(connection.socket.ping).be.calledOnce(); + } + + // dead sockets are terminated + const deadConnection = protocol.connectionPool.get('dead'); + + should(deadConnection.alive).be.false(); + should(deadConnection.socket.terminate).be.calledOnce(); + should(deadConnection.socket.ping).not.be.called(); + + // active sockets are unaffected + const activeConnection = protocol.connectionPool.get('active'); + + should(activeConnection.alive).be.true(); + should(activeConnection.socket.terminate).not.be.called(); + should(activeConnection.socket.ping).not.be.called(); + }); + }); + + describe('#IdleTimeout', () => { + it('should throw if the idleTimeout value is not set to a valid value', () => { + const + values = [null, 'foo', {}, [], 3.14159, true, -42, undefined], + promises = []; + + for (const idleTimeout of values) { + const + ep = new EntryPoint(kuzzle), + wsp = new WebSocketProtocol(); + + ep.config.protocols.websocket.idleTimeout = idleTimeout; + + promises.push( + should(wsp.init(ep)).rejectedWith( + {message: /WebSocket: invalid idleTimeout value /}) + .then(() => { + clearInterval(wsp.heartbeatInterval); + clearInterval(wsp.idleSweepInterval); + clearInterval(wsp.activityInterval); + })); + } + + return Promise.all(promises); + }); + + it('should start an idleTimeout sweep if asked to', () => { + const + clock = sinon.useFakeTimers(), + idleTimeoutSpy = sinon.stub(protocol, '_sweepIdleSockets'); + + entrypoint.config.protocols.websocket.idleTimeout = 1000; + + return protocol.init(entrypoint) + .then(() => { + should(protocol.idleTimeoutInterval).not.be.null(); + should(idleTimeoutSpy).not.be.called(); + + clock.tick(protocol.config.idleTimeout); + + should(idleTimeoutSpy).be.calledOnce(); + + clock.tick(protocol.config.idleTimeout); + + should(idleTimeoutSpy).be.calledTwice(); + + clock.restore(); + }) + .catch(e => { + clock.restore(); + throw e; + }); + }); + + it('should terminate inactive sockets', () => { + const + now = Date.now(), + Connection = function (lastActivity) { + return { + lastActivity, + socket: { + terminate: sinon.stub() + } + }; + }; + + protocol.config.idleTimeout = 1000; + + protocol.connectionPool = new Map([ + ['ahAhAhAhStayinAliveStayinAlive', new Connection(now)], + ['dead', new Connection(now - protocol.config.idleTimeout - 1)], + [ + 'ahAhAhAhStayinAliiiiiiiiive', + new Connection(now - protocol.config.idleTimeout + 100) + ] + ]); + + protocol._sweepIdleSockets(); + + // active sockets are unaffected + for (const id of [ + 'ahAhAhAhStayinAliveStayinAlive', + 'ahAhAhAhStayinAliiiiiiiiive' + ]) { + should(protocol.connectionPool.get(id).socket.terminate).not.called(); } - should(protocol.connectionPool.dead.alive).be.false(); - should(protocol.connectionPool.dead.socket.terminate).be.calledOnce(); - should(protocol.connectionPool.dead.socket.ping).not.be.called(); + // inactive sockets are terminated + should(protocol.connectionPool.get('dead').socket.terminate).calledOnce(); }); }); }); diff --git a/test/services/implementations/broker.test.js b/test/services/implementations/broker.test.js index 6d18098db4..6fd3c8a93c 100644 --- a/test/services/implementations/broker.test.js +++ b/test/services/implementations/broker.test.js @@ -7,7 +7,7 @@ const sinon = require('sinon'), sandbox = sinon.createSandbox(), should = require('should'), - WS = require('uws'), + WS = require('ws'), CircularList = require('easy-circular-list'), KuzzleMock = require('../../mocks/kuzzle.mock'), WSClientMock = require('../../mocks/services/ws.mock'), @@ -129,7 +129,7 @@ describe('Test: Internal broker', () => { it('should construct a WS client', () => { const WSStub = sandbox.stub(); - mockrequire('uws', WSStub); + mockrequire('ws', WSStub); mockrequire.reRequire('../../../lib/services/broker/wsBrokerClient'); const WSClient = rewire('../../../lib/services/broker/wsBrokerClient'); @@ -731,7 +731,7 @@ describe('Test: Internal broker', () => { }) }); - mockrequire('uws', { + mockrequire('ws', { Server: sinon.spy(WSServerMock) });