From f892cc8e103869fa710fd748469a510bcd26cbae Mon Sep 17 00:00:00 2001 From: Alex Okrushko Date: Mon, 1 Jun 2020 09:58:04 -0400 Subject: [PATCH] feat(component-store): add support for side effects (#2544) --- .../spec/component-store.spec.ts | 193 +++++++++++++++++- .../component-store/src/component-store.ts | 39 ++++ 2 files changed, 230 insertions(+), 2 deletions(-) diff --git a/modules/component-store/spec/component-store.spec.ts b/modules/component-store/spec/component-store.spec.ts index 25dd42d4cf..bef4afe2f8 100644 --- a/modules/component-store/spec/component-store.spec.ts +++ b/modules/component-store/spec/component-store.spec.ts @@ -1,7 +1,21 @@ import { ComponentStore } from '@ngrx/component-store'; import { fakeSchedulers, marbles } from 'rxjs-marbles/jest'; -import { of, Subscription, ConnectableObservable, interval, timer } from 'rxjs'; -import { delayWhen, publishReplay, take, map } from 'rxjs/operators'; +import { + of, + Subscription, + ConnectableObservable, + interval, + timer, + Observable, +} from 'rxjs'; +import { + delayWhen, + publishReplay, + take, + map, + tap, + finalize, +} from 'rxjs/operators'; describe('Component Store', () => { describe('initialization', () => { @@ -627,4 +641,179 @@ describe('Component Store', () => { componentStore.ngOnDestroy(); }); }); + + describe('effect', () => { + let componentStore: ComponentStore; + + beforeEach(() => { + componentStore = new ComponentStore(); + }); + + it( + 'is run when value is provided', + marbles(m => { + const results: string[] = []; + const mockGenerator = jest.fn((origin$: Observable) => + origin$.pipe(tap(v => results.push(v))) + ); + const effect = componentStore.effect(mockGenerator); + effect('value 1'); + effect('value 2'); + + expect(results).toEqual(['value 1', 'value 2']); + }) + ); + + it( + 'is run when undefined value is provided', + marbles(m => { + const results: string[] = []; + const mockGenerator = jest.fn((origin$: Observable) => + origin$.pipe(tap(v => results.push(typeof v))) + ); + const effect = componentStore.effect(mockGenerator); + effect(undefined); + effect(); + + expect(results).toEqual(['undefined', 'undefined']); + }) + ); + + it( + 'is run when observable is provided', + marbles(m => { + const mockGenerator = jest.fn(origin$ => origin$); + const effect = componentStore.effect(mockGenerator); + + effect(m.cold('-a-b-c|')); + + m.expect(mockGenerator.mock.calls[0][0]).toBeObservable( + m.hot(' -a-b-c-') + ); + }) + ); + it( + 'is run with multiple Observables', + marbles(m => { + const mockGenerator = jest.fn(origin$ => origin$); + const effect = componentStore.effect(mockGenerator); + + effect(m.cold('-a-b-c|')); + effect(m.hot(' --d--e----f-')); + + m.expect(mockGenerator.mock.calls[0][0]).toBeObservable( + m.hot(' -adb-(ce)-f-') + ); + }) + ); + + describe('cancels effect Observable', () => { + beforeEach(() => jest.useFakeTimers()); + it( + 'by unsubscribing with returned Subscription', + fakeSchedulers(advance => { + const results: string[] = []; + const effect = componentStore.effect((origin$: Observable) => + origin$.pipe(tap(v => results.push(v))) + ); + + const observable$ = interval(10).pipe( + map(v => String(v)), + take(10) // just in case + ); + + // Update with Observable. + const subsription = effect(observable$); + + // Advance for 40 fake milliseconds and unsubscribe - should capture + // from '0' to '3' + advance(40); + subsription.unsubscribe(); + + // Advance for 20 more fake milliseconds, to check if anything else + // is captured + advance(20); + + expect(results).toEqual(['0', '1', '2', '3']); + }) + ); + it( + 'could be unsubscribed from the specific Observable when multiple' + + ' are provided', + fakeSchedulers(advance => { + // Record all the values that go through state$ into an array + const results: Array<{ value: string }> = []; + const effect = componentStore.effect( + (origin$: Observable<{ value: string }>) => + origin$.pipe(tap(v => results.push(v))) + ); + + // Pass the first Observable to the effect. + const subsription = effect( + interval(10).pipe( + map(v => ({ value: 'a' + v })), + take(10) // just in case + ) + ); + + // Pass the second Observable that pushes values to effect + effect( + timer(15, 10).pipe( + map(v => ({ value: 'b' + v })), + take(10) + ) + ); + + // Advance for 40 fake milliseconds and unsubscribe - should capture + // from '0' to '3' + advance(40); + subsription.unsubscribe(); + + // Advance for 30 more fake milliseconds, to make sure that second + // Observable still emits + advance(30); + + expect(results).toEqual([ + { value: 'a0' }, + { value: 'b0' }, + { value: 'a1' }, + { value: 'b1' }, + { value: 'a2' }, + { value: 'b2' }, + { value: 'a3' }, + { value: 'b3' }, + { value: 'b4' }, + { value: 'b5' }, // second Observable continues to emit values + ]); + }) + ); + + it('completes when componentStore is destroyed', (doneFn: jest.DoneCallback) => { + componentStore.effect(origin$ => + origin$.pipe( + finalize(() => { + doneFn(); + }) + ) + )(interval(10)); + + setTimeout(() => componentStore.ngOnDestroy(), 20); + jest.advanceTimersByTime(20); + }); + + it('observable argument completes when componentStore is destroyed', (doneFn: jest.DoneCallback) => { + componentStore.effect(origin$ => origin$)( + interval(10).pipe( + finalize(() => { + doneFn(); + }) + ) + ); + + setTimeout(() => componentStore.ngOnDestroy(), 20); + + jest.advanceTimersByTime(20); + }); + }); + }); }); diff --git a/modules/component-store/src/component-store.ts b/modules/component-store/src/component-store.ts index fc1f475cfa..dc44a25c97 100644 --- a/modules/component-store/src/component-store.ts +++ b/modules/component-store/src/component-store.ts @@ -6,6 +6,7 @@ import { Subscription, throwError, combineLatest, + Subject, } from 'rxjs'; import { concatMap, @@ -17,6 +18,15 @@ import { } from 'rxjs/operators'; import { debounceSync } from './debounceSync'; +/** + * Return type of the effect, that behaves differently based on whether the + * argument is passed to the callback. + */ +interface EffectReturnFn { + (): void; + (t: T | Observable): Subscription; +} + export class ComponentStore { // Should be used only in ngOnDestroy. private readonly destroySubject$ = new ReplaySubject(1); @@ -178,4 +188,33 @@ export class ComponentStore { ); return distinctSharedObservable$; } + + /** + * Creates an effect. + * + * This effect is subscribed to for the life of the @Component. + * @param generator A function that takes an origin Observable input and + * returns an Observable. The Observable that is returned will be + * subscribed to for the life of the component. + * @return A function that, when called, will trigger the origin Observable. + */ + effect( + generator: (origin$: Observable) => Observable + ): EffectReturnFn { + const origin$ = new Subject(); + generator(origin$) + // tied to the lifecycle 👇 of ComponentStore + .pipe(takeUntil(this.destroy$)) + .subscribe(); + + return (observableOrValue?: V | Observable): Subscription => { + const observable$ = isObservable(observableOrValue) + ? observableOrValue + : of(observableOrValue); + return observable$.pipe(takeUntil(this.destroy$)).subscribe(value => { + // any new 👇 value is pushed into a stream + origin$.next(value); + }); + }; + } }