Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

4.x: possible new operators based on RxJava's feature set #489

Open
akarnokd opened this issue May 4, 2018 · 18 comments
Open

4.x: possible new operators based on RxJava's feature set #489

akarnokd opened this issue May 4, 2018 · 18 comments
Labels
[area] Rx discussion feature request Request for new features or operators.

Comments

@akarnokd
Copy link
Collaborator

akarnokd commented May 4, 2018

I've collected the operators and overloads of theirs which are available in RxJava but (likely) not in Rx.NET yet to enable discussions about feature synchronization.

Operators

Operator in RxJava Description
combineLatestDelayError Keep combining sources as long as possible, then signal the aggregate exception at the end.
concatDelayError Keep concatenating and signal the aggregate exception at the end.
concatEager Run many or all source observables concurrently, but provide the items from the sources in their original order.
concatEagerDelayError Run many or all source observables concurrently, provide items in order and signal the aggregate exception at the end.
error(Callable) Signal an onError with the generated exception per observer.
fromCallable Invoke a function to generate a single item & completion per observer.
generate Generate events in a callback fashion with an observer-abstraction and per observer state.
intervalRange Produce a range of numbers with an initial delay for the first followed by a periodic signal of the rest.
mergeDelayError Merges some or all sources into one sequence and signals an aggregate exception when all of them terminated.
switchOnNextDelayError Switch to newer sources, signal the aggregate exception at the end.
zip(delayError) Zips sources into one sequence and signals the error after the last row could be combined.
blockingIterable Turn a sequence into a blocking enumerable sequence.
blockingSubscribe Observe on the caller thread in a blocking fashion.
cache Consume the upstream once, cache and replay all items to observers.
collect Aggregate items into a single, custom container per observer with the help of a collector action and signal the container as a single result.
compose Applies a transformation function from observable to observable when the sequence is constructed.
concatMap Maps the upstream items into observables and concatenates them in order; an in-sequence version of concat. Can be implemented more efficiently than concat(map(source, f)).
concatMapDelayError Maps the upstream items into observables and concatenates them in order, signaling an aggregate exception at the end.
concatMapEager Maps the upstream items into observables, runs some or all of them eagerly but emits their items in order; an in-sequence version of concatEager.
concatMapEagerDelayError Maps the upstream items into observables, runs some or all of them eagerly but emits their items in order while signaling an aggregate exception at the end.
concatMapIterable Maps the upstream items into enumerations and emits their items in order; more efficient than concatMapping an Iterable turned into an observable.
doAfterNext Call an action with the current item after it has been emitted to the downstream but before the next value from upstream.
doFinally Perform an action exactly once per observer when it either completes normally, with an error or gets disposed.
doOnDispose Perform an action when the sequence gets disposed.
doOnSubscribe Perform an action when the subscription happens but before items start flowing.
flatMap(delayError) Maps the upstream items into observables, merges them into one sequence and signals the aggregate error at the end. Can be implemented more efficiently than merge(map(source, f)).
forEachWhile Consumes the sequence via a predicate that when returns false, the sequence is disposed.
reduceWith Start with a per observer initial value as the accumulator and apply a function that takes that accumulator, the upstream item and returns a new accumulator value, then emit the last accumulator value as a single result.
repeat(BooleanSupplier) Repeatedly subscribe to a source if the boolean predicate at each completion allows that.
repeatWhen Repeatedly subscribe to a source when after the current completion, a secondary per observer sequence signals an item.
retry(Predicate) Retry a failed sequence if the predicate returns true for the exception
retry(BiPredicate) Retry a failed sequence if the predicate returns true for the exception and the total retry count so far.
retryUntil(BooleanSupplier) Retry a failed sequence if the boolean predicate allows it.
retryWhen Retry a a failed sequence when the secondary per observer sequence signals an item in response to the current failure.
sample(emitLast) Sample the source sequence and optionally emit the very last unsampled item when the sequence completes before the sampler signals again.
scanWith Starts with a per observer generated initial accumulator value and emits a combination of this accumulator and the upstream item which also becomes the new accumulator value.
switchIfEmpty If the main sequence turns out to be empty, a fallback sequence is used to stream events instead.
switchMap Maps the upstream item into an observable and keeps switching to newer ones as those come along, disposing the previous ones. Can be implemented more efficiently than switchOnNext(map(source, f)).
switchMapDelayError Maps the upstream item into an observable and keeps switching to newer ones as those come along, disposing the previous ones and signalling the aggregated exception at the end.
takeUntil(Predicate) Emits items from the upstream and after each, it runs a predicate to determine if the sequence should stop just after the current item.
observeOn(delayError) Observe items on another thread and optionally keep the total event order or have the errors cut ahead.
unsubscribeOn When the sequence is diposed, the dispose call traveling upstream is executed on a scheduler.
withLatestFrom(Observable...) Combines the items of the main source with the latest items of multiple other sources.
autoConnect Automatically connects once to a ConnectableObservable after the specified number of observers have subscribed.
refCount(n, time) Connect to a ConnectableObservable after the specified number of observers have subscribed and/or disconnect after a grace period (under review).

