Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add basic handshake negotiation #341

Merged
merged 8 commits into from
Feb 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 34 additions & 10 deletions lib/agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ function Agent(backend, stream) {
this.stream = stream;

this.clientId = hat();
// src is a client-configurable "id" which the client will set in its handshake,
// and attach to its ops. This should take precedence over clientId if set.
// Only legacy clients, or new clients connecting for the first time will use the
// Agent-provided clientId. Ideally we'll deprecate clientId in favour of src
// in the next breaking change.
this.src = null;
this.connectTime = Date.now();

// We need to track which documents are subscribed by the client. This is a
Expand All @@ -38,20 +44,15 @@ function Agent(backend, stream) {
// active, and it is passed to each middleware call
this.custom = {};

// Initialize the remote client by sending it its agent Id.
this.send({
a: 'init',
protocol: 1,
id: this.clientId,
type: types.defaultType.uri
});
// Send the legacy message to initialize old clients with the random agent Id
this.send(this._initMessage('init'));
}
module.exports = Agent;

// Close the agent with the client.
Agent.prototype.close = function(err) {
if (err) {
logger.warn('Agent closed due to error', this.clientId, err.stack || err);
logger.warn('Agent closed due to error', this._src(), err.stack || err);
}
if (this.closed) return;
// This will end the writable stream and emit 'finish'
Expand Down Expand Up @@ -190,7 +191,7 @@ Agent.prototype._isOwnOp = function(collection, op) {
// Detect ops from this client on the same projection. Since the client sent
// these in, the submit reply will be sufficient and we can silently ignore
// them in the streams for subscribed documents or queries
return (this.clientId === op.src) && (collection === (op.i || op.c));
return (this._src() === op.src) && (collection === (op.i || op.c));
};

Agent.prototype.send = function(message) {
Expand Down Expand Up @@ -332,6 +333,9 @@ Agent.prototype._handleMessage = function(request, callback) {
if (errMessage) return callback(new ShareDBError(ERROR_CODE.ERR_MESSAGE_BADLY_FORMED, errMessage));

switch (request.a) {
case 'hs':
if (request.id) this.src = request.id;
return callback(null, this._initMessage('hs'));
case 'qf':
return this._queryFetch(request.id, request.c, request.q, getQueryOptions(request), callback);
case 'qs':
Expand All @@ -352,7 +356,13 @@ Agent.prototype._handleMessage = function(request, callback) {
return this._unsubscribe(request.c, request.d, callback);
case 'op':
// Normalize the properties submitted
var op = createClientOp(request, this.clientId);
var op = createClientOp(request, this._src());
if (op.seq >= util.MAX_SAFE_INTEGER) {
return callback(new ShareDBError(
ERROR_CODE.ERR_CONNECTION_SEQ_INTEGER_OVERFLOW,
'Connection seq has exceeded the max safe integer, maybe from being open for too long'
));
}
if (!op) return callback(new ShareDBError(ERROR_CODE.ERR_MESSAGE_BADLY_FORMED, 'Invalid op message'));
return this._submit(request.c, request.d, op, callback);
case 'nf':
Expand Down Expand Up @@ -645,6 +655,20 @@ Agent.prototype._fetchSnapshotByTimestamp = function(collection, id, timestamp,
this.backend.fetchSnapshotByTimestamp(this, collection, id, timestamp, callback);
};

Agent.prototype._initMessage = function(action) {
return {
a: action,
protocol: 1,
protocolMinor: 1,
id: this._src(),
type: types.defaultType.uri
};
};

Agent.prototype._src = function() {
return this.src || this.clientId;
};


function createClientOp(request, clientId) {
// src can be provided if it is not the same as the current agent,
Expand Down
9 changes: 8 additions & 1 deletion lib/backend.js
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ Backend.prototype.close = function(callback) {
finish();
};

Backend.prototype.connect = function(connection, req) {
Backend.prototype.connect = function(connection, req, callback) {
var socket = new StreamSocket();
if (connection) {
connection.bindToSocket(socket);
Expand All @@ -113,6 +113,13 @@ Backend.prototype.connect = function(connection, req) {
// not used internal to ShareDB, but it is handy for server-side only user
// code that may cache state on the agent and read it in middleware
connection.agent = agent;

if (typeof callback === 'function') {
connection.once('connected', function() {
callback(connection);
});
}

return connection;
};

Expand Down
71 changes: 45 additions & 26 deletions lib/client/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ function Connection(socket) {
// A unique message number for the given id
this.seq = 1;

// Equals agent.clientId on the server
// Equals agent.src on the server
this.id = null;

// This direct reference from connection to agent is not used internal to
Expand Down Expand Up @@ -141,6 +141,7 @@ Connection.prototype.bindToSocket = function(socket) {

socket.onopen = function() {
connection._setState('connecting');
connection._initializeHandshake();
};

socket.onerror = function(err) {
Expand Down Expand Up @@ -183,29 +184,9 @@ Connection.prototype.handleMessage = function(message) {
switch (message.a) {
case 'init':
// Client initialization packet
if (message.protocol !== 1) {
err = new ShareDBError(
ERROR_CODE.ERR_PROTOCOL_VERSION_NOT_SUPPORTED,
'Unsupported protocol version: ' + message.protocol
);
return this.emit('error', err);
}
if (types.map[message.type] !== types.defaultType) {
err = new ShareDBError(
ERROR_CODE.ERR_DEFAULT_TYPE_MISMATCH,
message.type + ' does not match the server default type'
);
return this.emit('error', err);
}
if (typeof message.id !== 'string') {
err = new ShareDBError(ERROR_CODE.ERR_CLIENT_ID_BADLY_FORMED, 'Client id must be a string');
return this.emit('error', err);
}
this.id = message.id;

this._setState('connected');
return;

return this._handleLegacyInit(message);
case 'hs':
return this._handleHandshake(err, message);
case 'qf':
var query = this.queries[message.id];
if (query) query._handleFetch(err, message.data, message.extra);
Expand Down Expand Up @@ -305,8 +286,6 @@ Connection.prototype._handleBulkMessage = function(err, message, method) {
};

Connection.prototype._reset = function() {
this.seq = 1;
this.id = null;
alecgibson marked this conversation as resolved.
Show resolved Hide resolved
this.agent = null;
};

Expand Down Expand Up @@ -714,3 +693,43 @@ Connection.prototype._handleSnapshotFetch = function(error, message) {
delete this._snapshotRequests[message.id];
snapshotRequest._handleResponse(error, message);
};

Connection.prototype._handleLegacyInit = function(message) {
alecgibson marked this conversation as resolved.
Show resolved Hide resolved
// If the minor protocol version has been set, we can ignore this legacy
// init message, and wait for a response to our handshake message.
if (message.protocolMinor) return;
this._initialize(message);
};

Connection.prototype._initializeHandshake = function() {
this.send({a: 'hs', id: this.id});
};

Connection.prototype._handleHandshake = function(error, message) {
if (error) return this.emit('error', error);
this._initialize(message);
};

Connection.prototype._initialize = function(message) {
if (message.protocol !== 1) {
return this.emit('error', new ShareDBError(
ERROR_CODE.ERR_PROTOCOL_VERSION_NOT_SUPPORTED,
'Unsupported protocol version: ' + message.protocol
));
}
if (types.map[message.type] !== types.defaultType) {
return this.emit('error', new ShareDBError(
ERROR_CODE.ERR_DEFAULT_TYPE_MISMATCH,
message.type + ' does not match the server default type'
));
}
if (typeof message.id !== 'string') {
return this.emit('error', new ShareDBError(
ERROR_CODE.ERR_CLIENT_ID_BADLY_FORMED,
'Client id must be a string'
));
}
this.id = message.id;

this._setState('connected');
};
15 changes: 12 additions & 3 deletions lib/client/doc.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ var emitter = require('../emitter');
var logger = require('../logger');
var ShareDBError = require('../error');
var types = require('../types');
var util = require('../util');

var ERROR_CODE = ShareDBError.CODES;

Expand Down Expand Up @@ -615,9 +616,8 @@ Doc.prototype._otApply = function(op, source) {

// Actually send op to the server.
Doc.prototype._sendOp = function() {
// Wait until we have a src id from the server
if (!this.connection.canSend) return;
var src = this.connection.id;
if (!src) return;

// When there is no inflightOp, send the first item in pendingOps. If
// there is inflightOp, try sending it again
Expand All @@ -642,7 +642,16 @@ Doc.prototype._sendOp = function() {
// reconnect, since an op may still be pending after the reconnection and
// this.connection.id will change. In case an op is sent multiple times, we
// also need to be careful not to override the original seq value.
if (op.seq == null) op.seq = this.connection.seq++;
if (op.seq == null) {
if (this.connection.seq >= util.MAX_SAFE_INTEGER) {
return this.emit('error', new ShareDBError(
ERROR_CODE.ERR_CONNECTION_SEQ_INTEGER_OVERFLOW,
'Connection seq has exceeded the max safe integer, maybe from being open for too long'
));
}

op.seq = this.connection.seq++;
}

this.connection.sendOp(this, op);

Expand Down
1 change: 1 addition & 0 deletions lib/error.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ ShareDBError.CODES = {
ERR_APPLY_OP_VERSION_DOES_NOT_MATCH_SNAPSHOT: 'ERR_APPLY_OP_VERSION_DOES_NOT_MATCH_SNAPSHOT',
ERR_APPLY_SNAPSHOT_NOT_PROVIDED: 'ERR_APPLY_SNAPSHOT_NOT_PROVIDED',
ERR_CLIENT_ID_BADLY_FORMED: 'ERR_CLIENT_ID_BADLY_FORMED',
ERR_CONNECTION_SEQ_INTEGER_OVERFLOW: 'ERR_CONNECTION_SEQ_INTEGER_OVERFLOW',
ERR_CONNECTION_STATE_TRANSITION_INVALID: 'ERR_CONNECTION_STATE_TRANSITION_INVALID',
ERR_DATABASE_ADAPTER_NOT_FOUND: 'ERR_DATABASE_ADAPTER_NOT_FOUND',
ERR_DATABASE_DOES_NOT_SUPPORT_SUBSCRIBE: 'ERR_DATABASE_DOES_NOT_SUPPORT_SUBSCRIBE',
Expand Down
2 changes: 2 additions & 0 deletions lib/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,5 @@ exports.isValidVersion = function(version) {
exports.isValidTimestamp = function(timestamp) {
return exports.isValidVersion(timestamp);
};

exports.MAX_SAFE_INTEGER = 9007199254740991;
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"lolex": "^5.1.1",
"mocha": "^6.2.2",
"nyc": "^14.1.1",
"sharedb-legacy": "npm:sharedb@=1.1.0",
"sinon": "^7.5.0"
},
"scripts": {
Expand Down
64 changes: 64 additions & 0 deletions test/client/connection.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
var expect = require('chai').expect;
var Backend = require('../../lib/backend');
var Connection = require('../../lib/client/connection');
var LegacyConnection = require('sharedb-legacy/lib/client').Connection;
var StreamSocket = require('../../lib/stream-socket');

describe('client connection', function() {
beforeEach(function() {
Expand Down Expand Up @@ -197,4 +199,66 @@ describe('client connection', function() {
});
});
});

it('persists its id and seq when reconnecting', function(done) {
var backend = this.backend;
backend.connect(null, null, function(connection) {
var id = connection.id;
expect(id).to.be.ok;
var doc = connection.get('test', '123');
doc.create({foo: 'bar'}, function(error) {
if (error) return done(error);
expect(connection.seq).to.equal(2);
connection.close();
backend.connect(connection, null, function() {
expect(connection.id).to.equal(id);
expect(connection.seq).to.equal(2);
done();
});
});
});
});

it('still connects to legacy clients, whose ID changes on reconnection', function(done) {
var currentBackend = this.backend;
var socket = new StreamSocket();
var legacyClient = new LegacyConnection(socket);
currentBackend.connect(legacyClient);

var doc = legacyClient.get('test', '123');
doc.create({foo: 'bar'}, function(error) {
if (error) return done(error);
var initialId = legacyClient.id;
expect(initialId).to.equal(legacyClient.agent.clientId);
expect(legacyClient.agent.src).to.be.null;
legacyClient.close();
currentBackend.connect(legacyClient);
doc.submitOp({p: ['baz'], oi: 'qux'}, function(error) {
if (error) return done(error);
var newId = legacyClient.id;
expect(newId).not.to.equal(initialId);
expect(newId).to.equal(legacyClient.agent.clientId);
expect(legacyClient.agent.src).to.be.null;
done();
});
});
});

it('errors when submitting an op with a very large seq', function(done) {
this.backend.connect(null, null, function(connection) {
var doc = connection.get('test', '123');
doc.create({foo: 'bar'}, function(error) {
if (error) return done(error);
connection.sendOp(doc, {
op: {p: ['foo'], od: 'bar'},
src: connection.id,
seq: Number.MAX_SAFE_INTEGER
});
doc.once('error', function(error) {
expect(error.code).to.equal('ERR_CONNECTION_SEQ_INTEGER_OVERFLOW');
done();
});
});
});
});
});
11 changes: 11 additions & 0 deletions test/client/doc.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,17 @@ describe('Doc', function() {
doc.destroy();
});

it('errors when trying to set a very large seq', function(done) {
var connection = this.connection;
connection.seq = Number.MAX_SAFE_INTEGER;
var doc = connection.get('dogs', 'fido');
doc.create({name: 'fido'});
doc.once('error', function(error) {
expect(error.code).to.equal('ERR_CONNECTION_SEQ_INTEGER_OVERFLOW');
done();
});
});

describe('when connection closed', function() {
beforeEach(function(done) {
this.op1 = [{p: ['snacks'], oi: true}];
Expand Down
Loading