Skip to content

Commit

Permalink
introduced timeout verification in case of missing open message from …
Browse files Browse the repository at this point in the history
…websocket
  • Loading branch information
massimocandela committed Oct 25, 2021
1 parent e4f19d3 commit 0125b17
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 17 deletions.
1 change: 1 addition & 0 deletions docs/ris-disconnections.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ The following causes are possible:

1) **Network issues.** The machine where BGPalerter is running loses connectivity (maybe just for a few seconds).
2) **You are monitoring something that produces too many BGP updates** (e.g., your prefixes are not stable or constantly re-announced). In such cases you may be too slow in consuming the data and the server disconnects you to flush the buffer.
3) **Process termination.** This happens when BGPalerter was killed or crashed for some reason, this is not related to RIPE RIS.

Anyway, unfortunately sometimes this happens without an explanation due to RIPE RIS instabilities.
This has been reported to the RIPE RIS team.
Expand Down
5 changes: 5 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
"kafkajs": "^1.15.0",
"md5": "^2.3.0",
"moment": "^2.29.1",
"node-cleanup": "^2.1.2",
"nodemailer": "^6.7.0",
"path": "^0.12.7",
"restify": "^8.6.0",
Expand Down
1 change: 0 additions & 1 deletion src/connectors/connector.js
Original file line number Diff line number Diff line change
Expand Up @@ -107,5 +107,4 @@ export default class Connector {
disconnect = () => {
throw new Error('The method disconnect MUST be implemented');
};

}
22 changes: 13 additions & 9 deletions src/connectors/connectorRIS.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,23 +46,25 @@ export default class ConnectorRIS extends Connector {
this.agent = env.agent;
this.subscribed = {};
this.canaryBeacons = {};
this.clientId = env.clientId;
this.instanceId = env.instanceId;

this.url = brembo.build(this.params.url, {
params: {
client_version: env.version,
client: env.clientId,
instance: env.instanceId
client: this.clientId,
instance: this.instanceId
}
});

if (this.environment !== "research") { // The canary feature may impact performance if you are planning to get all the possible updates of RIS
this._startCanaryInterval = setInterval(this._startCanary, 60000);
}
};

_openConnect = (resolve) => {
_openConnect = (resolve, data) => {
resolve(true);
this._connect(`${this.name} connector connected`);

this._connect(`${this.name} connector connected (instance:${this.instanceId} connection:${data.connection})`);
if (this.subscription) {
this.subscribe(this.subscription);
}
Expand All @@ -76,7 +78,7 @@ export default class ConnectorRIS extends Connector {
this._message(messageObj);
};

_appendListeners = (resolve, reject) => {
_appendListeners = (resolve, reject) => {
this.ws.on('message', this._messageToJson);
this.ws.on('close', (error) => {

Expand All @@ -87,8 +89,10 @@ export default class ConnectorRIS extends Connector {
reject();
}
});
this.ws.on('error', this._error);
this.ws.on('open', this._openConnect.bind(null, resolve));
this.ws.on('error', error => {
this._error(`${this.name} ${error.message} (instance:${this.instanceId} connection:${error.connection})`);
});
this.ws.on('open', data => this._openConnect(resolve, data));
};

connect = () =>
Expand Down Expand Up @@ -314,7 +318,7 @@ export default class ConnectorRIS extends Connector {
}
this._timeoutFileChange = setTimeout(() => {
this._onInputChange(input);
}, 2000);
}, 5000);
});
};

Expand Down
44 changes: 37 additions & 7 deletions src/utils/WebSocket.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import _ws from "ws";
import PubSub from "../utils/pubSub";
import brembo from "brembo";
import { v4 as uuidv4 } from 'uuid';
import nodeCleanup from "node-cleanup";

export default class WebSocket {
constructor(host, options) {
Expand All @@ -13,8 +14,16 @@ export default class WebSocket {
this.alive = false;
this.pingInterval = options.pingIntervalSeconds ? options.pingIntervalSeconds * 1000 : 40000;
this.reconnectSeconds = options.reconnectSeconds ? options.reconnectSeconds * 1000 : 30000;
this.connectionDelay = 5000;
this.connectionDelay = 8000;
this.openConnectionTimeoutSeconds = 40000;
this.lastPingReceived = null;

nodeCleanup(() => {
if (this.ws) {
this.pubsub.publish("close", "process termination");
this.disconnect();
}
});
}

_ping = () => {
Expand All @@ -35,7 +44,7 @@ export default class WebSocket {
const nPings = 6;
if (this.ws) {
if (this.lastPingReceived + (this.pingInterval * nPings) < new Date().getTime()) {
this.pubsub.publish("error", `The WebSocket client didn't receive ${nPings} pings. Disconnecting.`);
this._publishError(`The WebSocket client didn't receive ${nPings} pings. Disconnecting.`)
this.disconnect();
this.connect();
} else {
Expand Down Expand Up @@ -64,21 +73,24 @@ export default class WebSocket {
});

this.ws = new _ws(url, this.options);
this.setOpenTimeout(true);

this.ws.on('message', (data) => {
this.pubsub.publish("message", data);
});
this.ws.on('close', (data) => {
this.ws.on('close', data => {
this.alive = false;
this.setOpenTimeout(false);
this.pubsub.publish("close", data);
});
this.ws.on('pong', this._pingReceived);
this.ws.on('error', (data) => {
this.pubsub.publish("error", data);
this.ws.on('error', message => {
this._publishError(message, {connection: connectionId});
});
this.ws.on('open', (data) => {
this.ws.on('open', () => {
this.alive = true;
this.pubsub.publish("open", data);
this.setOpenTimeout(false);
this.pubsub.publish("open", { connection: connectionId });
});

this._startPing();
Expand Down Expand Up @@ -108,6 +120,24 @@ export default class WebSocket {
this.connectionDelay = this.reconnectSeconds;
};

_publishError = (message, extra={}) => {
this.pubsub.publish("error", { type: "error", message, ...extra });
};

setOpenTimeout = (setting) => {
if (setting) {
this.openConnectionTimeout = setTimeout(() => {
this._publishError("connection timed out");
if (this.ws) {
this.disconnect();
this.connect();
}
}, this.openConnectionTimeoutSeconds);
} else {
clearTimeout(this.openConnectionTimeout);
}
};

disconnect = () => {
try {
this.ws.removeAllListeners("message");
Expand Down

0 comments on commit 0125b17

Please sign in to comment.