Skip to content

Commit

Permalink
Update DESIGN.md (#6033)
Browse files Browse the repository at this point in the history
I've just read the DESIGN.md and noticed some things that I could do to improve the quality of the DESIGN.md. So as a result of my "proofreading" I mainly:
- Added periods at the ending of some sentences.
- Did case matching of certain types and terms. e.g. `OnSubscribe` -> `onSubscribe` OR flowable -> `Flowable`.

Hope it helps! 😄
  • Loading branch information
fjoshuajr authored and akarnokd committed Jun 1, 2018
1 parent e8156d5 commit f3c8862
Showing 1 changed file with 69 additions and 69 deletions.
138 changes: 69 additions & 69 deletions DESIGN.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ Producer is in charge. Consumer has to do whatever it needs to keep up.

Examples:

- `Observable` (RxJS, Rx.Net, RxJava v1.x without backpressure, RxJava v2)
- Callbacks (the producer calls the function at its convenience)
- IRQ, mouse events, IO interrupts
- 2.x `Flowable` (with `request(n)` credit always granted faster or in larger quantity than producer)
- Reactive Streams `Publisher` (with `request(n)` credit always granted faster or in larger quantity than producer)
- Java 9 `Flow.Publisher` (with `request(n)` credit always granted faster than or in larger quantity producer)
- `Observable` (RxJS, Rx.Net, RxJava v1.x without backpressure, RxJava v2).
- Callbacks (the producer calls the function at its convenience).
- IRQ, mouse events, IO interrupts.
- 2.x `Flowable` (with `request(n)` credit always granted faster or in larger quantity than producer).
- Reactive Streams `Publisher` (with `request(n)` credit always granted faster or in larger quantity than producer).
- Java 9 `Flow.Publisher` (with `request(n)` credit always granted faster than or in larger quantity than producer).


##### Synchronous Interactive/Pull
Expand All @@ -52,11 +52,11 @@ Consumer is in charge. Producer has to do whatever it needs to keep up.

Examples:

- `Iterable`
- 2.x/1.x `Observable` (without concurrency, producer and consumer on the same thread)
- 2.x `Flowable` (without concurrency, producer and consumer on the same thread)
- Reactive Streams `Publisher` (without concurrency, producer and consumer on the same thread)
- Java 9 `Flow.Publisher` (without concurrency, producer and consumer on the same thread)
- `Iterable`.
- 2.x/1.x `Observable` (without concurrency, producer and consumer on the same thread).
- 2.x `Flowable` (without concurrency, producer and consumer on the same thread).
- Reactive Streams `Publisher` (without concurrency, producer and consumer on the same thread).
- Java 9 `Flow.Publisher` (without concurrency, producer and consumer on the same thread).


##### Async Pull (Async Interactive)
Expand All @@ -65,24 +65,24 @@ Consumer requests data when it wishes, and the data is then pushed when the prod

Examples:

- `Future` & `Promise`
- `Single` (lazy `Future`)
- 2.x `Flowable`
- Reactive Streams `Publisher`
- Java 9 `Flow.Publisher`
- 1.x `Observable` (with backpressure)
- `AsyncEnumerable`/`AsyncIterable`
- `Future` & `Promise`.
- `Single` (lazy `Future`).
- 2.x `Flowable`.
- Reactive Streams `Publisher`.
- Java 9 `Flow.Publisher`.
- 1.x `Observable` (with backpressure).
- `AsyncEnumerable`/`AsyncIterable`.

There is an overhead (performance and mental) for achieving this, which is why we also have the 2.x `Observable` without backpressure.


##### Flow Control

Flow control is any mitigation strategies that a consumer applies to reduce the flow of data.
Flow control is any mitigation strategy that a consumer applies to reduce the flow of data.

Examples:

- Controlling the production of data, such as with `Iterator.next` or `Subscription.request(n)`
- Controlling the production of data, such as with `Iterator.next` or `Subscription.request(n)`.
- Preventing the delivery of data, such as buffer, drop, sample/throttle, and debounce.


Expand Down Expand Up @@ -112,14 +112,14 @@ Stream that supports async and synchronous push. It does *not* support interacti

Usable for:

- sync or async
- push
- 0, 1, many or infinite items
- Sync or async.
- Push.
- 0, 1, many or infinite items.

Flow control support:

- buffering, sampling, throttling, windowing, dropping, etc
- temporal and count-based strategies
- Buffering, sampling, throttling, windowing, dropping, etc.
- Temporal and count-based strategies.

*Type Signature*

Expand Down Expand Up @@ -147,23 +147,23 @@ Stream that supports async and synchronous push and pull. It supports interactiv

Usable for:

- pull sources
- push Observables with backpressure strategy (ie. `Observable.toFlowable(onBackpressureStrategy)`)
- sync or async
- 0, 1, many or infinite items
- Pull sources.
- Push Observables with backpressure strategy (i.e. `Observable.toFlowable(onBackpressureStrategy)`).
- Sync or async.
- 0, 1, many or infinite items.

Flow control support:

- buffering, sampling, throttling, windowing, dropping, etc
- temporal and count-based strategies
- `request(n)` consumer demand signal
- for pull-based sources, this allows batched "async pull"
- for push-based sources, this allows backpressure signals to conditionally apply strategies (i.e. drop, first, buffer, sample, fail, etc)
- Buffering, sampling, throttling, windowing, dropping, etc.
- Temporal and count-based strategies.
- `request(n)` consumer demand signal:
- For pull-based sources, this allows batched "async pull".
- For push-based sources, this allows backpressure signals to conditionally apply strategies (i.e. drop, first, buffer, sample, fail, etc.).

You get a flowable from:
You get a `Flowable` from:

- Converting a Observable with a backpressure strategy
- Create from sync/async OnSubscribe API (which participate in backpressure semantics)
- Converting a Observable with a backpressure strategy.
- Create from sync/async `onSubscribe` API (which participate in backpressure semantics).

*Type Signature*

Expand Down Expand Up @@ -191,14 +191,14 @@ Lazy representation of a single response (lazy equivalent of `Future`/`Promise`)

Usable for:

- pull sources
- push sources being windowed or flow controlled (such as `window(1)` or `take(1)`)
- sync or async
- 1 item
- Pull sources.
- Push sources being windowed or flow controlled (such as `window(1)` or `take(1)`).
- Sync or async.
- 1 item.

Flow control:

- Not applicable (don't subscribe if the single response is not wanted)
- Not applicable (don't subscribe if the single response is not wanted).

*Type Signature*

Expand All @@ -219,15 +219,15 @@ interface SingleSubscriber<T> {

##### Completable

Lazy representation of a unit of work that can complete or fail
Lazy representation of a unit of work that can complete or fail.

- Semantic equivalent of `Observable.empty().doOnSubscribe()`.
- Alternative for scenarios often represented with types such as `Single<Void>` or `Observable<Void>`.

Usable for:

- sync or async
- 0 items
- Sync or async.
- 0 items.

*Type Signature*

Expand Down Expand Up @@ -325,9 +325,9 @@ In the addition of the previous rules, an operator for `Flowable`:

### Creation

Unlike RxJava 1.x, 2.x base classes are to be abstract, stateless and generally no longer wrap an `OnSubscribe` callback - this saves allocation in assembly time without limiting the expressiveness. Operator methods and standard factories still live as final on the base classes.
Unlike RxJava 1.x, 2.x base classes are to be abstract, stateless and generally no longer wrap an `onSubscribe` callback - this saves allocation in assembly time without limiting the expressiveness. Operator methods and standard factories still live as final on the base classes.

Instead of the indirection of an `OnSubscribe` and `lift`, operators are to be implemented by extending the base classes. For example, the `map`
Instead of the indirection of an `onSubscribe` and `lift`, operators are to be implemented by extending the base classes. For example, the `map`
operator will look like this:

```java
Expand All @@ -353,36 +353,36 @@ public final class FlowableMap<T, R> extends Flowable<R> {
}
```

Since Java still doesn't have extension methods, "adding" more operators can only happen through helper methods such as `lift(C -> C)` and `compose(R -> P)` where `C` is the default consumer type (i.e., `rs.Subscriber`), `R` is the base type (i.e., `Flowable`) and `P` is the base interface (i.e., `rs.Publisher`). As before, the library itself may gain or lose standard operators and/or overloads through the same community process.
Since Java still doesn't have extension methods, "adding" more operators can only happen through helper methods such as `lift(C -> C)` and `compose(R -> P)` where `C` is the default consumer type (i.e. `rs.Subscriber`), `R` is the base type (i.e. `Flowable`) and `P` is the base interface (i.e. `rs.Publisher`). As before, the library itself may gain or lose standard operators and/or overloads through the same community process.

In concert, `create(OnSubscribe)` will not be available; standard operators extend the base types directly. The conversion of other RS-based libraries will happen through the `Flowable.wrap(Publisher<T>)` static method.
In concert, `create(onSubscribe)` will not be available; standard operators extend the base types directly. The conversion of other RS-based libraries will happen through the `Flowable.wrap(Publisher<T>)` static method.

(*The unfortunate effect of `create` in 1.x was the ignorance of the Observable contract and beginner's first choice as an entry point. We can't eliminate this path since `rs.Publisher` is a single method functional interface that can be implemented just as badly.*)

Therefore, new standard factory methods will try to address the common entry point requirements.

The `Flowable` will contain the following `create` methods:

- `create(SyncGenerator<T, S>)`: safe, synchronous generation of signals, one-by-one
- `create(AsyncOnSubscribe<T, S>)`: batch-create signals based on request patterns
- `create(Consumer<? super FlowEmitter<T>>)`: relay multiple values or error from multi-valued reactive-sources (i.e., button-clicks) while also give flow control options right there (buffer, drop, error, etc.).
- `createSingle(Consumer<? super SingleEmitter<T>>)`: relay a single value or error from other reactive sources (i.e., addListener callbacks)
- `createEmpty(Consumer<? super CompletionEmitter>)`: signal a completion or error from valueless reactive sources
- `create(SyncGenerator<T, S>)`: safe, synchronous generation of signals, one-by-one.
- `create(AsyncOnSubscribe<T, S>)`: batch-create signals based on request patterns.
- `create(Consumer<? super FlowEmitter<T>>)`: relay multiple values or error from multi-valued reactive-sources (i.e. button-clicks) while also give flow control options right there (buffer, drop, error, etc.).
- `createSingle(Consumer<? super SingleEmitter<T>>)`: relay a single value or error from other reactive sources (i.e. addListener callbacks).
- `createEmpty(Consumer<? super CompletionEmitter>)`: signal a completion or error from valueless reactive sources.

The `Observable` will contain the following `create` methods:

- `create(SyncGenerator<T, S>)`: safe, synchronous generation of signals, one-by-one
- `create(Consumer<? super FlowEmitter<T>>)`: relay multiple values or error from multi-valued reactive-sources (i.e., button-clicks) while also give flow control options right there (buffer, drop, error, etc.).
- `createSingle(Consumer<? super SingleEmitter<T>>)`: relay a single value or error from other reactive sources (i.e., addListener callbacks)
- `createEmpty(Consumer<? super CompletionEmitter>)`: signal a completion or error from valueless reactive sources
- `create(SyncGenerator<T, S>)`: safe, synchronous generation of signals, one-by-one.
- `create(Consumer<? super FlowEmitter<T>>)`: relay multiple values or error from multi-valued reactive-sources (i.e. button-clicks) while also give flow control options right there (buffer, drop, error, etc.).
- `createSingle(Consumer<? super SingleEmitter<T>>)`: relay a single value or error from other reactive sources (i.e. addListener callbacks).
- `createEmpty(Consumer<? super CompletionEmitter>)`: signal a completion or error from valueless reactive sources.

The `Single` will contain the following `create` method:

- `create(Consumer<? super SingleEmitter<T>>)`: relay a single value or error from other reactive sources (i.e., addListener callbacks)
- `create(Consumer<? super SingleEmitter<T>>)`: relay a single value or error from other reactive sources (i.e. addListener callbacks).

The `Completable` will contain the following `create` method:

- `create(Consumer<? super CompletionEmitter>)`: signal a completion or error from valueless reactive sources
- `create(Consumer<? super CompletionEmitter>)`: signal a completion or error from valueless reactive sources.


The first two `create` methods take an implementation of an interface which provides state and the generator methods:
Expand Down Expand Up @@ -509,10 +509,10 @@ There are two main levels of operator fusion: *macro* and *micro*.

Macro fusion deals with the higher level view of the operators, their identity and their combination (mostly in the form of subsequence). This is partially an internal affair of the operators, triggered by the downstream operator and may work with several cases. Given an operator application pair `a().b()` where `a` could be a source or an intermediate operator itself, when the application of `b` happens in assembly time, the following can happen:

- `b` identifies `a` and decides to not apply itself. Example: `empty().flatMap()` is functionally a no-op
- `b` identifies `a` and decides to not apply itself. Example: `empty().flatMap()` is functionally a no-op.
- `b` identifies `a` and decides to apply a different, conventional operator. Example: `just().subscribeOn()` is turned into `just().observeOn()`.
- `b` decides to apply a new custom operator, combining and inlining existing behavior. Example: `just().subscribeOn()` internally goes to `ScalarScheduledPublisher`.
- `a` is `b` and the two operator's parameter set can be combined into a single application. Example: `filter(p1).filter(p2)` combined into `filter(p1 && p2)`
- `a` is `b` and the two operator's parameter set can be combined into a single application. Example: `filter(p1).filter(p2)` combined into `filter(p1 && p2)`.

Participating in the macro-fusion externally is possible by implementing a marker interface when extending `Flowable`. Two kinds of interfaces are available:

Expand Down Expand Up @@ -540,7 +540,7 @@ Currently, two main kinds of micro-fusion opportunities are available.

###### 1) Conditional Subscriber

This extends the RS `Subscriber`interface with an extra method: `boolean tryOnNext(T value)` and can help avoiding small request amounts in case an operator didn't forward but dropped the value. The canonical use is for the `filter()` operator where if the predicate returns false, the operator has to request 1 from upstream (since the downstream doesn't know there was a value dropped and thus not request itself). Operators wanting to participate in this fusion have to implement and subscribe with an extended Subscriber interface:
This extends the RS `Subscriber`interface with an extra method: `boolean tryOnNext(T value)` and can help avoiding small request amounts in case an operator didn't forward but dropped the value. The canonical use is for the `filter()` operator where if the predicate returns false, the operator has to request 1 from upstream (since the downstream doesn't know there was a value dropped and thus not request itself). Operators wanting to participate in this fusion have to implement and subscribe with an extended `Subscriber` interface:

```java
interface ConditionalSubscriber<T> {
Expand All @@ -562,9 +562,9 @@ protected void subscribeActual(Subscriber<? super T> s) {

###### 2) Queue-fusion

The second category is when two (or more) operators share the same underlying queue and each append activity at the exit point (i.e., poll()) of the queue. This can work in two modes: synchronous and asynchronous.
The second category is when two (or more) operators share the same underlying queue and each append activity at the exit point (i.e. `poll()`) of the queue. This can work in two modes: synchronous and asynchronous.

In synchronous mode, the elements of the sequence is already available (i.e., a fixed `range()` or `fromArray()`, or can be synchronously calculated in a pull fashion in `fromIterable`. In this mode, the requesting and regular onError-path is bypassed and is forbidden. Sources have to return null from `pull()` and false from `isEmpty()` if they have no more values and throw from these methods if they want to indicate an exceptional case.
In synchronous mode, the elements of the sequence is already available (i.e. a fixed `range()` or `fromArray()`, or can be synchronously calculated in a pull fashion in `fromIterable`. In this mode, the requesting and regular onError-path is bypassed and is forbidden. Sources have to return null from `pull()` and false from `isEmpty()` if they have no more values and throw from these methods if they want to indicate an exceptional case.

In asynchronous mode, elements may become available at any time, therefore, `pull` returning null, as with regular queue-drain, is just the indication of temporary lack of source values. Completion and error still has to go through `onComplete` and `onError` as usual, requesting still happens as usual but when a value is available in the shared queue, it is indicated by an `onNext(null)` call. This can trigger a chain of `drain` calls without moving values in or out of different queues.

Expand All @@ -588,10 +588,10 @@ For performance, the mode is an integer bitflags setup, called early during subs

Since RxJava 2.x is still JDK 6 compatible, the `QueueSubscription` can't itself default unnecessary methods and implementations are required to throw `UnsupportedOperationException` for `Queue` methods other than the following:

- `poll()`
- `isEmpty()`
- `clear()`
- `size()`
- `poll()`.
- `isEmpty()`.
- `clear()`.
- `size()`.

Even though other modern libraries also define this interface, they live in local packages and thus non-reusable without dragging in the whole library. Therefore, until externalized and standardized, cross-library micro-fusion won't happen.

Expand Down

0 comments on commit f3c8862

Please sign in to comment.