Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Peering updates #125

Merged
merged 32 commits into from
Jan 29, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
9dc4c0e
Removed mention of runtime from the README.md it was buggin' me
mdobson Jan 22, 2015
f9c9333
Making progress on adding close logic.
mdobson Jan 23, 2015
17b77b3
Got disconnect logic working.
mdobson Jan 23, 2015
3a614a4
Got API disconnects working.
mdobson Jan 23, 2015
547780e
Proxied disconnects working now.
mdobson Jan 23, 2015
04840c9
Updates to peering to use the old id style.
mdobson Jan 23, 2015
dd6db55
Refactoring for the update API action.
mdobson Jan 23, 2015
3c33c9e
Semi-significant update. You can now proxy changes to the Z2Z protocol.
mdobson Jan 26, 2015
7f22f73
Allow CAQL queries on peers.
mdobson Jan 26, 2015
36ad6a2
Removed mention of runtime from the README.md it was buggin' me
mdobson Jan 22, 2015
8b77006
Making progress on adding close logic.
mdobson Jan 23, 2015
d331aa1
Got disconnect logic working.
mdobson Jan 23, 2015
8b78242
Got API disconnects working.
mdobson Jan 23, 2015
62fb32b
Proxied disconnects working now.
mdobson Jan 23, 2015
75574e7
Updates to peering to use the old id style.
mdobson Jan 23, 2015
5282e9b
Refactoring for the update API action.
mdobson Jan 23, 2015
edd3ee1
Semi-significant update. You can now proxy changes to the Z2Z protocol.
mdobson Jan 26, 2015
e7e3812
Allow CAQL queries on peers.
mdobson Jan 26, 2015
713c049
Added peer management tests and actions to the embedded peer entities.
mdobson Jan 27, 2015
220e7b8
Merge issue from the rebase of master.
mdobson Jan 27, 2015
dc66091
Rebase from master.
mdobson Jan 27, 2015
97de06a
More bug fixes from the rebase merge process.
mdobson Jan 27, 2015
c534123
Remnants of the rebase.
mdobson Jan 27, 2015
e3e178e
Merge branch 'master' of https://github.com/zettajs/zetta into peerin…
mdobson Jan 27, 2015
052306b
Porxied deletion of connection tests completed.
mdobson Jan 28, 2015
90087a0
Connection update tests in place.
mdobson Jan 28, 2015
61c5422
Fixing inconsistencies between peer establishment using javascript an…
mdobson Jan 28, 2015
8bcd16b
Fixing typos within tests causing them to fail.
mdobson Jan 28, 2015
40782a8
Updates to tests for path change, changing resource path to include c…
mdobson Jan 29, 2015
148e12b
Added failing tests for peer api when connection id does not exist
AdamMagaluk Jan 29, 2015
5f608f0
Fixed test content type, peer_managment handles 404
AdamMagaluk Jan 29, 2015
de46638
Commented out the reconnect action on the initiator side
AdamMagaluk Jan 29, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#The Zetta Runtime
#Zetta

