diff --git a/example/replication/app.js b/example/replication/app.js new file mode 100644 index 000000000..ab6e69870 --- /dev/null +++ b/example/replication/app.js @@ -0,0 +1,138 @@ +var loopback = require('../../'); +var app = loopback(); +var db = app.dataSource('db', {connector: loopback.Memory}); +var Color = app.model('color', {dataSource: 'db', options: {trackChanges: true}}); +var Color2 = app.model('color2', {dataSource: 'db', options: {trackChanges: true}}); +var target = Color2; +var source = Color; +var SPEED = process.env.SPEED || 100; +var conflicts; + +var steps = [ + + createSomeInitialSourceData, + + replicateSourceToTarget, + list.bind(this, source, 'current SOURCE data'), + list.bind(this, target, 'current TARGET data'), + + updateSomeTargetData, + + replicateSourceToTarget, + list.bind(this, source, 'current SOURCE data '), + list.bind(this, target, 'current TARGET data (includes conflicting update)'), + + updateSomeSourceDataCausingAConflict, + + replicateSourceToTarget, + list.bind(this, source, 'current SOURCE data (now has a conflict)'), + list.bind(this, target, 'current TARGET data (includes conflicting update)'), + + resolveAllConflicts, + + replicateSourceToTarget, + list.bind(this, source, 'current SOURCE data (conflict resolved)'), + list.bind(this, target, 'current TARGET data (conflict resolved)'), + + createMoreSourceData, + + replicateSourceToTarget, + list.bind(this, source, 'current SOURCE data'), + list.bind(this, target, 'current TARGET data'), + + createEvenMoreSourceData, + + replicateSourceToTarget, + list.bind(this, source, 'current SOURCE data'), + list.bind(this, target, 'current TARGET data'), + + deleteAllSourceData, + + replicateSourceToTarget, + list.bind(this, source, 'current SOURCE data (empty)'), + list.bind(this, target, 'current TARGET data (empty)'), + + createSomeNewSourceData, + + replicateSourceToTarget, + list.bind(this, source, 'current SOURCE data'), + list.bind(this, target, 'current TARGET data') +]; + +run(steps); + +function createSomeInitialSourceData() { + Color.create([ + {name: 'red'}, + {name: 'blue'}, + {name: 'green'} + ]); +} + +function replicateSourceToTarget() { + Color.replicate(0, Color2, {}, function(err, replicationConflicts) { + conflicts = replicationConflicts; + }); +} + +function resolveAllConflicts() { + if(conflicts.length) { + conflicts.forEach(function(conflict) { + conflict.resolve(); + }); + } +} + +function updateSomeTargetData() { + Color2.findById(1, function(err, color) { + color.name = 'conflict'; + color.save(); + }); +} + +function createMoreSourceData() { + Color.create({name: 'orange'}); +} + +function createEvenMoreSourceData() { + Color.create({name: 'black'}); +} + +function updateSomeSourceDataCausingAConflict() { + Color.findById(1, function(err, color) { + color.name = 'red!!!!'; + color.save(); + }); +} + +function deleteAllSourceData() { + Color.destroyAll(); +} + +function createSomeNewSourceData() { + Color.create([ + {name: 'violet'}, + {name: 'amber'}, + {name: 'olive'} + ]); +} + +function list(model, msg) { + console.log(msg); + model.find(function(err, items) { + items.forEach(function(item) { + console.log(' -', item.name); + }); + console.log(); + }); +} + +function run(steps) { + setInterval(function() { + var step = steps.shift(); + if(step) { + console.log(step.name); + step(); + } + }, SPEED); +} diff --git a/lib/models/change.js b/lib/models/change.js index 199438cc1..41e9b8760 100644 --- a/lib/models/change.js +++ b/lib/models/change.js @@ -27,7 +27,7 @@ var properties = { */ var options = { - + trackChanges: false }; /** @@ -55,6 +55,12 @@ Change.CREATE = 'create'; Change.DELETE = 'delete'; Change.UNKNOWN = 'unknown'; +/*! + * Conflict Class + */ + +Change.Conflict = Conflict; + /*! * Setup the extended model. */ @@ -149,13 +155,34 @@ Change.findOrCreate = function(modelName, modelId, callback) { Change.prototype.rectify = function(cb) { var change = this; - this.prev = this.rev; - // get the current revision - this.currentRevision(function(err, rev) { - if(err) return Change.handleError(err, cb); - change.rev = rev; + var tasks = [ + updateRevision, + updateCheckpoint + ]; + + if(this.rev) this.prev = this.rev; + + async.parallel(tasks, function(err) { + if(err) return cb(err); change.save(cb); }); + + function updateRevision(cb) { + // get the current revision + change.currentRevision(function(err, rev) { + if(err) return Change.handleError(err, cb); + change.rev = rev; + cb(); + }); + } + + function updateCheckpoint(cb) { + change.constructor.getCheckpointModel().current(function(err, checkpoint) { + if(err) return Change.handleError(err); + change.checkpoint = ++checkpoint; + cb(); + }); + } } /** @@ -233,7 +260,7 @@ Change.prototype.type = function() { Change.prototype.getModelCtor = function() { // todo - not sure if this works with multiple data sources - return this.constructor.modelBuilder.models[this.modelName]; + return loopback.getModel(this.modelName); } /** @@ -306,7 +333,10 @@ Change.diff = function(modelName, since, remoteChanges, callback) { if(err) return callback(err); var deltas = []; var conflicts = []; + var localModelIds = []; + localChanges.forEach(function(localChange) { + localModelIds.push(localChange.modelId); var remoteChange = remoteChangeIndex[localChange.modelId]; if(!localChange.equals(remoteChange)) { if(remoteChange.isBasedOn(localChange)) { @@ -317,9 +347,91 @@ Change.diff = function(modelName, since, remoteChanges, callback) { } }); + modelIds.forEach(function(id) { + if(localModelIds.indexOf(id) === -1) { + deltas.push(remoteChangeIndex[id]); + } + }); + callback(null, { deltas: deltas, conflicts: conflicts }); }); } + +/** + * Correct all change list entries. + * @param {Function} callback + */ + +Change.rectifyAll = function(cb) { + // this should be optimized + this.find(function(err, changes) { + if(err) return cb(err); + changes.forEach(function(change) { + change.rectify(); + }); + }); +} + +/** + * Get the checkpoint model. + * @return {Checkpoint} + */ + +Change.getCheckpointModel = function() { + var checkpointModel = this.Checkpoint; + if(checkpointModel) return checkpointModel; + this.checkpoint = checkpointModel = require('./checkpoint').extend('checkpoint'); + checkpointModel.attachTo(this.dataSource); + return checkpointModel; +} + + +/** + * When two changes conflict a conflict is created. + * + * **Note: call `conflict.fetch()` to get the `target` and `source` models. + * + * @param {Change} sourceChange The change object for the source model + * @param {Change} targetChange The conflicting model's change object + * @property {Model} source The source model instance + * @property {Model} target The target model instance + */ + +function Conflict(sourceChange, targetChange) { + this.sourceChange = sourceChange; + this.targetChange = targetChange; +} + +Conflict.prototype.fetch = function(cb) { + var conflict = this; + var tasks = [ + getSourceModel, + getTargetModel + ]; + + async.parallel(tasks, cb); + + function getSourceModel(change, cb) { + conflict.sourceModel.getModel(function(err, model) { + if(err) return cb(err); + conflict.source = model; + cb(); + }); + } + + function getTargetModel(cb) { + conflict.targetModel.getModel(function(err, model) { + if(err) return cb(err); + conflict.target = model; + cb(); + }); + } +} + +Conflict.prototype.resolve = function(cb) { + this.sourceChange.prev = this.targetChange.rev; + this.sourceChange.save(cb); +} diff --git a/lib/models/checkpoint.js b/lib/models/checkpoint.js index f5ff74236..22248bcb4 100644 --- a/lib/models/checkpoint.js +++ b/lib/models/checkpoint.js @@ -48,8 +48,9 @@ Checkpoint.current = function(cb) { this.find({ limit: 1, sort: 'id DESC' - }, function(err, checkpoint) { + }, function(err, checkpoints) { if(err) return cb(err); + var checkpoint = checkpoints[0] || {id: 0}; cb(null, checkpoint.id); }); } diff --git a/lib/models/model.js b/lib/models/model.js index 6e024a7d4..8c08d5d00 100644 --- a/lib/models/model.js +++ b/lib/models/model.js @@ -4,13 +4,76 @@ var loopback = require('../loopback'); var ModelBuilder = require('loopback-datasource-juggler').ModelBuilder; var modeler = new ModelBuilder(); +var async = require('async'); var assert = require('assert'); /** - * The built in loopback.Model. + * The base class for **all models**. * + * **Inheriting from `Model`** + * + * ```js + * var properties = {...}; + * var options = {...}; + * var MyModel = loopback.Model.extend('MyModel', properties, options); + * ``` + * + * **Options** + * + * - `trackChanges` - If true, changes to the model will be tracked. **Required + * for replication.** + * + * **Events** + * + * #### Event: `changed` + * + * Emitted after a model has been successfully created, saved, or updated. + * + * ```js + * MyModel.on('changed', function(inst) { + * console.log('model with id %s has been changed', inst.id); + * // => model with id 1 has been changed + * }); + * ``` + * + * #### Event: `deleted` + * + * Emitted after an individual model has been deleted. + * + * ```js + * MyModel.on('deleted', function(inst) { + * console.log('model with id %s has been deleted', inst.id); + * // => model with id 1 has been deleted + * }); + * ``` + * + * #### Event: `deletedAll` + * + * Emitted after an individual model has been deleted. + * + * ```js + * MyModel.on('deletedAll', function(where) { + * if(where) { + * console.log('all models where', where, 'have been deleted'); + * // => all models where + * // => {price: {gt: 100}} + * // => have been deleted + * } + * }); + * ``` + * + * #### Event: `attached` + * + * Emitted after a `Model` has been attached to an `app`. + * + * #### Event: `dataSourceAttached` + * + * Emitted after a `Model` has been attached to a `DataSource`. + * * @class * @param {Object} data + * @property {String} modelName The name of the model + * @property {DataSource} dataSource */ var Model = module.exports = modeler.define('Model'); @@ -23,6 +86,7 @@ Model.shared = true; Model.setup = function () { var ModelCtor = this; + var options = this.settings; ModelCtor.sharedCtor = function (data, id, fn) { if(typeof data === 'function') { @@ -108,6 +172,13 @@ Model.setup = function () { ModelCtor.sharedCtor.returns = {root: true}; + ModelCtor.once('dataSourceAttached', function() { + // enable change tracking (usually for replication) + if(options.trackChanges) { + ModelCtor.enableChangeTracking(); + } + }); + return ModelCtor; }; @@ -219,7 +290,7 @@ Model.diff = function(since, remoteChanges, callback) { */ Model.changes = function(since, filter, callback) { - var idName = this.idName(); + var idName = this.dataSource.idName(this.modelName); var Change = this.getChangeModel(); var model = this; @@ -235,15 +306,16 @@ Model.changes = function(since, filter, callback) { }, function(err, changes) { if(err) return cb(err); var ids = changes.map(function(change) { - return change.modelId; + return change.modelId.toString(); }); filter.where[idName] = {inq: ids}; model.find(filter, function(err, models) { if(err) return cb(err); var modelIds = models.map(function(m) { - return m[idName]; + return m[idName].toString(); }); callback(null, changes.filter(function(ch) { + if(ch.type() === Change.DELETE) return true; return modelIds.indexOf(ch.modelId) > -1; })); }); @@ -257,7 +329,7 @@ Model.changes = function(since, filter, callback) { */ Model.checkpoint = function(cb) { - var Checkpoint = this.getChangeModel().Checkpoint; + var Checkpoint = this.getChangeModel().getCheckpointModel(); this.getSourceId(function(err, sourceId) { if(err) return cb(err); Checkpoint.create({ @@ -283,18 +355,29 @@ Model.replicate = function(since, targetModel, options, callback) { var sourceModel = this; var diff; var updates; + var Change = this.getChangeModel(); + var TargetChange = targetModel.getChangeModel(); var tasks = [ getLocalChanges, getDiffFromTarget, createSourceUpdates, bulkUpdate, - sourceModel.checkpoint.bind(sourceModel) + checkpoint ]; async.waterfall(tasks, function(err) { if(err) return callback(err); - callback(null, diff.conflicts); + var conflicts = diff.conflicts.map(function(change) { + var sourceChange = new Change({ + modelName: sourceModel.modelName, + modelId: change.modelId + }); + var targetChange = new TargetChange(change); + return new Change.Conflict(sourceChange, targetChange); + }); + + callback(null, conflicts); }); function getLocalChanges(cb) { @@ -307,12 +390,18 @@ Model.replicate = function(since, targetModel, options, callback) { function createSourceUpdates(_diff, cb) { diff = _diff; + diff.conflicts = diff.conflicts || []; sourceModel.createUpdates(diff.deltas, cb); } function bulkUpdate(updates, cb) { targetModel.bulkUpdate(updates, cb); } + + function checkpoint() { + var cb = arguments[arguments.length - 1]; + sourceModel.checkpoint(cb); + } } /** @@ -328,10 +417,10 @@ Model.createUpdates = function(deltas, cb) { var updates = []; var Model = this; var tasks = []; - var type = change.type(); deltas.forEach(function(change) { - change = new Change(change); + var change = new Change(change); + var type = change.type(); var update = {type: type, change: change}; switch(type) { case Change.CREATE: @@ -339,7 +428,11 @@ Model.createUpdates = function(deltas, cb) { tasks.push(function(cb) { Model.findById(change.modelId, function(err, inst) { if(err) return cb(err); - update.data = inst; + if(inst.toObject) { + update.data = inst.toObject(); + } else { + update.data = inst; + } updates.push(update); cb(); }); @@ -369,16 +462,22 @@ Model.createUpdates = function(deltas, cb) { Model.bulkUpdate = function(updates, callback) { var tasks = []; var Model = this; - var idName = Model.idName(); + var idName = this.dataSource.idName(this.modelName); var Change = this.getChangeModel(); updates.forEach(function(update) { switch(update.type) { case Change.UPDATE: case Change.CREATE: - tasks.push(Model.upsert.bind(Model, update.data)); + // var model = new Model(update.data); + // tasks.push(model.save.bind(model)); + tasks.push(function(cb) { + var model = new Model(update.data); + debugger; + model.save(cb); + }); break; - case: Change.DELETE: + case Change.DELETE: var data = {}; data[idName] = update.change.modelId; var model = new Model(data); @@ -391,5 +490,58 @@ Model.bulkUpdate = function(updates, callback) { } Model.getChangeModel = function() { - + var changeModel = this.Change; + if(changeModel) return changeModel; + this.Change = changeModel = require('./change').extend(this.modelName + '-change'); + changeModel.attachTo(this.dataSource); + return changeModel; +} + +Model.getSourceId = function(cb) { + cb(null, 'foo') +} + +/** + * Enable the tracking of changes made to the model. Usually for replication. + */ + +Model.enableChangeTracking = function() { + var Model = this; + var Change = Model.getChangeModel(); + var cleanupInterval = Model.settings.changeCleanupInterval || 30000; + + Model.on('changed', function(obj) { + Change.track(Model.modelName, [obj.id], function(err) { + if(err) { + console.error(Model.modelName + ' Change Tracking Error:'); + console.error(err); + } + }); + }); + + Model.on('deleted', function(obj) { + Change.track(Model.modelName, [obj.id], function(err) { + if(err) { + console.error(Model.modelName + ' Change Tracking Error:'); + console.error(err); + } + }); + }); + + Model.on('deletedAll', cleanup); + + // initial cleanup + cleanup(); + + // cleanup + setInterval(cleanup, cleanupInterval); + + function cleanup() { + Change.rectifyAll(function(err) { + if(err) { + console.error(Model.modelName + ' Change Cleanup Error:'); + console.error(err); + } + }); + } }