RxJS in its version 6 is a core part of the Angular framework so it's worth to learn it. This being said it's hard to learn — this challenge will teach you some parts of it to bring you one step nearer to understand RxJS.
We will look at the following things in RxJS:
- RxJS 1: Debugging
- RxJS 2: About Dollar Signs
- RxJS 3: Cold vs Hot Observables
- RxJS 4: Make Cold Observables Hot
- RxJS 5: RxJS in the wild
- RxJS 6: Testing
Branch rxjs/debug
I see myself using tap a lot to debug rxjs code. It's an operator that can look at what data is passing by without touching it.
fromEvent(window, 'keydown')
.pipe(
tap(event => console.log('key pressed'))
).subscribe();
You can use tap to debug or to create side effects like calling another method.
You might find rxjs-spy useful.
RxJS Spy can hook into any RxJS stream by placing a pipeable tag
. It won't interact with the stream— just like the rxjs operator tap
. After the tag is registered you will get infos about everything that's happening with the Observable like subscriptions, unsubscribes and of course next, errors and complete.
import { tag } from 'rxjs-spy/operators/tag';
import { create } from 'rxjs-spy';
const spy = create();
spy.log();
fromEvent(window, 'keydown')
.pipe(
tag('🎹 Key Pressed'),
take(2)
).subscribe();
Your console log will look like this after pressing key A and B
Tag = 🎹 Key Pressed; notification = subscribe; matching /.+/
Tag = 🎹 Key Pressed; notification = next; value = KeyboardEvent {key: "a", code: "KeyA" …}
Tag = 🎹 Key Pressed; notification = next; value = KeyboardEvent {key: "b", code: "KeyB" …}
🎹 Key Pressed; notification = unsubscribe; matching /.+/
This means you are perfectly informed about what happens in your stream and what it contains. I love this way of debugging a rxjs stream.
Have you encountered a dollar sign in source files using rxjs ? Angular has a small description about it. Dollar signs help to separate Observables from values more easily.
Actually the dollar sign usually stands for pluralization as an observables streams multiple values. It's called the Finnish notation.
const click$ = Observable.fromEvent(button, 'click');
But it's no rule or a styleguide thing. I see myself not using it that strict.
Branch rxjs/cold-hot
I will cite Ben Lesh here from his excellent article on the topic.
A cold observable creates its producer on each subscription, a hot observables closes over an already existing instance.
This means:
Cold Observables only start producing data with a subscription present and each subscription get its own stream of values. That's called unicast. Example:
timer()
andinterval()
observables are cold.
Hot Observables are streaming values regardless of the amount of subscribers and each one gets the same data. That's multicast.
fromEvent()
produces data without a subscription.
Here another brief example.
// COLD
var cold = new Observable((observer) => {
var producer = new Producer();
producer.listen(() => {
observer.next()
});
});
// HOT
var producer = new Producer();
var hot = new Observable((observer) => {
producer.listen(() => {
observer.next()
});
});
It's important to understand the difference otherwise you create resource intensive producers with each new subscription. You also need the knowledge about hot & cold when dealing with operators like switchMap
and mergeMap
where the difference is when the inner observable is unsubscribed.
Can we make a cold observable hot? Can we make it multicast despites it's only producing values for a single observer ? Yes! It's our good old friend Subject that is tailored for this purpose.
Let's get a little mit more concrete, we use an interval — which is cold by default.
const myInterval = interval(500).pipe(
take(5),
tap(value => console.log('interval produced a value'))
);
You would create a new setInterval
with each subscription and each subscription would receive its own stream of numbers.
myInterval.subscribe(value => console.log('received a value', value));
myInterval.subscribe(value => console.log('received a value', value));
myInterval.subscribe(value => console.log('received a value', value));
/**
interval produced a value
received a value 0
interval produced a value
received a value 0
interval produced a value
received a value 0
interval produced a value
*/
The 0 look similar but they are produced from different intervals. We can make it multicast with a Subject. The Subject will be some kind of mediator. Receiving the value and distribute to everyone who is interested. That's multicast.
const subject = new Subject();
// 1. let this subject subscribe to the cold observable
myInterval.subscribe(subject);
// 2. now let future observables subscribe to the subject instead of the interval
subject.subscribe(value => console.log('received a value', value));
subject.subscribe(value => console.log('received a value', value));
subject.subscribe(value => console.log('received a value', value));
/**
interval produced a value
received a value 0
received a value 0
received a value 0
*/
Without changing the implementation of the given observable we were able to transform it into a multicasting stream. That's a powerful technique 💪
There are some multicast operators in RxJS like publish, publishReplay, multicast & share, they all do the same thing with a Subject under the hood.
multicast operator
You could have made your life a little bit easier with the `multicast` operator.const myHotObservable = interval(500).pipe(
take(5),
tap(value => console.log('interval produced a value')),
multicast(new Subject()) // make it a ConnectableObservable
) as ConnectableObservable<number>;
// all those subscribes will be delegated to the internal Subject by the multicast operator.
myInterval2.subscribe(value => console.log('received a value', value));
myInterval2.subscribe(value => console.log('received a value', value));
myInterval2.subscribe(value => console.log('received a value', value));
// let the internal subject subscribe to the given interval — no values are produced before this
myInterval2.connect();
Following some things I use frequently in real projects so you get confident in using them too 🙌
Branch rxjs/into-the-wild-as-observable
If you have a subject just for signaling something you usually use a Subject and expose it, right?
class YourComponent {
changed$: Subject<any> = new Subject();
notify() {
this.changed$.next()
}
}
The problem here: A subject is both an observer and observable. It can observe by subscribing (otherObservable.subscribe(subject)) and you can subscribe to it as an Observable (subject.subscribe). By exposing a subject directly you allow other parties to use its observer side. Someone could get the idea to subscribe it to some source or just use the next
method (as part of the observer pattern). You want to protect your subject from being used like this.
That's where subject.asObservable()
comes to your rescue.
private _changed: Subject<any> = new Subject();
get changed(): Observable<any> {
return this._changed.asObservable();
}
This way it's only an observable — there are no observer functions like next available.
Branch rxjs/into-the-wild-behaviour
Hot Observables can produce values without someone listening. It's pretty sad to imagine someone telling important stuff and nobody is listening. This is often a real problem in your application. Imagine a service loading some data at the application startup and you have a router navigating through your page components. Your interested page might come to life quite some time the data arrived. If you subscribe such an observable where the data might have been produced already you won't get any data when you subscribe too late.
RxJS has a special Subject that will help you. A BehaviorSubject.
const subjectA = new Subject();
const subjectB = new BehaviorSubject(null);
subjectA.next('your loaded data');
subjectB.next('your loaded data');
subjectA.subscribe(value => console.log('value from subjectA:', value));
subjectB.subscribe(value => console.log('value from subjectB:', value));
Your output is this:
value from subjectB: your loaded data
The subscription to the normal subject just missed the data — it was produced too early. The BehaviorSubject
also
produced the value before the subscription — but it's nice enough to tell every subscription about the last subscription.
A BehaviorSubject
only retains the last single value. If you want to keep all values you could use the ReplaySubject
.
Branch rxjs/into-the-wild-take-destroy
.
That's a pretty nice pattern to learn.
When you use addEvenListener
you must ensure that you call removeEventListener
too at some moment. The same is true of subscriptions,
that's the return value when you subscribe to an Observable. Otherwise you would create unintended side effects and you create memory leaks as the garbage collector can't get rid of all involved resources.
It's easy to create a dangling subscription:
class YourComponent {
initService() {
this.yourService.subscribe(data => {
// do something
})
}
}
When your YourComponent
is destroyed you have a subscription still listening to your services. The idiomatic way of unsubscribing is using a reference to the subscription and removing it when you destroy the component.
private _subscription: Subscription = Subscription.EMPTY;
initService() {
this._subscription = this.yourService.subscribe();
}
ngOnDestroy() {
this._subscription.unsubscribe();
}
What happens if you have more than one subscription? Yes this gets tedious. You can use the power of rxjs to unsubscribe automatically. The only thing you need in addition is a signal when the component is destroyed.
private _destroyed: Subject<any> = new Subject();
ngOnDestroy() {
this._destroyed.next();
}
With that at your hand you can automatically complete and unsubscribe any stream you create.
this.yourService
.pipe(takeUntil(this._destroyed))
.subscribe(data => {
// do something
})
takeUntil
will complete the event once the destroy signal arrives.
Neat isn't it? There is another good article from Ben Lesh about it.
You can collect all values from a stream with toArray
and return them once the stream completes.
The same thing is happening when you use last()
but you only return the last received element.
That's a pretty handy operator I discovered far too late. You will see it in action in one of the challenges. There I use it to block a stream until it reached a given result.
Here a simple example:
range(1, 10)
.pipe(
toArray()
).subscribe(values => console.log(values));
(10) [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
And one with switchMap
involved to show you the blocking functionality.
timer(0, 5000)
.pipe(
tag('outer observable'),
// create an inner observable
switchMap(value => {
return interval(500)
.pipe(
take(2),
tag('inner observable'),
toArray() // block until it completes. It will collect the array of collected values: [0, 1]
);
}),
// here you will see only one value (always [0, 1]) arriving from the
// inner observable.
tag('after the switch'),
).subscribe();
Branch rxjs/testing
.
RxJS is synchronous by default. This means testing of your streams can be easy! Just subscribe and watch for the correct data to arrive.
If async things are involved you need to make sure that things like services are mocked (and act synchronous again)
or if there is a timer or interval involved just use the power of fakeAsync
& tick
. We will take a look at those in the next challenge about Testing.
When you create your own observables things get more complicated and you should do marble testing (see Variant C)
But in most cases your RxJS code can be tested with the following two methods (Variant A & B). It depends on the complexity of your stream and how exposed it is. There will be some tests to implement in the testing challenge coming soon 🙌
You have this component given:
export class AppComponent {
title = 'workshop-theory';
counter = 0;
change$ = new Subject();
constructor() {
this.change$.subscribe(value => {
this.counter++;
});
}
doSomething() {
this.change$.next(true);
}
}
If the stream is only used internally it's perfectly fine to test the effects only. Imagine a component like this.
it('changes should be true', () => {
const fixture = TestBed.createComponent(AppTestComponent);
fixture.detectChanges();
const app: AppComponent = fixture.componentInstance.instance;
let result = false;
app.change$.subscribe( value => {
result = value;
});
app.doSomething();
expect(result).toBe(true);
});
That's even better in a component because you don't want to test implementation details but the things being exposed.
For example, if you have a component that is incrementing a counter whenever a click occurs. You know that you are using rxjs to do it but in the end it only matters that the changes are reflected in the template. So just test the counter not the rxjs.
it('counter should increment when clicked', () => {
const fixture = TestBed.createComponent(AppTestComponent);
fixture.detectChanges();
const app: AppComponent = fixture.componentInstance.instance;
const nativeElementCounter: HTMLElement = fixture.nativeElement.querySelector('.counter');
expect(nativeElementCounter.textContent.trim()).toBe('0');
app.doSomething();
fixture.detectChanges();
// rxjs implementation is not important just test the effect
expect(nativeElementCounter.textContent.trim()).toBe('1');
});
You might have seen marble test like those:
it('generate the stream correctly', () => {
scheduler.run(helpers => {
const { cold, expectObservable, expectSubscriptions } = helpers;
const e1 = cold('-a--b--c---|');
const subs = '^----------!';
const expected = '-a-----c---|';
expectObservable(e1.pipe(throttleTime(3, scheduler))).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});
That testing is useful when creating your own observables but in most cases far too much if you are only testing your streams.
Finish 🙌 You gained knowledged about the following awesome things in RxJS and you are ready for the challenge.
- RxJS 1: Debugging
- RxJS 2: About Dollar Signs
- RxJS 3: Cold vs Hot Observables
- RxJS 4: Make Cold Observables Hot
- RxJS 5: RxJS in the wild
- RxJS 9: Testing
Let's continue with Chapter 04 - RxJS (Challenge)