Skip to content

Commit

Permalink
experiment with different method for batching subscriptions using whi…
Browse files Browse the repository at this point in the history
…ch uses the base redux subscribe.
  • Loading branch information
tappleby committed Apr 3, 2016
1 parent 4093d5c commit ad8935d
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 52 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
"eslint-config-airbnb": "2.1.1",
"expect": "^1.13.4",
"mocha": "^2.2.5",
"redux": "^3.3.1",
"rimraf": "^2.5.0"
}
}
31 changes: 15 additions & 16 deletions src/__tests__/batchedSubscribe-test.js
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
import { batchedSubscribe } from '../';
import expect from 'expect';

function createStoreShape() {
return {
dispatch: expect.createSpy(),
subscribe: expect.createSpy()
};
}
import { createStore } from 'redux';

function createBatchedStore(batch = (cb) => cb()) {
const baseStore = createStoreShape();
const createStore = () => baseStore;
const batchedStore = batchedSubscribe(batch)(createStore)();
batchedStore.base = baseStore;
const baseStore = createStore(() => ({}), {});

expect.spyOn(baseStore, 'dispatch').andCallThrough();
expect.spyOn(baseStore, 'subscribe').andCallThrough();

const createBaseStore = () => baseStore;
const batchedStore = batchedSubscribe(batch)(createBaseStore)();

batchedStore.base = baseStore;
return batchedStore;
}

Expand All @@ -33,14 +31,15 @@ describe('batchedSubscribe()', () => {

store.subscribe(subscribeCallbackSpy);
store.dispatch({ type: 'foo' });
store.dispatch({ type: 'foo' });
store.dispatch({ type: 'foo' });

expect(store.base.subscribe.calls.length).toEqual(0);
expect(subscribeCallbackSpy.calls.length).toEqual(1);
expect(subscribeCallbackSpy.calls.length).toEqual(3);
});

it('it exposes base subscribe as subscribeImmediate', () => {
const store = createBatchedStore();
store.subscribeImmediate();
store.subscribeImmediate(() => {});

expect(store.base.subscribe.calls.length).toEqual(1);
});
Expand Down Expand Up @@ -71,8 +70,8 @@ describe('batchedSubscribe()', () => {
});
store.subscribe(listenerC);

store.dispatch({});
store.dispatch({});
store.dispatch({ type: 'foo' });
store.dispatch({ type: 'foo' });

expect(listenerA.calls.length).toEqual(2);
expect(listenerB.calls.length).toEqual(1);
Expand Down
78 changes: 42 additions & 36 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,54 +3,60 @@ export function batchedSubscribe(batch) {
throw new Error('Expected batch to be a function.');
}

let currentListeners = [];
let nextListeners = currentListeners;
return next => (...args) => {
const store = next(...args);
const subscribeImmediate = store.subscribe;
let subscriptions = [];

function ensureCanMutateNextListeners() {
if (nextListeners === currentListeners) {
nextListeners = currentListeners.slice();
}
}
function subscribe(listener) {
if (typeof listener !== 'function') {
throw new Error('Expected listener to be a function.');
}

function subscribe(listener) {
if (typeof listener !== 'function') {
throw new Error('Expected listener to be a function.');
}
const subscription = {
cb: listener, trigger: false, unsubscribe: false, ready: false
};

let isSubscribed = true;
const unsubscribeImmediate = subscribeImmediate(() => {
subscription.trigger = true;
});

ensureCanMutateNextListeners();
nextListeners.push(listener);
subscriptions.push(subscription);

return function unsubscribe() {
if (!isSubscribed) {
return;
}
return function unsubscribe() {
unsubscribeImmediate();
subscription.unsubscribe = true;
};
}

isSubscribed = false;
function notifyListeners() {
let cleanupSubscriptions = false;

ensureCanMutateNextListeners();
const index = nextListeners.indexOf(listener);
nextListeners.splice(index, 1);
};
}
for (const subscription of subscriptions) {
if (subscription.ready && subscription.trigger) {
subscription.cb();
subscription.trigger = false;
}

function notifyListeners() {
const listeners = currentListeners = nextListeners;
for (let i = 0; i < listeners.length; i++) {
listeners[i]();
}
}
if (subscription.unsubscribe) {
cleanupSubscriptions = true;
}
}

function notifyListenersBatched() {
batch(notifyListeners);
}
if (cleanupSubscriptions) {
subscriptions = subscriptions.filter((subscription) => !subscription.unsubscribe);
}
}

return next => (...args) => {
const store = next(...args);
const subscribeImmediate = store.subscribe;
function notifyListenersBatched() {
batch(notifyListeners);
}

function dispatch(...dispatchArgs) {
for (const subscription of subscriptions) {
subscription.ready = subscription.unsubscribe === false;
}

const res = store.dispatch(...dispatchArgs);
notifyListenersBatched();
return res;
Expand Down

0 comments on commit ad8935d

Please sign in to comment.