diff --git a/lib/agent.js b/lib/agent.js index cd011e2ca..466d2516e 100644 --- a/lib/agent.js +++ b/lib/agent.js @@ -20,6 +20,12 @@ function Agent(backend, stream) { this.stream = stream; this.clientId = hat(); + // src is a client-configurable "id" which the client will set in its handshake, + // and attach to its ops. This should take precedence over clientId if set. + // Only legacy clients, or new clients connecting for the first time will use the + // Agent-provided clientId. Ideally we'll deprecate clientId in favour of src + // in the next breaking change. + this.src = null; this.connectTime = Date.now(); // We need to track which documents are subscribed by the client. This is a @@ -38,20 +44,15 @@ function Agent(backend, stream) { // active, and it is passed to each middleware call this.custom = {}; - // Initialize the remote client by sending it its agent Id. - this.send({ - a: 'init', - protocol: 1, - id: this.clientId, - type: types.defaultType.uri - }); + // Send the legacy message to initialize old clients with the random agent Id + this.send(this._initMessage('init')); } module.exports = Agent; // Close the agent with the client. Agent.prototype.close = function(err) { if (err) { - logger.warn('Agent closed due to error', this.clientId, err.stack || err); + logger.warn('Agent closed due to error', this._src(), err.stack || err); } if (this.closed) return; // This will end the writable stream and emit 'finish' @@ -190,7 +191,7 @@ Agent.prototype._isOwnOp = function(collection, op) { // Detect ops from this client on the same projection. Since the client sent // these in, the submit reply will be sufficient and we can silently ignore // them in the streams for subscribed documents or queries - return (this.clientId === op.src) && (collection === (op.i || op.c)); + return (this._src() === op.src) && (collection === (op.i || op.c)); }; Agent.prototype.send = function(message) { @@ -332,6 +333,9 @@ Agent.prototype._handleMessage = function(request, callback) { if (errMessage) return callback(new ShareDBError(ERROR_CODE.ERR_MESSAGE_BADLY_FORMED, errMessage)); switch (request.a) { + case 'hs': + if (request.id) this.src = request.id; + return callback(null, this._initMessage('hs')); case 'qf': return this._queryFetch(request.id, request.c, request.q, getQueryOptions(request), callback); case 'qs': @@ -352,7 +356,13 @@ Agent.prototype._handleMessage = function(request, callback) { return this._unsubscribe(request.c, request.d, callback); case 'op': // Normalize the properties submitted - var op = createClientOp(request, this.clientId); + var op = createClientOp(request, this._src()); + if (op.seq >= util.MAX_SAFE_INTEGER) { + return callback(new ShareDBError( + ERROR_CODE.ERR_CONNECTION_SEQ_INTEGER_OVERFLOW, + 'Connection seq has exceeded the max safe integer, maybe from being open for too long' + )); + } if (!op) return callback(new ShareDBError(ERROR_CODE.ERR_MESSAGE_BADLY_FORMED, 'Invalid op message')); return this._submit(request.c, request.d, op, callback); case 'nf': @@ -645,6 +655,20 @@ Agent.prototype._fetchSnapshotByTimestamp = function(collection, id, timestamp, this.backend.fetchSnapshotByTimestamp(this, collection, id, timestamp, callback); }; +Agent.prototype._initMessage = function(action) { + return { + a: action, + protocol: 1, + protocolMinor: 1, + id: this._src(), + type: types.defaultType.uri + }; +}; + +Agent.prototype._src = function() { + return this.src || this.clientId; +}; + function createClientOp(request, clientId) { // src can be provided if it is not the same as the current agent, diff --git a/lib/backend.js b/lib/backend.js index 7768f7d38..f1178d730 100644 --- a/lib/backend.js +++ b/lib/backend.js @@ -100,7 +100,7 @@ Backend.prototype.close = function(callback) { finish(); }; -Backend.prototype.connect = function(connection, req) { +Backend.prototype.connect = function(connection, req, callback) { var socket = new StreamSocket(); if (connection) { connection.bindToSocket(socket); @@ -113,6 +113,13 @@ Backend.prototype.connect = function(connection, req) { // not used internal to ShareDB, but it is handy for server-side only user // code that may cache state on the agent and read it in middleware connection.agent = agent; + + if (typeof callback === 'function') { + connection.once('connected', function() { + callback(connection); + }); + } + return connection; }; diff --git a/lib/client/connection.js b/lib/client/connection.js index ba5a8dc9b..6fe67b7c9 100644 --- a/lib/client/connection.js +++ b/lib/client/connection.js @@ -52,7 +52,7 @@ function Connection(socket) { // A unique message number for the given id this.seq = 1; - // Equals agent.clientId on the server + // Equals agent.src on the server this.id = null; // This direct reference from connection to agent is not used internal to @@ -141,6 +141,7 @@ Connection.prototype.bindToSocket = function(socket) { socket.onopen = function() { connection._setState('connecting'); + connection._initializeHandshake(); }; socket.onerror = function(err) { @@ -183,29 +184,9 @@ Connection.prototype.handleMessage = function(message) { switch (message.a) { case 'init': // Client initialization packet - if (message.protocol !== 1) { - err = new ShareDBError( - ERROR_CODE.ERR_PROTOCOL_VERSION_NOT_SUPPORTED, - 'Unsupported protocol version: ' + message.protocol - ); - return this.emit('error', err); - } - if (types.map[message.type] !== types.defaultType) { - err = new ShareDBError( - ERROR_CODE.ERR_DEFAULT_TYPE_MISMATCH, - message.type + ' does not match the server default type' - ); - return this.emit('error', err); - } - if (typeof message.id !== 'string') { - err = new ShareDBError(ERROR_CODE.ERR_CLIENT_ID_BADLY_FORMED, 'Client id must be a string'); - return this.emit('error', err); - } - this.id = message.id; - - this._setState('connected'); - return; - + return this._handleLegacyInit(message); + case 'hs': + return this._handleHandshake(err, message); case 'qf': var query = this.queries[message.id]; if (query) query._handleFetch(err, message.data, message.extra); @@ -305,8 +286,6 @@ Connection.prototype._handleBulkMessage = function(err, message, method) { }; Connection.prototype._reset = function() { - this.seq = 1; - this.id = null; this.agent = null; }; @@ -714,3 +693,43 @@ Connection.prototype._handleSnapshotFetch = function(error, message) { delete this._snapshotRequests[message.id]; snapshotRequest._handleResponse(error, message); }; + +Connection.prototype._handleLegacyInit = function(message) { + // If the minor protocol version has been set, we can ignore this legacy + // init message, and wait for a response to our handshake message. + if (message.protocolMinor) return; + this._initialize(message); +}; + +Connection.prototype._initializeHandshake = function() { + this.send({a: 'hs', id: this.id}); +}; + +Connection.prototype._handleHandshake = function(error, message) { + if (error) return this.emit('error', error); + this._initialize(message); +}; + +Connection.prototype._initialize = function(message) { + if (message.protocol !== 1) { + return this.emit('error', new ShareDBError( + ERROR_CODE.ERR_PROTOCOL_VERSION_NOT_SUPPORTED, + 'Unsupported protocol version: ' + message.protocol + )); + } + if (types.map[message.type] !== types.defaultType) { + return this.emit('error', new ShareDBError( + ERROR_CODE.ERR_DEFAULT_TYPE_MISMATCH, + message.type + ' does not match the server default type' + )); + } + if (typeof message.id !== 'string') { + return this.emit('error', new ShareDBError( + ERROR_CODE.ERR_CLIENT_ID_BADLY_FORMED, + 'Client id must be a string' + )); + } + this.id = message.id; + + this._setState('connected'); +}; diff --git a/lib/client/doc.js b/lib/client/doc.js index 3a4b36602..da7a7a050 100644 --- a/lib/client/doc.js +++ b/lib/client/doc.js @@ -2,6 +2,7 @@ var emitter = require('../emitter'); var logger = require('../logger'); var ShareDBError = require('../error'); var types = require('../types'); +var util = require('../util'); var ERROR_CODE = ShareDBError.CODES; @@ -615,9 +616,8 @@ Doc.prototype._otApply = function(op, source) { // Actually send op to the server. Doc.prototype._sendOp = function() { - // Wait until we have a src id from the server + if (!this.connection.canSend) return; var src = this.connection.id; - if (!src) return; // When there is no inflightOp, send the first item in pendingOps. If // there is inflightOp, try sending it again @@ -642,7 +642,16 @@ Doc.prototype._sendOp = function() { // reconnect, since an op may still be pending after the reconnection and // this.connection.id will change. In case an op is sent multiple times, we // also need to be careful not to override the original seq value. - if (op.seq == null) op.seq = this.connection.seq++; + if (op.seq == null) { + if (this.connection.seq >= util.MAX_SAFE_INTEGER) { + return this.emit('error', new ShareDBError( + ERROR_CODE.ERR_CONNECTION_SEQ_INTEGER_OVERFLOW, + 'Connection seq has exceeded the max safe integer, maybe from being open for too long' + )); + } + + op.seq = this.connection.seq++; + } this.connection.sendOp(this, op); diff --git a/lib/error.js b/lib/error.js index b7072bc9a..4cd4f770f 100644 --- a/lib/error.js +++ b/lib/error.js @@ -16,6 +16,7 @@ ShareDBError.CODES = { ERR_APPLY_OP_VERSION_DOES_NOT_MATCH_SNAPSHOT: 'ERR_APPLY_OP_VERSION_DOES_NOT_MATCH_SNAPSHOT', ERR_APPLY_SNAPSHOT_NOT_PROVIDED: 'ERR_APPLY_SNAPSHOT_NOT_PROVIDED', ERR_CLIENT_ID_BADLY_FORMED: 'ERR_CLIENT_ID_BADLY_FORMED', + ERR_CONNECTION_SEQ_INTEGER_OVERFLOW: 'ERR_CONNECTION_SEQ_INTEGER_OVERFLOW', ERR_CONNECTION_STATE_TRANSITION_INVALID: 'ERR_CONNECTION_STATE_TRANSITION_INVALID', ERR_DATABASE_ADAPTER_NOT_FOUND: 'ERR_DATABASE_ADAPTER_NOT_FOUND', ERR_DATABASE_DOES_NOT_SUPPORT_SUBSCRIBE: 'ERR_DATABASE_DOES_NOT_SUPPORT_SUBSCRIBE', diff --git a/lib/util.js b/lib/util.js index 7ad8a23fb..601e98cb8 100644 --- a/lib/util.js +++ b/lib/util.js @@ -22,3 +22,5 @@ exports.isValidVersion = function(version) { exports.isValidTimestamp = function(timestamp) { return exports.isValidVersion(timestamp); }; + +exports.MAX_SAFE_INTEGER = 9007199254740991; diff --git a/package.json b/package.json index d4b1b3e0b..2c7216a18 100644 --- a/package.json +++ b/package.json @@ -18,6 +18,7 @@ "lolex": "^5.1.1", "mocha": "^6.2.2", "nyc": "^14.1.1", + "sharedb-legacy": "npm:sharedb@=1.1.0", "sinon": "^7.5.0" }, "scripts": { diff --git a/test/client/connection.js b/test/client/connection.js index a7a6cd82c..df3de633f 100644 --- a/test/client/connection.js +++ b/test/client/connection.js @@ -1,6 +1,8 @@ var expect = require('chai').expect; var Backend = require('../../lib/backend'); var Connection = require('../../lib/client/connection'); +var LegacyConnection = require('sharedb-legacy/lib/client').Connection; +var StreamSocket = require('../../lib/stream-socket'); describe('client connection', function() { beforeEach(function() { @@ -197,4 +199,66 @@ describe('client connection', function() { }); }); }); + + it('persists its id and seq when reconnecting', function(done) { + var backend = this.backend; + backend.connect(null, null, function(connection) { + var id = connection.id; + expect(id).to.be.ok; + var doc = connection.get('test', '123'); + doc.create({foo: 'bar'}, function(error) { + if (error) return done(error); + expect(connection.seq).to.equal(2); + connection.close(); + backend.connect(connection, null, function() { + expect(connection.id).to.equal(id); + expect(connection.seq).to.equal(2); + done(); + }); + }); + }); + }); + + it('still connects to legacy clients, whose ID changes on reconnection', function(done) { + var currentBackend = this.backend; + var socket = new StreamSocket(); + var legacyClient = new LegacyConnection(socket); + currentBackend.connect(legacyClient); + + var doc = legacyClient.get('test', '123'); + doc.create({foo: 'bar'}, function(error) { + if (error) return done(error); + var initialId = legacyClient.id; + expect(initialId).to.equal(legacyClient.agent.clientId); + expect(legacyClient.agent.src).to.be.null; + legacyClient.close(); + currentBackend.connect(legacyClient); + doc.submitOp({p: ['baz'], oi: 'qux'}, function(error) { + if (error) return done(error); + var newId = legacyClient.id; + expect(newId).not.to.equal(initialId); + expect(newId).to.equal(legacyClient.agent.clientId); + expect(legacyClient.agent.src).to.be.null; + done(); + }); + }); + }); + + it('errors when submitting an op with a very large seq', function(done) { + this.backend.connect(null, null, function(connection) { + var doc = connection.get('test', '123'); + doc.create({foo: 'bar'}, function(error) { + if (error) return done(error); + connection.sendOp(doc, { + op: {p: ['foo'], od: 'bar'}, + src: connection.id, + seq: Number.MAX_SAFE_INTEGER + }); + doc.once('error', function(error) { + expect(error.code).to.equal('ERR_CONNECTION_SEQ_INTEGER_OVERFLOW'); + done(); + }); + }); + }); + }); }); diff --git a/test/client/doc.js b/test/client/doc.js index f3f2b38dc..0c3dd875c 100644 --- a/test/client/doc.js +++ b/test/client/doc.js @@ -49,6 +49,17 @@ describe('Doc', function() { doc.destroy(); }); + it('errors when trying to set a very large seq', function(done) { + var connection = this.connection; + connection.seq = Number.MAX_SAFE_INTEGER; + var doc = connection.get('dogs', 'fido'); + doc.create({name: 'fido'}); + doc.once('error', function(error) { + expect(error.code).to.equal('ERR_CONNECTION_SEQ_INTEGER_OVERFLOW'); + done(); + }); + }); + describe('when connection closed', function() { beforeEach(function(done) { this.op1 = [{p: ['snacks'], oi: true}]; diff --git a/test/client/submit.js b/test/client/submit.js index e3d2fc33a..fa48d33d9 100644 --- a/test/client/submit.js +++ b/test/client/submit.js @@ -355,15 +355,17 @@ module.exports = function() { }); it('op submitted during inflight create does not compose and gets flushed', function(done) { - var doc = this.backend.connect().get('dogs', 'fido'); - doc.create({age: 3}); - // Submit an op after message is sent but before server has a chance to reply - process.nextTick(function() { - doc.submitOp({p: ['age'], na: 2}, function(err) { - if (err) return done(err); - expect(doc.version).equal(2); - expect(doc.data).eql({age: 5}); - done(); + this.backend.connect(null, null, function(connection) { + var doc = connection.get('dogs', 'fido'); + doc.create({age: 3}); + // Submit an op after message is sent but before server has a chance to reply + process.nextTick(function() { + doc.submitOp({p: ['age'], na: 2}, function(err) { + if (err) return done(err); + expect(doc.version).equal(2); + expect(doc.data).eql({age: 5}); + done(); + }); }); }); }); @@ -799,35 +801,40 @@ module.exports = function() { }); it('snapshot fetch from query does not advance version of doc with pending ops', function(done) { - var doc = this.backend.connect().get('dogs', 'fido'); - var doc2 = this.backend.connect().get('dogs', 'fido'); - doc.create({name: 'kido'}, function(err) { - if (err) return done(err); - doc2.fetch(function(err) { - if (err) return done(err); - doc2.submitOp({p: ['name', 0], si: 'f'}, function(err) { + var backend = this.backend; + backend.connect(null, null, function(connection1) { + backend.connect(null, null, function(connection2) { + var doc = connection1.get('dogs', 'fido'); + var doc2 = connection2.get('dogs', 'fido'); + doc.create({name: 'kido'}, function(err) { if (err) return done(err); - expect(doc2.data).eql({name: 'fkido'}); - doc.connection.createFetchQuery('dogs', {}, null, function(err) { + doc2.fetch(function(err) { if (err) return done(err); - doc.resume(); + doc2.submitOp({p: ['name', 0], si: 'f'}, function(err) { + if (err) return done(err); + expect(doc2.data).eql({name: 'fkido'}); + doc.connection.createFetchQuery('dogs', {}, null, function(err) { + if (err) return done(err); + doc.resume(); + }); + }); }); }); - }); - }); - process.nextTick(function() { - doc.pause(); - doc.submitOp({p: ['name', 0], sd: 'k'}, function(err) { - if (err) return done(err); - doc.pause(); - doc2.fetch(function(err) { - if (err) return done(err); - expect(doc2.version).equal(3); - expect(doc2.data).eql({name: 'fido'}); - done(); + process.nextTick(function() { + doc.pause(); + doc.submitOp({p: ['name', 0], sd: 'k'}, function(err) { + if (err) return done(err); + doc.pause(); + doc2.fetch(function(err) { + if (err) return done(err); + expect(doc2.version).equal(3); + expect(doc2.data).eql({name: 'fido'}); + done(); + }); + }); + doc.del(); }); }); - doc.del(); }); }); diff --git a/test/middleware.js b/test/middleware.js index 9f1ef9f40..ecc174e58 100644 --- a/test/middleware.js +++ b/test/middleware.js @@ -153,6 +153,7 @@ describe('middleware', function() { it('context has request and reply objects', function(done) { var snapshot = this.snapshot; this.backend.use('reply', function(replyContext, next) { + if (replyContext.request.a !== 'qf') return next(); expect(replyContext).to.have.property('action', 'reply'); expect(replyContext.request).to.eql({a: 'qf', id: 1, c: 'dogs', q: {age: 3}}); expect(replyContext.reply).to.eql({ @@ -179,7 +180,8 @@ describe('middleware', function() { it('can produce errors that get sent back to client', function(done) { var errorMessage = 'This is an error from reply middleware'; - this.backend.use('reply', function(_replyContext, next) { + this.backend.use('reply', function(replyContext, next) { + if (replyContext.request.a !== 'f') return next(); next(errorMessage); }); var connection = this.backend.connect();