Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Add materialize() and dematerialize() #6278

Merged
merged 4 commits into from
Nov 6, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/main/java/io/reactivex/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1794,6 +1794,7 @@ public final Completable lift(final CompletableOperator onLift) {
* @param <T> the intended target element type of the notification
* @return the new Single instance
* @since 2.2.4 - experimental
* @see Single#dematerialize(Function)
*/
@Experimental
@CheckReturnValue
Expand Down
1 change: 1 addition & 0 deletions src/main/java/io/reactivex/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -3388,6 +3388,7 @@ public final <R> Maybe<R> map(Function<? super T, ? extends R> mapper) {
* </dl>
* @return the new Single instance
* @since 2.2.4 - experimental
* @see Single#dematerialize(Function)
*/
@Experimental
@CheckReturnValue
Expand Down
30 changes: 15 additions & 15 deletions src/main/java/io/reactivex/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -2307,37 +2307,36 @@ public final Single<T> delaySubscription(long time, TimeUnit unit, Scheduler sch
* {@code onSuccess}, {@code onError} or {@code onComplete} signals as a
* {@link Maybe} source.
* <p>
* Note that {@code this} should be of type {@code Single<Notification<T>>} or
* the transformation will result in an {@code onError} signal of
* {@link ClassCastException}. Currently, the Java language doesn't allow specifying
* methods for certain type argument shapes only (unlike extension methods would),
* hence the forced casting in this operator.
* <p>
* In addition, usually the inner value type (T2) has to be expressed again via
* a type argument on this method (see example below).
* The intended use of the {@code selector} function is to perform a
* type-safe identity mapping (see example) on a source that is already of type
* {@code Notification<T>}. The Java language doesn't allow
* limiting instance methods to a certain generic argument shape, therefore,
* a function is used to ensure the conversion remains type safe.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code dematerialize} does by default subscribe to the current Single
* on the {@link Scheduler} you provided, after the delay.</dd>
* <dd>{@code dematerialize} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* <p>
* Example:
* <pre><code>
* Single.just(Notification.createOnNext(1))
* .&lt;Integer&gt;dematerialize()
* .dematerialize(notification -&gt; notification)
* .test()
* .assertResult(1);
* </code></pre>
* @param <T2> the type inside the Notification
* @param <R> the result type
* @param selector the function called with the success item and should
* return a {@link Notification} instance.
* @return the new Maybe instance
* @since 2.2.4 - experimental
* @see #materialize()
*/
@SuppressWarnings("unchecked")
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@Experimental
public final <T2> Maybe<T2> dematerialize() {
return RxJavaPlugins.onAssembly(new SingleDematerialize<T2>((Single<Object>)this));
public final <R> Maybe<R> dematerialize(Function<? super T, Notification<R>> selector) {
ObjectHelper.requireNonNull(selector, "selector is null");
return RxJavaPlugins.onAssembly(new SingleDematerialize<T, R>(this, selector));
}

/**
Expand Down Expand Up @@ -2920,6 +2919,7 @@ public final <R> Single<R> map(Function<? super T, ? extends R> mapper) {
* </dl>
* @return the new Single instance
* @since 2.2.4 - experimental
* @see #dematerialize(Function)
*/
@Experimental
@CheckReturnValue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,47 @@
import io.reactivex.*;
import io.reactivex.annotations.Experimental;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.Function;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.functions.ObjectHelper;

/**
* Maps the Notification success value of the source back to normal
* onXXX method call as a Maybe.
* @param <T> the element type of the notification and result
* Maps the success value of the source to a Notification, then
* maps it back to the corresponding signal type.
* @param <T> the element type of the source
* @param <R> the element type of the Notification and result
* @since 2.2.4 - experimental
*/
@Experimental
public final class SingleDematerialize<T> extends Maybe<T> {
public final class SingleDematerialize<T, R> extends Maybe<R> {

final Single<Object> source;
final Single<T> source;

public SingleDematerialize(Single<Object> source) {
final Function<? super T, Notification<R>> selector;

public SingleDematerialize(Single<T> source, Function<? super T, Notification<R>> selector) {
this.source = source;
this.selector = selector;
}

@Override
protected void subscribeActual(MaybeObserver<? super T> observer) {
source.subscribe(new DematerializeObserver<T>(observer));
protected void subscribeActual(MaybeObserver<? super R> observer) {
source.subscribe(new DematerializeObserver<T, R>(observer, selector));
}

static final class DematerializeObserver<T> implements SingleObserver<Object>, Disposable {
static final class DematerializeObserver<T, R> implements SingleObserver<T>, Disposable {

final MaybeObserver<? super R> downstream;

final MaybeObserver<? super T> downstream;
final Function<? super T, Notification<R>> selector;

Disposable upstream;

DematerializeObserver(MaybeObserver<? super T> downstream) {
DematerializeObserver(MaybeObserver<? super R> downstream,
Function<? super T, Notification<R>> selector) {
this.downstream = downstream;
this.selector = selector;
}

@Override
Expand All @@ -67,19 +78,22 @@ public void onSubscribe(Disposable d) {
}

@Override
public void onSuccess(Object t) {
if (t instanceof Notification) {
@SuppressWarnings("unchecked")
Notification<T> notification = (Notification<T>)t;
if (notification.isOnNext()) {
downstream.onSuccess(notification.getValue());
} else if (notification.isOnComplete()) {
downstream.onComplete();
} else {
downstream.onError(notification.getError());
}
public void onSuccess(T t) {
Notification<R> notification;

try {
notification = ObjectHelper.requireNonNull(selector.apply(t), "The selector returned a null Notification");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
downstream.onError(ex);
return;
}
if (notification.isOnNext()) {
downstream.onSuccess(notification.getValue());
} else if (notification.isOnComplete()) {
downstream.onComplete();
} else {
downstream.onError(new ClassCastException("io.reactivex.Notification expected but got " + t.getClass()));
downstream.onError(notification.getError());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,62 +18,90 @@
import io.reactivex.*;
import io.reactivex.exceptions.TestException;
import io.reactivex.functions.Function;
import io.reactivex.internal.functions.Functions;
import io.reactivex.subjects.SingleSubject;

public class SingleDematerializeTest {

@Test
public void success() {
Single.just(Notification.createOnNext(1))
.<Integer>dematerialize()
.dematerialize(Functions.<Notification<Integer>>identity())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Too bad the identity function is inside an internal package :(

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose most users now can use lambdas in their project so this is just v -> v for them. Only we are still so unlucky to use such helper methods.

.test()
.assertResult(1);
}

@Test
public void empty() {
Single.just(Notification.createOnComplete())
.<Integer>dematerialize()
Single.just(Notification.<Integer>createOnComplete())
.dematerialize(Functions.<Notification<Integer>>identity())
.test()
.assertResult();
}

@Test
public void error() {
Single.error(new TestException())
.<Integer>dematerialize()
Single.<Notification<Integer>>error(new TestException())
.dematerialize(Functions.<Notification<Integer>>identity())
.test()
.assertFailure(TestException.class);
}

@Test
public void errorNotification() {
Single.just(Notification.createOnError(new TestException()))
.<Integer>dematerialize()
Single.just(Notification.<Integer>createOnError(new TestException()))
.dematerialize(Functions.<Notification<Integer>>identity())
.test()
.assertFailure(TestException.class);
}

@Test
public void doubleOnSubscribe() {
TestHelper.checkDoubleOnSubscribeSingleToMaybe(new Function<Single<Object>, MaybeSource<Object>>() {
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public MaybeSource<Object> apply(Single<Object> v) throws Exception {
return v.dematerialize();
return v.dematerialize((Function)Functions.identity());
}
});
}

@Test
public void dispose() {
TestHelper.checkDisposed(SingleSubject.create().dematerialize());
TestHelper.checkDisposed(SingleSubject.<Notification<Integer>>create().dematerialize(Functions.<Notification<Integer>>identity()));
}

@Test
public void wrongType() {
Single.just(1)
.<String>dematerialize()
public void selectorCrash() {
Single.just(Notification.createOnNext(1))
.dematerialize(new Function<Notification<Integer>, Notification<Integer>>() {
@Override
public Notification<Integer> apply(Notification<Integer> v) throws Exception {
throw new TestException();
}
})
.test()
.assertFailure(TestException.class);
}

@Test
public void selectorNull() {
Single.just(Notification.createOnNext(1))
.dematerialize(Functions.justFunction((Notification<Integer>)null))
.test()
.assertFailure(NullPointerException.class);
}

@Test
public void selectorDifferentType() {
Single.just(Notification.createOnNext(1))
.dematerialize(new Function<Notification<Integer>, Notification<String>>() {
@Override
public Notification<String> apply(Notification<Integer> v) throws Exception {
return Notification.createOnNext("Value-" + 1);
}
})
.test()
.assertFailure(ClassCastException.class);
.assertResult("Value-1");
}
}