Subjects

RxJava has one additional subject type: UnicastSubject. It buffers items until one observer subscribes some time later and then drains the buffer into the observer.

In addition, Subjects in RxJava have additional state-peeking methods available to all variants:

Method Description
hasObservers() (all subjects) true if there are currently any observers subscribed to the subject.
hasComplete() (all subjects) true if the subject terminated normally.
hasThrowable() (all subjects) true if the subject terminated exceptionally.
getThrowable() (all subjects) Returns the exception if the subject terminated exceptionally.
toSerialized() Makes the onXXX methods threadsafe to call from any thread.
hasValue() true if a BehaviorSubject or ReplaySubject have items in their buffer.
getValue() Returns the current item of a BehaviorSubject.
getValues() Returns the currently buffered items in a ReplaySubject.

I'm not sure how interface evolution works in C#, so assuming it would break System.Reactive.Interfaces users, the methods could be added to a new interface derived from ISubject or just onto the various Rx Subject types.

@ghuntley
Copy link
Member

ghuntley commented May 4, 2018

Here's a bookmark for some of the side-convo that is going on in the Slack channel.

https://reactivex.slack.com/archives/C02B9R3QA/p1525433788000119

I'd appreciate if, over time as we figure out how to work together, less discussions happened on Slack and more on GitHub. It's early days of rebuilding the community, and right now so we don't need much structure but for progressing something like this more formal structure will be required.

We definitely want to be conservative when adding operators and each one will need to be debated on merit in their own separate issue. We aren't ready for separate issues though. Let's use this thread as the baseline for bringing parity and discussions if we even need to do so (vs creating libraries) and how it could be done and when would be best to do it.

You are doing a fantastic job. Thank-you for your help so far. Thank-you for doing this audit @akarnokd.

@RivaCode
Copy link

RivaCode commented May 4, 2018

I would like to propose yet another operator from RxJs, which I find highly useful with UI applications!
exhaustMap - it is a high order observable which, unlike switch (and switchMap) ignores any new values pushed from the upstream until the inner observable has completed.

@akarnokd
Copy link
Collaborator Author

akarnokd commented May 4, 2018

@RivaCode. We have it as flatMapDrop or flatMapLatest in the RxJava 2 Extensions project. I didn't include those because they look as rare to esoteric even in RxJava's quite liberal eyes.

@RivaCode
Copy link

RivaCode commented May 4, 2018

Can't say I agree.

When, for example:
Clicking a button, causes a creation of a resource and you must make sure that operation is atomic.
exhaustMap (or flatMapDrop/flatMapLatest) comes quite handy!

@akarnokd
Copy link
Collaborator Author

akarnokd commented May 4, 2018

As I mentioned in the Slack channel, thanks to C# extension methods, these or any other operator can live in any 3rd party library, outside of Rx.NET. The drawback is the possible difficulty of discovering them, the distrust of being non-standard or naming conflicts between multiple 3rd party extension methods.

@akarnokd
Copy link
Collaborator Author

Added section about subjects in RxJava.

@quinmars
Copy link
Contributor

quinmars commented Jun 4, 2018

Would it be possible to apply the performance improvments of concatMap behind the scences? So that if you write Concat().Select(x => ...) it will use the improved implementation. Similar as it is done for Skip(4).Skip(2), which will be transposed to Skip(6).

@akarnokd
Copy link
Collaborator Author

akarnokd commented Jun 5, 2018

It's Select(x => IObservable).Concat() but you'd have to typematch the Select instance with the appropriate Func<T, IObservable<R>> signature.

@glopesdev
Copy link

I am hesitant about introducing the vocabulary of other Rx variants into the dotnet repo, since it will introduce ambiguity. For example, flatMap already exists, it's called SelectMany, and I don't think introducing aliases will help .NET developers understand the duality between IEnumerable and IObservable.

I think concatMap and switchMap are really useful, giving you more intuition of the possible ways to reduce the sequence, so I think they do have a place in this library. I would align their names with SelectMany though, and call them ConcatMany and SwitchMany.

@quinmars
Copy link
Contributor

@glopesdev It probalby should be ConcatSelect because the counter part of map is Select. Or SelectConcat because flatMap translates to SelectMany and not ManySelect.

@akarnokd I see. So you would need some type variable in the type checks, which are not possible like (pseudo code):

if (source is Select<x, y>.Selector && y is IObservable) ...

@glopesdev
Copy link

glopesdev commented Jun 24, 2018

@quinmars ConcatSelect breaks the correspondence as you pointed out, so it doesn't really help users immediately relate to these variants. SelectConcat could be a reasonable option, but it doesn't sound right because it doesn't preserve the original flavour of SelectMany, which comes from SQL:

