Skip to content

Commit

Permalink
Merge pull request #26 from ncthbrt/feature/snapshotting
Browse files Browse the repository at this point in the history
Feature/snapshotting
  • Loading branch information
ncthbrt authored Dec 3, 2017
2 parents 3d56554 + 745c646 commit b9343da
Show file tree
Hide file tree
Showing 14 changed files with 378 additions and 72 deletions.
34 changes: 23 additions & 11 deletions lib/actor.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,29 @@ class Actor {
if (!shutdown.duration) {
throw new Error('Shutdown should be specified as a duration. It is recommended to use the after() function to do this');
}
this.shutdownPeriod = shutdown.duration;
this.timeout = setTimeout(() => this.stop(), this.shutdownPeriod);
this.shutdownPeriod = Actor.getSafeTimeout(shutdown.duration);
this.setTimeout();
}
}

static serializeErr (err) {
return JSON.stringify(err, Object.getOwnPropertyNames(err));
}

setTimeout () {
if (this.shutdownPeriod) {
this.timeout = setTimeout(() => this.stop(), this.shutdownPeriod);
}
}

clearTimeout () {
clearTimeout(this.timeout);
}

clearImmediate () {
clearImmediate(this.immediate);
}

static getSafeTimeout (timeoutDuration) {
timeoutDuration = timeoutDuration | 0;
const MAX_TIMEOUT = 2147483647;
Expand All @@ -54,10 +68,7 @@ class Actor {

dispatch (message, sender = new Nobody(this.system)) {
this.assertNotStopped();
if (this.shutdownPeriod) {
clearTimeout(this.timeout);
setTimeout(() => this.stop(), this.shutdownPeriod);
}
this.clearTimeout();
if (!this.busy) {
this.handleMessage(message, sender);
} else {
Expand Down Expand Up @@ -90,9 +101,8 @@ class Actor {
}

stop () {
if (this.immediate) {
clearImmediate(this.immediate);
}
this.clearImmediate();
this.clearTimeout();
this.parent && this.parent.childStopped(this);
this.reference && dereference(this.reference);
delete this.reference;
Expand All @@ -106,9 +116,9 @@ class Actor {
return this.subject.asObservable();
}

processNext (next, allowUndefined = false) {
processNext (next, initial = false) {
if (!this.stopped) {
if (next !== undefined || allowUndefined) {
if (next !== undefined || initial) {
if (this.state !== next) {
this.subject.next(next);
}
Expand All @@ -118,6 +128,8 @@ class Actor {
this.handleMessage(message, sender);
} else {
this.busy = false;
// Counter is now ticking until actor is killed
this.setTimeout();
}
} else {
this.stop();
Expand Down
4 changes: 2 additions & 2 deletions lib/index.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
const { spawn, spawnStateless } = require('./actor');
const { stop, state$, query, dispatch } = require('./functions');
const { spawnPersistent, configurePersistence } = require('./persistence');
const { after } = require('./utils');
const utils = require('./utils');
module.exports = {
...require('./system'),
...utils,
spawn,
after,
spawnStateless,
query,
dispatch,
Expand Down
4 changes: 2 additions & 2 deletions lib/persistence/index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
const { spawnPersistent } = require('./persistent-actor');
const { PersistedEvent, AbstractPersistenceEngine } = require('./persistence-engine');
const { PersistedEvent, PersistedSnapshot, AbstractPersistenceEngine } = require('./persistence-engine');

const configurePersistence = (engine) => (system) => {
if (!engine) {
Expand All @@ -8,4 +8,4 @@ const configurePersistence = (engine) => (system) => {
return Object.assign(system, { persistenceEngine: engine });
};

module.exports = { configurePersistence, spawnPersistent, PersistedEvent, AbstractPersistenceEngine };
module.exports = { configurePersistence, spawnPersistent, PersistedSnapshot, PersistedEvent, AbstractPersistenceEngine };
31 changes: 30 additions & 1 deletion lib/persistence/persistence-engine.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,40 @@ class AbstractPersistenceEngine {
throw new Error('#events() is yet implemented');
}

latestSnapshot (persistenceKey) {
throw new Error('#latestSnapshot() is yet implemented');
}

takeSnapshot (persistedSnapshot) {
throw new Error('#takeSnapshot() is yet implemented');
}

persist (persistedEvent) {
throw new Error('#persist() is not yet implemented');
}
}

class PersistedSnapshot {
constructor (data, sequenceNumber, key, createdAt = new Date().getTime()) {
if (data === null || data === undefined) {
throw new Error('data should not be null or undefined');
}

// Sequence number should be a number.
// This is an internal error if this is not the case as this is defined by the engine and hence shouldn't
// be exposed to users of the framework
assert(typeof (sequenceNumber) === 'number');

this.data = data;
this.sequenceNumber = sequenceNumber;
this.key = key;
this.createdAt = createdAt;

// A snapshot should be immutable
freeze(this);
}
}

class PersistedEvent {
constructor (data, sequenceNumber, key, tags = [], createdAt = new Date().getTime()) {
// data should not be undefined or null
Expand Down Expand Up @@ -38,4 +67,4 @@ class PersistedEvent {
}
}

module.exports = { PersistedEvent, AbstractPersistenceEngine };
module.exports = { PersistedEvent, PersistedSnapshot, AbstractPersistenceEngine };
99 changes: 80 additions & 19 deletions lib/persistence/persistent-actor.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
require('rxjs');
const { PersistedEvent } = require('./persistence-engine');
const { PersistedEvent, PersistedSnapshot } = require('./persistence-engine');
const { Actor } = require('../actor');
const { Promise } = require('bluebird');
const freeze = require('deep-freeze-node');

class PersistentActor extends Actor {
constructor (parent, name, system, f, key, persistenceEngine, properties = {}) {
constructor (parent, name, system, f, key, persistenceEngine, { snapshot, ...properties } = {}) {
super(parent, name, system, f, properties);
if (!key) {
throw new Error('Persistence key required');
Expand All @@ -18,44 +18,105 @@ class PersistentActor extends Actor {
this.sequenceNumber = 0;
this.busy = true;
this.key = key;
setImmediate(this.recover.bind(this));

if (snapshot) {
this.snapshotDuration = snapshot.duration ? Actor.getSafeTimeout(snapshot.duration) : false;
this.snapshotMessageInterval = snapshot.messageInterval || false;
if (!this.snapshotMessageInterval && !this.snapshotDuration) {
throw new Error('Snapshot requires a duration and/or messages field. Correctly specifying the snapshot rule is most easily done using every()');
}
}

setImmediate(() => this.recover());
}

recover () {
try {
this.clearTimeout();
// Create an observable sequence of events
// Reduce this sequence by passing it into the processor function f
// Calculate for each message the sequence number
// Subscribe to the end result and start processing new messages
const initialState = undefined;
this.persistenceEngine.latestSnapshot(this.key).then((snapshot) => {
let sequenceNumber = 0;
let initialState;
if (snapshot) {
initialState = snapshot.data;
sequenceNumber = snapshot.sequenceNumber;
}

this.persistenceEngine.events(this.key)
.distinct(evt => evt.sequenceNumber)
.reduce(
async (prev, msg, index) => {
this.persistenceEngine.events(this.key, sequenceNumber)
.distinct(evt => evt.sequenceNumber)
.reduce(async (prev, msg, index) => {
const [state] = await prev;
const context = { ...this.createContext(this.reference), recovering: true };
// Might not be an async function. Using promise.resolve to force it into that form
// Might not be an async function. Using promise.resolve to force it into that form
const nextState = await Promise.resolve(this.f.call(context, freeze(state), msg.data, context));
return [nextState, index + 1];
},
Promise.resolve([initialState, 0])
)
.subscribe(async (result) => {
const [state, sequenceNumber] = await result;
this.sequenceNumber = sequenceNumber;
this.processNext(state, sequenceNumber === 0);
});
return [nextState, msg.sequenceNumber, index];
}, Promise.resolve([initialState, sequenceNumber, 0]))
.subscribe(async (result) => {
// Message count can be different to sequenceNumber if events have been deleted from the database
const [state, sequenceNumber, messageCount] = await result;
this.sequenceNumber = sequenceNumber;

if (this.snapshotMessageInterval) {
this.messagesToNextSnapshot = this.snapshotMessageInterval - messageCount;
}

this.resetSnapshotInterval();
this.processNext(state, sequenceNumber === 0);
});
});
} catch (e) {
this.signalFault(e);
}
}

resetSnapshotInterval () {
if (this.snapshotDuration) {
clearInterval(this.snapshotInterval);
this.snapshotInterval = setInterval(async () => {
const snapshot = new PersistedSnapshot(this.state, this.sequenceNumber, this.key);
try {
await this.persistenceEngine.takeSnapshot(snapshot);
} catch (e) {
console.error(`Failed to save snapshot ${e}`);
}
}, this.snapshotDuration);
}
}

async processNext (next, initial = false) {
if (!this.stopped && this.snapshotMessageInterval && !initial) {
--this.messagesToNextSnapshot;
if (this.messagesToNextSnapshot <= 0) {
this.resetSnapshotInterval();
this.messagesToNextSnapshot = this.snapshotMessageInterval;
await this.takeSnapshot(next);
}
}
super.processNext(next, initial);
}

async takeSnapshot (state) {
try {
const snapshot = new PersistedSnapshot(state, this.sequenceNumber, this.key);
await this.persistenceEngine.takeSnapshot(snapshot);
} catch (e) {
console.error(`Failed to take snapshot ${e}`);
}
}

async persist (msg, tags = []) {
const persistedEvent = new PersistedEvent(msg, ++this.sequenceNumber, this.key, tags);
return (await (this.persistenceEngine.persist(persistedEvent))).data;
}

stop () {
super.stop();
clearInterval(this.snapshotInterval);
}

createContext () {
return { ...super.createContext.apply(this, arguments), persist: this.persist.bind(this) };
}
Expand All @@ -67,7 +128,7 @@ const spawnPersistent = (reference, f, key, name, properties) =>
reference,
parent => applyOrThrowIfStopped(
parent.system,
system => new PersistentActor(parent, name, parent.system, f, key, system.persistenceEngine)
system => new PersistentActor(parent, name, parent.system, f, key, system.persistenceEngine, properties)
).reference
);

Expand Down
41 changes: 35 additions & 6 deletions lib/utils.js
Original file line number Diff line number Diff line change
@@ -1,43 +1,72 @@
class AfterValue {
constructor (value) {
Object.assign(this, value);
}

or (amount) {
const { or, and, ...value } = this;
return new After(amount, value, false);
}

and (amount) {
const { or, and, ...value } = this;
return new After(amount, value, true);
}
}

class After {
constructor (amount) {
constructor (amount, chain = {}, addPreviousDuration) {
this._amount = amount;
this._chain = chain;
this._previousDuration = (addPreviousDuration && this._chain.duration) ? this._chain.duration : 0;
Object.freeze(this);
}

get hours () {
return { duration: (this._amount * 60 * 60 * 1000) | 0 };
return new AfterValue({ ...this._chain, duration: (this._amount * 60 * 60 * 1000 + this._previousDuration) | 0 });
}

get hour () {
return this.hours;
}

get minutes () {
return { duration: (this._amount * 60 * 1000) | 0 };
return new AfterValue({ ...this._chain, duration: (this._amount * 60 * 1000 + this._previousDuration) | 0 });
}

get minute () {
return this.minutes;
}

get seconds () {
return { duration: (this._amount * 1000) | 0 };
return new AfterValue({ ...this._chain, duration: (this._amount * 1000 + this._previousDuration) | 0 });
}

get second () {
return this.seconds;
}

get milliseconds () {
return { duration: this._amount | 0 };
return new AfterValue({ ...this._chain, duration: (this._amount + this._previousDuration) | 0 });
}

get millisecond () {
return this.milliseconds;
}

get messages () {
return new AfterValue({ ...this._chain, messageInterval: this._amount });
}

get message () {
return this.messages;
}
}

const after = (amount) => new After(amount);
const every = (amount) => new After(amount);

module.exports = {
after
after,
every
};
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "nact",
"version": "3.2.0",
"version": "4.0.0",
"description": "nact ⇒ node.js + actors = your services have never been so µ",
"main": "lib/index.js",
"scripts": {
Expand Down
2 changes: 1 addition & 1 deletion test/actor.js
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ describe('Actor', function () {
});

it('should throw if timeout does not include a duration field', async function () {
(() => spawnStateless(system, ignore, 'test1', { shutdown: {} })).should.throw();
(() => spawnStateless(system, ignore, 'test1', { shutdown: {} })).should.throw(Error);
});
});

Expand Down
Loading

0 comments on commit b9343da

Please sign in to comment.