Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for fetching a particular version of a snapshot #220

Merged
merged 28 commits into from
Aug 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
f691f48
Add support for fetching a particular version of a snapshot
Jun 29, 2018
d29b26d
Only call `done` once
Jul 3, 2018
b7b3abe
Correctly parse epoch timestamp when getting a snapshot
Jul 4, 2018
0f22355
Replace Snapshot `deleted` flag with `type`
Jul 10, 2018
cf9bd25
Change snapshot version (sv) action to snapshot fetch (sf)
Jul 10, 2018
7cfc134
Create snapshots with `type.create`
Jul 10, 2018
c94ae65
Keep snapshot fetching op local to loop
Jul 10, 2018
c9141b0
Make `getSnapshot` version an optional parameter
Jul 10, 2018
5edd003
Update `getSnapshot` falsiness check to avoid empty strings, etc.
Jul 10, 2018
1c934a1
Store `getSnapshot` `typeof` check in a local variable
Jul 10, 2018
d25ee6e
Set SnapshotRequest `sent` flag to false if connection cannot send
Jul 10, 2018
dc2d022
Check if snapshot version is finite
Jul 10, 2018
4a90170
Error on getSnapshot with version -1
Jul 10, 2018
9640d73
Review markups
Jul 11, 2018
fff922c
Split `getSnapshot` into two methods
Jul 11, 2018
b030e27
Initialise empty snapshots with version 0, timestamp 0
Jul 12, 2018
ce64e8a
Remove pending and ready from Snapshot Request
Jul 12, 2018
dd83d07
Update getSnapshot JSDocs
Jul 12, 2018
c855933
Increment fetched snapshot version
Jul 12, 2018
afb7784
Update SnapshotRequest version variable names to match version
Jul 12, 2018
16d6eaa
Ensure snapshot fetch timestamp is finite
Jul 12, 2018
12e1bcc
Add Snapshot class
Jul 19, 2018
b57273d
Remove snapshot fetch at time
Jul 26, 2018
1ae5680
Review markups
Aug 2, 2018
0f3d526
Remove VS Code config
Aug 2, 2018
a39c141
Simplify reconnection test case
Aug 2, 2018
b4764ac
Update fetchSnapshot documentation
Aug 8, 2018
e80fe46
Review markups
Aug 9, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
# Emacs
\#*\#

# VS Code
.vscode/

