Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How can I use OperatorConcat<T>? #3446

Closed
fougere-mike opened this issue Oct 14, 2015 · 5 comments
Closed

How can I use OperatorConcat<T>? #3446

fougere-mike opened this issue Oct 14, 2015 · 5 comments
Labels

Comments

@fougere-mike
Copy link

I'm trying to write an operator then<R,T> which ignores all emissions from the source (except errors), and continues with a second Observable when completed.

Something like:

// Typical Case
Observable.just(1, 2, 3, 4, 5)
   .lift(new then<String, Integer>(Observable.just("Hello")))
   .subscribe(new Action1<String>() {
      @Override
      public void call(String s) {
         // Called once with "Hello"
      });

// Source Errors
Observable.<Integer>error(new RuntimeException())
   .lift(new then<String, Integer>(Observable.just("Hello")))  // <-- the second observable should never be subscribed to since the source error'd
   .subscribe(new Action1<String>() {
      @Override
      public void call(String s) {
         // Not Called

      }, new Action1<Throwable>() {
      @Override
      public void call(Throwable e) {
        System.out.println("Error: "+e); // Should be called with the RuntimeException from above

      }
   });

I've come up with an implementation using ignoreElements + map + concatWith:

    public static <R, T> Observable<? extends R> then(Observable<T> source, Observable<R> other) {
        return source
                .ignoreElements()
                .map(new Func1<T, R>() {
                    @Override
                    public R call(T integer) {
                        return null;
                    }
                }).concatWith(other);
    }

I'm quite new to writing custom operators, and I can't quite figure out how to translate that static function into an operator. I've written a few operators by composing the provided Operator* types, but I'm having trouble with this one.

Any help would be greatly appreciated :)

@JakeWharton
Copy link
Contributor

This is discussed in #3113 and implemented awaiting merge in #3443.

@fougere-mike
Copy link
Author

Ok, well that eliminates the need for my operator, but I'm still interested in how it could be correctly implemented using lift(). I'm sure I'll be writing more operators in the future so I'd like to have a better understanding of how they work.

This is what I've come up with:

/**
 * Ignores all emissions from the source observable. Once the source completes, the provided
 * observable will be subscribed to. If the source errors, the error will terminate the stream and
 * the provided observable will not be subscribed to.
 *
 * @param <T> The type of objects emitted by the source observable.
 * @param <R> The type of objects emitted by the provided `next` observable.
 */
public class then<R, T> implements Observable.Operator<R, T> {
    private final OperatorIgnoreElements<T> mIgnoreElements;
    private final OperatorMap<T, R> mMap;
    private final OperatorConcat<R> mConcat;

    private final Observable<R> mNextObservable;

    public then(Observable<R> nextObservable) {
        assert( nextObservable != null );
        mNextObservable = nextObservable;
        mIgnoreElements = OperatorIgnoreElements.instance();
        mMap = new OperatorMap<>(new Func1<T, R>() {
            @Override
            public R call(T t) {
                return null;
            }
        });
        mConcat = OperatorConcat.instance();
    }

    @Override
    public Subscriber<? super T> call(final Subscriber<? super R> subscriber) {
        // TODO: Compose operators...
    }
}

@akarnokd akarnokd changed the title [Question] How can I use OperatorConcat<T>? How can I use OperatorConcat<T>? Oct 14, 2015
@akarnokd
Copy link
Member

This is "super easy" to implement as an Operator:

public final class Then<T, R> implements Operator<R, T> {
    final Observable<R> other;

    public Then(Observable<R> other) {
        this.other = other;
    }

    @Override
    public Subscriber<? super T> call(final Subscriber<? super R> t) {
        MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();
        t.add(mas);

        Subscriber<T> parent = new Subscriber<T>() {
            @Override
            public void onNext(T t) {
                // ignored
            }

            @Override
            public void onError(Throwable e) {
                t.onError(e);
            }

            @Override
            public void onCompleted() {
                Subscriber<R> tr = Subscribers.wrap(t);
                mas.set(tr);
                other.subscribe(tr);
            }
        };

        mas.set(parent);

        return parent;
    }
}

@fougere-mike
Copy link
Author

Awesome, thanks!

I have a few questions about your solution:

  1. Shouldn't it check if the subscriber has unsubscribed before calling t.onError(e)?
  2. Why is the MultipleAssignmentSubscription needed? I'm a bit of an rx newbie, so it's not clear to me why you can't just return the parent subscriber directly.

Cheers!

@akarnokd
Copy link
Member

akarnokd commented Feb 9, 2016

  1. unnecessary
  2. so the unsubscription from the child can target both the first parent and the second subscriber.

@akarnokd akarnokd closed this as completed Feb 9, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants