Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for fetching a particular version of a snapshot #220

Merged
merged 28 commits into from
Aug 13, 2018
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
f691f48
Add support for fetching a particular version of a snapshot
Jun 29, 2018
d29b26d
Only call `done` once
Jul 3, 2018
b7b3abe
Correctly parse epoch timestamp when getting a snapshot
Jul 4, 2018
0f22355
Replace Snapshot `deleted` flag with `type`
Jul 10, 2018
cf9bd25
Change snapshot version (sv) action to snapshot fetch (sf)
Jul 10, 2018
7cfc134
Create snapshots with `type.create`
Jul 10, 2018
c94ae65
Keep snapshot fetching op local to loop
Jul 10, 2018
c9141b0
Make `getSnapshot` version an optional parameter
Jul 10, 2018
5edd003
Update `getSnapshot` falsiness check to avoid empty strings, etc.
Jul 10, 2018
1c934a1
Store `getSnapshot` `typeof` check in a local variable
Jul 10, 2018
d25ee6e
Set SnapshotRequest `sent` flag to false if connection cannot send
Jul 10, 2018
dc2d022
Check if snapshot version is finite
Jul 10, 2018
4a90170
Error on getSnapshot with version -1
Jul 10, 2018
9640d73
Review markups
Jul 11, 2018
fff922c
Split `getSnapshot` into two methods
Jul 11, 2018
b030e27
Initialise empty snapshots with version 0, timestamp 0
Jul 12, 2018
ce64e8a
Remove pending and ready from Snapshot Request
Jul 12, 2018
dd83d07
Update getSnapshot JSDocs
Jul 12, 2018
c855933
Increment fetched snapshot version
Jul 12, 2018
afb7784
Update SnapshotRequest version variable names to match version
Jul 12, 2018
16d6eaa
Ensure snapshot fetch timestamp is finite
Jul 12, 2018
12e1bcc
Add Snapshot class
Jul 19, 2018
b57273d
Remove snapshot fetch at time
Jul 26, 2018
1ae5680
Review markups
Aug 2, 2018
0f3d526
Remove VS Code config
Aug 2, 2018
a39c141
Simplify reconnection test case
Aug 2, 2018
b4764ac
Update fetchSnapshot documentation
Aug 8, 2018
e80fe46
Review markups
Aug 9, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
# Emacs
\#*\#

# VS Code
.vscode/

