Skip to content

Commit

Permalink
Merge pull request #11 from ncthbrt/feature/actor-observable
Browse files Browse the repository at this point in the history
You can now subscribe to state changes on an actor using RxJS
  • Loading branch information
ncthbrt authored Oct 23, 2017
2 parents 72c1988 + aa3306f commit cb3f50e
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 11 deletions.
14 changes: 12 additions & 2 deletions lib/actor.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
const { Nobody } = require('./nobody');
const { Deferred } = require('./deferred');
require('bluebird');
const { ActorReference, TemporaryReference, applyOrThrowIfStopped } = require('./references');
const { ActorReference, TemporaryReference, applyOrThrowIfStopped, dereference } = require('./references');
const Queue = require('denque');
const assert = require('assert');
const freeze = require('deep-freeze-node');
const { Subject } = require('rxjs');

class Actor {
constructor (parent, name, system, f) {
Expand All @@ -25,6 +26,7 @@ class Actor {
this.children = new Map();
this.childReferences = new Map();
this.busy = false;
this.subject = new Subject();
this.mailbox = new Queue();
this.immediate = undefined;
this.parent.childSpawned(this);
Expand Down Expand Up @@ -81,16 +83,24 @@ class Actor {
clearImmediate(this.immediate);
}
this.parent.childStopped(this);
this.reference && this.reference._dereference();
this.reference && dereference(this.reference);
delete this.reference;
delete this.parent;
[...this.children.values()].map(child => child.stop());
this.stopped = true;
this.subject.complete();
}

get state$ () {
return this.subject.asObservable();
}

processNext (next, allowUndefined = false) {
if (!this.stopped) {
if (next !== undefined || allowUndefined) {
if (this.state !== next) {
this.subject.next(next);
}
this.state = next;
if (!this.mailbox.isEmpty()) {
let { message, sender } = this.mailbox.shift();
Expand Down
19 changes: 13 additions & 6 deletions lib/references.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
const privateProperties = new WeakMap();
const freeze = require('deep-freeze-node');
const { Observable } = require('rxjs');

const applyOrThrowIfStopped = (reference, f) => {
const actor = privateProperties.get(reference).actor;
Expand All @@ -12,7 +13,8 @@ const applyOrThrowIfStopped = (reference, f) => {
const fallback = {
dispatch: () => { },
query: () => Promise.reject(new Error('Actor stopped. Query can never resolve')),
stop: () => {}
stop: () => {},
state$: Observable.empty()
};

const actorOrFallback = (self) => (privateProperties.get(self).actor) || fallback;
Expand All @@ -27,6 +29,11 @@ class TemporaryReference {
}
}

const dereference = (reference) => {
const { system } = privateProperties.get(reference);
privateProperties.set(reference, { system });
};

class ActorReference {
constructor (actor) {
this.path = actor.path;
Expand All @@ -36,11 +43,6 @@ class ActorReference {
privateProperties.set(this, { actor, system: actor.system });
}

_dereference () {
const { system } = privateProperties.get(this);
privateProperties.set(this, { system });
}

dispatch (message, sender) {
return actorOrFallback(this).dispatch(message, sender);
}
Expand All @@ -52,6 +54,10 @@ class ActorReference {
stop () {
return actorOrFallback(this).stop();
}

get state$ () {
return actorOrFallback(this).state$;
}
}

class ActorSystemReference {
Expand All @@ -70,3 +76,4 @@ module.exports.ActorSystemReference = ActorSystemReference;
module.exports.ActorReference = ActorReference;
module.exports.applyOrThrowIfStopped = applyOrThrowIfStopped;
module.exports.TemporaryReference = TemporaryReference;
module.exports.dereference = dereference;
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "nact",
"version": "2.0.3",
"version": "2.1.0",
"description": "Mulithreaded implementation of the actor model for node",
"main": "lib/index.js",
"scripts": {
Expand Down
45 changes: 45 additions & 0 deletions test/actor.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/* eslint-env mocha */
/* eslint-disable no-unused-expressions */
const chai = require('chai');
// onCompleted
const chaiAsPromised = require('chai-as-promised');
chai.use(chaiAsPromised);
chai.should();
Expand Down Expand Up @@ -307,4 +308,48 @@ describe('Actor', function () {
result.should.equal('hello from child2');
});
});

describe('#state$', function () {
let system;
beforeEach(() => { system = start(); });
afterEach(() => system.stop());

it('should allow subscription to state changes', async function () {
let actor = spawn(system, (state, msg) => msg);
let arr = [];
actor.state$.subscribe(value => {
arr = [...arr, value];
});
actor.dispatch(1);
actor.dispatch(2);
actor.dispatch(3);
await retry(() => arr.should.deep.equal([1, 2, 3]), 5, 10);
});

it('should allow only emit when state has changed', async function () {
let state1 = { hello: 'world' };
let state2 = { it_has: 'been fun' };

let actor = spawn(system, (state, msg) => msg);
let arr = [];
actor.state$.subscribe(value => {
arr = [...arr, value];
});
actor.dispatch(state1);
actor.dispatch(state1);
actor.dispatch(state2);
await retry(() => arr.should.deep.equal([state1, state2]), 5, 10);
});

it('should emit done when the actor is stopped', async function () {
let actor = spawn(system, (state, msg) => msg);
let observableClosed = false;
actor.state$.last().subscribe(x => { observableClosed = true; });
actor.dispatch(1);
actor.dispatch(2);
await delay(10);
actor.stop();
await retry(() => observableClosed.should.be.true, 5, 10);
});
});
});
5 changes: 3 additions & 2 deletions test/persistent-actor.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ chai.should();
const { MockPersistenceEngine } = require('./mock-persistence-engine');
const { BrokenPersistenceEngine } = require('./broken-persistence-engine');
const { PartiallyBrokenPersistenceEngine } = require('./partially-broken-persistence-engine');
const { start } = require('../lib');
const { spawnPersistent, configurePersistence, PersistedEvent } = require('../lib/extensions/persistence');
const { start, spawnPersistent, configurePersistence } = require('../lib');
const { PersistedEvent } = require('../lib/extensions/persistence');
const chaiAsPromised = require('chai-as-promised');
chai.use(chaiAsPromised);
const { Promise } = require('bluebird');
Expand All @@ -20,6 +20,7 @@ const isStopped = (reference) => {
return true;
}
};

// Begin helpers
const ignore = () => {};

Expand Down

0 comments on commit cb3f50e

Please sign in to comment.