from x select one turned into
from x select (many).

In the case of Concat, it feels more intuitive to read:
from x concat (many).

@glopesdev
Copy link

Another way to think about this:

Select doesn't really translate cleanly to Map. It comes from SQL, where it originally meant selecting which columns of the data frame to include in the query result. Gradually it was realized it could be much more powerful and extended to include derived columns created from "mapping" or "transforming" the columns of each row.

In a similar way, the Many in SelectMany doesn't really mean Merge or Flat, but simply indicates that you are selecting multiple rows, rather than just one row. What to do with these rows is left up to imagination. In database "pull" world, there is usually only one way to think about it, because time is irrelevant, so the only thing you can do with Many is "select" them.

In reactive world, there is many more things you can do with Many, you can ConcatMany or SwitchMany, among other possibilities.

@quinmars
Copy link
Contributor

I'm aware of the meaning of SelectMany. But as you said: it's called SelectMany, because you select not only one element but many elements. With Switch you switch to the latest observable, but with SwitchMany you do not switch to many observables. The only difference is that you pass a selector to select the latest obervable. IMO Many does not make any sense here. Same with Concat you always conact many observables, even if the method is only called Concat. ConcatMany would be just confusing.

I see that the concatenation of two verbs like in ConcatSelect or SelectConcat is not ideal, but SwitchMany feels simply wrong. Maybe SelectingSwitch and SelectingConcat? Or do we real need an extra name? Couldn't an additional overload do the same work? Like .Switch(v => v.Observable) or .Concat(async x => await GetAsync(x)).

@glopesdev
Copy link

Couldn't an additional overload do the same work? Like .Switch(v => v.Observable) or .Concat(async x => await GetAsync(x))

I think this really nails it 👍 i very much support the overload idea, no need to confuse people with more names that seem to imply new concepts

@glopesdev
Copy link

Hmmm, although I still agree with @quinmars solution of adding an overload, I wanted to present another perspective into using the Many suffix.

For me, what Many represents is really the selector, as was pointed out. Specifically, Many in this case indicates a constraint on the selector: it cannot be just any selector, in fact it has to be a selector "from one to many", it has to return an IObservable or IEnumerable.

Basically Many represents the qualification on the return type of the selector, while the verb itself represents what you are doing with those Many: in the case of SelectMany you are simply emitting all of them, which is why Merge is used; in the case of ConcatMany or SwitchMany you are applying the Concat or Switch operator to the result of the selector.

I think it's equally valid to think about things this way, and as long as this perspective is made clear, and used consistently, I would still accept ConcatMany and SwitchMany.

The selector overload idea is not too bad, but it will beg the question of why SelectMany is not simply an overload of Merge...

@bartdesmet
Copy link
Collaborator

I think it'd be good to consider new operators on a case-by-case basis. So, I'd be all for seeing a set of issues that propose addition of operators in a more fine-grained manner.

A few answers:

  • We shouldn't change interfaces, e.g. for the ISubject<T> operators, but a base class could be introduced if we feel really strong about such universal additions. However, we could also await C# 8.0's introduction of default interface members and ponder pros and cons of this approach (and evaluate if it'd do the job of course).
  • Overloads to existing operators with extra selector functions and whatnot may be reasonable on a case-by-case basis, provided the default is easy to explain (e.g. SelectMany's shorter overloads really correspond to calling the larger overload with identity or similarly trivial functions).
  • Let's no add vocab aliases for operators. This was attempted before with System.Reactive.Aliases and was a nightmare and highly undesirable (it can only add confusion for people already in the .NET ecosystem to see many more overloads that turn out to be aliases and feel straight unfamiliar). This is a .NET project, and it should align with .NET-isms, vocabulary, and style.
  • Prefer composition over introducing new operators that are merely macros over popular combos of operators. There's possible no end to that slew of additional implementations, and it only has performance benefits if the user wrote it like that (which may not be these case if operators happen to be juxtaposed by composition over IObservable<T> objects are runtime). I'd rather invest in peephole optimizations similar to fusion of Where and Select, or coalescing of Skip and Take operators in LINQ to Objects.

@weitzhandler
Copy link
Contributor

weitzhandler commented Nov 13, 2019

Just bumped into the necessity for a SwitchIfEmpty.
Has been asked on SO too.

Posted separately: #1076.

@akarnokd akarnokd added [area] Rx discussion feature request Request for new features or operators. labels Nov 13, 2019
@Liero
Copy link

Liero commented Mar 2, 2022

What we really need to move forward when adding new extensions is a code analysis of existing repositories to find out, which operators are most used.

BTW, I would love to see feature parity with Rx.JS rather than Rx.Java as they are used much often.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
[area] Rx discussion feature request Request for new features or operators.
Projects
None yet
Development

No branches or pull requests

8 participants