diff --git a/lib/actor.js b/lib/actor.js index 541585b..55c78a3 100644 --- a/lib/actor.js +++ b/lib/actor.js @@ -5,7 +5,6 @@ const { ActorReference, TemporaryReference, Nobody } = require('./references'); const Queue = require('denque'); const assert = require('assert'); const freeze = require('./freeze'); -const { Subject } = require('rxjs'); const { stop } = require('./functions'); const { defaultSupervisionPolicy, SupervisionActions } = require('./supervision'); @@ -29,7 +28,6 @@ class Actor { this.children = new Map(); this.childReferences = new Map(); this.busy = false; - this.subject = new Subject(); this.mailbox = new Queue(); this.immediate = undefined; this.parent.childSpawned(this); @@ -123,20 +121,11 @@ class Actor { delete this.parent; [...this.children.values()].forEach(stop); this.stopped = true; - this.subject.complete(); - } - - get state$ () { - console.error('nact deprecation notice: state$ is deprecated'); - return this.subject.asObservable(); } processNext (next, initial = false) { if (!this.stopped) { if (next !== undefined || initial) { - if (this.state !== next) { - this.subject.next(next); - } this.state = next; if (!this.mailbox.isEmpty()) { let { message, sender } = this.mailbox.shift(); diff --git a/lib/functions.js b/lib/functions.js index 949dae1..6a89a79 100644 --- a/lib/functions.js +++ b/lib/functions.js @@ -26,15 +26,8 @@ const dispatch = (actor, msg, sender) => { concreteActor.dispatch(msg, sender); }; -// Deprecated -const state$ = (actor) => { - let concreteActor = systemMap.find(actor.system.name, actor); - return concreteActor && concreteActor.state$; -}; - module.exports = { stop, query, - dispatch, - state$ + dispatch }; diff --git a/lib/index.js b/lib/index.js index 69a59b3..dc36309 100644 --- a/lib/index.js +++ b/lib/index.js @@ -1,5 +1,5 @@ const { spawn, spawnStateless } = require('./actor'); -const { stop, state$, query, dispatch } = require('./functions'); +const { stop, query, dispatch } = require('./functions'); const { spawnPersistent, configurePersistence } = require('./persistence'); const { configureLogging, logNothing, logToConsole } = require('./monitoring'); const time = require('./time'); @@ -12,7 +12,6 @@ module.exports = { query, dispatch, stop, - state$, spawnPersistent, configurePersistence, configureLogging, diff --git a/lib/persistence/persistent-actor.js b/lib/persistence/persistent-actor.js index 0aa6949..75323e4 100644 --- a/lib/persistence/persistent-actor.js +++ b/lib/persistence/persistent-actor.js @@ -1,4 +1,3 @@ -require('rxjs'); const { PersistedEvent, PersistedSnapshot } = require('./persistence-engine'); const { Actor } = require('../actor'); const freeze = require('../freeze'); @@ -44,8 +43,8 @@ class PersistentActor extends Actor { sequenceNumber = snapshot.sequenceNumber; } - this.persistenceEngine.events(this.key, sequenceNumber) - .distinct(evt => evt.sequenceNumber) + this.persistenceEngine + .events(this.key, sequenceNumber) .reduce(async (prev, msg, index) => { const [state] = await prev; const context = { ...this.createContext(this.reference), recovering: true }; @@ -53,7 +52,7 @@ class PersistentActor extends Actor { const nextState = await Promise.resolve(this.f.call(context, freeze(state), msg.data, context)); return [nextState, msg.sequenceNumber, index]; }, Promise.resolve([initialState, sequenceNumber, 0])) - .subscribe(async (result) => { + .then(async (result) => { // Message count can be different to sequenceNumber if events have been deleted from the database const [state, sequenceNumber, messageCount] = await result; this.sequenceNumber = sequenceNumber; diff --git a/package.json b/package.json index 8277e19..30836c3 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "nact", - "version": "5.0.0", + "version": "6.0.0", "description": "nact ⇒ node.js + actors = your services have never been so µ", "main": "lib/index.js", "scripts": { @@ -36,8 +36,7 @@ "url": "http://github.com/ncthbrt/nact/issues" }, "dependencies": { - "denque": "^1.2.2", - "rxjs": "^5.4.3" + "denque": "^1.2.2" }, "devDependencies": { "chai": "^4.1.1", diff --git a/test/actor.js b/test/actor.js index 6cad357..ddbce02 100644 --- a/test/actor.js +++ b/test/actor.js @@ -4,7 +4,7 @@ const chai = require('chai'); const chaiAsPromised = require('chai-as-promised'); chai.use(chaiAsPromised); chai.should(); -const { start, spawn, spawnStateless, dispatch, stop, query, state$, milliseconds } = require('../lib'); +const { start, spawn, spawnStateless, dispatch, stop, query, milliseconds } = require('../lib'); const delay = (duration) => new Promise((resolve, reject) => setTimeout(() => resolve(), duration)); const { ActorPath } = require('../lib/paths'); const { applyOrThrowIfStopped } = require('../lib/system-map'); @@ -537,49 +537,4 @@ describe('Actor', function () { result2.should.equal(1); }); }); - - describe('#state$', function () { - let system; - beforeEach(() => { system = start(); }); - afterEach(() => stop(system)); - - it('should allow subscription to state changes', async function () { - let actor = spawn(system, (state, msg) => msg); - let arr = []; - state$(actor).subscribe(value => { - arr = [...arr, value]; - }); - dispatch(actor, 1); - dispatch(actor, 2); - dispatch(actor, 3); - await retry(() => arr.should.deep.equal([1, 2, 3]), 5, 10); - }); - - it('should allow only emit when state has changed', async function () { - let state1 = { hello: 'world' }; - let state2 = { it_has: 'been fun' }; - - let actor = spawn(system, (state, msg) => msg); - let arr = []; - state$(actor).subscribe(value => { - arr = [...arr, value]; - }); - dispatch(actor, state1); - dispatch(actor, state1); - dispatch(actor, state2); - - await retry(() => arr.should.deep.equal([state1, state2]), 5, 10); - }); - - it('should emit done when the actor is stopped', async function () { - let actor = spawn(system, (state, msg) => msg); - let observableClosed = false; - state$(actor).last().subscribe(x => { observableClosed = true; }); - dispatch(actor, 1); - dispatch(actor, 2); - await delay(10); - stop(actor); - await retry(() => observableClosed.should.be.true, 5, 10); - }); - }); }); diff --git a/test/mock-persistence-engine.js b/test/mock-persistence-engine.js index f893128..13bfff7 100644 --- a/test/mock-persistence-engine.js +++ b/test/mock-persistence-engine.js @@ -1,5 +1,4 @@ const { AbstractPersistenceEngine } = require('../lib/persistence'); -const { Observable } = require('rxjs'); class MockPersistenceEngine extends AbstractPersistenceEngine { constructor (events = {}, snapshots = {}, takeSnapshotIsWorking = true) { @@ -28,7 +27,7 @@ class MockPersistenceEngine extends AbstractPersistenceEngine { events (persistenceKey, offset = 0, limit, tags) { const persistedEvents = (this._events[persistenceKey] || []); const slice = persistedEvents.slice(offset, limit ? offset + limit : undefined); - return Observable.from(slice); + return slice; } persist (persistedEvent) { diff --git a/test/partially-broken-persistence-engine.js b/test/partially-broken-persistence-engine.js index 61e5c60..303c504 100644 --- a/test/partially-broken-persistence-engine.js +++ b/test/partially-broken-persistence-engine.js @@ -1,6 +1,4 @@ const { AbstractPersistenceEngine } = require('../lib/persistence'); -const Rx = require('rxjs'); -const { Observable } = Rx; class PartiallyBrokenPersistenceEngine extends AbstractPersistenceEngine { constructor (events = new Map(), failIndex, maxFailures) { @@ -14,7 +12,7 @@ class PartiallyBrokenPersistenceEngine extends AbstractPersistenceEngine { events (persistenceKey, offset = 0, limit, tags) { const persistedEvents = (this._events[persistenceKey] || []); const slice = persistedEvents.slice(offset, limit ? offset + limit : undefined); - return Observable.from(slice).map((item, index) => { + return slice.map((item, index) => { if (index < this.failIndex) { return item; } diff --git a/yarn.lock b/yarn.lock index fd13717..c1fbf65 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2330,12 +2330,6 @@ rx-lite@^3.1.2: version "3.1.2" resolved "https://registry.yarnpkg.com/rx-lite/-/rx-lite-3.1.2.tgz#19ce502ca572665f3b647b10939f97fd1615f102" -rxjs@^5.4.3: - version "5.4.3" - resolved "https://registry.yarnpkg.com/rxjs/-/rxjs-5.4.3.tgz#0758cddee6033d68e0fd53676f0f3596ce3d483f" - dependencies: - symbol-observable "^1.0.1" - safe-buffer@^5.0.1, safe-buffer@^5.1.1, safe-buffer@~5.1.0, safe-buffer@~5.1.1: version "5.1.1" resolved "https://registry.yarnpkg.com/safe-buffer/-/safe-buffer-5.1.1.tgz#893312af69b2123def71f57889001671eeb2c853" @@ -2572,10 +2566,6 @@ supports-color@^4.4.0: dependencies: has-flag "^2.0.0" -symbol-observable@^1.0.1: - version "1.0.4" - resolved "https://registry.yarnpkg.com/symbol-observable/-/symbol-observable-1.0.4.tgz#29bf615d4aa7121bdd898b22d4b3f9bc4e2aa03d" - table@^3.7.8: version "3.8.3" resolved "https://registry.yarnpkg.com/table/-/table-3.8.3.tgz#2bbc542f0fda9861a755d3947fefd8b3f513855f"