Fre is an exercise in repeating and expanding on a proven experiment.
The core of Fre is almost the same as RxJS, and serves the same purpose, Fre is a module library which enables Functional Reactive Programming (FRP), with a slight alteration to the Observer/Subscriber signature as well as hosting a more liberal multicasting interface.
In RxJS if you wanted to hook into the unsusbcribe
event you would add a teardown to the returned Subscription
after subscribing, Fre rolls "unsubscribe"
into the core Observer
signature and adds additional .unsubscribe
methods to all Operators (usually suplied as the final argument) allowing for finer control over shared artifacts (this goes against the functional programming manifesto, but can be useful when used with restraint): where RxJS's Observer exposes; .next
, .error
and .complete
- Fre's Observer exposes; .next
, .error
, .complete
and .unsubscribe
.
Fre's multicast
Operators (share
, publish
, etc...) support options for the following additional methods; .onConnect()
, .onDisconnect()
, .onReconnect()
, .onSubscribe()
and .onUnsubscribe()
, which serve to expose the multicasting lifecycle to the outside. They can be used to mirror the refCount
procedure so that the consumer can manage its own clean-up, refCount
has also been remounted as an option as opposed to a seperate Operator
to make usage more direct and explicit.
Fre is written in Javascript (as a feature) rather than Typescript and covers just a subset of the Operators that RxJS covers (click here to see whats included), however because the singature is virtual the same, Fre should be able to consume RxJS operators without the need for any changes.
Please check the tests for more specific implementation details.
npm install @gdixon/fre --save
// import Observable from the root of fre
import { Observable } from "@gdixon/fre"
// create a new Observable and define its Publisher
const observable = new Observable((subscriber) => {
// emit message to the subscriber
subscriber.next("message");
// complete the subscriber inside the publisher
subscriber.complete();
});
// subscribe an Observer to the Observerable (which will pass the Observer (wrapped in a Subscriber) through the Publisher)
observable.subscribe((message) => {
console.log(message);
}, (err) => {
console.log(err);
}, () => {
console.log("completed");
}, () => {
console.log("unsubscribed");
});
/*
logs:
$ message
$ completed
$ unsubscribed
*/
// import Subject from the root of fre
import { Subject } from "@gdixon/fre"
// create a new Subject
const subject = new Subject();
// Publisher can (optionally) be defined after the Observable/Subject has been constructed...
subject.setPublisher(function(subscriber) {
// return a teardown fn to be added to the subscribers teardowns
return () => {
console.log("subscriber teardown");
}
});
// any messages sent before subscription will be lost
subject.next("lost");
// subscribe an Observer to the Subject
subject.subscribe((message) => {
console.log(message);
}, (err) => {
console.log(err);
}, () => {
console.log("completed");
}, () => {
console.log("unsubscribed");
});
// emit message to the subject (and all subscribers)
subject.next("message");
// complete all subscribers which are subscribed to the subject
subject.complete();
/*
logs:
$ message
$ completed
$ subscriber teardown
$ unsubscribed
*/
// import BehaviourSubject from the root of fre
import { BehaviourSubject } from "@gdixon/fre"
// create a BehaviourSubject with an initial value of "message"
const subject = new BehaviourSubject("message");
// Setting a Publisher will overide the internal Publisher and any other previously set Publishers using the *decorator pattern
// *(each new publisher in the chain should invoke or replace the previous one)
subject.setPublisher(function (subscriber, publisher) {
// call to the original publisher (to invoke BehaviourSubjects internal publisher)
publisher.call(this, subscriber);
// return a teardown fn to be added to the subscribers teardowns
return () => {
console.log("subscriber teardown");
}
});
// subscribing to the BehaviourSubject will emit the last value the BehaviourSubject received to the new Subscriber
subject.subscribe((message) => {
console.log(message);
}, (err) => {
console.log(err);
}, () => {
console.log("completed");
}, () => {
console.log("unsubscribed");
});
// complete all subscribers which are subscribed to the subject
subject.complete();
/*
logs:
$ message
$ completed
$ subscriber teardown
$ unsubscribed
*/
// import ReplaySubject from the root of fre
import { ReplaySubject } from "@gdixon/fre"
// create a ReplaySubject (with no buffer invalidation rules)
const subject = new ReplaySubject();
// Setting a Publisher will overide the internal Publisher and any other previously set Publishers using the *decorator pattern
// *(each new publisher in the chain should invoke or replace the previous one)
subject.setPublisher(function (subscriber, publisher) {
// call to the original publisher (to invoke ReplaySubjects internal publisher)
publisher.call(this, subscriber);
// return a teardown fn to be added to the subscribers teardowns
return () => {
console.log("subscriber teardown");
}
});
// subscribing to the ReplaySubject would replay any messages already received (but nothing has been received yet)
subject.subscribe((message) => {
console.log("sub1", message);
}, (err) => {
console.log(err);
}, () => {
console.log("completed1");
}, () => {
console.log("unsubscribed1");
});
// emit message to the subject (and all subscribers) buffering the message for future subscriptions
subject.next("message1");
// Publisher will emit the buffered message
subject.subscribe((message) => {
console.log("sub2", message);
}, (err) => {
console.log(err);
}, () => {
console.log("completed2");
}, () => {
console.log("unsubscribed2");
});
// emit message to the subject (and all subscribers) buffering a second message
subject.next("message2");
// complete all subscribers which are subscribed to the subject
subject.complete();
/*
logs:
$ sub1 message1
$ sub2 message1
$ sub1 message2
$ sub2 message2
$ completed1
$ subscriber teardown
$ unsubscribed1
$ completed2
$ subscriber teardown
$ unsubscribed2
*/
Operators always start with a lowercase and Observable constructors always start with an uppercase
// import Of from Observable creation methods
import { Of } from "@gdixon/fre/observable"
// import map and reduce as Operators
import { map, reduce } from "@gdixon/fre/operator"
// create an Observable which will emit 1-4
const observable = Of(1,2,3,4);
// pipe the values through Operators to build a new message stream
const sumOfPlusOne = observable.pipe(map((value) => {
return value + 1;
}), reduce((carr, value) => {
return carr + value;
}));
// subscribe to the computed stream
sumOfPlusOne.subscribe((value) => {
console.log(value);
});
/*
logs:
$ 14
*/
map, filter and reduce are also aliased against the Observable itself without needing to use the pipe method...
// import Of from Observable creation methods
import { Of } from "@gdixon/fre/observable"
// create an Observable which will emit 1-4
const observable = Of(1,2,3,4);
// pipe the values through Operators to build a new message stream
const sumOfPlusOne = observable.map((value) => {
return value + 1;
}).reduce((carr, value) => {
return carr + value;
});
// subscribe to the computed stream
sumOfPlusOne.subscribe((value) => {
console.log(value);
});
/*
logs:
$ 14
*/
- Observable
- Subject
- ReplaySubject
- BehaviourSubject
- Observer
- Subscriber
- Subscription
- Scheduler
- bucket
- concat
- concatAll
- concatMap
- concatMapTo
- delay
- filter
- first
- groupBy
- last
- map
- mapTo
- merge
- mergeAll
- mergeMap
- mergeMapTo
- multicast
- operator
- pairwise
- publisher
- publishBehaviour
- publishReplay
- reduce
- scan
- share
- shareBehaviour
- shareReplay
- skip
- skipUntil
- skipWhile
- skipWith
- switchFor
- switchAll
- switchMap
- switchMapTo
- take
- takeUntil
- takeWhile
- tap
- toArray
- CombineLatest
- Concat
- Connectable
- ForkJoin
- From
- FromArray
- FromAsyncIterable
- FromCallback
- FromEvent
- FromIterable
- FromObservable
- FromPromise
- Interval
- Merge
- Of
- Switch
- Zip
- Animation
- Asap
- Async
- Queue
npm run test[:watch]
npm run coverage[:watch]
npm run lint[:fix]
npm run build
- We use SemVer for versioning. For available versions, see the tags on this repository.
-
Graham Dixon - Initial work - GDixon
See also the list of contributors who participated in this project.
- This project is licensed under the MIT License - see the license file for details
- RxJS - A reactive programming library for JavaScript (with thanks to Ben Lesh and all RxJS contributors)