Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Step-by-step guide for writing a custom operator? #5862

Closed
RobLewis opened this issue Feb 23, 2018 · 6 comments
Closed

Step-by-step guide for writing a custom operator? #5862

RobLewis opened this issue Feb 23, 2018 · 6 comments

Comments

@RobLewis
Copy link

I've been trying, with limited success, to understand the discussion of creating custom 2.x operators. I think I get the section "Operator by extending a base reactive class", but what would really help for the "Operator targeting lift()" discussion is a complete, design-through-usage example of creating an operator. Is this possible?

Suggestion: a simple operator that transforms its input type, perhaps something like .countCharacters() that would accept a String and emit an Integer with the number of characters in the String. If possible, versions with and without backpressure would be extra helpful.

@akarnokd
Copy link
Member

This shouldn't be of any difficulty because the two modes are only a straighforward transformation away:

You write the operator:

final class SomeOp<T> implements FlowableSubscriber<T>, Subscription {
   // etc
}

Optionally turn it into a liftable expression:

FlowableOperator<T, T> op = downstream -> new SomeOp<>(downstream, params);

Flowable.range(1, 5).lift(op).subscribe(System.out::println);

Or turn it into a transformer:

FlowableTransformer<T, T> ft = upstream -> new Flowable<T>() {
    @Override public void subscribeActual(Subscriber<? super T> downstream) {
        upstream.subscribe(new SomeOp<>(downstream, params));
    }
};

Flowable.range(1, 5).compose(ft).subscribe(System.out::println);

or use lifting and a transformer:

FlowableTransformer<T, T> ft2 = upstream -> updstream.lift(op); 

Flowable.range(1, 5).compose(ft2).subscribe(System.out::println);

@akarnokd
Copy link
Member

You could also read my blog and study the RxJava source code.

@RobLewis
Copy link
Author

Thanks, I will certainly study. But sometimes, you know, there's just no substitute for a fully worked-out example.

A large part of my problem is that the documentation of .lift() is essentially incomprehensible to me (it's remarkable that the javadoc literally begins with a warning to the reader against trying to understand it!). And this just isn't much help:

Lifts a function to the current Publisher and returns a new Publisher that when subscribed to will pass the values of the current Publisher through the Operator function.
In other words, this allows chaining Subscribers together on a Publisher for acting on the values within the Publisher.

Could someone please try to explain in simple terms what lift() actually does and how to use it?

Final question: once I have created an operator, how do I "insert" it into the Flowable or other class so that I can reference it like the built-in operators?

@akarnokd
Copy link
Member

lift() allows you to access the downstream's Subscriber during the subscription phase and provide a new Subscriber that will be used in the subscription process going furhter upstream. Generally, this new Subscriber will wrap the downstream's Subscriber and forwards the onNext, onError and onComplete events directly or according to the emission pattern this so-called operator's business logic needs. In addition, such operator can intercept the flow control calls of cancel and request that would have travelled upstream and perform additional actions depending on the same business logic needs. The difficulty is to express such business logic properly. These can be so different that, for example, knowing how map can be implemented won't help you implement observeOn.

it's remarkable that the javadoc literally begins with a warning to the reader against trying to understand it!

No, it warns the user about the complexities of using it and then suggests using existing operators, such as map for your .countCharacters(): map(string -> string.length()).

I have created an operator, how do I "insert" it into the Flowable or other class so that I can reference it like the built-in operators?

You can't add a new operator to Flowable or the other base classes from the outside of RxJava as the Java language lacks the features to do so. You'd need extension methods which is available in Kotlin for example.

Therefore, you either write FlowableTransformers or FlowableOperators and use compose and lift respectively to use your custom operator in a sequence. The RxJava 2 Extensions project demonstrates a lot of custom operators and their usage.

As a final note, I don't suggest you write operators if you are just learning RxJava, the target platform, the Java language or programming itself.

@akarnokd
Copy link
Member

Please see PR #5863 for the suggested expansion of the JavaDoc of lift.

@akarnokd
Copy link
Member

akarnokd commented Mar 2, 2018

Closing via #5863.

@akarnokd akarnokd closed this as completed Mar 2, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants