Skip to content

Commit

Permalink
Emit and Publish Peer Connection Errors (#317)
Browse files Browse the repository at this point in the history
* Emit & Publish Peer Connection Errors

A zetta instance's `pubsub` can be used to react to peer client
disconnect events. Currently, the _peer_client_ module will emit a
"closed" event for both a socket error *and* socket close event. This
means the zetta handle of 'error' events will never be called.

This change adds `emit('error', err)` to the _peer_client_ module, and
updates _zetta_ to publish the '_peer/disconnect' event with the
`error` object as an additional property of the published data.

`publish('_peer/disconnect'), {peer:peerClient, error: error});`

This allows code that subscribes to the peer disconnect events to
react specifically to errors.

* Publish Error

Updated _zetta_ module to publish '_peer/error' instead of disconnect
on a peer client 'error' event. This should not cause an issues with
existing code because the 'error' event was previously never emitted.

* Update _peer_client_ to only emit the "error" event.

#312 (comment)

* Updated PR per @AdamMagaluk; changes to match _event_socket_ in:

6206dc5

* Added tests to ensure PeerClients emits the proper close event

* Remove use of _peer/error and use only _peer/disconnect
AdamMagaluk authored Jun 14, 2016
1 parent 70334a8 commit 70312be
Showing 4 changed files with 67 additions and 16 deletions.
15 changes: 10 additions & 5 deletions lib/event_socket.js
Original file line number Diff line number Diff line change
@@ -79,11 +79,11 @@ var EventSocket = module.exports = function(ws, query, streamEnabled) {
self._queryCache[topic.streamQuery] = compiled;
} catch(err) {
var msg = {
type: 'error',
type: 'error',
code: 400,
timestamp: new Date().getTime(),
topic: msg.topic,
message: err.message
message: err.message
}
self.ws.send(JSON.stringify(msg));
return;
@@ -105,7 +105,7 @@ var EventSocket = module.exports = function(ws, query, streamEnabled) {

this._parser.on('unsubscribe', function(msg) {
self._unsubscribe(msg.subscriptionId, function(err, subscription) {
if (subscription) {
if (subscription) {
self.emit('unsubscribe', subscription);
}
});
@@ -180,10 +180,15 @@ EventSocket.prototype.send = function(topic, data) {

// used for _peer/connect _peer/disconnect
if (topic.indexOf('_peer/') === 0 && typeof tmpData.peer === 'object') {
var properties = tmpData.peer.properties();
if (tmpData.error) {
properties.error = tmpData.error;
}

if (this.streamEnabled) {
data.data = tmpData.peer.properties();
data.data = properties;
} else {
data = ObjectStream.format(topic, tmpData.peer.properties());
data = ObjectStream.format(topic, properties);
}
}

16 changes: 8 additions & 8 deletions lib/peer_client.js
Original file line number Diff line number Diff line change
@@ -20,7 +20,7 @@ function calculatePeerUrl(url, name){
wsUrl = wsUrl + peerPath;
} else {
wsUrl = wsUrl.slice(0, wsUrl.length - 1) + peerPath;
}
}
return wsUrl;
}

@@ -45,7 +45,7 @@ var PeerClient = module.exports = function(url, server) {
this._zetta = server;

this.updateURL(url);

// create a unique connection id peer connection, used to associate initiaion request
this.connectionId = null;
this.ws = new WebSocket(this._createNewUrl(), {});
@@ -57,7 +57,7 @@ util.inherits(PeerClient, EventEmitter);
PeerClient.calculatePeerUrl = calculatePeerUrl;

PeerClient.prototype.updateURL = function(httpUrl) {
var wsUrl = calculatePeerUrl(httpUrl, this._zetta._name);
var wsUrl = calculatePeerUrl(httpUrl, this._zetta._name);
this.url = wsUrl;
};

@@ -116,7 +116,7 @@ PeerClient.prototype._createSocket = function() {

// stop ping timer until backoff finishes
self._stopPingTimeout();

var backoff = this.generateBackoff(this.retryCount);
this._backoffTimer = setTimeout(function(){

@@ -128,7 +128,7 @@ PeerClient.prototype._createSocket = function() {
if (self.retryCount === 0) {
self.ws.on('open', function onOpen(socket) {
self.checkServerReq();
self.emit('connecting');
self.emit('connecting');
self.server.emit('connection', socket);
socket.on('spdyPing', function(connection) {
// reset ping timer on a spdy ping from the peer
@@ -140,7 +140,7 @@ PeerClient.prototype._createSocket = function() {
self.ws.on('error', function onError(err) {
self.connected = false;
self.log.emit('log', 'peer-client', 'Peer connection error (' + self.url + '): ' + err);
self.emit('closed');
self.emit('error', err);
reconnect(err);
});

@@ -177,7 +177,7 @@ PeerClient.prototype.checkServerReq = function() {
self.retryCount = 0;
self.emit('connected');
self.log.emit('log', 'peer-client', 'Peer connection established (' + self.url + ')');

res.statusCode = 200;
res.end();

@@ -195,7 +195,7 @@ PeerClient.prototype.generateBackoff = function(attempt) {
if (attempt === 0) {
return 0;
}

var random = parseInt(Math.random() * this.reconnect.maxRandomOffset);
var backoff = (Math.pow(2, attempt) * this.reconnect.min);
if (backoff > this.reconnect.max) {
50 changes: 48 additions & 2 deletions test/test_peer_client.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,19 @@
var util = require('util');
var EventEmitter = require('events').EventEmitter;
var PeerClient = require('../lib/peer_client');
var assert = require('assert');


var MockServer = { _name: '1234', httpServer: { spdyServer: {}}, log: {}};
var MockServer = { _name: '1234', httpServer: { spdyServer: {}}, log: {
emit: function() {}
}};
var MockSocket = function() {
EventEmitter.call(this);
this.setAddress = function() {};
this.start = function() {};
};
util.inherits(MockSocket, EventEmitter);

var urlEndingWithSlash = 'http://cloud.zettajs.io/';
var urlEndingWithNoSlash = 'http://cloud.zettajs.io';

@@ -16,6 +27,41 @@ describe('Peer Client', function() {
it('should calculate the proper url without a trailing slash', function() {
var client = new PeerClient(urlEndingWithNoSlash, MockServer);
assert.equal(client.url, 'ws://cloud.zettajs.io/peers/1234');
});
});
});

it('should emit error when underlying ws does', function(done) {
var client = new PeerClient(urlEndingWithNoSlash, MockServer);
client.ws = new MockSocket();
client._createSocket();
client.once('error', function(err) {
assert.equal(err.message, 'some message');
done();
});

client.once('closed', function() {
done(new Error('Should not have emitted closed'));
});

setTimeout(function() {
client.ws.emit('error', new Error('some message'));
}, 2);
})

it('should emit closed when underlying ws does', function(done) {
var client = new PeerClient(urlEndingWithNoSlash, MockServer);
client.ws = new MockSocket();
client._createSocket();
client.once('error', function(err) {
done(new Error('Should not have emitted error'));
});

client.once('closed', function() {
done();
});

setTimeout(function() {
client.ws.emit('close');
}, 2);
})
});
2 changes: 1 addition & 1 deletion zetta.js
Original file line number Diff line number Diff line change
@@ -450,7 +450,7 @@ Zetta.prototype._runPeer = function(peer) {
self.peerRegistry.save(result);

// peer-event
self.pubsub.publish('_peer/disconnect', { peer: peerClient });
self.pubsub.publish('_peer/disconnect', { peer: peerClient, error: error });
});
});

0 comments on commit 70312be

Please sign in to comment.