Skip to content

Commit

Permalink
Removed rxjs dependencies
Browse files Browse the repository at this point in the history
  • Loading branch information
ncthbrt committed Apr 4, 2018
1 parent 0cd4065 commit a744d6a
Show file tree
Hide file tree
Showing 9 changed files with 10 additions and 89 deletions.
11 changes: 0 additions & 11 deletions lib/actor.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ const { ActorReference, TemporaryReference, Nobody } = require('./references');
const Queue = require('denque');
const assert = require('assert');
const freeze = require('./freeze');
const { Subject } = require('rxjs');
const { stop } = require('./functions');
const { defaultSupervisionPolicy, SupervisionActions } = require('./supervision');

Expand All @@ -29,7 +28,6 @@ class Actor {
this.children = new Map();
this.childReferences = new Map();
this.busy = false;
this.subject = new Subject();
this.mailbox = new Queue();
this.immediate = undefined;
this.parent.childSpawned(this);
Expand Down Expand Up @@ -123,20 +121,11 @@ class Actor {
delete this.parent;
[...this.children.values()].forEach(stop);
this.stopped = true;
this.subject.complete();
}

get state$ () {
console.error('nact deprecation notice: state$ is deprecated');
return this.subject.asObservable();
}

processNext (next, initial = false) {
if (!this.stopped) {
if (next !== undefined || initial) {
if (this.state !== next) {
this.subject.next(next);
}
this.state = next;
if (!this.mailbox.isEmpty()) {
let { message, sender } = this.mailbox.shift();
Expand Down
9 changes: 1 addition & 8 deletions lib/functions.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,8 @@ const dispatch = (actor, msg, sender) => {
concreteActor.dispatch(msg, sender);
};

// Deprecated
const state$ = (actor) => {
let concreteActor = systemMap.find(actor.system.name, actor);
return concreteActor && concreteActor.state$;
};

module.exports = {
stop,
query,
dispatch,
state$
dispatch
};
3 changes: 1 addition & 2 deletions lib/index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
const { spawn, spawnStateless } = require('./actor');
const { stop, state$, query, dispatch } = require('./functions');
const { stop, query, dispatch } = require('./functions');
const { spawnPersistent, configurePersistence } = require('./persistence');
const { configureLogging, logNothing, logToConsole } = require('./monitoring');
const time = require('./time');
Expand All @@ -12,7 +12,6 @@ module.exports = {
query,
dispatch,
stop,
state$,
spawnPersistent,
configurePersistence,
configureLogging,
Expand Down
7 changes: 3 additions & 4 deletions lib/persistence/persistent-actor.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
require('rxjs');
const { PersistedEvent, PersistedSnapshot } = require('./persistence-engine');
const { Actor } = require('../actor');
const freeze = require('../freeze');
Expand Down Expand Up @@ -44,16 +43,16 @@ class PersistentActor extends Actor {
sequenceNumber = snapshot.sequenceNumber;
}

this.persistenceEngine.events(this.key, sequenceNumber)
.distinct(evt => evt.sequenceNumber)
this.persistenceEngine
.events(this.key, sequenceNumber)
.reduce(async (prev, msg, index) => {
const [state] = await prev;
const context = { ...this.createContext(this.reference), recovering: true };
// Might not be an async function. Using promise.resolve to force it into that form
const nextState = await Promise.resolve(this.f.call(context, freeze(state), msg.data, context));
return [nextState, msg.sequenceNumber, index];
}, Promise.resolve([initialState, sequenceNumber, 0]))
.subscribe(async (result) => {
.then(async (result) => {
// Message count can be different to sequenceNumber if events have been deleted from the database
const [state, sequenceNumber, messageCount] = await result;
this.sequenceNumber = sequenceNumber;
Expand Down
5 changes: 2 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "nact",
"version": "5.0.0",
"version": "6.0.0",
"description": "nact ⇒ node.js + actors = your services have never been so µ",
"main": "lib/index.js",
"scripts": {
Expand Down Expand Up @@ -36,8 +36,7 @@
"url": "http://github.com/ncthbrt/nact/issues"
},
"dependencies": {
"denque": "^1.2.2",
"rxjs": "^5.4.3"
"denque": "^1.2.2"
},
"devDependencies": {
"chai": "^4.1.1",
Expand Down
47 changes: 1 addition & 46 deletions test/actor.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const chai = require('chai');
const chaiAsPromised = require('chai-as-promised');
chai.use(chaiAsPromised);
chai.should();
const { start, spawn, spawnStateless, dispatch, stop, query, state$, milliseconds } = require('../lib');
const { start, spawn, spawnStateless, dispatch, stop, query, milliseconds } = require('../lib');
const delay = (duration) => new Promise((resolve, reject) => setTimeout(() => resolve(), duration));
const { ActorPath } = require('../lib/paths');
const { applyOrThrowIfStopped } = require('../lib/system-map');
Expand Down Expand Up @@ -537,49 +537,4 @@ describe('Actor', function () {
result2.should.equal(1);
});
});

describe('#state$', function () {
let system;
beforeEach(() => { system = start(); });
afterEach(() => stop(system));

it('should allow subscription to state changes', async function () {
let actor = spawn(system, (state, msg) => msg);
let arr = [];
state$(actor).subscribe(value => {
arr = [...arr, value];
});
dispatch(actor, 1);
dispatch(actor, 2);
dispatch(actor, 3);
await retry(() => arr.should.deep.equal([1, 2, 3]), 5, 10);
});

it('should allow only emit when state has changed', async function () {
let state1 = { hello: 'world' };
let state2 = { it_has: 'been fun' };

let actor = spawn(system, (state, msg) => msg);
let arr = [];
state$(actor).subscribe(value => {
arr = [...arr, value];
});
dispatch(actor, state1);
dispatch(actor, state1);
dispatch(actor, state2);

await retry(() => arr.should.deep.equal([state1, state2]), 5, 10);
});

it('should emit done when the actor is stopped', async function () {
let actor = spawn(system, (state, msg) => msg);
let observableClosed = false;
state$(actor).last().subscribe(x => { observableClosed = true; });
dispatch(actor, 1);
dispatch(actor, 2);
await delay(10);
stop(actor);
await retry(() => observableClosed.should.be.true, 5, 10);
});
});
});
3 changes: 1 addition & 2 deletions test/mock-persistence-engine.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
const { AbstractPersistenceEngine } = require('../lib/persistence');
const { Observable } = require('rxjs');

class MockPersistenceEngine extends AbstractPersistenceEngine {
constructor (events = {}, snapshots = {}, takeSnapshotIsWorking = true) {
Expand Down Expand Up @@ -28,7 +27,7 @@ class MockPersistenceEngine extends AbstractPersistenceEngine {
events (persistenceKey, offset = 0, limit, tags) {
const persistedEvents = (this._events[persistenceKey] || []);
const slice = persistedEvents.slice(offset, limit ? offset + limit : undefined);
return Observable.from(slice);
return slice;
}

persist (persistedEvent) {
Expand Down
4 changes: 1 addition & 3 deletions test/partially-broken-persistence-engine.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
const { AbstractPersistenceEngine } = require('../lib/persistence');
const Rx = require('rxjs');
const { Observable } = Rx;

class PartiallyBrokenPersistenceEngine extends AbstractPersistenceEngine {
constructor (events = new Map(), failIndex, maxFailures) {
Expand All @@ -14,7 +12,7 @@ class PartiallyBrokenPersistenceEngine extends AbstractPersistenceEngine {
events (persistenceKey, offset = 0, limit, tags) {
const persistedEvents = (this._events[persistenceKey] || []);
const slice = persistedEvents.slice(offset, limit ? offset + limit : undefined);
return Observable.from(slice).map((item, index) => {
return slice.map((item, index) => {
if (index < this.failIndex) {
return item;
}
Expand Down
10 changes: 0 additions & 10 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2330,12 +2330,6 @@ rx-lite@^3.1.2:
version "3.1.2"
resolved "https://registry.yarnpkg.com/rx-lite/-/rx-lite-3.1.2.tgz#19ce502ca572665f3b647b10939f97fd1615f102"

rxjs@^5.4.3:
version "5.4.3"
resolved "https://registry.yarnpkg.com/rxjs/-/rxjs-5.4.3.tgz#0758cddee6033d68e0fd53676f0f3596ce3d483f"
dependencies:
symbol-observable "^1.0.1"

safe-buffer@^5.0.1, safe-buffer@^5.1.1, safe-buffer@~5.1.0, safe-buffer@~5.1.1:
version "5.1.1"
resolved "https://registry.yarnpkg.com/safe-buffer/-/safe-buffer-5.1.1.tgz#893312af69b2123def71f57889001671eeb2c853"
Expand Down Expand Up @@ -2572,10 +2566,6 @@ supports-color@^4.4.0:
dependencies:
has-flag "^2.0.0"

symbol-observable@^1.0.1:
version "1.0.4"
resolved "https://registry.yarnpkg.com/symbol-observable/-/symbol-observable-1.0.4.tgz#29bf615d4aa7121bdd898b22d4b3f9bc4e2aa03d"

table@^3.7.8:
version "3.8.3"
resolved "https://registry.yarnpkg.com/table/-/table-3.8.3.tgz#2bbc542f0fda9861a755d3947fefd8b3f513855f"
Expand Down

0 comments on commit a744d6a

Please sign in to comment.