# Logs
logs
*.log
Expand Down
24 changes: 23 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ var socket = new WebSocket('ws://' + window.location.host);
var connection = new sharedb.Connection(socket);
```

The native Websocket object that you feed to ShareDB's `Connection` constructor **does not** handle reconnections.
The native Websocket object that you feed to ShareDB's `Connection` constructor **does not** handle reconnections.

The easiest way is to give it a WebSocket object that does reconnect. There are plenty of example on the web. The most important thing is that the custom reconnecting websocket, must have the same API as the native rfc6455 version.

Expand Down Expand Up @@ -227,6 +227,27 @@ changes. Returns a [`ShareDB.Query`](#class-sharedbquery) instance.
* `options.*`
All other options are passed through to the database adapter.

`connection.fetchSnapshot(collection, id, version, callback): void;`
Get a read-only snapshot of a document at the requested version.

* `collection` _(String)_
Collection name of the snapshot
* `id` _(String)_
ID of the snapshot
* `version` _(number) [optional]_
The version number of the desired snapshot
* `callback` _(Function)_
Called with `(error, snapshot)`, where `snapshot` takes the following form:

```javascript
{
id: string; // ID of the snapshot
v: number; // version number of the snapshot
type: string; // the OT type of the snapshot, or null if it doesn't exist or is deleted
data: any; // the snapshot
}
```

### Class: `ShareDB.Doc`

`doc.type` _(String_)
Expand Down Expand Up @@ -375,6 +396,7 @@ Additional fields may be added to the error object for debugging context dependi
* 4021 - Invalid client id
* 4022 - Database adapter does not support queries
* 4023 - Cannot project snapshots of this type
* 4024 - Invalid version

### 5000 - Internal error

Expand Down
6 changes: 6 additions & 0 deletions lib/agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,8 @@ Agent.prototype._handleMessage = function(request, callback) {
var op = this._createOp(request);
if (!op) return callback({code: 4000, message: 'Invalid op message'});
return this._submit(request.c, request.d, op, callback);
case 'nf':
return this._fetchSnapshot(request.c, request.d, request.v, callback);
default:
callback({code: 4000, message: 'Invalid or unknown message'});
}
Expand Down Expand Up @@ -582,3 +584,7 @@ Agent.prototype._createOp = function(request) {
return new DeleteOp(src, request.seq, request.v, request.del);
}
};

Agent.prototype._fetchSnapshot = function (collection, id, version, callback) {
this.backend.fetchSnapshot(this, collection, id, version, callback);
};
66 changes: 66 additions & 0 deletions lib/backend.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ var MemoryPubSub = require('./pubsub/memory');
var ot = require('./ot');
var projections = require('./projections');
var QueryEmitter = require('./query-emitter');
var Snapshot = require('./snapshot');
var StreamSocket = require('./stream-socket');
var SubmitRequest = require('./submit-request');
var types = require('./types');

var warnDeprecatedDoc = true;
var warnDeprecatedAfterSubmit = true;

Expand Down Expand Up @@ -580,6 +583,69 @@ Backend.prototype.getChannels = function(collection, id) {
];
};

Backend.prototype.fetchSnapshot = function(agent, index, id, version, callback) {
var start = Date.now();
var backend = this;
var projection = this.projections[index];
var collection = projection ? projection.target : index;
var request = {
agent: agent,
index: index,
collection: collection,
id: id,
version: version
};

this._fetchSnapshot(collection, id, version, function (error, snapshot) {
if (error) return callback(error);
var snapshotProjection = backend._getSnapshotProjection(backend.db, projection);
var snapshots = [snapshot];
backend._sanitizeSnapshots(agent, snapshotProjection, collection, snapshots, function (error) {
if (error) return callback(error);
backend.emit('timing', 'fetchSnapshot', Date.now() - start, request);
callback(null, snapshot);
});
});
};

Backend.prototype._fetchSnapshot = function (collection, id, version, callback) {
// Bypass backend.getOps so that we don't call _sanitizeOps. We want to avoid this, because:
// - we want to avoid the 'op' middleware, because we later use the 'readSnapshots' middleware in _sanitizeSnapshots
// - we handle the projection in _sanitizeSnapshots
this.db.getOps(collection, id, 0, version, null, function (error, ops) {
if (error) return callback(error);

var type = null;
var data;
var fetchedVersion = 0;

for (var index = 0; index < ops.length; index++) {
var op = ops[index];
fetchedVersion = op.v + 1;

if (op.create) {
type = types.map[op.create.type];
if (!type) return callback({ code: 4008, message: 'Unknown type' });
data = type.create(op.create.data);
} else if (op.del) {
data = undefined;
type = null;
} else {
data = type.apply(data, op.op);
}
}

type = type ? type.uri : null;

if (version > fetchedVersion) {
return callback({ code: 4024, message: 'Requested version exceeds latest snapshot version' });
}

var snapshot = new Snapshot(id, fetchedVersion, type, data, null);
callback(null, snapshot);
});
};

function pluckIds(snapshots) {
var ids = [];
for (var i = 0; i < snapshots.length; i++) {
Expand Down
66 changes: 63 additions & 3 deletions lib/client/connection.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
var Doc = require('./doc');
var Query = require('./query');
var SnapshotRequest = require('./snapshot-request');
var emitter = require('../emitter');
var ShareDBError = require('../error');
var types = require('../types');
Expand Down Expand Up @@ -33,13 +34,17 @@ function Connection(socket) {
// (created documents MUST BE UNIQUE)
this.collections = {};

// Each query is created with an id that the server uses when it sends us
// info about the query (updates, etc)
// Each query and snapshot request is created with an id that the server uses when it sends us
// info about the request (updates, etc)
this.nextQueryId = 1;
this.nextSnapshotRequestId = 1;

// Map from query ID -> query object.
this.queries = {};

// Map from snapshot request ID -> snapshot request
this._snapshotRequests = {};

// A unique message number for the given id
this.seq = 1;

Expand Down Expand Up @@ -226,6 +231,9 @@ Connection.prototype.handleMessage = function(message) {
case 'bu':
return this._handleBulkMessage(message, '_handleUnsubscribe');

case 'nf':
return this._handleSnapshotFetch(err, message);

case 'f':
var doc = this.getExisting(message.c, message.d);
if (doc) doc._handleFetch(err, message.data);
Expand Down Expand Up @@ -310,6 +318,11 @@ Connection.prototype._setState = function(newState, reason) {
docs[id]._onConnectionStateChanged();
}
}
// Emit the event to all snapshots
for (var id in this._snapshotRequests) {
var snapshotRequest = this._snapshotRequests[id];
snapshotRequest._onConnectionStateChanged();
}
this.endBulk();

this.emit(newState, reason);
Expand Down Expand Up @@ -523,7 +536,8 @@ Connection.prototype.createSubscribeQuery = function(collection, q, options, cal
Connection.prototype.hasPending = function() {
return !!(
this._firstDoc(hasPending) ||
this._firstQuery(hasPending)
this._firstQuery(hasPending) ||
this._firstSnapshotRequest()
);
};
function hasPending(object) {
Expand Down Expand Up @@ -552,6 +566,11 @@ Connection.prototype.whenNothingPending = function(callback) {
query.once('ready', this._nothingPendingRetry(callback));
return;
}
var snapshotRequest = this._firstSnapshotRequest();
if (snapshotRequest) {
snapshotRequest.once('ready', this._nothingPendingRetry(callback));
return;
}
// Call back when no pending operations
process.nextTick(callback);
};
Expand Down Expand Up @@ -584,3 +603,44 @@ Connection.prototype._firstQuery = function(fn) {
}
}
};

Connection.prototype._firstSnapshotRequest = function () {
for (var id in this._snapshotRequests) {
return this._snapshotRequests[id];
}
};

/**
* Fetch a read-only snapshot at a given version
*
* @param collection - the collection name of the snapshot
* @param id - the ID of the snapshot
* @param version (optional) - the version number to fetch
* @param callback - (error, snapshot) => void, where snapshot takes the following schema:
*
* {
* id: string; // ID of the snapshot
* v: number; // version number of the snapshot
* type: string; // the OT type of the snapshot, or null if it doesn't exist or is deleted
* data: any; // the snapshot
* }
*
*/
Connection.prototype.fetchSnapshot = function(collection, id, version, callback) {
if (typeof version === 'function') {
callback = version;
version = null;
}

var requestId = this.nextSnapshotRequestId++;
var snapshotRequest = new SnapshotRequest(this, requestId, collection, id, version, callback);
this._snapshotRequests[snapshotRequest.requestId] = snapshotRequest;
snapshotRequest.send();
};

Connection.prototype._handleSnapshotFetch = function (error, message) {
var snapshotRequest = this._snapshotRequests[message.id];
if (!snapshotRequest) return;
delete this._snapshotRequests[message.id];
snapshotRequest._handleResponse(error, message);
};
75 changes: 75 additions & 0 deletions lib/client/snapshot-request.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
var Snapshot = require('../snapshot');
var util = require('../util');
var emitter = require('../emitter');

module.exports = SnapshotRequest;

function SnapshotRequest(connection, requestId, collection, id, version, callback) {
emitter.EventEmitter.call(this);

if (typeof callback !== 'function') {
throw new Error('Callback is required for SnapshotRequest');
}

if (!this.isValidVersion(version)) {
throw new Error('Snapshot version must be a positive integer or null');
}

this.requestId = requestId;
this.connection = connection;
this.id = id;
this.collection = collection;
this.version = version;
this.callback = callback;

this.sent = false;
}
emitter.mixin(SnapshotRequest);

SnapshotRequest.prototype.isValidVersion = function (version) {
if (version === null) {
return true;
}

if (!util.isInteger(version)) {
return false;
}

return version >= 0;
}

SnapshotRequest.prototype.send = function () {
if (!this.connection.canSend) {
return;
}

var message = {
a: 'nf',
id: this.requestId,
c: this.collection,
d: this.id,
v: this.version,
};

this.connection.send(message);
this.sent = true;
};

SnapshotRequest.prototype._onConnectionStateChanged = function () {
if (this.connection.canSend && !this.sent) {
this.send();
} else if (!this.connection.canSend) {
this.sent = false;
}
};

SnapshotRequest.prototype._handleResponse = function (error, message) {
this.emit('ready');

if (error) {
return this.callback(error);
}

var snapshot = new Snapshot(this.id, message.v, message.type, message.data, null);
this.callback(null, snapshot);
};
16 changes: 4 additions & 12 deletions lib/db/memory.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
var DB = require('./index');
var Snapshot = require('../snapshot');

// In-memory ShareDB database
//
Expand Down Expand Up @@ -151,24 +152,15 @@ MemoryDB.prototype._getSnapshotSync = function(collection, id, includeMetadata)
var snapshot;
if (doc) {
var data = clone(doc.data);
var meta = (includeMetadata) ? clone(doc.m) : undefined;
snapshot = new MemorySnapshot(id, doc.v, doc.type, data, meta);
var meta = (includeMetadata) ? clone(doc.m) : null;
snapshot = new Snapshot(id, doc.v, doc.type, data, meta);
} else {
var version = this._getVersionSync(collection, id);
snapshot = new MemorySnapshot(id, version, null, undefined);
snapshot = new Snapshot(id, version, null, undefined, null);
}
return snapshot;
};

// `id`, and `v` should be on every returned snapshot
function MemorySnapshot(id, version, type, data, meta) {
this.id = id;
this.v = version;
this.type = type;
this.data = data;
if (meta) this.m = meta;
}

MemoryDB.prototype._getOpLogSync = function(collection, id) {
var collectionOps = this.ops[collection] || (this.ops[collection] = {});
return collectionOps[id] || (collectionOps[id] = []);
Expand Down
8 changes: 8 additions & 0 deletions lib/snapshot.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
module.exports = Snapshot;
function Snapshot(id, version, type, data, meta) {
this.id = id;
this.v = version;
this.type = type;
this.data = data;
this.m = meta;
}
7 changes: 7 additions & 0 deletions lib/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,10 @@ exports.hasKeys = function(object) {
for (var key in object) return true;
return false;
};

// https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Number/isInteger#Polyfill
exports.isInteger = Number.isInteger || function (value) {
return typeof value === 'number' &&
isFinite(value) &&
Math.floor(value) === value;
};
Loading