# Logs
logs
*.log
Expand Down
24 changes: 23 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ var socket = new WebSocket('ws://' + window.location.host);
var connection = new sharedb.Connection(socket);
```

The native Websocket object that you feed to ShareDB's `Connection` constructor **does not** handle reconnections.
The native Websocket object that you feed to ShareDB's `Connection` constructor **does not** handle reconnections.

The easiest way is to give it a WebSocket object that does reconnect. There are plenty of example on the web. The most important thing is that the custom reconnecting websocket, must have the same API as the native rfc6455 version.

Expand Down Expand Up @@ -227,6 +227,27 @@ changes. Returns a [`ShareDB.Query`](#class-sharedbquery) instance.
* `options.*`
All other options are passed through to the database adapter.

`connection.fetchSnapshot(collection, id, version, callback): void;`
Get a read-only snapshot of a document at the requested version.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice documentation.


* `collection` _(String)_
Collection name of the snapshot
* `id` _(String)_
ID of the snapshot
* `version` _(number) [optional]_
The version number of the desired snapshot
* `callback` _(Function)_
Called with `(error, snapshot)`, where `snapshot` takes the following form:

```javascript
{
id: string; // ID of the snapshot
v: number; // version number of the snapshot
type: string; // the OT type of the snapshot, or null if it doesn't exist or is deleted
data: any; // the snapshot
}
```

### Class: `ShareDB.Doc`

`doc.type` _(String_)
Expand Down Expand Up @@ -375,6 +396,7 @@ Additional fields may be added to the error object for debugging context dependi
* 4021 - Invalid client id
* 4022 - Database adapter does not support queries
* 4023 - Cannot project snapshots of this type
* 4024 - Invalid version

### 5000 - Internal error

Expand Down
6 changes: 6 additions & 0 deletions lib/agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,8 @@ Agent.prototype._handleMessage = function(request, callback) {
var op = this._createOp(request);
if (!op) return callback({code: 4000, message: 'Invalid op message'});
return this._submit(request.c, request.d, op, callback);
case 'nf':
return this._fetchSnapshot(request.c, request.d, request.v, callback);
default:
callback({code: 4000, message: 'Invalid or unknown message'});
}
Expand Down Expand Up @@ -582,3 +584,7 @@ Agent.prototype._createOp = function(request) {
return new DeleteOp(src, request.seq, request.v, request.del);
}
};

Agent.prototype._fetchSnapshot = function (collection, id, version, callback) {
this.backend.fetchSnapshot(this, collection, id, version, callback);
};
64 changes: 64 additions & 0 deletions lib/backend.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ var projections = require('./projections');
var QueryEmitter = require('./query-emitter');
var StreamSocket = require('./stream-socket');
var SubmitRequest = require('./submit-request');
var types = require('./types');

var warnDeprecatedDoc = true;
var warnDeprecatedAfterSubmit = true;

Expand Down Expand Up @@ -580,6 +582,68 @@ Backend.prototype.getChannels = function(collection, id) {
];
};

Backend.prototype.fetchSnapshot = function(agent, index, id, version, callback) {
var backend = this;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add timing test around whole function. var start = Date.now() here and emit timing event in callback from sanitizeSnapshot() below, similar to backend.fetch()

this._fetchSnapshot(agent, index, id, version, function (error, snapshot) {
if (error) return callback(error);

var request = {
collection: index,
id: id,
v: snapshot.v,
snapshots: snapshot.data ? [snapshot.data] : [],
type: snapshot.type
};

backend.trigger(backend.MIDDLEWARE_ACTIONS.readSnapshots, agent, request, function (error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should call:

var projection = ....;
backend._sanitizeSnapshots(agent, projection, collection, snapshots, cb)

if (error) return callback(error);
callback(null, {
data: request.snapshots[0],
v: request.v,
type: request.type
});
});
});
};

Backend.prototype._fetchSnapshot = function (agent, index, id, version, callback) {
this.getOps(agent, index, id, 0, version, function (error, ops) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

call a this._getOps() method that does not call _sanitizeOps internally, thus not projecting nor emitting an op middleware action

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NB: I'm directly using backend.db.getOps, because backend._getOps would have basically had the same signature and just been a wrapper.

if (error) return callback(error);

var type = null;
var snapshot;
var fetchedVersion = 0;

for (var index = 0; index < ops.length; index++) {
var op = ops[index];
fetchedVersion = op.v + 1;

if (op.create) {
type = types.map[op.create.type];
if (!type) return callback({ code: 4008, message: 'Unknown type' });
snapshot = type.create(op.create.data);
} else if (op.del) {
snapshot = undefined;
type = null;
} else {
snapshot = type.apply(snapshot, op.op);
}
}

type = type ? type.uri : null;

if (version > fetchedVersion) {
return callback({ code: 4024, message: 'Requested version exceeds latest snapshot version' });
}

callback(null, {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the Snapshot constructor here

data: snapshot,
v: fetchedVersion,
type: type
});
});
};

function pluckIds(snapshots) {
var ids = [];
for (var i = 0; i < snapshots.length; i++) {
Expand Down
72 changes: 69 additions & 3 deletions lib/client/connection.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
var Doc = require('./doc');
var Query = require('./query');
var SnapshotRequest = require('./snapshot-request');
var emitter = require('../emitter');
var ShareDBError = require('../error');
var types = require('../types');
Expand Down Expand Up @@ -33,13 +34,17 @@ function Connection(socket) {
// (created documents MUST BE UNIQUE)
this.collections = {};

// Each query is created with an id that the server uses when it sends us
// info about the query (updates, etc)
// Each query and snapshot request is created with an id that the server uses when it sends us
// info about the request (updates, etc)
this.nextQueryId = 1;
this.nextSnapshotRequestId = 1;

// Map from query ID -> query object.
this.queries = {};

// Map from snapshot request ID -> snapshot request
this._snapshotRequests = {};

// A unique message number for the given id
this.seq = 1;

Expand Down Expand Up @@ -226,6 +231,9 @@ Connection.prototype.handleMessage = function(message) {
case 'bu':
return this._handleBulkMessage(message, '_handleUnsubscribe');

case 'nf':
return this._handleSnapshot(err, message);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's call this _handleSnapshotFetch consistent with the message


case 'f':
var doc = this.getExisting(message.c, message.d);
if (doc) doc._handleFetch(err, message.data);
Expand Down Expand Up @@ -310,6 +318,11 @@ Connection.prototype._setState = function(newState, reason) {
docs[id]._onConnectionStateChanged();
}
}
// Emit the event to all snapshots
for (var id in this._snapshotRequests) {
var snapshotRequest = this._snapshotRequests[id];
snapshotRequest._onConnectionStateChanged();
}
this.endBulk();

this.emit(newState, reason);
Expand Down Expand Up @@ -523,12 +536,16 @@ Connection.prototype.createSubscribeQuery = function(collection, q, options, cal
Connection.prototype.hasPending = function() {
return !!(
this._firstDoc(hasPending) ||
this._firstQuery(hasPending)
this._firstQuery(hasPending) ||
this._firstSnapshotRequest(exists)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove the exists method, since we always call with this and it can simplify the _fristSnapshotRequest method below, which is an internal method

);
};
function hasPending(object) {
return object.hasPending();
}
function exists(object) {
return !!object;
}

Connection.prototype.hasWritePending = function() {
return !!this._firstDoc(hasWritePending);
Expand All @@ -552,6 +569,11 @@ Connection.prototype.whenNothingPending = function(callback) {
query.once('ready', this._nothingPendingRetry(callback));
return;
}
var snapshotRequest = this._firstSnapshotRequest(exists);
if (snapshotRequest) {
snapshotRequest.once('ready', this._nothingPendingRetry(callback));
return;
}
// Call back when no pending operations
process.nextTick(callback);
};
Expand Down Expand Up @@ -584,3 +606,47 @@ Connection.prototype._firstQuery = function(fn) {
}
}
};

Connection.prototype._firstSnapshotRequest = function (fn) {
for (var id in this._snapshotRequests) {
var snapshotRequest = this._snapshotRequests[id];
if (fn(snapshotRequest)) {
return snapshotRequest;
}
}
};

/**
* Fetch a read-only snapshot at a given version
*
* @param collection - the collection name of the snapshot
* @param id - the ID of the snapshot
* @param version (optional) - the version number to fetch
* @param callback - (error, snapshot) => void, where snapshot takes the following schema:
*
* {
* id: string; // ID of the snapshot
* v: number; // version number of the snapshot
* type: string; // the OT type of the snapshot, or null if it doesn't exist or is deleted
* data: any; // the snapshot
* }
*
*/
Connection.prototype.fetchSnapshot = function(collection, id, version, callback) {
if (typeof version === 'function') {
callback = version;
version = null;
}

var requestId = this.nextSnapshotRequestId++;
var snapshotRequest = new SnapshotRequest(this, requestId, collection, id, version, callback);
this._snapshotRequests[snapshotRequest.requestId] = snapshotRequest;
snapshotRequest.send();
};

Connection.prototype._handleSnapshot = function (error, message) {
var snapshotRequest = this._snapshotRequests[message.id];
if (!snapshotRequest) return;
delete this._snapshotRequests[message.id];
snapshotRequest._handleResponse(error, message);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete this.snapshotRequests[message.id];

is missing here - it'll prevent a memory leak.

};
62 changes: 62 additions & 0 deletions lib/client/snapshot-request.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
var Snapshot = require('../snapshot');
var util = require('../util');
var emitter = require('../emitter');

module.exports = SnapshotRequest;

function SnapshotRequest(connection, requestId, collection, id, version, callback) {
emitter.EventEmitter.call(this);

if (typeof callback !== 'function') {
throw new Error('Callback is required for SnapshotRequest');
}

if (version != null && !util.isInteger(version)) {
throw new Error('Snapshot version must be an integer');
}

this.requestId = requestId;
this.connection = connection;
this.id = id;
this.collection = collection;
this.version = util.isInteger(version) ? Math.max(0, version) : null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's throw an error for negative versions rather than silently fixing it

this.callback = callback;

this.sent = false;
}
emitter.mixin(SnapshotRequest);

SnapshotRequest.prototype.send = function () {
if (!this.connection.canSend) {
return;
}

var message = {
a: 'nf',
id: this.requestId,
c: this.collection,
d: this.id,
v: this.version,
};

this.connection.send(message);
this.sent = true;
};

SnapshotRequest.prototype._onConnectionStateChanged = function () {
if (this.connection.canSend && !this.sent) {
this.send();
} else if (!this.connection.canSend) {
this.sent = false;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If !this.connection.canSend, then probably set this.sent to false, so that the request would be resent after re-connection.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It starts as false, and is only changed to true at the end of send().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about the following situation:

  1. Snapshot request is sent
  2. Connection is dropped
  3. Connection is restored

_onConnectionStateChanged will not re-send the request because this.sent is true. The server will not send a response to the new WebSocket connection because the request was not sent on that new WebSocket connection and the old WebSocket connection has been closed.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha cool. Still trying to piece together all the data flows around this library! Some of it's pretty confusing.

};

SnapshotRequest.prototype._handleResponse = function (error, message) {
if (error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's emit ready here first before error checking, same as query so that we make sure we eventually call back to whenNothingPending on the connection in all cases

return this.callback(error);
}

var snapshot = new Snapshot(this.id, message.v, message.type, message.data, null);
this.callback(null, snapshot);
this.emit('ready');
};
16 changes: 4 additions & 12 deletions lib/db/memory.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
var DB = require('./index');
var Snapshot = require('../snapshot');

// In-memory ShareDB database
//
Expand Down Expand Up @@ -151,24 +152,15 @@ MemoryDB.prototype._getSnapshotSync = function(collection, id, includeMetadata)
var snapshot;
if (doc) {
var data = clone(doc.data);
var meta = (includeMetadata) ? clone(doc.m) : undefined;
snapshot = new MemorySnapshot(id, doc.v, doc.type, data, meta);
var meta = (includeMetadata) ? clone(doc.m) : null;
snapshot = new Snapshot(id, doc.v, doc.type, data, meta);
} else {
var version = this._getVersionSync(collection, id);
snapshot = new MemorySnapshot(id, version, null, undefined);
snapshot = new Snapshot(id, version, null, undefined, null);
}
return snapshot;
};

// `id`, and `v` should be on every returned snapshot
function MemorySnapshot(id, version, type, data, meta) {
this.id = id;
this.v = version;
this.type = type;
this.data = data;
if (meta) this.m = meta;
}

MemoryDB.prototype._getOpLogSync = function(collection, id) {
var collectionOps = this.ops[collection] || (this.ops[collection] = {});
return collectionOps[id] || (collectionOps[id] = []);
Expand Down
8 changes: 8 additions & 0 deletions lib/snapshot.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
module.exports = Snapshot;
function Snapshot(id, version, type, data, meta) {
this.id = id;
this.v = version;
this.type = type;
this.data = data;
this.m = meta;
}
7 changes: 7 additions & 0 deletions lib/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,10 @@ exports.hasKeys = function(object) {
for (var key in object) return true;
return false;
};

// https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Number/isInteger#Polyfill
exports.isInteger = Number.isInteger || function (value) {
return typeof value === 'number' &&
isFinite(value) &&
Math.floor(value) === value;
};
Loading