From 1a13a8d95e114cea036a58b1700c94d73a517366 Mon Sep 17 00:00:00 2001 From: Ritchie Martori Date: Sun, 26 Jan 2014 14:02:56 -0800 Subject: [PATCH] Add Checkpoint model and Model replication methods --- lib/models/checkpoint.js | 56 +++++++++++ lib/models/model.js | 203 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 259 insertions(+) create mode 100644 lib/models/checkpoint.js diff --git a/lib/models/checkpoint.js b/lib/models/checkpoint.js new file mode 100644 index 000000000..f5ff74236 --- /dev/null +++ b/lib/models/checkpoint.js @@ -0,0 +1,56 @@ +/** + * Module Dependencies. + */ + +var Model = require('../loopback').Model + , loopback = require('../loopback') + , assert = require('assert'); + +/** + * Properties + */ + +var properties = { + id: {type: Number, generated: true, id: true}, + time: {type: Number, generated: true, default: Date.now}, + sourceId: {type: String} +}; + +/** + * Options + */ + +var options = { + +}; + +/** + * Checkpoint list entry. + * + * @property id {Number} the sequencial identifier of a checkpoint + * @property time {Number} the time when the checkpoint was created + * @property sourceId {String} the source identifier + * + * @class + * @inherits {Model} + */ + +var Checkpoint = module.exports = Model.extend('Checkpoint', properties, options); + +/** + * Get the current checkpoint id + * @callback {Function} callback + * @param {Error} err + * @param {Number} checkpointId The current checkpoint id + */ + +Checkpoint.current = function(cb) { + this.find({ + limit: 1, + sort: 'id DESC' + }, function(err, checkpoint) { + if(err) return cb(err); + cb(null, checkpoint.id); + }); +} + diff --git a/lib/models/model.js b/lib/models/model.js index ce2bb5f90..6e024a7d4 100644 --- a/lib/models/model.js +++ b/lib/models/model.js @@ -131,6 +131,7 @@ function getACL() { * @param {String|Error} err The error object * @param {Boolean} allowed is the request allowed */ + Model.checkAccess = function(token, modelId, method, callback) { var ANONYMOUS = require('./access-token').ANONYMOUS; token = token || ANONYMOUS; @@ -190,3 +191,205 @@ Model._getAccessTypeForMethod = function(method) { // setup the initial model Model.setup(); +/** + * Get a set of deltas and conflicts since the given checkpoint. + * + * See `Change.diff()` for details. + * + * @param {Number} since Find changes since this checkpoint + * @param {Array} remoteChanges An array of change objects + * @param {Function} callback + */ + +Model.diff = function(since, remoteChanges, callback) { + var Change = this.getChangeModel(); + Change.diff(this.modelName, since, remoteChanges, callback); +} + +/** + * Get the changes to a model since a given checkpoing. Provide a filter object + * to reduce the number of results returned. + * @param {Number} since Only return changes since this checkpoint + * @param {Object} filter Only include changes that match this filter + * (same as `Model.find(filter, ...)`) + * @callback {Function} callback + * @param {Error} err + * @param {Array} changes An array of `Change` objects + * @end + */ + +Model.changes = function(since, filter, callback) { + var idName = this.idName(); + var Change = this.getChangeModel(); + var model = this; + + filter = filter || {}; + filter.fields = {}; + filter.where = filter.where || {}; + filter.fields[idName] = true; + + // this whole thing could be optimized a bit more + Change.find({ + checkpoint: {gt: since}, + modelName: this.modelName + }, function(err, changes) { + if(err) return cb(err); + var ids = changes.map(function(change) { + return change.modelId; + }); + filter.where[idName] = {inq: ids}; + model.find(filter, function(err, models) { + if(err) return cb(err); + var modelIds = models.map(function(m) { + return m[idName]; + }); + callback(null, changes.filter(function(ch) { + return modelIds.indexOf(ch.modelId) > -1; + })); + }); + }); +} + +/** + * Create a checkpoint. + * + * @param {Function} callback + */ + +Model.checkpoint = function(cb) { + var Checkpoint = this.getChangeModel().Checkpoint; + this.getSourceId(function(err, sourceId) { + if(err) return cb(err); + Checkpoint.create({ + sourceId: sourceId + }, cb); + }); +} + +/** + * Replicate changes since the given checkpoint to the given target model. + * + * @param {Number} since Since this checkpoint + * @param {Model} targetModel Target this model class + * @options {Object} options + * @property {Object} filter Replicate models that match this filter + * @callback {Function} callback + * @param {Error} err + * @param {Array} conflicts A list of changes that could not be replicated + * due to conflicts. + */ + +Model.replicate = function(since, targetModel, options, callback) { + var sourceModel = this; + var diff; + var updates; + + var tasks = [ + getLocalChanges, + getDiffFromTarget, + createSourceUpdates, + bulkUpdate, + sourceModel.checkpoint.bind(sourceModel) + ]; + + async.waterfall(tasks, function(err) { + if(err) return callback(err); + callback(null, diff.conflicts); + }); + + function getLocalChanges(cb) { + sourceModel.changes(since, options.filter, cb); + } + + function getDiffFromTarget(sourceChanges, cb) { + targetModel.diff(since, sourceChanges, cb); + } + + function createSourceUpdates(_diff, cb) { + diff = _diff; + sourceModel.createUpdates(diff.deltas, cb); + } + + function bulkUpdate(updates, cb) { + targetModel.bulkUpdate(updates, cb); + } +} + +/** + * Create an update list (for `Model.bulkUpdate()`) from a delta list + * (result of `Change.diff()`). + * + * @param {Array} deltas + * @param {Function} callback + */ + +Model.createUpdates = function(deltas, cb) { + var Change = this.getChangeModel(); + var updates = []; + var Model = this; + var tasks = []; + var type = change.type(); + + deltas.forEach(function(change) { + change = new Change(change); + var update = {type: type, change: change}; + switch(type) { + case Change.CREATE: + case Change.UPDATE: + tasks.push(function(cb) { + Model.findById(change.modelId, function(err, inst) { + if(err) return cb(err); + update.data = inst; + updates.push(update); + cb(); + }); + }); + break; + case Change.DELETE: + updates.push(update); + break; + } + }); + + async.parallel(tasks, function(err) { + if(err) return cb(err); + cb(null, updates); + }); +} + +/** + * Apply an update list. + * + * **Note: this is not atomic** + * + * @param {Array} updates An updates list (usually from Model.createUpdates()) + * @param {Function} callback + */ + +Model.bulkUpdate = function(updates, callback) { + var tasks = []; + var Model = this; + var idName = Model.idName(); + var Change = this.getChangeModel(); + + updates.forEach(function(update) { + switch(update.type) { + case Change.UPDATE: + case Change.CREATE: + tasks.push(Model.upsert.bind(Model, update.data)); + break; + case: Change.DELETE: + var data = {}; + data[idName] = update.change.modelId; + var model = new Model(data); + tasks.push(model.destroy.bind(model)); + break; + } + }); + + async.parallel(tasks, callback); +} + +Model.getChangeModel = function() { + +}