Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade the WebSocket libraries #1301

Merged
merged 37 commits into from
Jun 3, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
0fa800a
Dont load disabled protocols
Aschen May 9, 2019
ae349dd
Dont load disabled protocols
Aschen May 10, 2019
f3fb1f6
Merge branch '1-dev' into fix-disabled-protocol-init
Aschen May 10, 2019
9ae4365
[network] HTTP and WebSocket layers refactoring
scottinet May 10, 2019
f65d216
Merge remote-tracking branch 'origin/fix-disabled-protocol-init' into…
scottinet May 10, 2019
9534801
[config] re-disable access logs
scottinet May 10, 2019
5d2e4ed
[wip] precalculating websocket frame
scottinet May 10, 2019
b2fad69
[ws] precompute websocket frames for efficient broadcasting
scottinet May 14, 2019
6f43019
[config] default to silent mode
scottinet May 14, 2019
6e67380
[websocket] clean up
scottinet May 14, 2019
f800dca
[ws] optimize last activity tracker performances
scottinet May 14, 2019
4677e1d
[ws] adapt tests to ws
scottinet May 14, 2019
5817e18
fix tests
Aschen May 14, 2019
61e3425
Merge branch '1-dev' into fix-disabled-protocol-init
scottinet May 14, 2019
6ccfdd5
Merge remote-tracking branch 'origin/fix-disabled-protocol-init' into…
scottinet May 14, 2019
9fc69c2
Merge branch 'fix-disabled-protocol-init' of github.com:kuzzleio/kuzz…
Aschen May 14, 2019
b33a569
Merge remote-tracking branch 'origin/fix-disabled-protocol-init' into…
scottinet May 14, 2019
af5f411
[websocket] emit a PING only if inactive
scottinet May 15, 2019
782fc5f
Merge branch '1-dev' into fix-disabled-protocol-init
benoitvidis May 15, 2019
24e058c
[tests] update tests + bugfixes
scottinet May 15, 2019
be00d61
[package.json] dependencies update
scottinet May 15, 2019
2c3a48a
[tests] moar tests and bugfixes
scottinet May 15, 2019
0c5cd77
[ws] last minute cleanup
scottinet May 16, 2019
18fa313
[bug] fix inverted assert logic
scottinet May 16, 2019
7ca1556
Merge remote-tracking branch 'origin/fix-disabled-protocol-init' into…
scottinet May 16, 2019
e3a1571
[ws] in hindsight, idle sweeps should be made more often
scottinet May 16, 2019
2bd3bcf
Update lib/api/core/entrypoints/embedded/protocols/http.js
scottinet May 16, 2019
1efa47c
[protocols] DRYify configuration
scottinet May 17, 2019
a782f61
Merge branch '1-dev' into migrate-to-ws
scottinet May 20, 2019
70066aa
[compat] downgrade ws to v6.2.1 to make it work with Node.js 6
scottinet May 20, 2019
087d134
Merge branch 'migrate-to-ws' of github.com:kuzzleio/kuzzle into migra…
scottinet May 20, 2019
f266ff6
Merge remote-tracking branch 'origin/1-dev' into migrate-to-ws
scottinet May 20, 2019
e3eed22
[bug] fix doubled assert declaration (bad merge)
scottinet May 20, 2019
fadbf09
Merge branch '1-dev' into migrate-to-ws
scottinet May 23, 2019
4d1af27
Merge branch '1-dev' into migrate-to-ws
scottinet May 28, 2019
eee6882
Merge remote-tracking branch 'origin/1-dev' into migrate-to-ws
scottinet May 28, 2019
4ec0b96
[merge] fix bad merge state
scottinet May 28, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions lib/api/core/entrypoints/embedded/protocols/http.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,14 @@ class HttpProtocol extends Protocol {
init(entryPoint) {
return super.init('http', entryPoint)
.then(() => {
const config = entryPoint.config.protocols.http;

if (config.enabled === false) {
if (this.config.enabled === false) {
return false;
}

debug('initializing http Server with config: %a', entryPoint.config);
debug('initializing http Server with config: %a', this.config);

this.maxFormFileSize = bytes.parse(config.maxFormFileSize);
this.maxEncodingLayers = config.maxEncodingLayers;
this.maxFormFileSize = bytes.parse(this.config.maxFormFileSize);
this.maxEncodingLayers = this.config.maxEncodingLayers;
this.server = entryPoint.httpServer;

for (const value of ['maxFormFileSize', 'maxEncodingLayers']) {
Expand Down Expand Up @@ -400,7 +398,7 @@ class HttpProtocol extends Protocol {
*/
_setDecoders() {
const
allowCompression = this.entryPoint.config.protocols.http.allowCompression,
allowCompression = this.config.allowCompression,
disabledfn = () => {
throw new BadRequestError('Compression support is disabled');
};
Expand Down
22 changes: 11 additions & 11 deletions lib/api/core/entrypoints/embedded/protocols/mqtt.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,15 @@ class MqttProtocol extends Protocol {
init (entryPoint) {
return super.init('mqtt', entryPoint)
.then(() => {
if (entryPoint.config.protocols.mqtt.enabled === false) {
if (this.config.enabled === false) {
return false;
}

debug('initializing MQTT Server with config: %a', entryPoint.config.protocols.mqtt);
debug('initializing MQTT Server with config: %a', this.config);

this.entryPoint = entryPoint;
this.kuzzle = this.entryPoint.kuzzle;

this.config = Object.assign({
allowPubSub: false,
developmentMode: false,
Expand All @@ -69,10 +69,10 @@ class MqttProtocol extends Protocol {
server: {
port: 1883
}
}, entryPoint.config.protocols.mqtt || {});
}, this.config);

this.server = new mosca.Server(this.config.server);

/*
To avoid ill-use of our topics, we need to configure authorizations:
* "requestTopic": should be publish-only, so no one but this plugin can listen to this topic
Expand All @@ -89,25 +89,25 @@ class MqttProtocol extends Protocol {
callback(null, topic === this.config.requestTopic);
}
};

this.server.authorizeSubscribe = (client, topic, callback) => {
const isAllowed = topic !== this.config.requestTopic
&& topic.indexOf('#') === -1
&& topic.indexOf('+') === -1;

callback(null, isAllowed);
};

return new Bluebird(resolve => {
this.server.on('ready', () => {
this.server.on('clientConnected', client => this.onConnection(client));
this.server.on('clientDisconnecting', client => this.onDisconnection(client));
this.server.on('clientDisconnected', client => this.onDisconnection(client));
this.server.on('published', (packet, client) => this.onMessage(packet, client));

resolve(true);
});
});
});
});
}

Expand Down
5 changes: 5 additions & 0 deletions lib/api/core/entrypoints/embedded/protocols/protocol.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class Protocol {
this.maxRequestSize = null;
this.entryPoint = null;
this.name = null;
this.config = {};
}

/**
Expand All @@ -48,6 +49,10 @@ class Protocol {
typeof this.name === 'string' && this.name.length > 0,
'Invalid "name" parameter value: expected a non empty string value');

if (entryPoint.config.protocols && entryPoint.config.protocols[name]) {
this.config = entryPoint.config.protocols[name];
}

assert(
Number.isInteger(this.maxRequestSize),
'Invalid "maxRequestSize" parameter value: expected a numeric value');
Expand Down
18 changes: 9 additions & 9 deletions lib/api/core/entrypoints/embedded/protocols/socketio.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,28 +47,28 @@ class SocketIoProtocol extends Protocol {
init(entryPoint) {
return super.init('socketio', entryPoint)
.then(() => {
if (entryPoint.config.protocols.socketio.enabled === false) {
if (this.config.enabled === false) {
return false;
}
debug('initializing socketIo Server with config: %a', entryPoint.config.socketio);

debug('initializing socketIo Server with config: %a', this.config);

this.kuzzle = this.entryPoint.kuzzle;

// SocketIo server listens by default to "/socket.io" path
// (see (http://socket.io/docs/server-api/#server#path(v:string):server))
this.io = require('socket.io')(entryPoint.httpServer, {
// maxHttpBufferSize is passed to the ws library as the maxPayload option
// see https://github.com/socketio/socket.io/issues/2326#issuecomment-292146468
maxHttpBufferSize: this.maxRequestSize
});
this.io.set('origins', entryPoint.config.protocols.socketio.origins);

this.io.set('origins', this.config.origins);

this.io.on('connection', socket => this.onConnection(socket));
this.io.on('error', error => this.onServerError(error));

return true;
return true;
});
}

Expand Down
38 changes: 16 additions & 22 deletions lib/api/core/entrypoints/embedded/protocols/websocket.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@ class WebSocketProtocol extends Protocol {
this.kuzzle = null;
this.heartbeatInterval = null;
this.idleSweepInterval = null;
this.heartbeatTimeout = 0;
this.idleTimeout = 0;

// Prevents thousands of "Date.now()" per second: it's far more efficient
// to have a timestamp property refreshed every second or so, since we
Expand All @@ -97,16 +95,14 @@ class WebSocketProtocol extends Protocol {
init (entryPoint) {
return super.init('websocket', entryPoint)
.then(() => {
const config = entryPoint.config.protocols.websocket;

if (config.enabled === false) {
if (this.config.enabled === false) {
return false;
}

debug('initializing WebSocket Server with config: %a', config);
debug('initializing WebSocket Server with config: %a', this.config);

this._startHeartbeat(config.heartbeat);
this._startIdleSweeps(config.idleTimeout);
this._startHeartbeat();
this._startIdleSweeps();

this.kuzzle = this.entryPoint.kuzzle;

Expand Down Expand Up @@ -387,36 +383,34 @@ class WebSocketProtocol extends Protocol {
}
}

_startHeartbeat(heartbeat) {
_startHeartbeat() {
assert(
Number.isInteger(heartbeat) && heartbeat >= 0,
`WebSocket: invalid heartbeat value ${heartbeat}`);
Number.isInteger(this.config.heartbeat) && this.config.heartbeat >= 0,
`WebSocket: invalid heartbeat value ${this.config.heartbeat}`);

if (heartbeat > 0) {
this.heartbeatTimeout = heartbeat;
if (this.config.heartbeat > 0) {
this.heartbeatInterval = setInterval(
this._doHeartbeat.bind(this),
heartbeat);
this.config.heartbeat);
}
}

_startIdleSweeps(idleTimeout) {
_startIdleSweeps() {
assert(
Number.isInteger(idleTimeout) && idleTimeout >= 0,
`WebSocket: invalid idleTimeout value ${idleTimeout}`);
Number.isInteger(this.config.idleTimeout) && this.config.idleTimeout >= 0,
`WebSocket: invalid idleTimeout value ${this.config.idleTimeout}`);

if (idleTimeout > 0) {
this.idleTimeout = idleTimeout;
if (this.config.idleTimeout > 0) {
this.idleSweepInterval = setInterval(
this._sweepIdleSockets.bind(this),
this.idleTimeout);
this.config.idleTimeout);
}
}

_doHeartbeat() {
debug('Heartbeat');
const
lastActivityThreshold = this.activityTimestamp - this.heartbeatTimeout;
lastActivityThreshold = this.activityTimestamp - this.config.heartbeat;

for (const connection of this.connectionPool.values()) {
if (connection.alive === false) {
Expand All @@ -439,7 +433,7 @@ class WebSocketProtocol extends Protocol {
const now = Date.now();
Aschen marked this conversation as resolved.
Show resolved Hide resolved

for (const connection of this.connectionPool.values()) {
if ((now - connection.lastActivity) > this.idleTimeout) {
if ((now - connection.lastActivity) > this.config.idleTimeout) {
// correctly triggers the 'close' event handler on that socket
connection.socket.terminate();
}
Expand Down
41 changes: 22 additions & 19 deletions test/api/core/entrypoints/embedded/index.test.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
'use strict';

const root = '../../../../../';

const
{
Request,
Expand All @@ -11,18 +13,18 @@ const
}
} = require('kuzzle-common-objects'),
path = require('path'),
KuzzleMock = require('../../../../mocks/kuzzle.mock'),
KuzzleMock = require(`${root}/test/mocks/kuzzle.mock`),
mockrequire = require('mock-require'),
rewire = require('rewire'),
should = require('should'),
sinon = require('sinon'),
Bluebird = require('bluebird');

class FakeProtocol {
constructor (name) {
this.name = name;
class FakeProtocol {
constructor (name) {
this.name = name;
}
}
}

class FakeHttpProtocol extends FakeProtocol {
constructor () { super('http'); }
Expand Down Expand Up @@ -83,10 +85,11 @@ describe('lib/core/api/core/entrypoints/embedded/index', () => {
winstonTransportFile = sinon.spy();
winstonTransportSyslog = sinon.spy();

mockrequire('../../../../../lib/api/core/entrypoints/embedded/protocols/http', HttpMock);
mockrequire('../../../../../lib/api/core/entrypoints/embedded/protocols/websocket', WebSocketMock);
mockrequire('../../../../../lib/api/core/entrypoints/embedded/protocols/socketio', SocketIOMock);
mockrequire('../../../../../lib/api/core/entrypoints/embedded/protocols/mqtt', MqttMock);
const embeddedPath = `${root}/lib/api/core/entrypoints/embedded`;
mockrequire(`${embeddedPath}/protocols/http`, HttpMock);
mockrequire(`${embeddedPath}/protocols/websocket`, WebSocketMock);
mockrequire(`${embeddedPath}/protocols/socketio`, SocketIOMock);
mockrequire(`${embeddedPath}/protocols/mqtt`, MqttMock);

mockrequire('http', httpMock);
mockrequire('winston', {
Expand All @@ -101,15 +104,16 @@ describe('lib/core/api/core/entrypoints/embedded/index', () => {
mockrequire('winston-syslog', winstonTransportSyslog);

// Disables unnecessary console warnings
AbstractManifest = rewire('../../../../../lib/api/core/abstractManifest');
AbstractManifest = rewire(`${root}/lib/api/core/abstractManifest`);
AbstractManifest.__set__({ console: { warn: sinon.stub() }});
mockrequire('../../../../../lib/api/core/abstractManifest', AbstractManifest);
mockrequire(`${root}/lib/api/core/abstractManifest`, AbstractManifest);

Manifest = rewire('../../../../../lib/api/core/entrypoints/embedded/manifest');
Manifest = rewire(`${embeddedPath}/manifest`);
Manifest.__set__({ console: { warn: sinon.stub() }});
mockrequire('../../../../../lib/api/core/entrypoints/embedded/manifest', Manifest);
mockrequire(`${embeddedPath}/manifest`, Manifest);

// Bluebird.map forces a different context, preventing rewire to mock "require"
// Bluebird.map forces a different context, preventing rewire to mock
// "require"
mockrequire('bluebird', {
map: (arr, fn) => Promise.all(arr.map(e => {
let result;
Expand All @@ -127,7 +131,7 @@ describe('lib/core/api/core/entrypoints/embedded/index', () => {
all: Bluebird.all
});

EntryPoint = mockrequire.reRequire('../../../../../lib/api/core/entrypoints/embedded');
EntryPoint = mockrequire.reRequire(embeddedPath);

entrypoint = new EntryPoint(kuzzle);

Expand All @@ -140,10 +144,9 @@ describe('lib/core/api/core/entrypoints/embedded/index', () => {
}
});

HttpMock.prototype.init = sinon.stub().resolves(true);
WebSocketMock.prototype.init = sinon.stub().resolves(true);
SocketIOMock.prototype.init = sinon.stub().resolves(true);
MqttMock.prototype.init = sinon.stub().resolves(true);
for (const Class of [HttpMock, WebSocketMock, SocketIOMock, MqttMock]) {
Class.prototype.init = sinon.stub().resolves(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}
});

afterEach(() => {
Expand Down
14 changes: 9 additions & 5 deletions test/api/core/entrypoints/embedded/protocols/websocket.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,7 @@ describe('/lib/api/core/entrypoints/embedded/protocols/websocket', () => {
['active', new Connection(true, Date.now())]
]);

protocol.config.heartbeat = 1000;
protocol._doHeartbeat();

// inactive sockets are pinged
Expand Down Expand Up @@ -667,11 +668,11 @@ describe('/lib/api/core/entrypoints/embedded/protocols/websocket', () => {
should(protocol.idleTimeoutInterval).not.be.null();
should(idleTimeoutSpy).not.be.called();

clock.tick(protocol.idleTimeout);
clock.tick(protocol.config.idleTimeout);

should(idleTimeoutSpy).be.calledOnce();

clock.tick(protocol.idleTimeout);
clock.tick(protocol.config.idleTimeout);

should(idleTimeoutSpy).be.calledTwice();

Expand All @@ -695,12 +696,15 @@ describe('/lib/api/core/entrypoints/embedded/protocols/websocket', () => {
};
};

protocol.idleTimeout = 1000;
protocol.config.idleTimeout = 1000;

protocol.connectionPool = new Map([
['ahAhAhAhStayinAliveStayinAlive', new Connection(now)],
['dead', new Connection(now - protocol.idleTimeout - 1)],
['ahAhAhAhStayinAliiiiiiiiive', new Connection(now - protocol.idleTimeout + 100)]
['dead', new Connection(now - protocol.config.idleTimeout - 1)],
[
'ahAhAhAhStayinAliiiiiiiiive',
new Connection(now - protocol.config.idleTimeout + 100)
]
]);

protocol._sweepIdleSockets();
Expand Down