[![Build Status](https://travis-ci.org/zettajs/zetta.svg?branch=master)](https://travis-ci.org/zettajs/zetta)

Expand Down
192 changes: 157 additions & 35 deletions lib/api_resources/peer_management.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ var PeerClient = require('../peer_client');
var rels = require('zetta-rels');
var MediaType = require('api-media-type');
var Query = require('calypso').Query;
var http = require('http');

var PeerManagementResource = module.exports = function(server) {
this.server = server;
Expand All @@ -16,11 +17,17 @@ PeerManagementResource.prototype.init = function(config) {
.consumes(MediaType.FORM_URLENCODED)
.get('/', this.list)
.post('/', this.link)
.get('/{id}', this.show);
.get('/{id}', this.show)
.del('/{id}', this.deletePeer)
.put('/{id}', this.updatePeer);
};

PeerManagementResource.prototype.list = function(env, next) {
var params = env.route.query;
var allQuery = Query.of('peers');
if(params) {
allQuery.ql(params.ql);
}

this.registry.find(allQuery, function(err, results) {
var builder = new PeerManagementBuilder(results, env.helpers.url);
Expand Down Expand Up @@ -55,7 +62,7 @@ PeerManagementResource.prototype.link = function(env, next) {
return next(env);
}

self._connect(peer);
self.server._runPeer(peer);

env.response.statusCode = 202;
env.response.setHeader('Location', env.helpers.url.join(peer.id));
Expand All @@ -65,47 +72,133 @@ PeerManagementResource.prototype.link = function(env, next) {
});
};

PeerManagementResource.prototype._connect = function(peer) {
//This is conditonal based on where the request is coming from.
//From acceptor we'll add additonal info to the request, and proxy to initiator
//From intiator we'll perform the disconnection
PeerManagementResource.prototype.deletePeer = function(env, next) {
var id = env.route.params.id;
var self = this;
var client = new PeerClient(peer.url, this.server);
var query = Query.of('peers').where({connectionId: id});
this.registry.find(query, function(err, results) {
if(results.length) {
var peer = results[0];
if(peer.direction === 'initiator') {
self._disconnect(peer);
env.response.statusCode = 200;
next(env);
} else if(peer.direction === 'acceptor') {
self._proxyDisconnect(env, next, id);
} else {
env.response.statusCode = 500;
next(env);
}
} else {
env.response.statusCode = 404;
next(env);
}
});
};

client.on('connected', function() {
self.registry.get(peer.id, function(err, result) {
result = JSON.parse(result);
result.status = 'connected';
self.registry.save(result);
//Updating a peer is a two part process.
//First we'll determine whether or not the API call should be proxied
//Then we'll connect to the new peer.
//Then we'll disconnect from the old peer.
PeerManagementResource.prototype.updatePeer = function(env, next) {
var self = this;
env.request.getBody(function(err, body) {
var params = querystring.parse(body.toString());
var id = env.route.params.id;
var url = params.url;
var query = Query.of('peers').where({connectionId: id});
self.registry.find(query, function(err, results) {
if(results.length) {
var peer = results[0];
if(peer.direction === 'initiator') {

self.server._peers.push(url);

var newPeer = {
url: url,
direction: 'initiator',
status: 'connecting'
};


self.registry.add(newPeer, function(err, newPeer) {
if (err) {
env.response.statusCode = 500;
return next(env);
}

self.server._runPeer(newPeer);
self._disconnect(peer);
env.response.statusCode = 200;
next(env);
});

} else if(peer.direction === 'acceptor') {
self._proxyDisconnect(env, next, id);
} else {
env.response.statusCode = 500;
next(env);
}
} else {
env.response.statusCode = 404;
next(env);
}
});
});
};

client.on('error', function(error) {
self.registry.get(peer.id, function(err, result) {
result = JSON.parse(result);
result.status = 'failed';
result.error = error;
self.registry.save(result);
});
PeerManagementResource.prototype._disconnect = function(peer) {
var wsUrl = PeerClient.calculatePeerUrl(peer.url, this.server._name);
var peers = this.server._peerClients.filter(function(peer) {
return peer.url === wsUrl;
});
var client = peers[0];
client.close();
};

client.on('closed', function() {
self.registry.get(peer.id, function(err, result) {
result = JSON.parse(result);
result.status = 'connecting';
self.registry.save(result, function() {
client.start();
});
});
PeerManagementResource.prototype._proxyDisconnect = function(env, next, id) {
var self = this;
var peerSocket;
var sockets = Object.keys(this.server.httpServer.peers).forEach(function(socketId){
var peers = self.server.httpServer.peers;
var socket = peers[socketId];
if(socket.connectionId === id) {
peerSocket = socket;
}
});

peer.status = 'connecting';
self.registry.save(peer, function() {
client.start();
if (!peerSocket) {
env.response.statusCode = 404;
return next(env);
}

peerSocket.on('end', function() {
env.response.statusCode = 200;
next(env);
});
// setTimeout to change status to failed if connection not made.

this._proxyToPeer(peerSocket, env);

};

PeerManagementResource.prototype._proxyToPeer = function(peer, env) {
var req = env.request;
var res = env.response;
var agent = env.zettaAgent || peer.agent;
var opts = { method: req.method, headers: req.headers, path: req.url, agent: agent };
var request = http.request(opts);
if(req.body) {
request.end(req.body);
} else {
req.pipe(request);
}
};

PeerManagementResource.prototype.show = function(env, next) {
var id = env.route.params.id;

this.registry.get(id, function(err, result) {
if (err) {
env.response.statusCode = 404;
Expand Down Expand Up @@ -149,18 +242,33 @@ PeerManagementBuilder.prototype.entities = function() {
direction: peer.direction,
status: peer.status,
error: peer.error,
updated: peer.updated
updated: peer.updated,
connectionId: peer.connectionId
}
};

entity.actions = [];
entity.actions.push({
name: 'disconnect',
method: 'DELETE',
href: self.urlHelper.path('/peer-management/'+peer.connectionId)
},{
name: 'update',
method: 'PUT',
href: self.urlHelper.path('/peer-management/'+peer.connectionId),
fields: [{name: 'url', type: 'url'}]
});

/* API action does not work wait till we fix it
if (peer.direction === 'initiator') {
entity.actions = [{
entity.actions.push({
name: 'reconnect',
method: 'POST',
href: self.urlHelper.current(),
fields: [{ name: 'url', type: 'url', value: peerUrl }]
}];
});
}
*/

var peerUrlRel = peer.direction === 'initiator' ? rels.root : rels.server;
entity.links = [
Expand Down Expand Up @@ -215,20 +323,34 @@ PeerItemBuilder.prototype.properties = function() {
direction: this.data.direction,
status: this.data.status,
error: this.data.error,
updated: this.data.updated
updated: this.data.updated,
connectionId: this.data.connectionId
};

return this;
};

PeerItemBuilder.prototype.actions = function() {
this.base.actions = [];

this.base.actions.push({
name: 'disconnect',
method: 'DELETE',
href: this.urlHelper.path('/peer-management/'+this.data.connectionId),
},{
name: 'update',
method: 'PUT',
href: this.urlHelper.path('/peer-management/'+this.data.connectionId),
fields: [{name: 'url', type: 'url'}]
});

if (this.data.direction === 'initiator') {
this.base.actions = [{
this.base.actions.push({
name: 'reconnect',
method: 'POST',
href: this.urlHelper.path('/peer-management'),
fields: [{ name: 'url', type: 'url', value: this.data.url }]
}];
});
}

return this;
Expand Down
26 changes: 12 additions & 14 deletions lib/peer_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,28 @@ var uuid = require('node-uuid');
var Logger = require('./logger');
var WebSocket = require('./web_socket');

var RETRY_INTERVAL = 3000;
var RETRY_MAX = 9;
var CONNECTION_BACKOFF_MAX = 50;

function backoffTime() {
return Math.floor(0 + CONNECTION_BACKOFF_MAX * Math.random());
}

var PeerClient = module.exports = function(url, server) {
var wsUrl = url.replace(/^http/, 'ws');
var peerPath = '/peers/' + server._name;
function calculatePeerUrl(url, name){
var wsUrl = url.replace(/^http/, 'ws');
var peerPath = '/peers/' + name;
if(wsUrl.indexOf('/', wsUrl.length - 1) === -1) {
wsUrl = wsUrl + peerPath;
} else {
wsUrl = wsUrl.slice(0, wsUrl.length - 1) + peerPath;
}
}
return wsUrl;
}


var PeerClient = module.exports = function(url, server) {
var wsUrl = calculatePeerUrl(url, server._name);
this.reconnect = {
min: 100,
max: 30000, // max amount of time allowed to backoff
maxRandomOffset: 1000, // max amount of time
};

this.url = wsUrl;

this.server = server.httpServer.spdyServer;
this.connected = false;
this.retryCount = 0;
Expand All @@ -44,6 +42,7 @@ var PeerClient = module.exports = function(url, server) {
};
util.inherits(PeerClient, EventEmitter);

PeerClient.calculatePeerUrl = calculatePeerUrl;

PeerClient.prototype.start = function() {
this._createSocket();
Expand All @@ -68,7 +67,7 @@ PeerClient.prototype._createSocket = function() {
var backoff = this.generateBackoff(this.retryCount);
this._backoffTimer = setTimeout(function(){
// create a new connection id
this.connectionId = uuid.v4();
self.connectionId = uuid.v4();
self._ws = new WebSocket(self.url + '?connectionId=' + self.connectionId, {});
self._ws.on('open', function(socket) {
self.checkServerReq();
Expand Down Expand Up @@ -133,7 +132,6 @@ PeerClient.prototype.generateBackoff = function(attempt) {

var random = parseInt(Math.random() * this.reconnect.maxRandomOffset);
var backoff = (Math.pow(2, attempt) * this.reconnect.min);

if (backoff > this.reconnect.max) {
return this.reconnect.max + random;
} else {
Expand Down
1 change: 1 addition & 0 deletions lib/peer_socket.js
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ PeerSocket.prototype._setPeerStatus = function(status, err, cb) {
}

peer.status = status;
peer.connectionId = self.connectionId;
if (err) {
peer.error = err;
}
Expand Down
12 changes: 11 additions & 1 deletion lib/registry.js
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,17 @@ Registry.prototype._find = function(q, cb) {


Registry.prototype.get = function(id, cb) {
this.db.get(id, { valueEncoding: 'json' }, cb);
this.db.get(id, { valueEncoding: 'json' }, function(err, result){
if(err) {
cb(err);
} else {
if(typeof result === 'object') {
cb(null, result);
} else {
cb(null, JSON.parse(result));
}
}
});
};

Registry.prototype.close = function() {
Expand Down
16 changes: 13 additions & 3 deletions lib/web_socket.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,13 @@ var WebSocket = module.exports = function(address, httpOptions) {
return;
}

req.connection.on('close', function() {
self.emit('close');
});
self.onClose = function() {
self.emit('close');
};
req.connection.on('close', self.onClose);

self.emit('open', req.connection);
self.socket = req.connection;
});

req.on('error', function(e) { self.emit('error', e); });
Expand All @@ -68,3 +70,11 @@ var WebSocket = module.exports = function(address, httpOptions) {
};

util.inherits(WebSocket, EventEmitter);

WebSocket.prototype.close = function() {
this.socket.removeListener('close', this.onClose);
if(this.socket) {
this.socket.end();
this.emit('close', null, null, true);
}
};
Loading