Skip to content

Commit

Permalink
feat(component-store): add support for side effects (ngrx#2544)
Browse files Browse the repository at this point in the history
  • Loading branch information
alex-okrushko authored and BioPhoton committed Jun 5, 2020
1 parent 3c092cb commit 705b3a5
Show file tree
Hide file tree
Showing 2 changed files with 230 additions and 2 deletions.
193 changes: 191 additions & 2 deletions modules/component-store/spec/component-store.spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,21 @@
import { ComponentStore } from '@ngrx/component-store';
import { fakeSchedulers, marbles } from 'rxjs-marbles/jest';
import { of, Subscription, ConnectableObservable, interval, timer } from 'rxjs';
import { delayWhen, publishReplay, take, map } from 'rxjs/operators';
import {
of,
Subscription,
ConnectableObservable,
interval,
timer,
Observable,
} from 'rxjs';
import {
delayWhen,
publishReplay,
take,
map,
tap,
finalize,
} from 'rxjs/operators';

describe('Component Store', () => {
describe('initialization', () => {
Expand Down Expand Up @@ -627,4 +641,179 @@ describe('Component Store', () => {
componentStore.ngOnDestroy();
});
});

describe('effect', () => {
let componentStore: ComponentStore<object>;

beforeEach(() => {
componentStore = new ComponentStore<object>();
});

it(
'is run when value is provided',
marbles(m => {
const results: string[] = [];
const mockGenerator = jest.fn((origin$: Observable<string>) =>
origin$.pipe(tap(v => results.push(v)))
);
const effect = componentStore.effect(mockGenerator);
effect('value 1');
effect('value 2');

expect(results).toEqual(['value 1', 'value 2']);
})
);

it(
'is run when undefined value is provided',
marbles(m => {
const results: string[] = [];
const mockGenerator = jest.fn((origin$: Observable<undefined>) =>
origin$.pipe(tap(v => results.push(typeof v)))
);
const effect = componentStore.effect(mockGenerator);
effect(undefined);
effect();

expect(results).toEqual(['undefined', 'undefined']);
})
);

it(
'is run when observable is provided',
marbles(m => {
const mockGenerator = jest.fn(origin$ => origin$);
const effect = componentStore.effect(mockGenerator);

effect(m.cold('-a-b-c|'));

m.expect(mockGenerator.mock.calls[0][0]).toBeObservable(
m.hot(' -a-b-c-')
);
})
);
it(
'is run with multiple Observables',
marbles(m => {
const mockGenerator = jest.fn(origin$ => origin$);
const effect = componentStore.effect(mockGenerator);

effect(m.cold('-a-b-c|'));
effect(m.hot(' --d--e----f-'));

m.expect(mockGenerator.mock.calls[0][0]).toBeObservable(
m.hot(' -adb-(ce)-f-')
);
})
);

describe('cancels effect Observable', () => {
beforeEach(() => jest.useFakeTimers());
it(
'by unsubscribing with returned Subscription',
fakeSchedulers(advance => {
const results: string[] = [];
const effect = componentStore.effect((origin$: Observable<string>) =>
origin$.pipe(tap(v => results.push(v)))
);

const observable$ = interval(10).pipe(
map(v => String(v)),
take(10) // just in case
);

// Update with Observable.
const subsription = effect(observable$);

// Advance for 40 fake milliseconds and unsubscribe - should capture
// from '0' to '3'
advance(40);
subsription.unsubscribe();

// Advance for 20 more fake milliseconds, to check if anything else
// is captured
advance(20);

expect(results).toEqual(['0', '1', '2', '3']);
})
);
it(
'could be unsubscribed from the specific Observable when multiple' +
' are provided',
fakeSchedulers(advance => {
// Record all the values that go through state$ into an array
const results: Array<{ value: string }> = [];
const effect = componentStore.effect(
(origin$: Observable<{ value: string }>) =>
origin$.pipe(tap(v => results.push(v)))
);

// Pass the first Observable to the effect.
const subsription = effect(
interval(10).pipe(
map(v => ({ value: 'a' + v })),
take(10) // just in case
)
);

// Pass the second Observable that pushes values to effect
effect(
timer(15, 10).pipe(
map(v => ({ value: 'b' + v })),
take(10)
)
);

// Advance for 40 fake milliseconds and unsubscribe - should capture
// from '0' to '3'
advance(40);
subsription.unsubscribe();

// Advance for 30 more fake milliseconds, to make sure that second
// Observable still emits
advance(30);

expect(results).toEqual([
{ value: 'a0' },
{ value: 'b0' },
{ value: 'a1' },
{ value: 'b1' },
{ value: 'a2' },
{ value: 'b2' },
{ value: 'a3' },
{ value: 'b3' },
{ value: 'b4' },
{ value: 'b5' }, // second Observable continues to emit values
]);
})
);

it('completes when componentStore is destroyed', (doneFn: jest.DoneCallback) => {
componentStore.effect(origin$ =>
origin$.pipe(
finalize(() => {
doneFn();
})
)
)(interval(10));

setTimeout(() => componentStore.ngOnDestroy(), 20);
jest.advanceTimersByTime(20);
});

it('observable argument completes when componentStore is destroyed', (doneFn: jest.DoneCallback) => {
componentStore.effect(origin$ => origin$)(
interval(10).pipe(
finalize(() => {
doneFn();
})
)
);

setTimeout(() => componentStore.ngOnDestroy(), 20);

jest.advanceTimersByTime(20);
});
});
});
});
39 changes: 39 additions & 0 deletions modules/component-store/src/component-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
Subscription,
throwError,
combineLatest,
Subject,
} from 'rxjs';
import {
concatMap,
Expand All @@ -17,6 +18,15 @@ import {
} from 'rxjs/operators';
import { debounceSync } from './debounceSync';

/**
* Return type of the effect, that behaves differently based on whether the
* argument is passed to the callback.
*/
interface EffectReturnFn<T> {
(): void;
(t: T | Observable<T>): Subscription;
}

export class ComponentStore<T extends object> {
// Should be used only in ngOnDestroy.
private readonly destroySubject$ = new ReplaySubject<void>(1);
Expand Down Expand Up @@ -178,4 +188,33 @@ export class ComponentStore<T extends object> {
);
return distinctSharedObservable$;
}

/**
* Creates an effect.
*
* This effect is subscribed to for the life of the @Component.
* @param generator A function that takes an origin Observable input and
* returns an Observable. The Observable that is returned will be
* subscribed to for the life of the component.
* @return A function that, when called, will trigger the origin Observable.
*/
effect<V, R = unknown>(
generator: (origin$: Observable<V>) => Observable<R>
): EffectReturnFn<V> {
const origin$ = new Subject<V>();
generator(origin$)
// tied to the lifecycle 👇 of ComponentStore
.pipe(takeUntil(this.destroy$))
.subscribe();

return (observableOrValue?: V | Observable<V>): Subscription => {
const observable$ = isObservable(observableOrValue)
? observableOrValue
: of(observableOrValue);
return observable$.pipe(takeUntil(this.destroy$)).subscribe(value => {
// any new 👇 value is pushed into a stream
origin$.next(value);
});
};
}
}

0 comments on commit 705b3a5

Please sign in to comment.