These are simple extensions to make handling some common Rx flows a bit more simple.
$ npm install --save rx-flow-extensions
The library can be used in three ways
const Rx = require('rxjs/Rx');
const RxFlowExt = require('rx-flow-extensions');
const obs = Rx.Observable.of(1);
RxFlowExt.just(obs, 2)
.then(console.log); // 2
Important! when using functions this way, the first parameter is always the observable
const Rx = require('rxjs/Rx');
const RxFlowExt = require('rx-flow-extensions');
const obs = Rx.Observable.of(1);
RxFlowExt.extend(obs);
obs.just(2)
.then(console.log); // 2
const Rx = require('rxjs/Rx');
const RxFlowExt = require('rx-flow-extensions');
RxFlowExt.extend(Rx.Observable.prototype);
const obs = Rx.Observable.of(1);
obs.just(2)
.then(console.log); // 2
You can reset an extended object (i.e. remove all the added methods) by running
const Rx = require('rxjs/Rx');
const RxFlowExt = require('rx-flow-extensions');
RxFlowExt.extend(Rx.Observable.prototype);
RxFlowExt.reset(Rx.Observable.prototype);
const obs = Rx.Observable.of(1);
obs.just(2) // obs.just is not a function
.then(console.log);
All methods can be called using the 3 options shown above, we'll assume we have extended all observables for all examples.
It will cache the last result provided by the observable for time
milliseconds.
It won't ask the observable for new values during that time
const rest = require('rest');
const mime = require('rest/interceptor/mime');
const client = rest.wrap(mime); // Using cujojs/rest
const slow = Rx.Observable.of('my/slow/service/url') // {value: 1}
.cached(10000)
.map(res => res.value);
slow.subscribe(console.log); // 1 (will call my/slow/service/url)
slow.subscribe(console.log); // 1 (won't call my/slow/service/url)
setTimeout(
() => slow.subscribe(console.log),
11000
); // 1 (after 11000 ms, will call my/slow/service/url)
It will flatten an observable of observables of values into an observable of values
Rx.Observable.range(0, 5)
.map(v => Rx.Observable.of('test'))
.flatten()
.subscribe(console.log); // test! (5 times)
It just maps the emitted observable values to the specified value
Rx.Observable.range(0, 5)
.just('test!')
.subscribe(console.log); // test! (5 times)
It will merge two observers based on a condition (similar to how two tables are merged in a RDBMS).
const obs1 = Rx.Observable.from([
{id: 1, value: 'value a'},
{id: 2, value: 'value b'},
{id: 3, value: 'value c'}
]);
const obs2 = Rx.Observable.from([
{foreignId: 3, value2: 'matches' },
{foreignId: 2, value2: 'matches' },
{foreignId: 4, value2: 'does not match' }
{foreignId: 1, value2: 'matches' },
]);
obs1.matching(obs2, a => a.id, (id, b) => id = b.foreignId)
.subscribe(console.log); /* will emmit:
[{id: 1, value: 'value a'}, {foreignId: 1, value2: 'matches' }]
[{id: 2, value: 'value b'}, {foreignId: 2, value2: 'matches' }]
[{id: 3, value: 'value c'}, {foreignId: 3, value2: 'matches' }]
It will get elements emitted by the observable, waiting interval
milliseconds.
It will stop after maxAttempts
.
This method is useful for polling a web service repeatedly until a condition is met.
const rest = require('rest');
const mime = require('rest/interceptor/mime');
const client = rest.wrap(mime); // Using cujojs/rest
Rx.Observable.of('my/polling_results/service/url') // {results: [...], finished: true|false}
.flatMap(client)
.polling(1000, 5)
.map(res => res.entity)
.takeWhileInclusive(res => !res.finished)
.flatMap(res => Rx.Observable.fromArray(results))
.distinct()
.subscribe(console.log); // result1 result2 result3... After 5 attempts or finished === true
It will take elements while the condition is true, including the element after
the condition fails for the first time (similar effect to do while
loops)
Rx.Observable.range(0, 5)
.takeWhileInclusive(x => x < 3)
.subscribe(console.log); // 0 1 2 3