diff --git a/README.md b/README.md index 3935fe9..490c4f0 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -#The Zetta Runtime +#Zetta [![Build Status](https://travis-ci.org/zettajs/zetta.svg?branch=master)](https://travis-ci.org/zettajs/zetta) diff --git a/lib/api_resources/peer_management.js b/lib/api_resources/peer_management.js index fe4b0aa..234d9f4 100644 --- a/lib/api_resources/peer_management.js +++ b/lib/api_resources/peer_management.js @@ -3,6 +3,7 @@ var PeerClient = require('../peer_client'); var rels = require('zetta-rels'); var MediaType = require('api-media-type'); var Query = require('calypso').Query; +var http = require('http'); var PeerManagementResource = module.exports = function(server) { this.server = server; @@ -16,11 +17,17 @@ PeerManagementResource.prototype.init = function(config) { .consumes(MediaType.FORM_URLENCODED) .get('/', this.list) .post('/', this.link) - .get('/{id}', this.show); + .get('/{id}', this.show) + .del('/{id}', this.deletePeer) + .put('/{id}', this.updatePeer); }; PeerManagementResource.prototype.list = function(env, next) { + var params = env.route.query; var allQuery = Query.of('peers'); + if(params) { + allQuery.ql(params.ql); + } this.registry.find(allQuery, function(err, results) { var builder = new PeerManagementBuilder(results, env.helpers.url); @@ -55,7 +62,7 @@ PeerManagementResource.prototype.link = function(env, next) { return next(env); } - self._connect(peer); + self.server._runPeer(peer); env.response.statusCode = 202; env.response.setHeader('Location', env.helpers.url.join(peer.id)); @@ -65,47 +72,133 @@ PeerManagementResource.prototype.link = function(env, next) { }); }; -PeerManagementResource.prototype._connect = function(peer) { +//This is conditonal based on where the request is coming from. +//From acceptor we'll add additonal info to the request, and proxy to initiator +//From intiator we'll perform the disconnection +PeerManagementResource.prototype.deletePeer = function(env, next) { + var id = env.route.params.id; var self = this; - var client = new PeerClient(peer.url, this.server); + var query = Query.of('peers').where({connectionId: id}); + this.registry.find(query, function(err, results) { + if(results.length) { + var peer = results[0]; + if(peer.direction === 'initiator') { + self._disconnect(peer); + env.response.statusCode = 200; + next(env); + } else if(peer.direction === 'acceptor') { + self._proxyDisconnect(env, next, id); + } else { + env.response.statusCode = 500; + next(env); + } + } else { + env.response.statusCode = 404; + next(env); + } + }); +}; - client.on('connected', function() { - self.registry.get(peer.id, function(err, result) { - result = JSON.parse(result); - result.status = 'connected'; - self.registry.save(result); +//Updating a peer is a two part process. +//First we'll determine whether or not the API call should be proxied +//Then we'll connect to the new peer. +//Then we'll disconnect from the old peer. +PeerManagementResource.prototype.updatePeer = function(env, next) { + var self = this; + env.request.getBody(function(err, body) { + var params = querystring.parse(body.toString()); + var id = env.route.params.id; + var url = params.url; + var query = Query.of('peers').where({connectionId: id}); + self.registry.find(query, function(err, results) { + if(results.length) { + var peer = results[0]; + if(peer.direction === 'initiator') { + + self.server._peers.push(url); + + var newPeer = { + url: url, + direction: 'initiator', + status: 'connecting' + }; + + + self.registry.add(newPeer, function(err, newPeer) { + if (err) { + env.response.statusCode = 500; + return next(env); + } + + self.server._runPeer(newPeer); + self._disconnect(peer); + env.response.statusCode = 200; + next(env); + }); + + } else if(peer.direction === 'acceptor') { + self._proxyDisconnect(env, next, id); + } else { + env.response.statusCode = 500; + next(env); + } + } else { + env.response.statusCode = 404; + next(env); + } }); }); +}; - client.on('error', function(error) { - self.registry.get(peer.id, function(err, result) { - result = JSON.parse(result); - result.status = 'failed'; - result.error = error; - self.registry.save(result); - }); +PeerManagementResource.prototype._disconnect = function(peer) { + var wsUrl = PeerClient.calculatePeerUrl(peer.url, this.server._name); + var peers = this.server._peerClients.filter(function(peer) { + return peer.url === wsUrl; }); + var client = peers[0]; + client.close(); +}; - client.on('closed', function() { - self.registry.get(peer.id, function(err, result) { - result = JSON.parse(result); - result.status = 'connecting'; - self.registry.save(result, function() { - client.start(); - }); - }); +PeerManagementResource.prototype._proxyDisconnect = function(env, next, id) { + var self = this; + var peerSocket; + var sockets = Object.keys(this.server.httpServer.peers).forEach(function(socketId){ + var peers = self.server.httpServer.peers; + var socket = peers[socketId]; + if(socket.connectionId === id) { + peerSocket = socket; + } }); - peer.status = 'connecting'; - self.registry.save(peer, function() { - client.start(); + if (!peerSocket) { + env.response.statusCode = 404; + return next(env); + } + + peerSocket.on('end', function() { + env.response.statusCode = 200; + next(env); }); - // setTimeout to change status to failed if connection not made. + + this._proxyToPeer(peerSocket, env); + +}; + +PeerManagementResource.prototype._proxyToPeer = function(peer, env) { + var req = env.request; + var res = env.response; + var agent = env.zettaAgent || peer.agent; + var opts = { method: req.method, headers: req.headers, path: req.url, agent: agent }; + var request = http.request(opts); + if(req.body) { + request.end(req.body); + } else { + req.pipe(request); + } }; PeerManagementResource.prototype.show = function(env, next) { var id = env.route.params.id; - this.registry.get(id, function(err, result) { if (err) { env.response.statusCode = 404; @@ -149,18 +242,33 @@ PeerManagementBuilder.prototype.entities = function() { direction: peer.direction, status: peer.status, error: peer.error, - updated: peer.updated + updated: peer.updated, + connectionId: peer.connectionId } }; + entity.actions = []; + entity.actions.push({ + name: 'disconnect', + method: 'DELETE', + href: self.urlHelper.path('/peer-management/'+peer.connectionId) + },{ + name: 'update', + method: 'PUT', + href: self.urlHelper.path('/peer-management/'+peer.connectionId), + fields: [{name: 'url', type: 'url'}] + }); + + /* API action does not work wait till we fix it if (peer.direction === 'initiator') { - entity.actions = [{ + entity.actions.push({ name: 'reconnect', method: 'POST', href: self.urlHelper.current(), fields: [{ name: 'url', type: 'url', value: peerUrl }] - }]; + }); } + */ var peerUrlRel = peer.direction === 'initiator' ? rels.root : rels.server; entity.links = [ @@ -215,20 +323,34 @@ PeerItemBuilder.prototype.properties = function() { direction: this.data.direction, status: this.data.status, error: this.data.error, - updated: this.data.updated + updated: this.data.updated, + connectionId: this.data.connectionId }; return this; }; PeerItemBuilder.prototype.actions = function() { + this.base.actions = []; + + this.base.actions.push({ + name: 'disconnect', + method: 'DELETE', + href: this.urlHelper.path('/peer-management/'+this.data.connectionId), + },{ + name: 'update', + method: 'PUT', + href: this.urlHelper.path('/peer-management/'+this.data.connectionId), + fields: [{name: 'url', type: 'url'}] + }); + if (this.data.direction === 'initiator') { - this.base.actions = [{ + this.base.actions.push({ name: 'reconnect', method: 'POST', href: this.urlHelper.path('/peer-management'), fields: [{ name: 'url', type: 'url', value: this.data.url }] - }]; + }); } return this; diff --git a/lib/peer_client.js b/lib/peer_client.js index cefb3ac..570d22f 100644 --- a/lib/peer_client.js +++ b/lib/peer_client.js @@ -5,23 +5,20 @@ var uuid = require('node-uuid'); var Logger = require('./logger'); var WebSocket = require('./web_socket'); -var RETRY_INTERVAL = 3000; -var RETRY_MAX = 9; -var CONNECTION_BACKOFF_MAX = 50; - -function backoffTime() { - return Math.floor(0 + CONNECTION_BACKOFF_MAX * Math.random()); -} - -var PeerClient = module.exports = function(url, server) { - var wsUrl = url.replace(/^http/, 'ws'); - var peerPath = '/peers/' + server._name; +function calculatePeerUrl(url, name){ + var wsUrl = url.replace(/^http/, 'ws'); + var peerPath = '/peers/' + name; if(wsUrl.indexOf('/', wsUrl.length - 1) === -1) { wsUrl = wsUrl + peerPath; } else { wsUrl = wsUrl.slice(0, wsUrl.length - 1) + peerPath; - } + } + return wsUrl; +} + +var PeerClient = module.exports = function(url, server) { + var wsUrl = calculatePeerUrl(url, server._name); this.reconnect = { min: 100, max: 30000, // max amount of time allowed to backoff @@ -29,6 +26,7 @@ var PeerClient = module.exports = function(url, server) { }; this.url = wsUrl; + this.server = server.httpServer.spdyServer; this.connected = false; this.retryCount = 0; @@ -44,6 +42,7 @@ var PeerClient = module.exports = function(url, server) { }; util.inherits(PeerClient, EventEmitter); +PeerClient.calculatePeerUrl = calculatePeerUrl; PeerClient.prototype.start = function() { this._createSocket(); @@ -68,7 +67,7 @@ PeerClient.prototype._createSocket = function() { var backoff = this.generateBackoff(this.retryCount); this._backoffTimer = setTimeout(function(){ // create a new connection id - this.connectionId = uuid.v4(); + self.connectionId = uuid.v4(); self._ws = new WebSocket(self.url + '?connectionId=' + self.connectionId, {}); self._ws.on('open', function(socket) { self.checkServerReq(); @@ -133,7 +132,6 @@ PeerClient.prototype.generateBackoff = function(attempt) { var random = parseInt(Math.random() * this.reconnect.maxRandomOffset); var backoff = (Math.pow(2, attempt) * this.reconnect.min); - if (backoff > this.reconnect.max) { return this.reconnect.max + random; } else { diff --git a/lib/peer_socket.js b/lib/peer_socket.js index ca2bab5..9ce57cd 100644 --- a/lib/peer_socket.js +++ b/lib/peer_socket.js @@ -167,6 +167,7 @@ PeerSocket.prototype._setPeerStatus = function(status, err, cb) { } peer.status = status; + peer.connectionId = self.connectionId; if (err) { peer.error = err; } diff --git a/lib/registry.js b/lib/registry.js index 073a062..4f3bb8b 100644 --- a/lib/registry.js +++ b/lib/registry.js @@ -138,7 +138,17 @@ Registry.prototype._find = function(q, cb) { Registry.prototype.get = function(id, cb) { - this.db.get(id, { valueEncoding: 'json' }, cb); + this.db.get(id, { valueEncoding: 'json' }, function(err, result){ + if(err) { + cb(err); + } else { + if(typeof result === 'object') { + cb(null, result); + } else { + cb(null, JSON.parse(result)); + } + } + }); }; Registry.prototype.close = function() { diff --git a/lib/web_socket.js b/lib/web_socket.js index 7645317..1148f48 100644 --- a/lib/web_socket.js +++ b/lib/web_socket.js @@ -51,11 +51,13 @@ var WebSocket = module.exports = function(address, httpOptions) { return; } - req.connection.on('close', function() { - self.emit('close'); - }); + self.onClose = function() { + self.emit('close'); + }; + req.connection.on('close', self.onClose); self.emit('open', req.connection); + self.socket = req.connection; }); req.on('error', function(e) { self.emit('error', e); }); @@ -68,3 +70,11 @@ var WebSocket = module.exports = function(address, httpOptions) { }; util.inherits(WebSocket, EventEmitter); + +WebSocket.prototype.close = function() { + this.socket.removeListener('close', this.onClose); + if(this.socket) { + this.socket.end(); + this.emit('close', null, null, true); + } +}; diff --git a/sample/PeeringTest/index.js b/sample/PeeringTest/index.js index 305db68..cb0daf4 100644 --- a/sample/PeeringTest/index.js +++ b/sample/PeeringTest/index.js @@ -1,5 +1,6 @@ var zetta = require('../../zetta.js'); zetta() + .link('http://127.0.0.1:3030/') .link('http://hello-zetta.herokuapp.com/') .listen(1337); diff --git a/test/test_api.js b/test/test_api.js index b9d40ec..d2328c1 100644 --- a/test/test_api.js +++ b/test/test_api.js @@ -334,6 +334,19 @@ describe('Zetta Api', function() { }); }); + it('should allow the querying of peers with the ql parameter', function(done) { + peerRegistry.save({ id: '1', type: 'initiator'}, function() { + request(getHttpServer(app)) + .get('/peer-management?ql=where%20type%3D%22initiator%22') + .expect(getBody(function(err, body) { + assert.equal(body.entities.length, 1); + var entity = body.entities[0]; + assert.equal(entity.properties.id, '1'); + })) + .end(done); + }); + }); + describe('#link', function() { it('should return status code 202', function(done) { request(getHttpServer(app)) diff --git a/test/test_peer_connection_api.js b/test/test_peer_connection_api.js new file mode 100644 index 0000000..5e703eb --- /dev/null +++ b/test/test_peer_connection_api.js @@ -0,0 +1,324 @@ +var assert = require('assert'); +var http = require('http'); +var zetta = require('../zetta'); +var MemRegistry = require('./fixture/mem_registry'); +var MemPeerRegistry = require('./fixture/mem_peer_registry'); +var request = require('supertest'); +var PeerRegistry = require('../lib/peer_registry'); +var Query = require('calypso').Query; +var querystring = require('querystring'); + +function deleteRequest(port, connectionId) { + var opts = { + host: '0.0.0.0', + port: port, + method: 'DELETE', + path: '/peer-management/' + connectionId + } + + var req = http.request(opts); + req.end(); +} + +function putRequest(port, connectionId, url) { + var qs = { + url: url + }; + var string = querystring.stringify(qs); + var opts = { + host: '0.0.0.0', + port: port, + method: 'PUT', + path: '/peer-management/' + connectionId, + headers: { + 'Content-Length': string.length + } + }; + + var req = http.request(opts); + req.write(string); + req.end(); +} + + +function getHttpServer(app) { + return app.httpServer.server; +} + +function getBody(fn) { + return function(res) { + try { + if(res.text) { + var body = JSON.parse(res.text); + } else { + var body = ''; + } + } catch(err) { + throw new Error('Failed to parse json body'); + } + + fn(res, body); + } +} + +describe('Peer Connection API', function() { + describe('/peer-management embedded entities', function() { + var peerRegistry = null; + var app = null; + + beforeEach(function(done) { + peerRegistry = new MemPeerRegistry(); + app = zetta({ registry: new MemRegistry(), peerRegistry: peerRegistry }) + .silent() + .name('local') + ._run(done); + }); + + it('exposes actions on the embedded entity', function(done) { + peerRegistry.save({id:'foo', connectionId:'12345'}, function() { + var url = '/peer-management'; + request(getHttpServer(app)) + .get(url) + .expect(getBody(function(res, body) { + assert.equal(body.entities.length, 1); + assert.equal(body.entities[0].actions.length, 2); + body.entities[0].actions.forEach(function(action) { + assert.ok(action.href.indexOf('/peer-management/12345') !== -1); + }) + })) + .end(done); + }); + }); + + it('exposes actions on the full entity', function(done) { + peerRegistry.save({id:'foo', connectionId:'12345'}, function() { + var url = '/peer-management/foo'; + request(getHttpServer(app)) + .get(url) + .expect(getBody(function(res, body) { + assert.equal(body.actions.length, 2); + body.actions.forEach(function(action) { + assert.ok(action.href.indexOf('/peer-management/12345') !== -1); + }); + })) + .end(done); + }); + }); + }); + + describe('/peer-management disconnection API', function() { + var cloud = null; + var cloudUrl = null; + var cloudPort = null; + var db1 = null; + var db2 = null; + + beforeEach(function(done) { + + cloud = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() }); + cloud.name('cloud'); + cloud.silent(); + + cloud.listen(0, function(err) { + if(err) { + return done(err); + } + + cloudPort = cloud.httpServer.server.address().port; + cloudUrl = 'http://0.0.0.0:' + cloudPort; + done(); + }); + }); + + afterEach(function(done) { + cloud.httpServer.server.close(); + done(); + }); + + it('will return 404 if connection does not exist', function(done) { + var url = '/peer-management/1234'; + request(getHttpServer(cloud)) + .del(url) + .expect(404, done); + }); + + it('will proxy a disconnection between two peers', function(done) { + //Had to increase the timeout. The standard two seconds may not be long enough for a connection to be established. + this.timeout(10000); + var connected = false; + var z = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() }); + z.name('local'); + var connectionId = null; + + z.pubsub.subscribe('_peer/disconnect', function(topic, data) { + assert.equal(connectionId, data.peer.connectionId); + done(); + }); + + cloud.pubsub.subscribe('_peer/connect', function(topic, data) { + assert.equal(connected, true); + connectionId = data.peer.connectionId; + deleteRequest(cloudPort, connectionId); + }); + z.pubsub.subscribe('_peer/connect', function(topic, data) { + connected = true; + }); + + z.silent(); + z.link(cloudUrl); + z.listen(0); + + + }); + + it('will disconnect two peers', function(done) { + this.timeout(10000); + var connected = false; + var localPort = null; + var z = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() }); + z.name('local'); + + var connectionId = null; + + z.pubsub.subscribe('_peer/disconnect', function(topic, data) { + assert.equal(connectionId, data.peer.connectionId); + done(); + }); + + cloud.pubsub.subscribe('_peer/connect', function(topic, data) { + assert.equal(connected, true); + connectionId = data.peer.connectionId; + deleteRequest(localPort, connectionId); + }); + + z.pubsub.subscribe('_peer/connect', function(topic, data) { + connected = true; + }); + + z.silent(); + z.link(cloudUrl); + z.listen(0, function(err) { + if(err) { + done(err); + } + + localPort = z.httpServer.server.address().port; + }); + + }); + }); + + describe('/peer-management update API', function() { + var cloud = null; + var localOne = null; + var cloudPort = null; + var localOnePort = null; + var connectionId = null; + + + beforeEach(function(done) { + this.timeout(10000); + cloud = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() }); + cloud.name('cloud'); + cloud.silent(); + + localOne = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() }); + localOne.name('localOne'); + localOne.silent(); + + cloud.pubsub.subscribe('_peer/connect', function(topic, data) { + connectionId = data.peer.connectionId; + done(); + }); + + cloud.listen(0, function(err) { + if(err) { + return done(err); + } + + cloudPort = cloud.httpServer.server.address().port; + var cloudUrl = 'http://0.0.0.0:' + cloudPort; + + localOne.link(cloudUrl); + localOne.listen(0, function(err) { + if(err) { + done(err); + } + + localPort = localOne.httpServer.server.address().port; + }); + }); + }); + + afterEach(function(done) { + cloud.httpServer.server.close(); + localOne.httpServer.server.close(); + done(); + }); + + it('will return 404 if connection does not exist', function(done) { + var url = '/peer-management/1234'; + request(getHttpServer(cloud)) + .put(url) + .set('Content-Type', 'application/x-www-form-urlencoded') + .send({ url: 'http://0.0.0.0:1234' }) + .expect(404, done); + }); + + it('will proxy a connection update between two peers', function(done) { + this.timeout(10000); + var localTwoPort = null; + var localTwo = zetta({ registry: new MemRegistry(), peerRegsitry: new MemPeerRegistry() }); + localTwo.name('localTwo'); + localTwo.silent(); + + var url = 'http://0.0.0.0:'; + + cloud.pubsub.subscribe('_peer/disconnect', function(topic, data) { + assert.equal(connectionId, data.peer.connectionId); + }); + + localTwo.pubsub.subscribe('_peer/connect', function(topic, data) { + done(); + }); + + localTwo.listen(0, function(err) { + if(err) { + done(err); + } + + localTwoPort = localTwo.httpServer.server.address().port; + var serverUrl = url + localTwoPort; + putRequest(cloudPort, connectionId, serverUrl); + }); + }); + + it('will update a connection between two peers', function(done) { + this.timeout(10000); + var localTwoPort = null; + var localTwo = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() }); + localTwo.name('localTwo'); + localTwo.silent(); + + var url = 'http://0.0.0.0:'; + + cloud.pubsub.subscribe('_peer/disconnect', function(topic, data) { + assert.equal(connectionId, data.peer.connectionId); + }); + + localTwo.pubsub.subscribe('_peer/connect', function(topic, data) { + done(); + }); + + localTwo.listen(0, function(err) { + if(err) { + done(err); + } + + localTwoPort = localTwo.httpServer.server.address().port; + var serverUrl = url + localTwoPort; + putRequest(localPort, connectionId, serverUrl); + }); + }); + }); +}); diff --git a/zetta.js b/zetta.js index d3d0d0b..a1edcc7 100644 --- a/zetta.js +++ b/zetta.js @@ -330,7 +330,7 @@ Zetta.prototype._initPeers = function(peers, callback) { if (existing) { if(!obj.fromLink || peers.indexOf(obj.url) > -1) { self.peerRegistry.save(obj, function() { - runPeer(obj); + self._runPeer(obj); }); } else { //Delete @@ -347,57 +347,62 @@ Zetta.prototype._initPeers = function(peers, callback) { fromLink:true }; self.peerRegistry.add(peerData, function(err, newPeer) { - runPeer(newPeer); + self._runPeer(newPeer); }); } - function runPeer(peer) { - var peerClient = new PeerClient(peer.url, self); - self._peerClients.push(peerClient); - - // when websocket is established - peerClient.on('connecting', function() { - peer.status = 'connecting'; - self.peerRegistry.save(peer); - }); - - // when peer handshake is made - peerClient.on('connected', function() { - peer.status = 'connected'; - self.peerRegistry.save(peer); - - // peer-event - self.pubsub.publish('_peer/connect', { peer: peerClient}); - }); - - peerClient.on('error', function(error) { - self.peerRegistry.get(peer.id, function(err, result) { - result.status = 'failed'; - result.error = error; - self.peerRegistry.save(result); - - // peer-event - self.pubsub.publish('_peer/disconnect', { peer: peerClient }); - }); - }); - - peerClient.on('closed', function(reconnect) { - self.peerRegistry.get(peer.id, function(err, result) { - result.status = 'disconnected'; - - // peer-event - self.pubsub.publish('_peer/disconnect', { peer: peerClient }); - self.peerRegistry.save(result, function() { }); - }); - }); - - peerClient.start(); - } + }); - + // end after db read callback(); }); return this; }; + +Zetta.prototype._runPeer = function(peer) { + var self = this; + var peerClient = new PeerClient(peer.url, self); + self._peerClients.push(peerClient); + + // when websocket is established + peerClient.on('connecting', function() { + peer.status = 'connecting'; + self.peerRegistry.save(peer); + }); + + // when peer handshake is made + peerClient.on('connected', function() { + peer.status = 'connected'; + peer.connectionId = peerClient.connectionId; + self.peerRegistry.save(peer); + + // peer-event + self.pubsub.publish('_peer/connect', { peer: peerClient}); + }); + + peerClient.on('error', function(error) { + self.peerRegistry.get(peer.id, function(err, result) { + result.status = 'failed'; + result.error = error; + self.peerRegistry.save(result); + + // peer-event + self.pubsub.publish('_peer/disconnect', { peer: peerClient }); + }); + }); + + peerClient.on('closed', function() { + self.peerRegistry.get(peer.id, function(err, result) { + result.status = 'disconnected'; + + // peer-event + self.pubsub.publish('_peer/disconnect', { peer: peerClient }); + self.peerRegistry.save(result, function() { }); + }); + }); + + peerClient.start(); +} +