From 7709f19d0fef2aba6f3c0d71778deda78e58048d Mon Sep 17 00:00:00 2001 From: Nick Cuthbert Date: Sun, 3 Dec 2017 16:29:29 +0200 Subject: [PATCH 1/4] Making progress with snapshotting --- lib/actor.js | 3 + lib/index.js | 4 +- lib/persistence/persistence-engine.js | 31 ++++++++++- lib/persistence/persistent-actor.js | 61 ++++++++++++++++++--- lib/utils.js | 35 ++++++++++-- test/broken-persistence-engine.js | 4 ++ test/mock-persistence-engine.js | 9 ++- test/partially-broken-persistence-engine.js | 4 ++ test/utils.js | 14 ++++- 9 files changed, 145 insertions(+), 20 deletions(-) diff --git a/lib/actor.js b/lib/actor.js index 086e698..d506bf7 100644 --- a/lib/actor.js +++ b/lib/actor.js @@ -93,6 +93,9 @@ class Actor { if (this.immediate) { clearImmediate(this.immediate); } + if (this.timeout) { + clearTimeout(this.timeout); + } this.parent && this.parent.childStopped(this); this.reference && dereference(this.reference); delete this.reference; diff --git a/lib/index.js b/lib/index.js index 9e3fe70..87204e3 100644 --- a/lib/index.js +++ b/lib/index.js @@ -1,11 +1,11 @@ const { spawn, spawnStateless } = require('./actor'); const { stop, state$, query, dispatch } = require('./functions'); const { spawnPersistent, configurePersistence } = require('./persistence'); -const { after } = require('./utils'); +const utils = require('./utils'); module.exports = { ...require('./system'), + ...utils, spawn, - after, spawnStateless, query, dispatch, diff --git a/lib/persistence/persistence-engine.js b/lib/persistence/persistence-engine.js index 0fb257a..d0c3131 100644 --- a/lib/persistence/persistence-engine.js +++ b/lib/persistence/persistence-engine.js @@ -6,11 +6,40 @@ class AbstractPersistenceEngine { throw new Error('#events() is yet implemented'); } + latestSnapshot (persistenceKey) { + throw new Error('#latestSnapshot() is yet implemented'); + } + + takeSnapshot (persistanceSnapshot) { + throw new Error('#takeSnapshot() is yet implemented'); + } + persist (persistedEvent) { throw new Error('#persist() is not yet implemented'); } } +class PersistedSnapshot { + constructor (data, sequenceNumber, key, createdAt = new Date().getTime()) { + if (data === null || data === undefined) { + throw new Error('data should not be null or undefined'); + } + + // Sequence number should be a number. + // This is an internal error if this is not the case as this is defined by the engine and hence shouldn't + // be exposed to users of the framework + assert(typeof (sequenceNumber) === 'number'); + + this.data = data; + this.sequenceNumber = sequenceNumber; + this.key = key; + this.createdAt = createdAt; + + // A snapshot should be immutable + freeze(this); + } +} + class PersistedEvent { constructor (data, sequenceNumber, key, tags = [], createdAt = new Date().getTime()) { // data should not be undefined or null @@ -38,4 +67,4 @@ class PersistedEvent { } } -module.exports = { PersistedEvent, AbstractPersistenceEngine }; +module.exports = { PersistedEvent, PersistedSnapshot, AbstractPersistenceEngine }; diff --git a/lib/persistence/persistent-actor.js b/lib/persistence/persistent-actor.js index 8d6e1da..0035bec 100644 --- a/lib/persistence/persistent-actor.js +++ b/lib/persistence/persistent-actor.js @@ -1,11 +1,12 @@ require('rxjs'); -const { PersistedEvent } = require('./persistence-engine'); +const Rx = require('rxjs'); +const { PersistedEvent, PersistedSnapshot } = require('./persistence-engine'); const { Actor } = require('../actor'); const { Promise } = require('bluebird'); const freeze = require('deep-freeze-node'); class PersistentActor extends Actor { - constructor (parent, name, system, f, key, persistenceEngine, properties = {}) { + constructor (parent, name, system, f, key, persistenceEngine, { snapshot, ...properties } = {}) { super(parent, name, system, f, properties); if (!key) { throw new Error('Persistence key required'); @@ -18,7 +19,21 @@ class PersistentActor extends Actor { this.sequenceNumber = 0; this.busy = true; this.key = key; - setImmediate(this.recover.bind(this)); + if (snapshot) { + if (snapshot.duration) { + this.snapshotDuration = snapshot.duration; + } + + if (snapshot.messages) { + this.snapshotMessageInterval = snapshot.messages; + this.messagesToNextSnapshot = this.snapshotMessageInterval; + } + + if (!snapshot.messages && !snapshot.duration) { + throw new Error('Snapshot requires a duration and/or messages field. Correctly specifying the snapshot rule is most easily done using every()'); + } + } + setImmediate(() => this.recover()); } recover () { @@ -27,25 +42,45 @@ class PersistentActor extends Actor { // Reduce this sequence by passing it into the processor function f // Calculate for each message the sequence number // Subscribe to the end result and start processing new messages - const initialState = undefined; + this.persistenceEngine.latestSnapshot(this.key).then((snapshot) => { + let sequenceNumber = 0; + let initialState; + if (snapshot) { + initialState = snapshot.data; + sequenceNumber = snapshot.sequenceNumber; + } - this.persistenceEngine.events(this.key) + this.persistenceEngine.events(this.key, sequenceNumber) .distinct(evt => evt.sequenceNumber) .reduce( async (prev, msg, index) => { const [state] = await prev; const context = { ...this.createContext(this.reference), recovering: true }; - // Might not be an async function. Using promise.resolve to force it into that form + // Might not be an async function. Using promise.resolve to force it into that form const nextState = await Promise.resolve(this.f.call(context, freeze(state), msg.data, context)); - return [nextState, index + 1]; + return [nextState, msg.sequenceNumber, index]; }, - Promise.resolve([initialState, 0]) + Promise.resolve([initialState, sequenceNumber, 0]) ) .subscribe(async (result) => { - const [state, sequenceNumber] = await result; + const [state, sequenceNumber, messageCount] = await result; this.sequenceNumber = sequenceNumber; + this.messagesToNextSnapshot = this.snapshotMessageInterval - messageCount; + if (this.snapshotDuration) { + // If instructed to snapshot at regular intervals, create an interval observable which emits until actor is stopped + this.stopObservableSubject = new Rx.Subject(); + Rx.Observable + .interval(this.snapshotDuration) + .takeUntil(this.stopObservableSubject) + .subscribe(async () => { + const sequenceNumber = this.sequenceNumber; + const state = this.state; + await this.persistenceEngine.takeSnapshot(new PersistedSnapshot(state, sequenceNumber, this.key)); + }); + } this.processNext(state, sequenceNumber === 0); }); + }); } catch (e) { this.signalFault(e); } @@ -56,6 +91,14 @@ class PersistentActor extends Actor { return (await (this.persistenceEngine.persist(persistedEvent))).data; } + stop () { + super.stop(); + if (this.stopObservableSubject) { + this.stopObservableSubject.next(false); + this.stopObservableSubject.complete(); + } + } + createContext () { return { ...super.createContext.apply(this, arguments), persist: this.persist.bind(this) }; } diff --git a/lib/utils.js b/lib/utils.js index 55849a1..8e0ad34 100644 --- a/lib/utils.js +++ b/lib/utils.js @@ -1,11 +1,23 @@ +class AfterValue { + constructor (value) { + Object.assign(this, value); + } + + or (amount) { + const { or, ...value } = this; + return new After(amount, value); + } +} + class After { - constructor (amount) { + constructor (amount, chain = {}) { this._amount = amount; + this._chain = chain; Object.freeze(this); } get hours () { - return { duration: (this._amount * 60 * 60 * 1000) | 0 }; + return new AfterValue({ ...this._chain, duration: (this._amount * 60 * 60 * 1000) | 0 }); } get hour () { @@ -13,7 +25,7 @@ class After { } get minutes () { - return { duration: (this._amount * 60 * 1000) | 0 }; + return new AfterValue({ ...this._chain, duration: (this._amount * 60 * 1000) | 0 }); } get minute () { @@ -21,7 +33,7 @@ class After { } get seconds () { - return { duration: (this._amount * 1000) | 0 }; + return new AfterValue({ ...this._chain, duration: (this._amount * 1000) | 0 }); } get second () { @@ -29,15 +41,26 @@ class After { } get milliseconds () { - return { duration: this._amount | 0 }; + return new AfterValue({ ...this._chain, duration: this._amount | 0 }); } + get millisecond () { return this.milliseconds; } + + get messages () { + return new AfterValue({ ...this._chain, messages: this._amount }); + } + + get message () { + return this.messages; + } } const after = (amount) => new After(amount); +const every = (amount) => new After(amount); module.exports = { - after + after, + every }; diff --git a/test/broken-persistence-engine.js b/test/broken-persistence-engine.js index b43caa5..0dbb5c6 100644 --- a/test/broken-persistence-engine.js +++ b/test/broken-persistence-engine.js @@ -5,6 +5,10 @@ class BrokenPersistenceEngine extends AbstractPersistenceEngine { throw new Error('Elvis has left the building'); } + latestSnapshot (persistenceKey) { + throw new Error('#latestSnapshot() is yet implemented'); + } + persist (persistedEvent) {} } diff --git a/test/mock-persistence-engine.js b/test/mock-persistence-engine.js index 45385b8..5ec4be8 100644 --- a/test/mock-persistence-engine.js +++ b/test/mock-persistence-engine.js @@ -2,9 +2,16 @@ const { AbstractPersistenceEngine } = require('../lib/persistence'); const { Observable } = require('rxjs'); class MockPersistenceEngine extends AbstractPersistenceEngine { - constructor (events = {}) { + constructor (events = {}, snapshots = {}) { super(); this._events = events; + this._snapshots = snapshots; + } + + latestSnapshot (persistenceKey) { + const snapshots = (this._snapshots[persistenceKey] || []); + const snapshot = snapshots.length > 0 ? snapshots[snapshots.length - 1] : undefined; + return Promise.resolve(snapshot); } events (persistenceKey, offset = 0, limit, tags) { diff --git a/test/partially-broken-persistence-engine.js b/test/partially-broken-persistence-engine.js index 2598e4e..61e5c60 100644 --- a/test/partially-broken-persistence-engine.js +++ b/test/partially-broken-persistence-engine.js @@ -26,6 +26,10 @@ class PartiallyBrokenPersistenceEngine extends AbstractPersistenceEngine { }); } + latestSnapshot (persistenceKey) { + throw new Error('#latestSnapshot() is yet implemented'); + } + persist (persistedEvent) { const prev = this._events.get(persistedEvent.key) || []; this._events.set(persistedEvent.key, [...prev, persistedEvent]); diff --git a/test/utils.js b/test/utils.js index 11505ca..78aeb2f 100644 --- a/test/utils.js +++ b/test/utils.js @@ -2,7 +2,7 @@ /* eslint-disable no-unused-expressions */ const chai = require('chai'); chai.should(); -const { after } = require('../lib'); +const { after, every } = require('../lib'); describe('#after', function () { it('should correctly calculate milliseconds', function () { @@ -10,6 +10,7 @@ describe('#after', function () { after(1).millisecond.duration.should.equal(1); after(0).milliseconds.duration.should.equal(0); }); + it('should correctly calculate seconds', function () { after(1).second.duration.should.equal(1000); after(0).seconds.duration.should.equal(0); @@ -26,4 +27,15 @@ describe('#after', function () { after(1).hour.duration.should.equal(3600000); after(0).hours.duration.should.equal(0); }); + + it('should correctly set messages', function () { + after(1).message.messages.should.equal(1); + every(10).messages.messages.should.equal(10); + }); + + it('should allow the combination of duration and messages', function () { + let value = every(1).hours.or(5).messages; + value.duration.should.equal(3600000); + value.messages.should.equal(5); + }); }); From efc8904873ea7bf986f5f52f3be896f805e6eb3c Mon Sep 17 00:00:00 2001 From: Nick Cuthbert Date: Mon, 4 Dec 2017 00:25:15 +0200 Subject: [PATCH 2/4] Completed snapshotting feature --- lib/actor.js | 24 +++-- lib/persistence/index.js | 4 +- lib/persistence/persistence-engine.js | 2 +- lib/persistence/persistent-actor.js | 83 ++++++++------- lib/utils.js | 22 ++-- package.json | 2 +- test/actor.js | 2 +- test/mock-persistence-engine.js | 13 ++- test/persistence-engine.js | 42 +++++++- test/persistent-actor.js | 143 +++++++++++++++++++++----- test/utils.js | 11 +- 11 files changed, 262 insertions(+), 86 deletions(-) diff --git a/lib/actor.js b/lib/actor.js index d506bf7..46467c4 100644 --- a/lib/actor.js +++ b/lib/actor.js @@ -36,7 +36,7 @@ class Actor { throw new Error('Shutdown should be specified as a duration. It is recommended to use the after() function to do this'); } this.shutdownPeriod = shutdown.duration; - this.timeout = setTimeout(() => this.stop(), this.shutdownPeriod); + this.setTimeout(); } } @@ -44,6 +44,18 @@ class Actor { return JSON.stringify(err, Object.getOwnPropertyNames(err)); } + setTimeout () { + if (this.shutdownPeriod) { + this.timeout = setTimeout(() => this.stop(), this.shutdownPeriod); + } + } + + clearTimeout () { + if (this.shutdownPeriod) { + clearTimeout(this.timeout); + } + } + static getSafeTimeout (timeoutDuration) { timeoutDuration = timeoutDuration | 0; const MAX_TIMEOUT = 2147483647; @@ -54,10 +66,7 @@ class Actor { dispatch (message, sender = new Nobody(this.system)) { this.assertNotStopped(); - if (this.shutdownPeriod) { - clearTimeout(this.timeout); - setTimeout(() => this.stop(), this.shutdownPeriod); - } + this.clearTimeout(); if (!this.busy) { this.handleMessage(message, sender); } else { @@ -109,9 +118,9 @@ class Actor { return this.subject.asObservable(); } - processNext (next, allowUndefined = false) { + processNext (next, initial = false) { if (!this.stopped) { - if (next !== undefined || allowUndefined) { + if (next !== undefined || initial) { if (this.state !== next) { this.subject.next(next); } @@ -121,6 +130,7 @@ class Actor { this.handleMessage(message, sender); } else { this.busy = false; + this.setTimeout(); } } else { this.stop(); diff --git a/lib/persistence/index.js b/lib/persistence/index.js index a676091..566e305 100644 --- a/lib/persistence/index.js +++ b/lib/persistence/index.js @@ -1,5 +1,5 @@ const { spawnPersistent } = require('./persistent-actor'); -const { PersistedEvent, AbstractPersistenceEngine } = require('./persistence-engine'); +const { PersistedEvent, PersistedSnapshot, AbstractPersistenceEngine } = require('./persistence-engine'); const configurePersistence = (engine) => (system) => { if (!engine) { @@ -8,4 +8,4 @@ const configurePersistence = (engine) => (system) => { return Object.assign(system, { persistenceEngine: engine }); }; -module.exports = { configurePersistence, spawnPersistent, PersistedEvent, AbstractPersistenceEngine }; +module.exports = { configurePersistence, spawnPersistent, PersistedSnapshot, PersistedEvent, AbstractPersistenceEngine }; diff --git a/lib/persistence/persistence-engine.js b/lib/persistence/persistence-engine.js index d0c3131..4fb566e 100644 --- a/lib/persistence/persistence-engine.js +++ b/lib/persistence/persistence-engine.js @@ -10,7 +10,7 @@ class AbstractPersistenceEngine { throw new Error('#latestSnapshot() is yet implemented'); } - takeSnapshot (persistanceSnapshot) { + takeSnapshot (persistedSnapshot) { throw new Error('#takeSnapshot() is yet implemented'); } diff --git a/lib/persistence/persistent-actor.js b/lib/persistence/persistent-actor.js index 0035bec..c951539 100644 --- a/lib/persistence/persistent-actor.js +++ b/lib/persistence/persistent-actor.js @@ -19,20 +19,15 @@ class PersistentActor extends Actor { this.sequenceNumber = 0; this.busy = true; this.key = key; - if (snapshot) { - if (snapshot.duration) { - this.snapshotDuration = snapshot.duration; - } - if (snapshot.messages) { - this.snapshotMessageInterval = snapshot.messages; - this.messagesToNextSnapshot = this.snapshotMessageInterval; - } - - if (!snapshot.messages && !snapshot.duration) { + if (snapshot) { + this.snapshotDuration = snapshot.duration || false; + this.snapshotMessageInterval = snapshot.messageInterval || false; + if (!this.snapshotMessageInterval && !this.snapshotDuration) { throw new Error('Snapshot requires a duration and/or messages field. Correctly specifying the snapshot rule is most easily done using every()'); } } + setImmediate(() => this.recover()); } @@ -51,8 +46,8 @@ class PersistentActor extends Actor { } this.persistenceEngine.events(this.key, sequenceNumber) - .distinct(evt => evt.sequenceNumber) - .reduce( + .distinct(evt => evt.sequenceNumber) + .reduce( async (prev, msg, index) => { const [state] = await prev; const context = { ...this.createContext(this.reference), recovering: true }; @@ -61,31 +56,49 @@ class PersistentActor extends Actor { return [nextState, msg.sequenceNumber, index]; }, Promise.resolve([initialState, sequenceNumber, 0]) - ) - .subscribe(async (result) => { - const [state, sequenceNumber, messageCount] = await result; - this.sequenceNumber = sequenceNumber; - this.messagesToNextSnapshot = this.snapshotMessageInterval - messageCount; - if (this.snapshotDuration) { - // If instructed to snapshot at regular intervals, create an interval observable which emits until actor is stopped - this.stopObservableSubject = new Rx.Subject(); - Rx.Observable - .interval(this.snapshotDuration) - .takeUntil(this.stopObservableSubject) - .subscribe(async () => { - const sequenceNumber = this.sequenceNumber; - const state = this.state; - await this.persistenceEngine.takeSnapshot(new PersistedSnapshot(state, sequenceNumber, this.key)); - }); - } - this.processNext(state, sequenceNumber === 0); - }); + ) + .subscribe(async (result) => { + const [state, sequenceNumber, messageCount] = await result; + this.sequenceNumber = sequenceNumber; + if (this.snapshotMessageInterval) { + this.messagesToNextSnapshot = this.snapshotMessageInterval - messageCount; + } + if (this.snapshotDuration) { + // If instructed to snapshot at regular intervals, create an interval observable which emits until actor is stopped + this.stop$ = new Rx.Subject(); + Rx.Observable + .interval(this.snapshotDuration) + .takeUntil(this.stop$) + .subscribe(async () => { + const sequenceNumber = this.sequenceNumber; + const state = this.state; + await this.persistenceEngine.takeSnapshot(new PersistedSnapshot(state, sequenceNumber, this.key)); + }); + } + this.processNext(state, sequenceNumber === 0); + }); }); } catch (e) { this.signalFault(e); } } + async processNext (next, initial = false) { + if (!this.stopped && this.snapshotMessageInterval && !initial) { + --this.messagesToNextSnapshot; + if (this.messagesToNextSnapshot <= 0) { + this.messagesToNextSnapshot = this.snapshotMessageInterval; + const snapshot = new PersistedSnapshot(next, this.sequenceNumber, this.key); + try { + await this.persistenceEngine.takeSnapshot(snapshot); + } catch (e) { + console.error(`Failed to take snapshot ${e}`); + } + } + } + super.processNext(next, initial); + } + async persist (msg, tags = []) { const persistedEvent = new PersistedEvent(msg, ++this.sequenceNumber, this.key, tags); return (await (this.persistenceEngine.persist(persistedEvent))).data; @@ -93,9 +106,9 @@ class PersistentActor extends Actor { stop () { super.stop(); - if (this.stopObservableSubject) { - this.stopObservableSubject.next(false); - this.stopObservableSubject.complete(); + if (this.stop$) { + this.stop$.next(false); + this.stop$.complete(); } } @@ -110,7 +123,7 @@ const spawnPersistent = (reference, f, key, name, properties) => reference, parent => applyOrThrowIfStopped( parent.system, - system => new PersistentActor(parent, name, parent.system, f, key, system.persistenceEngine) + system => new PersistentActor(parent, name, parent.system, f, key, system.persistenceEngine, properties) ).reference ); diff --git a/lib/utils.js b/lib/utils.js index 8e0ad34..49325ba 100644 --- a/lib/utils.js +++ b/lib/utils.js @@ -4,20 +4,26 @@ class AfterValue { } or (amount) { - const { or, ...value } = this; - return new After(amount, value); + const { or, and, ...value } = this; + return new After(amount, value, false); + } + + and (amount) { + const { or, and, ...value } = this; + return new After(amount, value, true); } } class After { - constructor (amount, chain = {}) { + constructor (amount, chain = {}, addPreviousDuration) { this._amount = amount; this._chain = chain; + this._previousDuration = (addPreviousDuration && this._chain.duration) ? this._chain.duration : 0; Object.freeze(this); } get hours () { - return new AfterValue({ ...this._chain, duration: (this._amount * 60 * 60 * 1000) | 0 }); + return new AfterValue({ ...this._chain, duration: (this._amount * 60 * 60 * 1000 + this._previousDuration) | 0 }); } get hour () { @@ -25,7 +31,7 @@ class After { } get minutes () { - return new AfterValue({ ...this._chain, duration: (this._amount * 60 * 1000) | 0 }); + return new AfterValue({ ...this._chain, duration: (this._amount * 60 * 1000 + this._previousDuration) | 0 }); } get minute () { @@ -33,7 +39,7 @@ class After { } get seconds () { - return new AfterValue({ ...this._chain, duration: (this._amount * 1000) | 0 }); + return new AfterValue({ ...this._chain, duration: (this._amount * 1000 + this._previousDuration) | 0 }); } get second () { @@ -41,7 +47,7 @@ class After { } get milliseconds () { - return new AfterValue({ ...this._chain, duration: this._amount | 0 }); + return new AfterValue({ ...this._chain, duration: (this._amount + this._previousDuration) | 0 }); } get millisecond () { @@ -49,7 +55,7 @@ class After { } get messages () { - return new AfterValue({ ...this._chain, messages: this._amount }); + return new AfterValue({ ...this._chain, messageInterval: this._amount }); } get message () { diff --git a/package.json b/package.json index 3baf11c..9e09429 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "nact", - "version": "3.2.0", + "version": "4.0.0", "description": "nact ⇒ node.js + actors = your services have never been so µ", "main": "lib/index.js", "scripts": { diff --git a/test/actor.js b/test/actor.js index 05baa91..f775928 100644 --- a/test/actor.js +++ b/test/actor.js @@ -220,7 +220,7 @@ describe('Actor', function () { }); it('should throw if timeout does not include a duration field', async function () { - (() => spawnStateless(system, ignore, 'test1', { shutdown: {} })).should.throw(); + (() => spawnStateless(system, ignore, 'test1', { shutdown: {} })).should.throw(Error); }); }); diff --git a/test/mock-persistence-engine.js b/test/mock-persistence-engine.js index 5ec4be8..9bc6128 100644 --- a/test/mock-persistence-engine.js +++ b/test/mock-persistence-engine.js @@ -2,10 +2,11 @@ const { AbstractPersistenceEngine } = require('../lib/persistence'); const { Observable } = require('rxjs'); class MockPersistenceEngine extends AbstractPersistenceEngine { - constructor (events = {}, snapshots = {}) { + constructor (events = {}, snapshots = {}, takeSnapshotIsWorking = true) { super(); this._events = events; this._snapshots = snapshots; + this.takeSnapshotIsWorking = takeSnapshotIsWorking; } latestSnapshot (persistenceKey) { @@ -14,6 +15,16 @@ class MockPersistenceEngine extends AbstractPersistenceEngine { return Promise.resolve(snapshot); } + takeSnapshot (persistedSnapshot) { + if (this.takeSnapshotIsWorking) { + const prev = this._snapshots[persistedSnapshot.key] || []; + this._snapshots[persistedSnapshot.key] = [...prev, persistedSnapshot]; + return Promise.resolve(persistedSnapshot); + } else { + throw new Error('Elvis has left the building'); + } + } + events (persistenceKey, offset = 0, limit, tags) { const persistedEvents = (this._events[persistenceKey] || []); const slice = persistedEvents.slice(offset, limit ? offset + limit : undefined); diff --git a/test/persistence-engine.js b/test/persistence-engine.js index 9170eac..99a99ef 100644 --- a/test/persistence-engine.js +++ b/test/persistence-engine.js @@ -2,7 +2,44 @@ /* eslint-disable no-unused-expressions */ const chai = require('chai'); chai.should(); -const { PersistedEvent, AbstractPersistenceEngine } = require('../lib/persistence'); +const { PersistedEvent, PersistedSnapshot, AbstractPersistenceEngine } = require('../lib/persistence'); + +describe('PersistedSnapshot', function () { + it('should be immutable', function () { + const event = new PersistedSnapshot('123', 1, 'test-key'); + event.sequenceNumber = 2; + event.sequenceNumber.should.equal(1); + event.data = '234'; + event.data.should.equal('123'); + event.key = 'another-test-key'; + event.key.should.equal('test-key'); + }); + + describe('#data', function () { + it('should disallow null values', function () { + (() => new PersistedSnapshot(null, 1, 'test-key')).should.throw(Error); + }); + it('should disallow undefined data values', function () { + (() => new PersistedSnapshot(undefined, 1, 'test-key')).should.throw(Error); + }); + it('should disallow non-number sequenceNums', function () { + (() => new PersistedSnapshot({ msg: 'test' }, '1', 'test-key')).should.throw(Error); + }); + }); + + describe('#createdAt', function () { + it('should be able to be explicitely set', function () { + new PersistedSnapshot({ msg: 'test' }, 1, 'test-key', 123456).createdAt.should.equal(123456); + }); + + it('should default to the current time', function () { + const oldGetTime = global.Date.prototype.getTime; + global.Date.prototype.getTime = () => 123456; + new PersistedSnapshot({ msg: 'test' }, 1, 'test-key').createdAt.should.equal(123456); + global.Date.prototype.getTime = oldGetTime; + }); + }); +}); describe('PersistedEvent', function () { it('should be immutable', function () { @@ -61,8 +98,11 @@ describe('PersistedEvent', function () { describe('AbstractPersistenceEngine', function () { it('should throw when functions are invoked', function () { const event = new PersistedEvent({ msg: '234' }, 1, 'test-key', []); + const snapshot = new PersistedSnapshot('234', 1, 'test-key'); const abstractEngine = new AbstractPersistenceEngine(); (() => abstractEngine.events('123', 1)).should.throw(Error); (() => abstractEngine.persist(event)).should.throw(Error); + (() => abstractEngine.latestSnapshot('123')).should.throw(Error); + (() => abstractEngine.takeSnapshot(snapshot)).should.throw(Error); }); }); diff --git a/test/persistent-actor.js b/test/persistent-actor.js index 5991ace..947e4ad 100644 --- a/test/persistent-actor.js +++ b/test/persistent-actor.js @@ -5,8 +5,8 @@ chai.should(); const { MockPersistenceEngine } = require('./mock-persistence-engine'); const { BrokenPersistenceEngine } = require('./broken-persistence-engine'); const { PartiallyBrokenPersistenceEngine } = require('./partially-broken-persistence-engine'); -const { start, dispatch, query, stop } = require('../lib'); -const { PersistedEvent, spawnPersistent, configurePersistence } = require('../lib/persistence'); +const { start, dispatch, query, stop, every } = require('../lib'); +const { PersistedEvent, PersistedSnapshot, spawnPersistent, configurePersistence } = require('../lib/persistence'); const chaiAsPromised = require('chai-as-promised'); chai.use(chaiAsPromised); const { Promise } = require('bluebird'); @@ -22,7 +22,7 @@ const isStopped = (reference) => { }; // Begin helpers -const ignore = () => {}; +const ignore = () => { }; const retry = async (assertion, remainingAttempts, retryInterval = 0) => { if (remainingAttempts <= 1) { @@ -96,10 +96,10 @@ describe('PersistentActor', () => { const persistenceEngine = new MockPersistenceEngine({ test: events }); system = start(configurePersistence(persistenceEngine)); const actor = spawnPersistent( - system, - concatenativeFunction(''), - 'test' - ); + system, + concatenativeFunction(''), + 'test' + ); dispatch(actor, '1'); dispatch(actor, '2'); dispatch(actor, '3'); @@ -111,10 +111,10 @@ describe('PersistentActor', () => { const persistenceEngine = new MockPersistenceEngine(); system = start(configurePersistence(persistenceEngine)); const actor = spawnPersistent( - system, - concatenativeFunction('', (state, msg, ctx) => !ctx.recovering && ctx.persist(msg)), - 'test' - ); + system, + concatenativeFunction('', (state, msg, ctx) => !ctx.recovering && ctx.persist(msg)), + 'test' + ); dispatch(actor, 'a'); dispatch(actor, 'b'); dispatch(actor, 'c'); @@ -127,10 +127,10 @@ describe('PersistentActor', () => { const persistenceEngine = new BrokenPersistenceEngine(); system = start(configurePersistence(persistenceEngine)); const actor = spawnPersistent( - system, - concatenativeFunction(''), - 'test' - ); + system, + concatenativeFunction(''), + 'test' + ); await retry(() => isStopped(actor).should.be.true, 5, 10); }); @@ -154,19 +154,110 @@ describe('PersistentActor', () => { const persistenceEngine = new MockPersistenceEngine({ iceland: previousEvents }); system = start(configurePersistence(persistenceEngine)); const actor = spawnPersistent( - system, - concatenativeFunction('', (state, msg, ctx) => { - if (!ctx.recovering) { - console.log('persisting'); - return ctx.persist(msg); - } - }), - 'iceland' - ); + system, + concatenativeFunction('', async (state, msg, ctx) => { + if (!ctx.recovering) { + console.log('persisting'); + await ctx.persist(msg); + } + }), + 'iceland' + ); dispatch(actor, ', very cold indeed'); await retry(() => persistenceEngine._events['iceland'].map((evt, i) => evt.sequenceNumber === i + 1) - .should.deep.equal(new Array(previousState.length + 1).fill(true)) - , 5, 20); + .should.deep.equal(new Array(previousState.length + 1).fill(true)) + , 5, 20); + }); + + it('should be able to restore a snapshot and replay events exluding those that were persisted before the snapshot', async () => { + const previousState = 'icelandiscold'; + const expectedState = 'greenlandiscold'; + const previousEvents = [...previousState].map((evt, i) => new PersistedEvent(evt, i + 1, 'iceland')); + const persistenceEngine = new MockPersistenceEngine({ iceland: previousEvents }, { iceland: [new PersistedSnapshot('green', 3, 'iceland')] }); + system = start(configurePersistence(persistenceEngine)); + const actor = spawnPersistent( + system, + concatenativeFunction(''), + 'iceland' + ); + (await query(actor, '', 30)).should.equal(expectedState); + }); + + it('should be able to restore a snapshot and replay events exluding those that were persisted before the snapshot', async () => { + const previousState = 'icelandiscold'; + const expectedState = 'greenlandiscold'; + const previousEvents = [...previousState].map((evt, i) => new PersistedEvent(evt, i + 1, 'iceland')); + const persistenceEngine = new MockPersistenceEngine({ iceland: previousEvents }, { iceland: [new PersistedSnapshot('green', 3, 'iceland')] }); + system = start(configurePersistence(persistenceEngine)); + const actor = spawnPersistent( + system, + concatenativeFunction(''), + 'iceland' + ); + (await query(actor, '', 30)).should.equal(expectedState); + }); + + it('should be able to persist a snapshot after a given number of messages', async () => { + const persistenceEngine = new MockPersistenceEngine(); + system = start(configurePersistence(persistenceEngine)); + const actor = spawnPersistent( + system, + concatenativeFunction(''), + 'iceland', + 'test', + { snapshot: every(5).messages } + ); + const expectedResult = 'iceland is cold'; + expectedResult.split('').forEach(msg => { + dispatch(actor, msg); + }); + (await query(actor, '!', 30)); + const snapshots = persistenceEngine._snapshots['iceland']; + snapshots.length.should.equal(3); + snapshots[snapshots.length - 1].data.should.equal(expectedResult); + }); + + it('should be able to persist a snapshot after a specified duration', async () => { + const persistenceEngine = new MockPersistenceEngine(); + system = start(configurePersistence(persistenceEngine)); + const actor = spawnPersistent( + system, + concatenativeFunction(''), + 'iceland', + 'test', + { snapshot: every(40).milliseconds } + ); + const expectedResult = 'iceland is cold'; + expectedResult.split('').forEach(msg => { + dispatch(actor, msg); + }); + await delay(50); + const snapshots = persistenceEngine._snapshots['iceland']; + snapshots[snapshots.length - 1].data.should.equal(expectedResult); + }); + + it('should be able to continue processing messages even after failing to save a snapshot', async () => { + console.error = ignore; + const persistenceEngine = new MockPersistenceEngine(undefined, undefined, false); // Disable takeSnapshot + system = start(configurePersistence(persistenceEngine)); + const actor = spawnPersistent( + system, + concatenativeFunction(''), + 'iceland', + 'test', + { snapshot: every(5).messages } + ); + const expectedResult = 'iceland is cold'; + expectedResult.split('').forEach(msg => { + dispatch(actor, msg); + }); + (await query(actor, '', 30)).should.equal(expectedResult); + }); + + it('should throw if timeout does not include a duration field', async function () { + const persistenceEngine = new MockPersistenceEngine(); // Disable takeSnapshot + system = start(configurePersistence(persistenceEngine)); + (() => spawnPersistent(system, ignore, 'test1', undefined, { snapshot: {} })).should.throw(Error); }); }); diff --git a/test/utils.js b/test/utils.js index 78aeb2f..e6aec73 100644 --- a/test/utils.js +++ b/test/utils.js @@ -28,14 +28,19 @@ describe('#after', function () { after(0).hours.duration.should.equal(0); }); + it('should correctly calculat duration when using and operator', function () { + after(1).hours.and(5).milliseconds.duration.should.equal(3600005); + after(0).hours.and(3).minutes.duration.should.equal(180000); + }); + it('should correctly set messages', function () { - after(1).message.messages.should.equal(1); - every(10).messages.messages.should.equal(10); + after(1).message.messageInterval.should.equal(1); + every(10).messages.messageInterval.should.equal(10); }); it('should allow the combination of duration and messages', function () { let value = every(1).hours.or(5).messages; value.duration.should.equal(3600000); - value.messages.should.equal(5); + value.messageInterval.should.equal(5); }); }); From 9114783ae57d68eae507186aa34e271398f70c8b Mon Sep 17 00:00:00 2001 From: Nick Cuthbert Date: Mon, 4 Dec 2017 00:57:52 +0200 Subject: [PATCH 3/4] Refactored the code a fair bit to reduce complexity --- lib/actor.js | 19 +++++----- lib/persistence/persistent-actor.js | 56 ++++++++++++++++------------- test/persistent-actor.js | 7 ++-- 3 files changed, 45 insertions(+), 37 deletions(-) diff --git a/lib/actor.js b/lib/actor.js index 46467c4..cec0966 100644 --- a/lib/actor.js +++ b/lib/actor.js @@ -35,7 +35,7 @@ class Actor { if (!shutdown.duration) { throw new Error('Shutdown should be specified as a duration. It is recommended to use the after() function to do this'); } - this.shutdownPeriod = shutdown.duration; + this.shutdownPeriod = Actor.getSafeTimeout(shutdown.duration); this.setTimeout(); } } @@ -51,9 +51,11 @@ class Actor { } clearTimeout () { - if (this.shutdownPeriod) { - clearTimeout(this.timeout); - } + clearTimeout(this.timeout); + } + + clearImmediate () { + clearImmediate(this.immediate); } static getSafeTimeout (timeoutDuration) { @@ -99,12 +101,8 @@ class Actor { } stop () { - if (this.immediate) { - clearImmediate(this.immediate); - } - if (this.timeout) { - clearTimeout(this.timeout); - } + this.clearImmediate(); + this.clearTimeout(); this.parent && this.parent.childStopped(this); this.reference && dereference(this.reference); delete this.reference; @@ -130,6 +128,7 @@ class Actor { this.handleMessage(message, sender); } else { this.busy = false; + // Counter is now ticking until actor is killed this.setTimeout(); } } else { diff --git a/lib/persistence/persistent-actor.js b/lib/persistence/persistent-actor.js index c951539..0731735 100644 --- a/lib/persistence/persistent-actor.js +++ b/lib/persistence/persistent-actor.js @@ -1,5 +1,4 @@ require('rxjs'); -const Rx = require('rxjs'); const { PersistedEvent, PersistedSnapshot } = require('./persistence-engine'); const { Actor } = require('../actor'); const { Promise } = require('bluebird'); @@ -21,7 +20,7 @@ class PersistentActor extends Actor { this.key = key; if (snapshot) { - this.snapshotDuration = snapshot.duration || false; + this.snapshotDuration = snapshot.duration ? Actor.getSafeTimeout(snapshot.duration) : false; this.snapshotMessageInterval = snapshot.messageInterval || false; if (!this.snapshotMessageInterval && !this.snapshotDuration) { throw new Error('Snapshot requires a duration and/or messages field. Correctly specifying the snapshot rule is most easily done using every()'); @@ -33,6 +32,7 @@ class PersistentActor extends Actor { recover () { try { + this.clearTimeout(); // Create an observable sequence of events // Reduce this sequence by passing it into the processor function f // Calculate for each message the sequence number @@ -58,23 +58,15 @@ class PersistentActor extends Actor { Promise.resolve([initialState, sequenceNumber, 0]) ) .subscribe(async (result) => { + // Message count can be different to sequenceNumber if events have been deleted from the database const [state, sequenceNumber, messageCount] = await result; this.sequenceNumber = sequenceNumber; + if (this.snapshotMessageInterval) { this.messagesToNextSnapshot = this.snapshotMessageInterval - messageCount; } - if (this.snapshotDuration) { - // If instructed to snapshot at regular intervals, create an interval observable which emits until actor is stopped - this.stop$ = new Rx.Subject(); - Rx.Observable - .interval(this.snapshotDuration) - .takeUntil(this.stop$) - .subscribe(async () => { - const sequenceNumber = this.sequenceNumber; - const state = this.state; - await this.persistenceEngine.takeSnapshot(new PersistedSnapshot(state, sequenceNumber, this.key)); - }); - } + + this.resetSnapshotInterval(); this.processNext(state, sequenceNumber === 0); }); }); @@ -83,22 +75,41 @@ class PersistentActor extends Actor { } } + resetSnapshotInterval () { + if (this.snapshotDuration) { + clearInterval(this.snapshotInterval); + this.snapshotInterval = setInterval(async () => { + const snapshot = new PersistedSnapshot(this.state, this.sequenceNumber, this.key); + try { + await this.persistenceEngine.takeSnapshot(snapshot); + } catch (e) { + console.error(`Failed to save snapshot ${e}`); + } + }, this.snapshotDuration); + } + } + async processNext (next, initial = false) { if (!this.stopped && this.snapshotMessageInterval && !initial) { --this.messagesToNextSnapshot; if (this.messagesToNextSnapshot <= 0) { + this.resetSnapshotInterval(); this.messagesToNextSnapshot = this.snapshotMessageInterval; - const snapshot = new PersistedSnapshot(next, this.sequenceNumber, this.key); - try { - await this.persistenceEngine.takeSnapshot(snapshot); - } catch (e) { - console.error(`Failed to take snapshot ${e}`); - } + await this.takeSnapshot(next); } } super.processNext(next, initial); } + async takeSnapshot (state) { + try { + const snapshot = new PersistedSnapshot(state, this.sequenceNumber, this.key); + await this.persistenceEngine.takeSnapshot(snapshot); + } catch (e) { + console.error(`Failed to take snapshot ${e}`); + } + } + async persist (msg, tags = []) { const persistedEvent = new PersistedEvent(msg, ++this.sequenceNumber, this.key, tags); return (await (this.persistenceEngine.persist(persistedEvent))).data; @@ -106,10 +117,7 @@ class PersistentActor extends Actor { stop () { super.stop(); - if (this.stop$) { - this.stop$.next(false); - this.stop$.complete(); - } + clearInterval(this.snapshotInterval); } createContext () { diff --git a/test/persistent-actor.js b/test/persistent-actor.js index 947e4ad..05838fc 100644 --- a/test/persistent-actor.js +++ b/test/persistent-actor.js @@ -237,7 +237,7 @@ describe('PersistentActor', () => { snapshots[snapshots.length - 1].data.should.equal(expectedResult); }); - it('should be able to continue processing messages even after failing to save a snapshot', async () => { + it('should be able to continue processing messages even after failing to save a snapshot when snapshotting', async () => { console.error = ignore; const persistenceEngine = new MockPersistenceEngine(undefined, undefined, false); // Disable takeSnapshot system = start(configurePersistence(persistenceEngine)); @@ -246,16 +246,17 @@ describe('PersistentActor', () => { concatenativeFunction(''), 'iceland', 'test', - { snapshot: every(5).messages } + { snapshot: every(5).messages.and(30).milliseconds } ); const expectedResult = 'iceland is cold'; expectedResult.split('').forEach(msg => { dispatch(actor, msg); }); + await delay(50); (await query(actor, '', 30)).should.equal(expectedResult); }); - it('should throw if timeout does not include a duration field', async function () { + it('should throw if snapshot does not include a duration field', async function () { const persistenceEngine = new MockPersistenceEngine(); // Disable takeSnapshot system = start(configurePersistence(persistenceEngine)); (() => spawnPersistent(system, ignore, 'test1', undefined, { snapshot: {} })).should.throw(Error); From 745c6463f97712c5aacf2b53717c5b5bdebbc0ba Mon Sep 17 00:00:00 2001 From: Nick Cuthbert Date: Mon, 4 Dec 2017 01:04:44 +0200 Subject: [PATCH 4/4] Refactored the code a fair bit to reduce complexity --- lib/persistence/persistent-actor.js | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/lib/persistence/persistent-actor.js b/lib/persistence/persistent-actor.js index 0731735..909f729 100644 --- a/lib/persistence/persistent-actor.js +++ b/lib/persistence/persistent-actor.js @@ -47,16 +47,13 @@ class PersistentActor extends Actor { this.persistenceEngine.events(this.key, sequenceNumber) .distinct(evt => evt.sequenceNumber) - .reduce( - async (prev, msg, index) => { + .reduce(async (prev, msg, index) => { const [state] = await prev; const context = { ...this.createContext(this.reference), recovering: true }; // Might not be an async function. Using promise.resolve to force it into that form const nextState = await Promise.resolve(this.f.call(context, freeze(state), msg.data, context)); return [nextState, msg.sequenceNumber, index]; - }, - Promise.resolve([initialState, sequenceNumber, 0]) - ) + }, Promise.resolve([initialState, sequenceNumber, 0])) .subscribe(async (result) => { // Message count can be different to sequenceNumber if events have been deleted from the database const [state, sequenceNumber, messageCount] = await result;