From 9dc4c0e7547eaf064917872c365a80b925029b5f Mon Sep 17 00:00:00 2001 From: Matthew Dobson Date: Thu, 22 Jan 2015 14:30:54 -0500 Subject: [PATCH 01/30] Removed mention of runtime from the README.md it was buggin' me --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 3935fe9..490c4f0 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -#The Zetta Runtime +#Zetta [![Build Status](https://travis-ci.org/zettajs/zetta.svg?branch=master)](https://travis-ci.org/zettajs/zetta) From f9c93334a915dba765e150523e784dbe10d9a09a Mon Sep 17 00:00:00 2001 From: Matthew Dobson Date: Fri, 23 Jan 2015 11:46:15 -0500 Subject: [PATCH 02/30] Making progress on adding close logic. --- lib/api_resources/peer_management.js | 69 +++++++++++++++++++++++++--- lib/peer_client.js | 28 ++++++++--- lib/registry.js | 12 ++++- sample/PeeringTest/index.js | 2 +- zetta.js | 5 +- 5 files changed, 99 insertions(+), 17 deletions(-) diff --git a/lib/api_resources/peer_management.js b/lib/api_resources/peer_management.js index fe4b0aa..179c464 100644 --- a/lib/api_resources/peer_management.js +++ b/lib/api_resources/peer_management.js @@ -16,7 +16,9 @@ PeerManagementResource.prototype.init = function(config) { .consumes(MediaType.FORM_URLENCODED) .get('/', this.list) .post('/', this.link) - .get('/{id}', this.show); + .get('/{id}', this.show) + .del('/{id}', this.deletePeer) + .put('/{id}', this.updatePeer); }; PeerManagementResource.prototype.list = function(env, next) { @@ -65,13 +67,40 @@ PeerManagementResource.prototype.link = function(env, next) { }); }; +//This is conditonal based on where the request is coming from. +//From acceptor we'll add additonal info to the request, and proxy to initiator +//From intiator we'll perform the disconnection +PeerManagementResource.prototype.deletePeer = function(env, next) { + var id = env.route.params.id; + var self = this; + this.registry.get(id, function(err, result) { + if(result.direction === 'initiator') { + var wsUrl = PeerClient.calculatePeerUrl(result.url, id); + var peers = self.server._peerClients.filter(function(peer) { + return peer.url === wsUrl; + }); + var client = peers[0]; + client.close(); + env.response.statusCode = 200; + next(env); + } else if(result.direction === 'acceptor') { + console.log(result); + } else { + env.response.statusCode = 500; + next(env); + } + }); +}; + + +PeerManagementResource.prototype.updatePeer = function(env, next) {}; + PeerManagementResource.prototype._connect = function(peer) { var self = this; var client = new PeerClient(peer.url, this.server); client.on('connected', function() { self.registry.get(peer.id, function(err, result) { - result = JSON.parse(result); result.status = 'connected'; self.registry.save(result); }); @@ -79,7 +108,6 @@ PeerManagementResource.prototype._connect = function(peer) { client.on('error', function(error) { self.registry.get(peer.id, function(err, result) { - result = JSON.parse(result); result.status = 'failed'; result.error = error; self.registry.save(result); @@ -88,7 +116,6 @@ PeerManagementResource.prototype._connect = function(peer) { client.on('closed', function() { self.registry.get(peer.id, function(err, result) { - result = JSON.parse(result); result.status = 'connecting'; self.registry.save(result, function() { client.start(); @@ -105,7 +132,6 @@ PeerManagementResource.prototype._connect = function(peer) { PeerManagementResource.prototype.show = function(env, next) { var id = env.route.params.id; - this.registry.get(id, function(err, result) { if (err) { env.response.statusCode = 404; @@ -222,13 +248,42 @@ PeerItemBuilder.prototype.properties = function() { }; PeerItemBuilder.prototype.actions = function() { + this.base.actions = []; + + if(this.data.direction === 'acceptor') { + this.base.actions.push({ + name: 'disconnect', + method: 'DELETE', + href: this.urlHelper.current(), + fields:[] + },{ + name: 'update', + method: 'PUT', + href: this.urlHelper.current(), + fields: [{name: 'url', type: 'url'}] + }); + } + + if (this.data.direction === 'initiator') { - this.base.actions = [{ + this.base.actions.push({ name: 'reconnect', method: 'POST', href: this.urlHelper.path('/peer-management'), fields: [{ name: 'url', type: 'url', value: this.data.url }] - }]; + }, + { + name: 'disconnect', + method: 'DELETE', + href: this.urlHelper.current(), + fields: [{name:'peer', type: 'text'}] + }, + { + name: 'update', + method: 'PUT', + href: this.urlHelper.current(), + fields: [{name:'from', type:'url'}, {name: 'to', type:'url'}] + }); } return this; diff --git a/lib/peer_client.js b/lib/peer_client.js index 1263ac3..d150245 100644 --- a/lib/peer_client.js +++ b/lib/peer_client.js @@ -13,14 +13,20 @@ function backoffTime() { return Math.floor(0 + CONNECTION_BACKOFF_MAX * Math.random()); } -var PeerClient = module.exports = function(url, server) { - var wsUrl = url.replace(/^http/, 'ws'); - var peerPath = '/peers/' + server._name; +function calculatePeerUrl(url, name){ + var wsUrl = url.replace(/^http/, 'ws'); + var peerPath = '/peers/' + name; if(wsUrl.indexOf('/', wsUrl.length - 1) === -1) { wsUrl = wsUrl + peerPath; } else { wsUrl = wsUrl.slice(0, wsUrl.length - 1) + peerPath; - } + } + return wsUrl; +} + + +var PeerClient = module.exports = function(url, server) { + var wsUrl = calculatePeerUrl(url, server._name); this.url = wsUrl; this.server = server.httpServer.spdyServer; this.connected = false; @@ -36,10 +42,17 @@ var PeerClient = module.exports = function(url, server) { }; util.inherits(PeerClient, EventEmitter); +PeerClient.calculatePeerUrl = calculatePeerUrl; + PeerClient.prototype.start = function() { setTimeout(this._createSocket.bind(this), (this._firstConnect) ? 0 : backoffTime()); }; +PeerClient.prototype.close = function() { + this.ws.removeListener('close', this.onClose); + this.socket.emit('close'); +}; + PeerClient.prototype.checkServerReq = function() { var self = this; @@ -75,7 +88,9 @@ PeerClient.prototype._createSocket = function() { // create a new connection id this.connectionId = uuid.v4(); var ws = new WebSocket(this.url + "?connectionId=" + this.connectionId, {}); + this.ws = ws; ws.on('open', function(socket) { + self.socket = socket; self.checkServerReq(); self.emit('connecting'); self.server.emit('connection', socket); @@ -88,11 +103,12 @@ PeerClient.prototype._createSocket = function() { reconnect(err); }); - ws.on('close', function(code, message) { + this.onClose = function(code, message) { self.connected = false; self.log.emit('log', 'peer-client', 'Peer connection closed (' + self.url + '): ' + code + ' - ' + message); self.emit('closed', reconnect); - }); + }; + ws.on('close', this.onClose); function reconnect(err) { if (self.interval) { diff --git a/lib/registry.js b/lib/registry.js index 073a062..4f3bb8b 100644 --- a/lib/registry.js +++ b/lib/registry.js @@ -138,7 +138,17 @@ Registry.prototype._find = function(q, cb) { Registry.prototype.get = function(id, cb) { - this.db.get(id, { valueEncoding: 'json' }, cb); + this.db.get(id, { valueEncoding: 'json' }, function(err, result){ + if(err) { + cb(err); + } else { + if(typeof result === 'object') { + cb(null, result); + } else { + cb(null, JSON.parse(result)); + } + } + }); }; Registry.prototype.close = function() { diff --git a/sample/PeeringTest/index.js b/sample/PeeringTest/index.js index 305db68..e237f5b 100644 --- a/sample/PeeringTest/index.js +++ b/sample/PeeringTest/index.js @@ -1,5 +1,5 @@ var zetta = require('../../zetta.js'); zetta() - .link('http://hello-zetta.herokuapp.com/') + .link('http://127.0.0.1:3030/') .listen(1337); diff --git a/zetta.js b/zetta.js index 8f30497..c611631 100644 --- a/zetta.js +++ b/zetta.js @@ -332,8 +332,10 @@ Zetta.prototype._initPeers = function(callback) { var peerData = { url: obj, direction: 'initiator', - fromLink:true + fromLink:true, + id: self.id }; + console.log(peerData); self.peerRegistry.add(peerData, function(err, newPeer) { runPeer(newPeer); }); @@ -360,7 +362,6 @@ Zetta.prototype._initPeers = function(callback) { peerClient.on('error', function(error) { self.peerRegistry.get(peer.id, function(err, result) { - result = JSON.parse(result); result.status = 'failed'; result.error = error; self.peerRegistry.save(result); From 17b77b30bc8a0c6954f0f1f2abdbe81c68a41e7f Mon Sep 17 00:00:00 2001 From: Matthew Dobson Date: Fri, 23 Jan 2015 12:28:32 -0500 Subject: [PATCH 03/30] Got disconnect logic working. --- lib/peer_client.js | 16 +++++++++------- lib/web_socket.js | 16 +++++++++++++--- zetta.js | 4 +++- 3 files changed, 25 insertions(+), 11 deletions(-) diff --git a/lib/peer_client.js b/lib/peer_client.js index d150245..497541f 100644 --- a/lib/peer_client.js +++ b/lib/peer_client.js @@ -49,8 +49,7 @@ PeerClient.prototype.start = function() { }; PeerClient.prototype.close = function() { - this.ws.removeListener('close', this.onClose); - this.socket.emit('close'); + this.ws.close(); }; PeerClient.prototype.checkServerReq = function() { @@ -90,7 +89,6 @@ PeerClient.prototype._createSocket = function() { var ws = new WebSocket(this.url + "?connectionId=" + this.connectionId, {}); this.ws = ws; ws.on('open', function(socket) { - self.socket = socket; self.checkServerReq(); self.emit('connecting'); self.server.emit('connection', socket); @@ -103,12 +101,16 @@ PeerClient.prototype._createSocket = function() { reconnect(err); }); - this.onClose = function(code, message) { + + ws.on('close', function(code, message, intentional) { self.connected = false; self.log.emit('log', 'peer-client', 'Peer connection closed (' + self.url + '): ' + code + ' - ' + message); - self.emit('closed', reconnect); - }; - ws.on('close', this.onClose); + if(!intentional) { + self.emit('closed', reconnect); + } else { + self.emit('closed'); + } + }); function reconnect(err) { if (self.interval) { diff --git a/lib/web_socket.js b/lib/web_socket.js index aa094e9..6d72ce0 100644 --- a/lib/web_socket.js +++ b/lib/web_socket.js @@ -51,11 +51,13 @@ var WebSocket = module.exports = function(address, httpOptions) { return; } - req.connection.on('close', function() { - self.emit('close'); - }); + self.onClose = function() { + self.emit('close'); + }; + req.connection.on('close', self.onClose); self.emit('open', req.connection); + self.socket = req.connection; }); req.on('error', function(e) { self.emit('error', e); }); @@ -63,3 +65,11 @@ var WebSocket = module.exports = function(address, httpOptions) { }; util.inherits(WebSocket, EventEmitter); + +WebSocket.prototype.close = function() { + this.socket.removeListener('close', this.onClose); + if(this.socket) { + this.socket.end(); + this.emit('close', null, null, true); + } +}; diff --git a/zetta.js b/zetta.js index c611631..2d7293d 100644 --- a/zetta.js +++ b/zetta.js @@ -380,7 +380,9 @@ Zetta.prototype._initPeers = function(callback) { self.peerRegistry.save(result, function() { // re-connect - peerClient.start(); + if(reconnect) { + peerClient.start(); + } }); }); }); From 3a614a4586f1b18429e97d08f01699ddb6fe6e81 Mon Sep 17 00:00:00 2001 From: Matthew Dobson Date: Fri, 23 Jan 2015 13:39:06 -0500 Subject: [PATCH 04/30] Got API disconnects working. --- lib/api_resources/peer_management.js | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/api_resources/peer_management.js b/lib/api_resources/peer_management.js index 179c464..4cf2765 100644 --- a/lib/api_resources/peer_management.js +++ b/lib/api_resources/peer_management.js @@ -83,8 +83,10 @@ PeerManagementResource.prototype.deletePeer = function(env, next) { client.close(); env.response.statusCode = 200; next(env); + } else if(result.direction === 'acceptor') { - console.log(result); + var socket = self.server.httpServer.peers[id]; + self.server.httpServer.proxyToPeer(env, next); } else { env.response.statusCode = 500; next(env); @@ -92,7 +94,6 @@ PeerManagementResource.prototype.deletePeer = function(env, next) { }); }; - PeerManagementResource.prototype.updatePeer = function(env, next) {}; PeerManagementResource.prototype._connect = function(peer) { From 547780e6e5544abc4642025d077ad2f343a22b86 Mon Sep 17 00:00:00 2001 From: Matthew Dobson Date: Fri, 23 Jan 2015 14:31:57 -0500 Subject: [PATCH 05/30] Proxied disconnects working now. --- lib/api_resources/peer_management.js | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/lib/api_resources/peer_management.js b/lib/api_resources/peer_management.js index 4cf2765..75fa5d0 100644 --- a/lib/api_resources/peer_management.js +++ b/lib/api_resources/peer_management.js @@ -3,6 +3,7 @@ var PeerClient = require('../peer_client'); var rels = require('zetta-rels'); var MediaType = require('api-media-type'); var Query = require('calypso').Query; +var http = require('http'); var PeerManagementResource = module.exports = function(server) { this.server = server; @@ -18,7 +19,6 @@ PeerManagementResource.prototype.init = function(config) { .post('/', this.link) .get('/{id}', this.show) .del('/{id}', this.deletePeer) - .put('/{id}', this.updatePeer); }; PeerManagementResource.prototype.list = function(env, next) { @@ -80,13 +80,25 @@ PeerManagementResource.prototype.deletePeer = function(env, next) { return peer.url === wsUrl; }); var client = peers[0]; - client.close(); env.response.statusCode = 200; next(env); + client.close(); } else if(result.direction === 'acceptor') { var socket = self.server.httpServer.peers[id]; - self.server.httpServer.proxyToPeer(env, next); + socket.on('end', function() { + env.response.statusCode = 200; + next(env); + }); + //self.server.httpServer.proxyToPeer(env, next); + var peer = self.server.httpServer.peers[id]; + var req = env.request; + var res = env.response; + var agent = env.zettaAgent || peer.agent; + + var opts = { method: req.method, headers: req.headers, path: req.url, agent: agent }; + var request = http.request(opts); + request.end(); } else { env.response.statusCode = 500; next(env); From 04840c938f233c2b71e421a8ccfe97eaa1d85afa Mon Sep 17 00:00:00 2001 From: Matthew Dobson Date: Fri, 23 Jan 2015 15:50:47 -0500 Subject: [PATCH 06/30] Updates to peering to use the old id style. --- lib/api_resources/peer_management.js | 2 +- sample/PeeringTest/index.js | 1 + zetta.js | 4 +--- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/lib/api_resources/peer_management.js b/lib/api_resources/peer_management.js index 75fa5d0..30a1570 100644 --- a/lib/api_resources/peer_management.js +++ b/lib/api_resources/peer_management.js @@ -75,7 +75,7 @@ PeerManagementResource.prototype.deletePeer = function(env, next) { var self = this; this.registry.get(id, function(err, result) { if(result.direction === 'initiator') { - var wsUrl = PeerClient.calculatePeerUrl(result.url, id); + var wsUrl = PeerClient.calculatePeerUrl(result.url, self.server._name); var peers = self.server._peerClients.filter(function(peer) { return peer.url === wsUrl; }); diff --git a/sample/PeeringTest/index.js b/sample/PeeringTest/index.js index e237f5b..cb0daf4 100644 --- a/sample/PeeringTest/index.js +++ b/sample/PeeringTest/index.js @@ -2,4 +2,5 @@ var zetta = require('../../zetta.js'); zetta() .link('http://127.0.0.1:3030/') + .link('http://hello-zetta.herokuapp.com/') .listen(1337); diff --git a/zetta.js b/zetta.js index 2d7293d..fc7aa7e 100644 --- a/zetta.js +++ b/zetta.js @@ -332,10 +332,8 @@ Zetta.prototype._initPeers = function(callback) { var peerData = { url: obj, direction: 'initiator', - fromLink:true, - id: self.id + fromLink:true }; - console.log(peerData); self.peerRegistry.add(peerData, function(err, newPeer) { runPeer(newPeer); }); From dd6db55b6abd98bd17aab6e97f32bc22a5dbbc31 Mon Sep 17 00:00:00 2001 From: Matthew Dobson Date: Fri, 23 Jan 2015 16:38:34 -0500 Subject: [PATCH 07/30] Refactoring for the update API action. --- lib/api_resources/peer_management.js | 50 ++++++++++++++++------------ 1 file changed, 29 insertions(+), 21 deletions(-) diff --git a/lib/api_resources/peer_management.js b/lib/api_resources/peer_management.js index 30a1570..608a328 100644 --- a/lib/api_resources/peer_management.js +++ b/lib/api_resources/peer_management.js @@ -75,30 +75,11 @@ PeerManagementResource.prototype.deletePeer = function(env, next) { var self = this; this.registry.get(id, function(err, result) { if(result.direction === 'initiator') { - var wsUrl = PeerClient.calculatePeerUrl(result.url, self.server._name); - var peers = self.server._peerClients.filter(function(peer) { - return peer.url === wsUrl; - }); - var client = peers[0]; + self._disconnect(result); env.response.statusCode = 200; next(env); - client.close(); - } else if(result.direction === 'acceptor') { - var socket = self.server.httpServer.peers[id]; - socket.on('end', function() { - env.response.statusCode = 200; - next(env); - }); - //self.server.httpServer.proxyToPeer(env, next); - var peer = self.server.httpServer.peers[id]; - var req = env.request; - var res = env.response; - var agent = env.zettaAgent || peer.agent; - - var opts = { method: req.method, headers: req.headers, path: req.url, agent: agent }; - var request = http.request(opts); - request.end(); + self._proxyDisconnect(env, next); } else { env.response.statusCode = 500; next(env); @@ -106,6 +87,33 @@ PeerManagementResource.prototype.deletePeer = function(env, next) { }); }; +PeerManagmentResource.prototype._disconnect = function(peer) { + var wsUrl = PeerClient.calculatePeerUrl(result.url, self.server._name); + var peers = self.server._peerClients.filter(function(peer) { + return peer.url === wsUrl; + }); + var client = peers[0]; + client.close(); + }; + +PeerManagementResource.prototype._proxyDisconnect = function(env, next) { + var socket = self.server.httpServer.peers[id]; + socket.on('end', function() { + env.response.statusCode = 200; + next(env); + }); + //self.server.httpServer.proxyToPeer(env, next); + var peer = self.server.httpServer.peers[id]; + var req = env.request; + var res = env.response; + var agent = env.zettaAgent || peer.agent; + + var opts = { method: req.method, headers: req.headers, path: req.url, agent: agent }; + var request = http.request(opts); + request.end(); + +}; + PeerManagementResource.prototype.updatePeer = function(env, next) {}; PeerManagementResource.prototype._connect = function(peer) { From 3c33c9e5619b22441c3265adb5a6a75e9efe6984 Mon Sep 17 00:00:00 2001 From: Matthew Dobson Date: Sun, 25 Jan 2015 22:10:02 -0500 Subject: [PATCH 08/30] Semi-significant update. You can now proxy changes to the Z2Z protocol. --- lib/api_resources/peer_management.js | 156 ++++++++++++++++++--------- lib/peer_registry.js | 2 +- lib/peer_socket.js | 1 + zetta.js | 1 + 4 files changed, 111 insertions(+), 49 deletions(-) diff --git a/lib/api_resources/peer_management.js b/lib/api_resources/peer_management.js index 608a328..bbf30e6 100644 --- a/lib/api_resources/peer_management.js +++ b/lib/api_resources/peer_management.js @@ -18,7 +18,8 @@ PeerManagementResource.prototype.init = function(config) { .get('/', this.list) .post('/', this.link) .get('/{id}', this.show) - .del('/{id}', this.deletePeer) + .del('/', this.deletePeer) + .put('/', this.updatePeer); }; PeerManagementResource.prototype.list = function(env, next) { @@ -71,51 +72,122 @@ PeerManagementResource.prototype.link = function(env, next) { //From acceptor we'll add additonal info to the request, and proxy to initiator //From intiator we'll perform the disconnection PeerManagementResource.prototype.deletePeer = function(env, next) { - var id = env.route.params.id; + var id = env.route.query.connectionId; var self = this; - this.registry.get(id, function(err, result) { - if(result.direction === 'initiator') { - self._disconnect(result); - env.response.statusCode = 200; - next(env); - } else if(result.direction === 'acceptor') { - self._proxyDisconnect(env, next); + var query = Query.of('peers').ql('where connectionId=@id').params({id: id}); + this.registry.find(query, function(err, results) { + if(results.length) { + var peer = results[0]; + if(peer.direction === 'initiator') { + self._disconnect(peer); + env.response.statusCode = 200; + next(env); + } else if(peer.direction === 'acceptor') { + self._proxyDisconnect(env, next, id); + } else { + env.response.statusCode = 500; + next(env); + } } else { - env.response.statusCode = 500; - next(env); - } + env.response.statusCode = 404; + next(env); + } + }); +}; + +//Updating a peer is a two part process. +//First we'll determine whether or not the API call should be proxied +//Then we'll connect to the new peer. +//Then we'll disconnect from the old peer. +PeerManagementResource.prototype.updatePeer = function(env, next) { + var self = this; + env.request.getBody(function(err, body) { + var params = querystring.parse(body.toString()); + var id = params.connectionId; + var url = params.url; + var query = Query.of('peers').ql('where connectionId=@id').params({id: id}); + self.registry.find(query, function(err, results) { + if(results.length) { + var peer = results[0]; + if(peer.direction === 'initiator') { + + self.server._peers.push(url); + + var newPeer = { + url: url, + direction: 'initiator', + status: 'connecting' + }; + + + self.registry.add(newPeer, function(err, newPeer) { + if (err) { + env.response.statusCode = 500; + return next(env); + } + + self._connect(newPeer); + self._disconnect(peer); + env.response.statusCode = 200; + next(env); + }); + + } else if(peer.direction === 'acceptor') { + self._proxyDisconnect(env, next, id); + } else { + env.response.statusCode = 500; + next(env); + } + } else { + env.response.statusCode = 404; + next(env); + } + }); }); }; -PeerManagmentResource.prototype._disconnect = function(peer) { - var wsUrl = PeerClient.calculatePeerUrl(result.url, self.server._name); - var peers = self.server._peerClients.filter(function(peer) { +PeerManagementResource.prototype._disconnect = function(peer) { + var wsUrl = PeerClient.calculatePeerUrl(peer.url, this.server._name); + var peers = this.server._peerClients.filter(function(peer) { return peer.url === wsUrl; }); var client = peers[0]; client.close(); - }; +}; + +PeerManagementResource.prototype._proxyDisconnect = function(env, next, id) { + var self = this; + var peerSocket; + var sockets = Object.keys(this.server.httpServer.peers).forEach(function(socketId){ + var peers = self.server.httpServer.peers; + var socket = peers[socketId]; + if(socket.connectionId === id) { + peerSocket = socket; + } + }); -PeerManagementResource.prototype._proxyDisconnect = function(env, next) { - var socket = self.server.httpServer.peers[id]; - socket.on('end', function() { + peerSocket.on('end', function() { env.response.statusCode = 200; next(env); }); - //self.server.httpServer.proxyToPeer(env, next); - var peer = self.server.httpServer.peers[id]; + + this._proxyToPeer(peerSocket, env); + +}; + +PeerManagementResource.prototype._proxyToPeer = function(peer, env) { var req = env.request; var res = env.response; var agent = env.zettaAgent || peer.agent; - var opts = { method: req.method, headers: req.headers, path: req.url, agent: agent }; var request = http.request(opts); - request.end(); - + if(req.body) { + request.end(req.body); + } else { + req.pipe(request); + } }; -PeerManagementResource.prototype.updatePeer = function(env, next) {}; - PeerManagementResource.prototype._connect = function(peer) { var self = this; var client = new PeerClient(peer.url, this.server); @@ -123,6 +195,7 @@ PeerManagementResource.prototype._connect = function(peer) { client.on('connected', function() { self.registry.get(peer.id, function(err, result) { result.status = 'connected'; + result.connectionId = client.connectionId; self.registry.save(result); }); }); @@ -145,7 +218,7 @@ PeerManagementResource.prototype._connect = function(peer) { }); peer.status = 'connecting'; - self.registry.save(peer, function() { + self.registry.add(peer, function() { client.start(); }); // setTimeout to change status to failed if connection not made. @@ -196,7 +269,8 @@ PeerManagementBuilder.prototype.entities = function() { direction: peer.direction, status: peer.status, error: peer.error, - updated: peer.updated + updated: peer.updated, + connectionId: peer.connectionId } }; @@ -262,7 +336,8 @@ PeerItemBuilder.prototype.properties = function() { direction: this.data.direction, status: this.data.status, error: this.data.error, - updated: this.data.updated + updated: this.data.updated, + connectionId: this.data.connectionId }; return this; @@ -271,20 +346,17 @@ PeerItemBuilder.prototype.properties = function() { PeerItemBuilder.prototype.actions = function() { this.base.actions = []; - if(this.data.direction === 'acceptor') { this.base.actions.push({ name: 'disconnect', method: 'DELETE', - href: this.urlHelper.current(), - fields:[] + href: this.urlHelper.path('/peer-management'), + fields:[{name:'connectionId', type:'text', value: this.data.connectionId}] },{ name: 'update', method: 'PUT', - href: this.urlHelper.current(), - fields: [{name: 'url', type: 'url'}] + href: this.urlHelper.path('/peer-management'), + fields: [{name: 'url', type: 'url'}, {name:'connectionId', type:'text', value: this.data.connectionId}] }); - } - if (this.data.direction === 'initiator') { this.base.actions.push({ @@ -292,18 +364,6 @@ PeerItemBuilder.prototype.actions = function() { method: 'POST', href: this.urlHelper.path('/peer-management'), fields: [{ name: 'url', type: 'url', value: this.data.url }] - }, - { - name: 'disconnect', - method: 'DELETE', - href: this.urlHelper.current(), - fields: [{name:'peer', type: 'text'}] - }, - { - name: 'update', - method: 'PUT', - href: this.urlHelper.current(), - fields: [{name:'from', type:'url'}, {name: 'to', type:'url'}] }); } diff --git a/lib/peer_registry.js b/lib/peer_registry.js index 01a0cf5..a417858 100644 --- a/lib/peer_registry.js +++ b/lib/peer_registry.js @@ -22,7 +22,7 @@ PeerRegistry.prototype.save = function(peer, cb) { peer.updated = Date.now(); - this.db.put(peer.id, JSON.stringify(peer), cb); + this.db.put(peer.id, peer, {valueEncoding: 'json'}, cb); } PeerRegistry.prototype.add = function(peer, cb) { diff --git a/lib/peer_socket.js b/lib/peer_socket.js index ca2bab5..9ce57cd 100644 --- a/lib/peer_socket.js +++ b/lib/peer_socket.js @@ -167,6 +167,7 @@ PeerSocket.prototype._setPeerStatus = function(status, err, cb) { } peer.status = status; + peer.connectionId = self.connectionId; if (err) { peer.error = err; } diff --git a/zetta.js b/zetta.js index fc7aa7e..c16c893 100644 --- a/zetta.js +++ b/zetta.js @@ -352,6 +352,7 @@ Zetta.prototype._initPeers = function(callback) { // when peer handshake is made peerClient.on('connected', function() { peer.status = 'connected'; + peer.connectionId = peerClient.connectionId; self.peerRegistry.save(peer); // peer-event From 7f22f735a83f1a53fef99f479fc12f264e897907 Mon Sep 17 00:00:00 2001 From: Matthew Dobson Date: Sun, 25 Jan 2015 22:22:10 -0500 Subject: [PATCH 09/30] Allow CAQL queries on peers. --- lib/api_resources/peer_management.js | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lib/api_resources/peer_management.js b/lib/api_resources/peer_management.js index bbf30e6..c821063 100644 --- a/lib/api_resources/peer_management.js +++ b/lib/api_resources/peer_management.js @@ -23,7 +23,11 @@ PeerManagementResource.prototype.init = function(config) { }; PeerManagementResource.prototype.list = function(env, next) { + var params = env.route.query; var allQuery = Query.of('peers'); + if(params) { + allQuery.ql(params.ql); + } this.registry.find(allQuery, function(err, results) { var builder = new PeerManagementBuilder(results, env.helpers.url); From 36ad6a229f177ab99f4f600d7f90dfc8ef427001 Mon Sep 17 00:00:00 2001 From: Matthew Dobson Date: Thu, 22 Jan 2015 14:30:54 -0500 Subject: [PATCH 10/30] Removed mention of runtime from the README.md it was buggin' me --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 3935fe9..490c4f0 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -#The Zetta Runtime +#Zetta [![Build Status](https://travis-ci.org/zettajs/zetta.svg?branch=master)](https://travis-ci.org/zettajs/zetta) From 8b770065a0719ae615aab692504573a68d285732 Mon Sep 17 00:00:00 2001 From: Matthew Dobson Date: Fri, 23 Jan 2015 11:46:15 -0500 Subject: [PATCH 11/30] Making progress on adding close logic. --- lib/api_resources/peer_management.js | 69 +++++++++++++++++++++++++--- lib/peer_client.js | 21 +++++++-- lib/peer_socket.js | 1 + lib/registry.js | 12 ++++- sample/PeeringTest/index.js | 2 +- zetta.js | 5 +- 6 files changed, 96 insertions(+), 14 deletions(-) diff --git a/lib/api_resources/peer_management.js b/lib/api_resources/peer_management.js index fe4b0aa..179c464 100644 --- a/lib/api_resources/peer_management.js +++ b/lib/api_resources/peer_management.js @@ -16,7 +16,9 @@ PeerManagementResource.prototype.init = function(config) { .consumes(MediaType.FORM_URLENCODED) .get('/', this.list) .post('/', this.link) - .get('/{id}', this.show); + .get('/{id}', this.show) + .del('/{id}', this.deletePeer) + .put('/{id}', this.updatePeer); }; PeerManagementResource.prototype.list = function(env, next) { @@ -65,13 +67,40 @@ PeerManagementResource.prototype.link = function(env, next) { }); }; +//This is conditonal based on where the request is coming from. +//From acceptor we'll add additonal info to the request, and proxy to initiator +//From intiator we'll perform the disconnection +PeerManagementResource.prototype.deletePeer = function(env, next) { + var id = env.route.params.id; + var self = this; + this.registry.get(id, function(err, result) { + if(result.direction === 'initiator') { + var wsUrl = PeerClient.calculatePeerUrl(result.url, id); + var peers = self.server._peerClients.filter(function(peer) { + return peer.url === wsUrl; + }); + var client = peers[0]; + client.close(); + env.response.statusCode = 200; + next(env); + } else if(result.direction === 'acceptor') { + console.log(result); + } else { + env.response.statusCode = 500; + next(env); + } + }); +}; + + +PeerManagementResource.prototype.updatePeer = function(env, next) {}; + PeerManagementResource.prototype._connect = function(peer) { var self = this; var client = new PeerClient(peer.url, this.server); client.on('connected', function() { self.registry.get(peer.id, function(err, result) { - result = JSON.parse(result); result.status = 'connected'; self.registry.save(result); }); @@ -79,7 +108,6 @@ PeerManagementResource.prototype._connect = function(peer) { client.on('error', function(error) { self.registry.get(peer.id, function(err, result) { - result = JSON.parse(result); result.status = 'failed'; result.error = error; self.registry.save(result); @@ -88,7 +116,6 @@ PeerManagementResource.prototype._connect = function(peer) { client.on('closed', function() { self.registry.get(peer.id, function(err, result) { - result = JSON.parse(result); result.status = 'connecting'; self.registry.save(result, function() { client.start(); @@ -105,7 +132,6 @@ PeerManagementResource.prototype._connect = function(peer) { PeerManagementResource.prototype.show = function(env, next) { var id = env.route.params.id; - this.registry.get(id, function(err, result) { if (err) { env.response.statusCode = 404; @@ -222,13 +248,42 @@ PeerItemBuilder.prototype.properties = function() { }; PeerItemBuilder.prototype.actions = function() { + this.base.actions = []; + + if(this.data.direction === 'acceptor') { + this.base.actions.push({ + name: 'disconnect', + method: 'DELETE', + href: this.urlHelper.current(), + fields:[] + },{ + name: 'update', + method: 'PUT', + href: this.urlHelper.current(), + fields: [{name: 'url', type: 'url'}] + }); + } + + if (this.data.direction === 'initiator') { - this.base.actions = [{ + this.base.actions.push({ name: 'reconnect', method: 'POST', href: this.urlHelper.path('/peer-management'), fields: [{ name: 'url', type: 'url', value: this.data.url }] - }]; + }, + { + name: 'disconnect', + method: 'DELETE', + href: this.urlHelper.current(), + fields: [{name:'peer', type: 'text'}] + }, + { + name: 'update', + method: 'PUT', + href: this.urlHelper.current(), + fields: [{name:'from', type:'url'}, {name: 'to', type:'url'}] + }); } return this; diff --git a/lib/peer_client.js b/lib/peer_client.js index cefb3ac..9ba88a9 100644 --- a/lib/peer_client.js +++ b/lib/peer_client.js @@ -13,15 +13,22 @@ function backoffTime() { return Math.floor(0 + CONNECTION_BACKOFF_MAX * Math.random()); } -var PeerClient = module.exports = function(url, server) { - var wsUrl = url.replace(/^http/, 'ws'); - var peerPath = '/peers/' + server._name; +function calculatePeerUrl(url, name){ + var wsUrl = url.replace(/^http/, 'ws'); + var peerPath = '/peers/' + name; if(wsUrl.indexOf('/', wsUrl.length - 1) === -1) { wsUrl = wsUrl + peerPath; } else { wsUrl = wsUrl.slice(0, wsUrl.length - 1) + peerPath; } + wsUrl = wsUrl.slice(0, wsUrl.length - 1) + peerPath; + } + return wsUrl; +} + +var PeerClient = module.exports = function(url, server) { + var wsUrl = calculatePeerUrl(url, server._name); this.reconnect = { min: 100, max: 30000, // max amount of time allowed to backoff @@ -29,6 +36,7 @@ var PeerClient = module.exports = function(url, server) { }; this.url = wsUrl; + this.server = server.httpServer.spdyServer; this.connected = false; this.retryCount = 0; @@ -44,6 +52,7 @@ var PeerClient = module.exports = function(url, server) { }; util.inherits(PeerClient, EventEmitter); +PeerClient.calculatePeerUrl = calculatePeerUrl; PeerClient.prototype.start = function() { this._createSocket(); @@ -97,6 +106,11 @@ PeerClient.prototype._createSocket = function() { } }; +PeerClient.prototype.close = function() { + this.ws.removeListener('close', this.onClose); + this.socket.emit('close'); +}; + PeerClient.prototype.checkServerReq = function() { var self = this; @@ -133,7 +147,6 @@ PeerClient.prototype.generateBackoff = function(attempt) { var random = parseInt(Math.random() * this.reconnect.maxRandomOffset); var backoff = (Math.pow(2, attempt) * this.reconnect.min); - if (backoff > this.reconnect.max) { return this.reconnect.max + random; } else { diff --git a/lib/peer_socket.js b/lib/peer_socket.js index ca2bab5..9ce57cd 100644 --- a/lib/peer_socket.js +++ b/lib/peer_socket.js @@ -167,6 +167,7 @@ PeerSocket.prototype._setPeerStatus = function(status, err, cb) { } peer.status = status; + peer.connectionId = self.connectionId; if (err) { peer.error = err; } diff --git a/lib/registry.js b/lib/registry.js index 073a062..4f3bb8b 100644 --- a/lib/registry.js +++ b/lib/registry.js @@ -138,7 +138,17 @@ Registry.prototype._find = function(q, cb) { Registry.prototype.get = function(id, cb) { - this.db.get(id, { valueEncoding: 'json' }, cb); + this.db.get(id, { valueEncoding: 'json' }, function(err, result){ + if(err) { + cb(err); + } else { + if(typeof result === 'object') { + cb(null, result); + } else { + cb(null, JSON.parse(result)); + } + } + }); }; Registry.prototype.close = function() { diff --git a/sample/PeeringTest/index.js b/sample/PeeringTest/index.js index 305db68..e237f5b 100644 --- a/sample/PeeringTest/index.js +++ b/sample/PeeringTest/index.js @@ -1,5 +1,5 @@ var zetta = require('../../zetta.js'); zetta() - .link('http://hello-zetta.herokuapp.com/') + .link('http://127.0.0.1:3030/') .listen(1337); diff --git a/zetta.js b/zetta.js index 3b45cd6..ea9006a 100644 --- a/zetta.js +++ b/zetta.js @@ -338,8 +338,10 @@ Zetta.prototype._initPeers = function(callback) { var peerData = { url: obj, direction: 'initiator', - fromLink:true + fromLink:true, + id: self.id }; + console.log(peerData); self.peerRegistry.add(peerData, function(err, newPeer) { runPeer(newPeer); }); @@ -352,6 +354,7 @@ Zetta.prototype._initPeers = function(callback) { // when websocket is established peerClient.on('connecting', function() { peer.status = 'connecting'; + peer.connectionId = peerClient.connectionId; self.peerRegistry.save(peer); }); From d331aa1bc8aaeef87a3a08c59e11ff117a10791b Mon Sep 17 00:00:00 2001 From: Matthew Dobson Date: Fri, 23 Jan 2015 12:28:32 -0500 Subject: [PATCH 12/30] Got disconnect logic working. --- lib/peer_client.js | 3 +-- lib/web_socket.js | 16 +++++++++++++--- zetta.js | 2 +- 3 files changed, 15 insertions(+), 6 deletions(-) diff --git a/lib/peer_client.js b/lib/peer_client.js index 9ba88a9..cba8ef3 100644 --- a/lib/peer_client.js +++ b/lib/peer_client.js @@ -107,8 +107,7 @@ PeerClient.prototype._createSocket = function() { }; PeerClient.prototype.close = function() { - this.ws.removeListener('close', this.onClose); - this.socket.emit('close'); + this.ws.close(); }; PeerClient.prototype.checkServerReq = function() { diff --git a/lib/web_socket.js b/lib/web_socket.js index 7645317..1148f48 100644 --- a/lib/web_socket.js +++ b/lib/web_socket.js @@ -51,11 +51,13 @@ var WebSocket = module.exports = function(address, httpOptions) { return; } - req.connection.on('close', function() { - self.emit('close'); - }); + self.onClose = function() { + self.emit('close'); + }; + req.connection.on('close', self.onClose); self.emit('open', req.connection); + self.socket = req.connection; }); req.on('error', function(e) { self.emit('error', e); }); @@ -68,3 +70,11 @@ var WebSocket = module.exports = function(address, httpOptions) { }; util.inherits(WebSocket, EventEmitter); + +WebSocket.prototype.close = function() { + this.socket.removeListener('close', this.onClose); + if(this.socket) { + this.socket.end(); + this.emit('close', null, null, true); + } +}; diff --git a/zetta.js b/zetta.js index ea9006a..03d71bb 100644 --- a/zetta.js +++ b/zetta.js @@ -385,7 +385,7 @@ Zetta.prototype._initPeers = function(callback) { // peer-event self.pubsub.publish('_peer/disconnect', { peer: peerClient }); self.peerRegistry.save(result, function() { }); - }); + }); }); peerClient.start(); From 8b782422561a83b30e5d647080f2446d408a40a4 Mon Sep 17 00:00:00 2001 From: Matthew Dobson Date: Fri, 23 Jan 2015 13:39:06 -0500 Subject: [PATCH 13/30] Got API disconnects working. --- lib/api_resources/peer_management.js | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/api_resources/peer_management.js b/lib/api_resources/peer_management.js index 179c464..4cf2765 100644 --- a/lib/api_resources/peer_management.js +++ b/lib/api_resources/peer_management.js @@ -83,8 +83,10 @@ PeerManagementResource.prototype.deletePeer = function(env, next) { client.close(); env.response.statusCode = 200; next(env); + } else if(result.direction === 'acceptor') { - console.log(result); + var socket = self.server.httpServer.peers[id]; + self.server.httpServer.proxyToPeer(env, next); } else { env.response.statusCode = 500; next(env); @@ -92,7 +94,6 @@ PeerManagementResource.prototype.deletePeer = function(env, next) { }); }; - PeerManagementResource.prototype.updatePeer = function(env, next) {}; PeerManagementResource.prototype._connect = function(peer) { From 62fb32b4cf571ced2dc2d4c23692a201052f8a7b Mon Sep 17 00:00:00 2001 From: Matthew Dobson Date: Fri, 23 Jan 2015 14:31:57 -0500 Subject: [PATCH 14/30] Proxied disconnects working now. --- lib/api_resources/peer_management.js | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/lib/api_resources/peer_management.js b/lib/api_resources/peer_management.js index 4cf2765..75fa5d0 100644 --- a/lib/api_resources/peer_management.js +++ b/lib/api_resources/peer_management.js @@ -3,6 +3,7 @@ var PeerClient = require('../peer_client'); var rels = require('zetta-rels'); var MediaType = require('api-media-type'); var Query = require('calypso').Query; +var http = require('http'); var PeerManagementResource = module.exports = function(server) { this.server = server; @@ -18,7 +19,6 @@ PeerManagementResource.prototype.init = function(config) { .post('/', this.link) .get('/{id}', this.show) .del('/{id}', this.deletePeer) - .put('/{id}', this.updatePeer); }; PeerManagementResource.prototype.list = function(env, next) { @@ -80,13 +80,25 @@ PeerManagementResource.prototype.deletePeer = function(env, next) { return peer.url === wsUrl; }); var client = peers[0]; - client.close(); env.response.statusCode = 200; next(env); + client.close(); } else if(result.direction === 'acceptor') { var socket = self.server.httpServer.peers[id]; - self.server.httpServer.proxyToPeer(env, next); + socket.on('end', function() { + env.response.statusCode = 200; + next(env); + }); + //self.server.httpServer.proxyToPeer(env, next); + var peer = self.server.httpServer.peers[id]; + var req = env.request; + var res = env.response; + var agent = env.zettaAgent || peer.agent; + + var opts = { method: req.method, headers: req.headers, path: req.url, agent: agent }; + var request = http.request(opts); + request.end(); } else { env.response.statusCode = 500; next(env); From 75574e722fd8427a96da03421938efc48315e812 Mon Sep 17 00:00:00 2001 From: Matthew Dobson Date: Fri, 23 Jan 2015 15:50:47 -0500 Subject: [PATCH 15/30] Updates to peering to use the old id style. --- lib/api_resources/peer_management.js | 2 +- sample/PeeringTest/index.js | 1 + zetta.js | 4 +--- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/lib/api_resources/peer_management.js b/lib/api_resources/peer_management.js index 75fa5d0..30a1570 100644 --- a/lib/api_resources/peer_management.js +++ b/lib/api_resources/peer_management.js @@ -75,7 +75,7 @@ PeerManagementResource.prototype.deletePeer = function(env, next) { var self = this; this.registry.get(id, function(err, result) { if(result.direction === 'initiator') { - var wsUrl = PeerClient.calculatePeerUrl(result.url, id); + var wsUrl = PeerClient.calculatePeerUrl(result.url, self.server._name); var peers = self.server._peerClients.filter(function(peer) { return peer.url === wsUrl; }); diff --git a/sample/PeeringTest/index.js b/sample/PeeringTest/index.js index e237f5b..cb0daf4 100644 --- a/sample/PeeringTest/index.js +++ b/sample/PeeringTest/index.js @@ -2,4 +2,5 @@ var zetta = require('../../zetta.js'); zetta() .link('http://127.0.0.1:3030/') + .link('http://hello-zetta.herokuapp.com/') .listen(1337); diff --git a/zetta.js b/zetta.js index 03d71bb..b803d02 100644 --- a/zetta.js +++ b/zetta.js @@ -338,10 +338,8 @@ Zetta.prototype._initPeers = function(callback) { var peerData = { url: obj, direction: 'initiator', - fromLink:true, - id: self.id + fromLink:true }; - console.log(peerData); self.peerRegistry.add(peerData, function(err, newPeer) { runPeer(newPeer); }); From 5282e9bf23a91bc2edd5af7ec8cb75b9e51cdae7 Mon Sep 17 00:00:00 2001 From: Matthew Dobson Date: Fri, 23 Jan 2015 16:38:34 -0500 Subject: [PATCH 16/30] Refactoring for the update API action. --- lib/api_resources/peer_management.js | 50 ++++++++++++++++------------ 1 file changed, 29 insertions(+), 21 deletions(-) diff --git a/lib/api_resources/peer_management.js b/lib/api_resources/peer_management.js index 30a1570..608a328 100644 --- a/lib/api_resources/peer_management.js +++ b/lib/api_resources/peer_management.js @@ -75,30 +75,11 @@ PeerManagementResource.prototype.deletePeer = function(env, next) { var self = this; this.registry.get(id, function(err, result) { if(result.direction === 'initiator') { - var wsUrl = PeerClient.calculatePeerUrl(result.url, self.server._name); - var peers = self.server._peerClients.filter(function(peer) { - return peer.url === wsUrl; - }); - var client = peers[0]; + self._disconnect(result); env.response.statusCode = 200; next(env); - client.close(); - } else if(result.direction === 'acceptor') { - var socket = self.server.httpServer.peers[id]; - socket.on('end', function() { - env.response.statusCode = 200; - next(env); - }); - //self.server.httpServer.proxyToPeer(env, next); - var peer = self.server.httpServer.peers[id]; - var req = env.request; - var res = env.response; - var agent = env.zettaAgent || peer.agent; - - var opts = { method: req.method, headers: req.headers, path: req.url, agent: agent }; - var request = http.request(opts); - request.end(); + self._proxyDisconnect(env, next); } else { env.response.statusCode = 500; next(env); @@ -106,6 +87,33 @@ PeerManagementResource.prototype.deletePeer = function(env, next) { }); }; +PeerManagmentResource.prototype._disconnect = function(peer) { + var wsUrl = PeerClient.calculatePeerUrl(result.url, self.server._name); + var peers = self.server._peerClients.filter(function(peer) { + return peer.url === wsUrl; + }); + var client = peers[0]; + client.close(); + }; + +PeerManagementResource.prototype._proxyDisconnect = function(env, next) { + var socket = self.server.httpServer.peers[id]; + socket.on('end', function() { + env.response.statusCode = 200; + next(env); + }); + //self.server.httpServer.proxyToPeer(env, next); + var peer = self.server.httpServer.peers[id]; + var req = env.request; + var res = env.response; + var agent = env.zettaAgent || peer.agent; + + var opts = { method: req.method, headers: req.headers, path: req.url, agent: agent }; + var request = http.request(opts); + request.end(); + +}; + PeerManagementResource.prototype.updatePeer = function(env, next) {}; PeerManagementResource.prototype._connect = function(peer) { From edd3ee1eceb3014ac4030f2c337d5dfc4211e767 Mon Sep 17 00:00:00 2001 From: Matthew Dobson Date: Sun, 25 Jan 2015 22:10:02 -0500 Subject: [PATCH 17/30] Semi-significant update. You can now proxy changes to the Z2Z protocol. --- lib/api_resources/peer_management.js | 156 ++++++++++++++++++--------- zetta.js | 1 + 2 files changed, 109 insertions(+), 48 deletions(-) diff --git a/lib/api_resources/peer_management.js b/lib/api_resources/peer_management.js index 608a328..bbf30e6 100644 --- a/lib/api_resources/peer_management.js +++ b/lib/api_resources/peer_management.js @@ -18,7 +18,8 @@ PeerManagementResource.prototype.init = function(config) { .get('/', this.list) .post('/', this.link) .get('/{id}', this.show) - .del('/{id}', this.deletePeer) + .del('/', this.deletePeer) + .put('/', this.updatePeer); }; PeerManagementResource.prototype.list = function(env, next) { @@ -71,51 +72,122 @@ PeerManagementResource.prototype.link = function(env, next) { //From acceptor we'll add additonal info to the request, and proxy to initiator //From intiator we'll perform the disconnection PeerManagementResource.prototype.deletePeer = function(env, next) { - var id = env.route.params.id; + var id = env.route.query.connectionId; var self = this; - this.registry.get(id, function(err, result) { - if(result.direction === 'initiator') { - self._disconnect(result); - env.response.statusCode = 200; - next(env); - } else if(result.direction === 'acceptor') { - self._proxyDisconnect(env, next); + var query = Query.of('peers').ql('where connectionId=@id').params({id: id}); + this.registry.find(query, function(err, results) { + if(results.length) { + var peer = results[0]; + if(peer.direction === 'initiator') { + self._disconnect(peer); + env.response.statusCode = 200; + next(env); + } else if(peer.direction === 'acceptor') { + self._proxyDisconnect(env, next, id); + } else { + env.response.statusCode = 500; + next(env); + } } else { - env.response.statusCode = 500; - next(env); - } + env.response.statusCode = 404; + next(env); + } + }); +}; + +//Updating a peer is a two part process. +//First we'll determine whether or not the API call should be proxied +//Then we'll connect to the new peer. +//Then we'll disconnect from the old peer. +PeerManagementResource.prototype.updatePeer = function(env, next) { + var self = this; + env.request.getBody(function(err, body) { + var params = querystring.parse(body.toString()); + var id = params.connectionId; + var url = params.url; + var query = Query.of('peers').ql('where connectionId=@id').params({id: id}); + self.registry.find(query, function(err, results) { + if(results.length) { + var peer = results[0]; + if(peer.direction === 'initiator') { + + self.server._peers.push(url); + + var newPeer = { + url: url, + direction: 'initiator', + status: 'connecting' + }; + + + self.registry.add(newPeer, function(err, newPeer) { + if (err) { + env.response.statusCode = 500; + return next(env); + } + + self._connect(newPeer); + self._disconnect(peer); + env.response.statusCode = 200; + next(env); + }); + + } else if(peer.direction === 'acceptor') { + self._proxyDisconnect(env, next, id); + } else { + env.response.statusCode = 500; + next(env); + } + } else { + env.response.statusCode = 404; + next(env); + } + }); }); }; -PeerManagmentResource.prototype._disconnect = function(peer) { - var wsUrl = PeerClient.calculatePeerUrl(result.url, self.server._name); - var peers = self.server._peerClients.filter(function(peer) { +PeerManagementResource.prototype._disconnect = function(peer) { + var wsUrl = PeerClient.calculatePeerUrl(peer.url, this.server._name); + var peers = this.server._peerClients.filter(function(peer) { return peer.url === wsUrl; }); var client = peers[0]; client.close(); - }; +}; + +PeerManagementResource.prototype._proxyDisconnect = function(env, next, id) { + var self = this; + var peerSocket; + var sockets = Object.keys(this.server.httpServer.peers).forEach(function(socketId){ + var peers = self.server.httpServer.peers; + var socket = peers[socketId]; + if(socket.connectionId === id) { + peerSocket = socket; + } + }); -PeerManagementResource.prototype._proxyDisconnect = function(env, next) { - var socket = self.server.httpServer.peers[id]; - socket.on('end', function() { + peerSocket.on('end', function() { env.response.statusCode = 200; next(env); }); - //self.server.httpServer.proxyToPeer(env, next); - var peer = self.server.httpServer.peers[id]; + + this._proxyToPeer(peerSocket, env); + +}; + +PeerManagementResource.prototype._proxyToPeer = function(peer, env) { var req = env.request; var res = env.response; var agent = env.zettaAgent || peer.agent; - var opts = { method: req.method, headers: req.headers, path: req.url, agent: agent }; var request = http.request(opts); - request.end(); - + if(req.body) { + request.end(req.body); + } else { + req.pipe(request); + } }; -PeerManagementResource.prototype.updatePeer = function(env, next) {}; - PeerManagementResource.prototype._connect = function(peer) { var self = this; var client = new PeerClient(peer.url, this.server); @@ -123,6 +195,7 @@ PeerManagementResource.prototype._connect = function(peer) { client.on('connected', function() { self.registry.get(peer.id, function(err, result) { result.status = 'connected'; + result.connectionId = client.connectionId; self.registry.save(result); }); }); @@ -145,7 +218,7 @@ PeerManagementResource.prototype._connect = function(peer) { }); peer.status = 'connecting'; - self.registry.save(peer, function() { + self.registry.add(peer, function() { client.start(); }); // setTimeout to change status to failed if connection not made. @@ -196,7 +269,8 @@ PeerManagementBuilder.prototype.entities = function() { direction: peer.direction, status: peer.status, error: peer.error, - updated: peer.updated + updated: peer.updated, + connectionId: peer.connectionId } }; @@ -262,7 +336,8 @@ PeerItemBuilder.prototype.properties = function() { direction: this.data.direction, status: this.data.status, error: this.data.error, - updated: this.data.updated + updated: this.data.updated, + connectionId: this.data.connectionId }; return this; @@ -271,20 +346,17 @@ PeerItemBuilder.prototype.properties = function() { PeerItemBuilder.prototype.actions = function() { this.base.actions = []; - if(this.data.direction === 'acceptor') { this.base.actions.push({ name: 'disconnect', method: 'DELETE', - href: this.urlHelper.current(), - fields:[] + href: this.urlHelper.path('/peer-management'), + fields:[{name:'connectionId', type:'text', value: this.data.connectionId}] },{ name: 'update', method: 'PUT', - href: this.urlHelper.current(), - fields: [{name: 'url', type: 'url'}] + href: this.urlHelper.path('/peer-management'), + fields: [{name: 'url', type: 'url'}, {name:'connectionId', type:'text', value: this.data.connectionId}] }); - } - if (this.data.direction === 'initiator') { this.base.actions.push({ @@ -292,18 +364,6 @@ PeerItemBuilder.prototype.actions = function() { method: 'POST', href: this.urlHelper.path('/peer-management'), fields: [{ name: 'url', type: 'url', value: this.data.url }] - }, - { - name: 'disconnect', - method: 'DELETE', - href: this.urlHelper.current(), - fields: [{name:'peer', type: 'text'}] - }, - { - name: 'update', - method: 'PUT', - href: this.urlHelper.current(), - fields: [{name:'from', type:'url'}, {name: 'to', type:'url'}] }); } diff --git a/zetta.js b/zetta.js index b803d02..7d8479c 100644 --- a/zetta.js +++ b/zetta.js @@ -359,6 +359,7 @@ Zetta.prototype._initPeers = function(callback) { // when peer handshake is made peerClient.on('connected', function() { peer.status = 'connected'; + peer.connectionId = peerClient.connectionId; self.peerRegistry.save(peer); // peer-event From e7e38125ecaa3fd8e78fd3fee45325b0f42d8e48 Mon Sep 17 00:00:00 2001 From: Matthew Dobson Date: Sun, 25 Jan 2015 22:22:10 -0500 Subject: [PATCH 18/30] Allow CAQL queries on peers. --- lib/api_resources/peer_management.js | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lib/api_resources/peer_management.js b/lib/api_resources/peer_management.js index bbf30e6..c821063 100644 --- a/lib/api_resources/peer_management.js +++ b/lib/api_resources/peer_management.js @@ -23,7 +23,11 @@ PeerManagementResource.prototype.init = function(config) { }; PeerManagementResource.prototype.list = function(env, next) { + var params = env.route.query; var allQuery = Query.of('peers'); + if(params) { + allQuery.ql(params.ql); + } this.registry.find(allQuery, function(err, results) { var builder = new PeerManagementBuilder(results, env.helpers.url); From 713c0493626cfea0f63b75adbc6cf09b949f4914 Mon Sep 17 00:00:00 2001 From: Matthew Dobson Date: Tue, 27 Jan 2015 13:40:27 -0500 Subject: [PATCH 19/30] Added peer management tests and actions to the embedded peer entities. --- lib/api_resources/peer_management.js | 39 ++++++++++++++++++---------- test/test_api.js | 13 ++++++++++ 2 files changed, 39 insertions(+), 13 deletions(-) diff --git a/lib/api_resources/peer_management.js b/lib/api_resources/peer_management.js index c821063..7e62f72 100644 --- a/lib/api_resources/peer_management.js +++ b/lib/api_resources/peer_management.js @@ -278,13 +278,26 @@ PeerManagementBuilder.prototype.entities = function() { } }; + entity.actions = []; + entity.actions.push({ + name: 'disconnect', + method: 'DELETE', + href: self.urlHelper.path('/peer-management'), + fields:[{name:'connectionId', type:'text', value: peer.connectionId}] + },{ + name: 'update', + method: 'PUT', + href: self.urlHelper.path('/peer-management'), + fields: [{name: 'url', type: 'url'}, {name:'connectionId', type:'text', value: peer.connectionId}] + }); + if (peer.direction === 'initiator') { - entity.actions = [{ + entity.actions.push({ name: 'reconnect', method: 'POST', href: self.urlHelper.current(), fields: [{ name: 'url', type: 'url', value: peerUrl }] - }]; + }); } var peerUrlRel = peer.direction === 'initiator' ? rels.root : rels.server; @@ -350,17 +363,17 @@ PeerItemBuilder.prototype.properties = function() { PeerItemBuilder.prototype.actions = function() { this.base.actions = []; - this.base.actions.push({ - name: 'disconnect', - method: 'DELETE', - href: this.urlHelper.path('/peer-management'), - fields:[{name:'connectionId', type:'text', value: this.data.connectionId}] - },{ - name: 'update', - method: 'PUT', - href: this.urlHelper.path('/peer-management'), - fields: [{name: 'url', type: 'url'}, {name:'connectionId', type:'text', value: this.data.connectionId}] - }); + this.base.actions.push({ + name: 'disconnect', + method: 'DELETE', + href: this.urlHelper.path('/peer-management'), + fields:[{name:'connectionId', type:'text', value: this.data.connectionId}] + },{ + name: 'update', + method: 'PUT', + href: this.urlHelper.path('/peer-management'), + fields: [{name: 'url', type: 'url'}, {name:'connectionId', type:'text', value: this.data.connectionId}] + }); if (this.data.direction === 'initiator') { this.base.actions.push({ diff --git a/test/test_api.js b/test/test_api.js index b9d40ec..d2328c1 100644 --- a/test/test_api.js +++ b/test/test_api.js @@ -334,6 +334,19 @@ describe('Zetta Api', function() { }); }); + it('should allow the querying of peers with the ql parameter', function(done) { + peerRegistry.save({ id: '1', type: 'initiator'}, function() { + request(getHttpServer(app)) + .get('/peer-management?ql=where%20type%3D%22initiator%22') + .expect(getBody(function(err, body) { + assert.equal(body.entities.length, 1); + var entity = body.entities[0]; + assert.equal(entity.properties.id, '1'); + })) + .end(done); + }); + }); + describe('#link', function() { it('should return status code 202', function(done) { request(getHttpServer(app)) From 220e7b8fb69466cdcd0481d2b600be1c87998851 Mon Sep 17 00:00:00 2001 From: Matthew Dobson Date: Tue, 27 Jan 2015 13:58:46 -0500 Subject: [PATCH 20/30] Merge issue from the rebase of master. --- lib/peer_client.js | 2 -- 1 file changed, 2 deletions(-) diff --git a/lib/peer_client.js b/lib/peer_client.js index cba8ef3..00ebb74 100644 --- a/lib/peer_client.js +++ b/lib/peer_client.js @@ -20,8 +20,6 @@ function calculatePeerUrl(url, name){ wsUrl = wsUrl + peerPath; } else { wsUrl = wsUrl.slice(0, wsUrl.length - 1) + peerPath; - } - wsUrl = wsUrl.slice(0, wsUrl.length - 1) + peerPath; } return wsUrl; } From 97de06a99cc23cc49e14754419cd44df7d19d0e7 Mon Sep 17 00:00:00 2001 From: Matthew Dobson Date: Tue, 27 Jan 2015 15:32:30 -0500 Subject: [PATCH 21/30] More bug fixes from the rebase merge process. --- lib/peer_client.js | 10 +--------- zetta.js | 1 - 2 files changed, 1 insertion(+), 10 deletions(-) diff --git a/lib/peer_client.js b/lib/peer_client.js index 7d2c1b2..3c7e0a8 100644 --- a/lib/peer_client.js +++ b/lib/peer_client.js @@ -75,7 +75,7 @@ PeerClient.prototype._createSocket = function() { var backoff = this.generateBackoff(this.retryCount); this._backoffTimer = setTimeout(function(){ // create a new connection id - this.connectionId = uuid.v4(); + self.connectionId = uuid.v4(); self._ws = new WebSocket(self.url + '?connectionId=' + self.connectionId, {}); self._ws.on('open', function(socket) { self.checkServerReq(); @@ -104,14 +104,6 @@ PeerClient.prototype._createSocket = function() { } }; -PeerClient.prototype.close = function() { - this.ws.close(); -}; - -PeerClient.prototype.close = function() { - this.ws.close(); -}; - PeerClient.prototype.checkServerReq = function() { var self = this; diff --git a/zetta.js b/zetta.js index 7d8479c..92e7bed 100644 --- a/zetta.js +++ b/zetta.js @@ -352,7 +352,6 @@ Zetta.prototype._initPeers = function(callback) { // when websocket is established peerClient.on('connecting', function() { peer.status = 'connecting'; - peer.connectionId = peerClient.connectionId; self.peerRegistry.save(peer); }); From c5341231e2f1a7afade111416d4ce122c876acc0 Mon Sep 17 00:00:00 2001 From: Matthew Dobson Date: Tue, 27 Jan 2015 15:35:43 -0500 Subject: [PATCH 22/30] Remnants of the rebase. --- lib/peer_client.js | 8 -------- 1 file changed, 8 deletions(-) diff --git a/lib/peer_client.js b/lib/peer_client.js index 3c7e0a8..570d22f 100644 --- a/lib/peer_client.js +++ b/lib/peer_client.js @@ -5,14 +5,6 @@ var uuid = require('node-uuid'); var Logger = require('./logger'); var WebSocket = require('./web_socket'); -var RETRY_INTERVAL = 3000; -var RETRY_MAX = 9; -var CONNECTION_BACKOFF_MAX = 50; - -function backoffTime() { - return Math.floor(0 + CONNECTION_BACKOFF_MAX * Math.random()); -} - function calculatePeerUrl(url, name){ var wsUrl = url.replace(/^http/, 'ws'); var peerPath = '/peers/' + name; From 052306bbda32652ed25048fb7e1a8413a91595b3 Mon Sep 17 00:00:00 2001 From: Matthew Dobson Date: Wed, 28 Jan 2015 08:55:31 -0500 Subject: [PATCH 23/30] Porxied deletion of connection tests completed. --- lib/api_resources/peer_management.js | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/lib/api_resources/peer_management.js b/lib/api_resources/peer_management.js index 7e62f72..83a4e1c 100644 --- a/lib/api_resources/peer_management.js +++ b/lib/api_resources/peer_management.js @@ -201,6 +201,8 @@ PeerManagementResource.prototype._connect = function(peer) { result.status = 'connected'; result.connectionId = client.connectionId; self.registry.save(result); + + self.server.pubsub.publish('_peer/connect', { peer: client}); }); }); @@ -209,12 +211,15 @@ PeerManagementResource.prototype._connect = function(peer) { result.status = 'failed'; result.error = error; self.registry.save(result); + self.pubsub.publish('_peer/disconnect', { peer: client }); + }); }); client.on('closed', function() { self.registry.get(peer.id, function(err, result) { result.status = 'connecting'; + self.server.pubsub.publish('_peer/connect', { peer: client}); self.registry.save(result, function() { client.start(); }); From 90087a04fc4018958edd03b23fbb019370575068 Mon Sep 17 00:00:00 2001 From: Matthew Dobson Date: Wed, 28 Jan 2015 11:43:51 -0500 Subject: [PATCH 24/30] Connection update tests in place. --- test/test_peer_connection_api.js | 318 +++++++++++++++++++++++++++++++ 1 file changed, 318 insertions(+) create mode 100644 test/test_peer_connection_api.js diff --git a/test/test_peer_connection_api.js b/test/test_peer_connection_api.js new file mode 100644 index 0000000..44c823e --- /dev/null +++ b/test/test_peer_connection_api.js @@ -0,0 +1,318 @@ +var assert = require('assert'); +var http = require('http'); +var zetta = require('../zetta'); +var MemRegistry = require('./fixture/mem_registry'); +var MemPeerRegistry = require('./fixture/mem_peer_registry'); +var request = require('supertest'); +var PeerRegistry = require('../lib/peer_registry'); +var Query = require('calypso').Query; +var querystring = require('querystring'); + +function deleteRequest(port, connectionId) { + var string = '?connectionId='+connectionId; + var opts = { + host: '0.0.0.0', + port: port, + method: 'DELETE', + path: '/peer-management' + string + } + + var req = http.request(opts); + req.end(); +} + +function putRequest(port, connectionId, url) { + var qs = { + connectionId: connectionId, + url: url + }; + var string = querystring.stringify(qs); + var opts = { + host: '0.0.0.0', + port: port, + method: 'PUT', + path: '/peer-management', + headers: { + 'Content-Length': string.length + } + }; + + var req = http.request(opts); + req.write(string); + req.end(); +} + + +function getHttpServer(app) { + return app.httpServer.server; +} + +function getBody(fn) { + return function(res) { + try { + if(res.text) { + var body = JSON.parse(res.text); + } else { + var body = ''; + } + } catch(err) { + throw new Error('Failed to parse json body'); + } + + fn(res, body); + } +} + +describe('Peer Connection API', function() { + describe('/peer-management embedded entities', function() { + var peerRegistry = null; + var app = null; + + beforeEach(function(done) { + peerRegistry = new MemPeerRegistry(); + app = zetta({ registry: new MemRegistry(), peerRegistry: peerRegistry }) + .silent() + .name('local') + ._run(done); + }); + + it('exposes actions on the embedded entity', function(done) { + peerRegistry.save({id:'foo', connectionId:'12345'}, function() { + var url = '/peer-management'; + request(getHttpServer(app)) + .get(url) + .expect(getBody(function(res, body) { + assert.equal(body.entities.length, 1); + assert.equal(body.entities[0].actions.length, 2); + body.entities[0].actions.forEach(function(action) { + action.fields.forEach(function(field) { + if(field.name === 'connectionId') { + assert.equal(field.value, '12345'); + } + }); + }) + })) + .end(done); + }); + }); + + it('exposes actions on the full entity', function(done) { + peerRegistry.save({id:'foo', connectionId:'12345'}, function() { + var url = '/peer-management/foo'; + request(getHttpServer(app)) + .get(url) + .expect(getBody(function(res, body) { + assert.equal(body.actions.length, 2); + body.actions.forEach(function(action) { + action.fields.forEach(function(field) { + if(field.name === 'connectionId') { + assert.equal(field.value, '12345'); + } + }); + }) + })) + .end(done); + }); + }); + }); + + describe('/peer-management disconnection API', function() { + var cloud = null; + var cloudUrl = null; + var cloudPort = null; + var db1 = null; + var db2 = null; + + beforeEach(function(done) { + + cloud = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() }); + cloud.name('cloud'); + cloud.silent(); + + cloud.listen(0, function(err) { + if(err) { + return done(err); + } + + cloudPort = cloud.httpServer.server.address().port; + cloudUrl = 'http://0.0.0.0:' + cloudPort; + done(); + }); + }); + + afterEach(function(done) { + cloud.httpServer.server.close(); + done(); + }); + + it('will proxy a disconnection between two peers', function(done) { + //Had to increase the timeout. The standard two seconds may not be long enough for a connection to be established. + this.timeout(10000); + var connected = false; + var z = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() }); + z.name('local'); + var connectionId = null; + + z.pubsub.subscribe('_peer/disconnect', function(topic, data) { + assert.equal(connectionId, data.peer.connectionId); + done(); + }); + + cloud.pubsub.subscribe('_peer/connect', function(topic, data) { + assert.equal(connected, true); + connectionId = data.peer.connectionId; + deleteRequest(cloudPort, connectionId); + }); + z.pubsub.subscribe('_peer/connect', function(topic, data) { + connected = true; + }); + + z.silent(); + z.link(cloudUrl); + z.listen(0); + + + }); + + it('will disconnect two peers', function(done) { + this.timeout(10000); + var connected = false; + var localPort = null; + var z = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() }); + z.name('local'); + + var connectionId = null; + + z.pubsub.subscribe('_peer/disconnect', function(topic, data) { + assert.equal(connectionId, data.peer.connectionId); + done(); + }); + + cloud.pubsub.subscribe('_peer/connect', function(topic, data) { + assert.equal(connected, true); + connectionId = data.peer.connectionId; + deleteRequest(localPort, connectionId); + }); + + z.pubsub.subscribe('_peer/connect', function(topic, data) { + connected = true; + }); + + z.silent(); + z.link(cloudUrl); + z.listen(0, function(err) { + if(err) { + done(err); + } + + localPort = z.httpServer.server.address().port; + }); + + }); + }); + + describe('/peer-management update API', function() { + var cloud = null; + var localOne = null; + var cloudPort = null; + var localOnePort = null; + var connectionId = null; + + + beforeEach(function(done) { + this.timeout(10000); + cloud = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() }); + cloud.name('cloud'); + cloud.silent(); + + localOne = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() }); + localOne.name('localOne'); + localOne.silent(); + + cloud.pubsub.subscribe('_peer/connect', function(topic, data) { + connectionId = data.peer.connectionId; + done(); + }); + + cloud.listen(0, function(err) { + if(err) { + return done(err); + } + + cloudPort = cloud.httpServer.server.address().port; + var cloudUrl = 'http://0.0.0.0:' + cloudPort; + + localOne.link(cloudUrl); + localOne.listen(0, function(err) { + if(err) { + done(err); + } + + localPort = localOne.httpServer.server.address().port; + }); + }); + }); + + afterEach(function(done) { + cloud.httpServer.server.close(); + localOne.httpServer.server.close(); + done(); + }); + + it('will proxy a connection update between two peers', function(done) { + this.timeout(10000); + var localTwoPort = null; + var localTwo = zetta({ registry: new MemRegistry(), peerRegsitry: new MemPeerRegistry() }); + localTwo.name('localTwo'); + localTwo.silent(); + + var url = 'http://0.0.0.0:'; + + cloud.pubsub.subscribe('_peer/disconnect', function(topic, data) { + assert.equal(connectionId, data.peer.connectionId); + }); + + localTwo.pubsub.subscribe('_peer/connect', function(topic, data) { + done(); + }); + + localTwo.listen(0, function(err) { + if(err) { + done(err); + } + + localTwoPort = localTwo.httpServer.server.address().port; + var serverUrl = url + localTwoPort; + putRequest(cloudPort, connectionId, serverUrl); + }); + }); + + it('will update a connection between two peers', function(done) { + this.timeout(10000); + var localTwoPort = null; + var localTwo = zetta({ registry: new MemRegistry(), peerRegsitry: new MemPeerRegistry() }); + localTwo.name('localTwo'); + localTwo.silent(); + + var url = 'http://0.0.0.0:'; + + cloud.pubsub.subscribe('_peer/disconnect', function(topic, data) { + assert.equal(connectionId, data.peer.connectionId); + }); + + localTwo.pubsub.subscribe('_peer/connect', function(topic, data) { + done(); + }); + + localTwo.listen(0, function(err) { + if(err) { + done(err); + } + + localTwoPort = localTwo.httpServer.server.address().port; + var serverUrl = url + localTwoPort; + putRequest(localPort, connectionId, serverUrl); + }); + }); + }); +}); From 61c542251c7fc57d82e4ad8a33a51918195095b0 Mon Sep 17 00:00:00 2001 From: Matthew Dobson Date: Wed, 28 Jan 2015 14:06:50 -0500 Subject: [PATCH 25/30] Fixing inconsistencies between peer establishment using javascript and API. --- lib/api_resources/peer_management.js | 35 +++++++++++++--------------- zetta.js | 2 +- 2 files changed, 17 insertions(+), 20 deletions(-) diff --git a/lib/api_resources/peer_management.js b/lib/api_resources/peer_management.js index 83a4e1c..2ecc85e 100644 --- a/lib/api_resources/peer_management.js +++ b/lib/api_resources/peer_management.js @@ -78,7 +78,7 @@ PeerManagementResource.prototype.link = function(env, next) { PeerManagementResource.prototype.deletePeer = function(env, next) { var id = env.route.query.connectionId; var self = this; - var query = Query.of('peers').ql('where connectionId=@id').params({id: id}); + var query = Query.of('peers').where({connectionId: id}); this.registry.find(query, function(err, results) { if(results.length) { var peer = results[0]; @@ -109,7 +109,7 @@ PeerManagementResource.prototype.updatePeer = function(env, next) { var params = querystring.parse(body.toString()); var id = params.connectionId; var url = params.url; - var query = Query.of('peers').ql('where connectionId=@id').params({id: id}); + var query = Query.of('peers').where({connectionId: id}); self.registry.find(query, function(err, results) { if(results.length) { var peer = results[0]; @@ -195,15 +195,18 @@ PeerManagementResource.prototype._proxyToPeer = function(peer, env) { PeerManagementResource.prototype._connect = function(peer) { var self = this; var client = new PeerClient(peer.url, this.server); + + self.server._peerClients.push(client); + client.on('connecting', function() { + peer.status = 'connecting'; + self.registry.save(peer); + }); client.on('connected', function() { - self.registry.get(peer.id, function(err, result) { - result.status = 'connected'; - result.connectionId = client.connectionId; - self.registry.save(result); - - self.server.pubsub.publish('_peer/connect', { peer: client}); - }); + peer.status = 'connected'; + peer.connectionId = client.connectionId; + self.registry.save(peer); + self.server.pubsub.publish('_peer/connect', { peer: client}); }); client.on('error', function(error) { @@ -218,19 +221,13 @@ PeerManagementResource.prototype._connect = function(peer) { client.on('closed', function() { self.registry.get(peer.id, function(err, result) { - result.status = 'connecting'; - self.server.pubsub.publish('_peer/connect', { peer: client}); - self.registry.save(result, function() { - client.start(); - }); + result.status = 'disconnected'; + self.server.pubsub.publish('_peer/disconnect', { peer: client}); + self.registry.save(result, function() {}); }); }); - peer.status = 'connecting'; - self.registry.add(peer, function() { - client.start(); - }); - // setTimeout to change status to failed if connection not made. + client.start(); }; PeerManagementResource.prototype.show = function(env, next) { diff --git a/zetta.js b/zetta.js index 7e64ef9..344861f 100644 --- a/zetta.js +++ b/zetta.js @@ -382,7 +382,7 @@ Zetta.prototype._initPeers = function(peers, callback) { }); }); - peerClient.on('closed', function(reconnect) { + peerClient.on('closed', function() { self.peerRegistry.get(peer.id, function(err, result) { result.status = 'disconnected'; From 8bcd16b27495ee9adf25276998ff4197fec0d9cd Mon Sep 17 00:00:00 2001 From: Matthew Dobson Date: Wed, 28 Jan 2015 14:18:14 -0500 Subject: [PATCH 26/30] Fixing typos within tests causing them to fail. --- test/test_peer_connection_api.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/test_peer_connection_api.js b/test/test_peer_connection_api.js index 44c823e..5832235 100644 --- a/test/test_peer_connection_api.js +++ b/test/test_peer_connection_api.js @@ -290,7 +290,7 @@ describe('Peer Connection API', function() { it('will update a connection between two peers', function(done) { this.timeout(10000); var localTwoPort = null; - var localTwo = zetta({ registry: new MemRegistry(), peerRegsitry: new MemPeerRegistry() }); + var localTwo = zetta({ registry: new MemRegistry(), peerRegistry: new MemPeerRegistry() }); localTwo.name('localTwo'); localTwo.silent(); From 40782a89a18e9acbd45bc90d811b1078168d739d Mon Sep 17 00:00:00 2001 From: Matthew Dobson Date: Thu, 29 Jan 2015 10:02:51 -0500 Subject: [PATCH 27/30] Updates to tests for path change, changing resource path to include connectionId, use ._runPeer on zetta to start a PeerClient. --- lib/api_resources/peer_management.js | 64 ++++--------------- test/test_peer_connection_api.js | 20 ++---- zetta.js | 96 +++++++++++++++------------- 3 files changed, 67 insertions(+), 113 deletions(-) diff --git a/lib/api_resources/peer_management.js b/lib/api_resources/peer_management.js index 2ecc85e..2769a8f 100644 --- a/lib/api_resources/peer_management.js +++ b/lib/api_resources/peer_management.js @@ -18,8 +18,8 @@ PeerManagementResource.prototype.init = function(config) { .get('/', this.list) .post('/', this.link) .get('/{id}', this.show) - .del('/', this.deletePeer) - .put('/', this.updatePeer); + .del('/{id}', this.deletePeer) + .put('/{id}', this.updatePeer); }; PeerManagementResource.prototype.list = function(env, next) { @@ -62,7 +62,7 @@ PeerManagementResource.prototype.link = function(env, next) { return next(env); } - self._connect(peer); + self.server._runPeer(peer); env.response.statusCode = 202; env.response.setHeader('Location', env.helpers.url.join(peer.id)); @@ -76,7 +76,7 @@ PeerManagementResource.prototype.link = function(env, next) { //From acceptor we'll add additonal info to the request, and proxy to initiator //From intiator we'll perform the disconnection PeerManagementResource.prototype.deletePeer = function(env, next) { - var id = env.route.query.connectionId; + var id = env.route.params.id; var self = this; var query = Query.of('peers').where({connectionId: id}); this.registry.find(query, function(err, results) { @@ -107,7 +107,7 @@ PeerManagementResource.prototype.updatePeer = function(env, next) { var self = this; env.request.getBody(function(err, body) { var params = querystring.parse(body.toString()); - var id = params.connectionId; + var id = env.route.params.id; var url = params.url; var query = Query.of('peers').where({connectionId: id}); self.registry.find(query, function(err, results) { @@ -130,7 +130,7 @@ PeerManagementResource.prototype.updatePeer = function(env, next) { return next(env); } - self._connect(newPeer); + self.server._runPeer(newPeer); self._disconnect(peer); env.response.statusCode = 200; next(env); @@ -192,44 +192,6 @@ PeerManagementResource.prototype._proxyToPeer = function(peer, env) { } }; -PeerManagementResource.prototype._connect = function(peer) { - var self = this; - var client = new PeerClient(peer.url, this.server); - - self.server._peerClients.push(client); - client.on('connecting', function() { - peer.status = 'connecting'; - self.registry.save(peer); - }); - - client.on('connected', function() { - peer.status = 'connected'; - peer.connectionId = client.connectionId; - self.registry.save(peer); - self.server.pubsub.publish('_peer/connect', { peer: client}); - }); - - client.on('error', function(error) { - self.registry.get(peer.id, function(err, result) { - result.status = 'failed'; - result.error = error; - self.registry.save(result); - self.pubsub.publish('_peer/disconnect', { peer: client }); - - }); - }); - - client.on('closed', function() { - self.registry.get(peer.id, function(err, result) { - result.status = 'disconnected'; - self.server.pubsub.publish('_peer/disconnect', { peer: client}); - self.registry.save(result, function() {}); - }); - }); - - client.start(); -}; - PeerManagementResource.prototype.show = function(env, next) { var id = env.route.params.id; this.registry.get(id, function(err, result) { @@ -284,13 +246,12 @@ PeerManagementBuilder.prototype.entities = function() { entity.actions.push({ name: 'disconnect', method: 'DELETE', - href: self.urlHelper.path('/peer-management'), - fields:[{name:'connectionId', type:'text', value: peer.connectionId}] + href: self.urlHelper.path('/peer-management/'+peer.connectionId) },{ name: 'update', method: 'PUT', - href: self.urlHelper.path('/peer-management'), - fields: [{name: 'url', type: 'url'}, {name:'connectionId', type:'text', value: peer.connectionId}] + href: self.urlHelper.path('/peer-management/'+peer.connectionId), + fields: [{name: 'url', type: 'url'}] }); if (peer.direction === 'initiator') { @@ -368,13 +329,12 @@ PeerItemBuilder.prototype.actions = function() { this.base.actions.push({ name: 'disconnect', method: 'DELETE', - href: this.urlHelper.path('/peer-management'), - fields:[{name:'connectionId', type:'text', value: this.data.connectionId}] + href: this.urlHelper.path('/peer-management/'+this.data.connectionId), },{ name: 'update', method: 'PUT', - href: this.urlHelper.path('/peer-management'), - fields: [{name: 'url', type: 'url'}, {name:'connectionId', type:'text', value: this.data.connectionId}] + href: this.urlHelper.path('/peer-management/'+this.data.connectionId), + fields: [{name: 'url', type: 'url'}] }); if (this.data.direction === 'initiator') { diff --git a/test/test_peer_connection_api.js b/test/test_peer_connection_api.js index 5832235..6ddc944 100644 --- a/test/test_peer_connection_api.js +++ b/test/test_peer_connection_api.js @@ -9,12 +9,11 @@ var Query = require('calypso').Query; var querystring = require('querystring'); function deleteRequest(port, connectionId) { - var string = '?connectionId='+connectionId; var opts = { host: '0.0.0.0', port: port, method: 'DELETE', - path: '/peer-management' + string + path: '/peer-management/' + connectionId } var req = http.request(opts); @@ -23,7 +22,6 @@ function deleteRequest(port, connectionId) { function putRequest(port, connectionId, url) { var qs = { - connectionId: connectionId, url: url }; var string = querystring.stringify(qs); @@ -31,7 +29,7 @@ function putRequest(port, connectionId, url) { host: '0.0.0.0', port: port, method: 'PUT', - path: '/peer-management', + path: '/peer-management/' + connectionId, headers: { 'Content-Length': string.length } @@ -85,11 +83,7 @@ describe('Peer Connection API', function() { assert.equal(body.entities.length, 1); assert.equal(body.entities[0].actions.length, 2); body.entities[0].actions.forEach(function(action) { - action.fields.forEach(function(field) { - if(field.name === 'connectionId') { - assert.equal(field.value, '12345'); - } - }); + assert.ok(action.href.indexOf('/peer-management/12345') !== -1); }) })) .end(done); @@ -104,12 +98,8 @@ describe('Peer Connection API', function() { .expect(getBody(function(res, body) { assert.equal(body.actions.length, 2); body.actions.forEach(function(action) { - action.fields.forEach(function(field) { - if(field.name === 'connectionId') { - assert.equal(field.value, '12345'); - } - }); - }) + assert.ok(action.href.indexOf('/peer-management/12345') !== -1); + }); })) .end(done); }); diff --git a/zetta.js b/zetta.js index 344861f..a1edcc7 100644 --- a/zetta.js +++ b/zetta.js @@ -330,7 +330,7 @@ Zetta.prototype._initPeers = function(peers, callback) { if (existing) { if(!obj.fromLink || peers.indexOf(obj.url) > -1) { self.peerRegistry.save(obj, function() { - runPeer(obj); + self._runPeer(obj); }); } else { //Delete @@ -347,58 +347,62 @@ Zetta.prototype._initPeers = function(peers, callback) { fromLink:true }; self.peerRegistry.add(peerData, function(err, newPeer) { - runPeer(newPeer); + self._runPeer(newPeer); }); } - function runPeer(peer) { - var peerClient = new PeerClient(peer.url, self); - self._peerClients.push(peerClient); - - // when websocket is established - peerClient.on('connecting', function() { - peer.status = 'connecting'; - self.peerRegistry.save(peer); - }); - - // when peer handshake is made - peerClient.on('connected', function() { - peer.status = 'connected'; - peer.connectionId = peerClient.connectionId; - self.peerRegistry.save(peer); - - // peer-event - self.pubsub.publish('_peer/connect', { peer: peerClient}); - }); - - peerClient.on('error', function(error) { - self.peerRegistry.get(peer.id, function(err, result) { - result.status = 'failed'; - result.error = error; - self.peerRegistry.save(result); - - // peer-event - self.pubsub.publish('_peer/disconnect', { peer: peerClient }); - }); - }); - - peerClient.on('closed', function() { - self.peerRegistry.get(peer.id, function(err, result) { - result.status = 'disconnected'; - - // peer-event - self.pubsub.publish('_peer/disconnect', { peer: peerClient }); - self.peerRegistry.save(result, function() { }); - }); - }); - - peerClient.start(); - } + }); - + // end after db read callback(); }); return this; }; + +Zetta.prototype._runPeer = function(peer) { + var self = this; + var peerClient = new PeerClient(peer.url, self); + self._peerClients.push(peerClient); + + // when websocket is established + peerClient.on('connecting', function() { + peer.status = 'connecting'; + self.peerRegistry.save(peer); + }); + + // when peer handshake is made + peerClient.on('connected', function() { + peer.status = 'connected'; + peer.connectionId = peerClient.connectionId; + self.peerRegistry.save(peer); + + // peer-event + self.pubsub.publish('_peer/connect', { peer: peerClient}); + }); + + peerClient.on('error', function(error) { + self.peerRegistry.get(peer.id, function(err, result) { + result.status = 'failed'; + result.error = error; + self.peerRegistry.save(result); + + // peer-event + self.pubsub.publish('_peer/disconnect', { peer: peerClient }); + }); + }); + + peerClient.on('closed', function() { + self.peerRegistry.get(peer.id, function(err, result) { + result.status = 'disconnected'; + + // peer-event + self.pubsub.publish('_peer/disconnect', { peer: peerClient }); + self.peerRegistry.save(result, function() { }); + }); + }); + + peerClient.start(); +} + From 148e12b51e36382208e9bceaa84ffd899bc447c8 Mon Sep 17 00:00:00 2001 From: Adam Magaluk Date: Thu, 29 Jan 2015 10:20:50 -0500 Subject: [PATCH 28/30] Added failing tests for peer api when connection id does not exist --- test/test_peer_connection_api.js | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/test/test_peer_connection_api.js b/test/test_peer_connection_api.js index 6ddc944..33cf9cf 100644 --- a/test/test_peer_connection_api.js +++ b/test/test_peer_connection_api.js @@ -135,6 +135,13 @@ describe('Peer Connection API', function() { done(); }); + it('will return 404 if connection does not exist', function(done) { + var url = '/peer-management/1234'; + request(getHttpServer(cloud)) + .del(url) + .expect(404, done); + }); + it('will proxy a disconnection between two peers', function(done) { //Had to increase the timeout. The standard two seconds may not be long enough for a connection to be established. this.timeout(10000); @@ -249,6 +256,13 @@ describe('Peer Connection API', function() { done(); }); + it('will return 404 if connection does not exist', function(done) { + var url = '/peer-management/1234'; + request(getHttpServer(cloud)) + .put(url) + .expect(404, done); + }); + it('will proxy a connection update between two peers', function(done) { this.timeout(10000); var localTwoPort = null; From 5f608f0a8ea0037068c6843ca8ed9142a1ac366f Mon Sep 17 00:00:00 2001 From: Adam Magaluk Date: Thu, 29 Jan 2015 11:04:17 -0500 Subject: [PATCH 29/30] Fixed test content type, peer_managment handles 404 --- lib/api_resources/peer_management.js | 5 +++++ test/test_peer_connection_api.js | 2 ++ 2 files changed, 7 insertions(+) diff --git a/lib/api_resources/peer_management.js b/lib/api_resources/peer_management.js index 2769a8f..7f5b8aa 100644 --- a/lib/api_resources/peer_management.js +++ b/lib/api_resources/peer_management.js @@ -170,6 +170,11 @@ PeerManagementResource.prototype._proxyDisconnect = function(env, next, id) { } }); + if (!peerSocket) { + env.response.statusCode = 404; + return next(env); + } + peerSocket.on('end', function() { env.response.statusCode = 200; next(env); diff --git a/test/test_peer_connection_api.js b/test/test_peer_connection_api.js index 33cf9cf..5e703eb 100644 --- a/test/test_peer_connection_api.js +++ b/test/test_peer_connection_api.js @@ -260,6 +260,8 @@ describe('Peer Connection API', function() { var url = '/peer-management/1234'; request(getHttpServer(cloud)) .put(url) + .set('Content-Type', 'application/x-www-form-urlencoded') + .send({ url: 'http://0.0.0.0:1234' }) .expect(404, done); }); From de46638a736426ba0c59359cde3f65600567972e Mon Sep 17 00:00:00 2001 From: Adam Magaluk Date: Thu, 29 Jan 2015 11:48:47 -0500 Subject: [PATCH 30/30] Commented out the reconnect action on the initiator side --- lib/api_resources/peer_management.js | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/api_resources/peer_management.js b/lib/api_resources/peer_management.js index 7f5b8aa..234d9f4 100644 --- a/lib/api_resources/peer_management.js +++ b/lib/api_resources/peer_management.js @@ -259,6 +259,7 @@ PeerManagementBuilder.prototype.entities = function() { fields: [{name: 'url', type: 'url'}] }); + /* API action does not work wait till we fix it if (peer.direction === 'initiator') { entity.actions.push({ name: 'reconnect', @@ -267,6 +268,7 @@ PeerManagementBuilder.prototype.entities = function() { fields: [{ name: 'url', type: 'url', value: peerUrl }] }); } + */ var peerUrlRel = peer.direction === 'initiator' ? rels.root : rels.server; entity.links = [