Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement Maybe.switchIfEmpty(Single) #5582

Merged
merged 3 commits into from
Sep 4, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions src/main/java/io/reactivex/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -2213,7 +2213,7 @@ public final Single<Long> count() {
@SchedulerSupport(SchedulerSupport.NONE)
public final Maybe<T> defaultIfEmpty(T defaultItem) {
ObjectHelper.requireNonNull(defaultItem, "item is null");
return switchIfEmpty(just(defaultItem));
return switchIfEmpty(Single.just(defaultItem));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this changed, the original just should still work?!

Copy link
Contributor Author

@bmaslakov bmaslakov Sep 2, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course the original just still works. Just thought that it would be better to wrap the T defaultItem into Single instead of Maybe because the defaultItem cannot be empty.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please restore the original state. The less change to existing and practically unrelated code regarding the PR the better.

}


Expand Down Expand Up @@ -3817,7 +3817,30 @@ public final <E extends MaybeObserver<? super T>> E subscribeWith(E observer) {
@SchedulerSupport(SchedulerSupport.NONE)
public final Maybe<T> switchIfEmpty(MaybeSource<? extends T> other) {
ObjectHelper.requireNonNull(other, "other is null");
return RxJavaPlugins.onAssembly(new MaybeSwitchIfEmpty<T>(this, other));
return RxJavaPlugins.onAssembly(new MaybeSwitchIfEmptyToMaybe<T>(this, other));
}

/**
* Returns a Maybe that emits the items emitted by the source Maybe or the item of an alternate
* SingleSource if the current Maybe is empty.
* <p>
* <img width="640" height="445" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/switchifempty.m.png" alt="">
* <p/>
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code switchIfEmpty} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param other
* the alternate SingleSource to subscribe to if the main does not emit any items
* @return a Maybe that emits the items emitted by the source Maybe or the item of an
* alternate SingleSource if the source Maybe is empty.
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add the appropriate experimental tags and annotations:

@since 2.1.4 - experimental

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add @Experimental

public final Maybe<T> switchIfEmpty(SingleSource<? extends T> other) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue mentioned Single as return type.

ObjectHelper.requireNonNull(other, "other is null");
return RxJavaPlugins.onAssembly(new MaybeSwitchIfEmptyToSingle<T>(this, other));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
*
* @param <T> the value type
*/
public final class MaybeSwitchIfEmpty<T> extends AbstractMaybeWithUpstream<T, T> {
public final class MaybeSwitchIfEmptyToMaybe<T> extends AbstractMaybeWithUpstream<T, T> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd keep the original name, less change per PR, and name the new version MaybeSwitchIfEmptySingle.


final MaybeSource<? extends T> other;

public MaybeSwitchIfEmpty(MaybeSource<T> source, MaybeSource<? extends T> other) {
public MaybeSwitchIfEmptyToMaybe(MaybeSource<T> source, MaybeSource<? extends T> other) {
super(source);
this.other = other;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators.maybe;

import io.reactivex.MaybeObserver;
import io.reactivex.MaybeSource;
import io.reactivex.SingleObserver;
import io.reactivex.SingleSource;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.disposables.DisposableHelper;

import java.util.concurrent.atomic.AtomicReference;

/**
* Subscribes to the other source if the main source is empty.
*
* @param <T> the value type
*/
public final class MaybeSwitchIfEmptyToSingle<T> extends AbstractMaybeWithUpstream<T, T> {

final SingleSource<? extends T> other;

public MaybeSwitchIfEmptyToSingle(MaybeSource<T> source, SingleSource<? extends T> other) {
super(source);
this.other = other;
}

@Override
protected void subscribeActual(MaybeObserver<? super T> observer) {
source.subscribe(new SwitchIfEmptyMaybeObserver<T>(observer, other));
}

static final class SwitchIfEmptyMaybeObserver<T>
extends AtomicReference<Disposable>
implements MaybeObserver<T>, Disposable {

private static final long serialVersionUID = 4603919676453758899L;

final MaybeObserver<? super T> actual;

final SingleSource<? extends T> other;

SwitchIfEmptyMaybeObserver(MaybeObserver<? super T> actual, SingleSource<? extends T> other) {
this.actual = actual;
this.other = other;
}

@Override
public void dispose() {
DisposableHelper.dispose(this);
}

@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}

@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.setOnce(this, d)) {
actual.onSubscribe(this);
}
}

@Override
public void onSuccess(T value) {
actual.onSuccess(value);
}

@Override
public void onError(Throwable e) {
actual.onError(e);
}

@Override
public void onComplete() {
Disposable d = get();
if (d != DisposableHelper.DISPOSED) {
if (compareAndSet(d, null)) {
other.subscribe(new OtherSingleObserver<T>(actual, this));
}
}
}

static final class OtherSingleObserver<T> implements SingleObserver<T> {

final MaybeObserver<? super T> actual;

final AtomicReference<Disposable> parent;
OtherSingleObserver(MaybeObserver<? super T> actual, AtomicReference<Disposable> parent) {
this.actual = actual;
this.parent = parent;
}
@Override
public void onSubscribe(Disposable d) {
DisposableHelper.setOnce(parent, d);
}
@Override
public void onSuccess(T value) {
actual.onSuccess(value);
}
@Override
public void onError(Throwable e) {
actual.onError(e);
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import io.reactivex.processors.PublishProcessor;
import io.reactivex.schedulers.Schedulers;

public class MaybeSwitchIfEmptyTest {
public class MaybeSwitchIfEmptyToMaybeTest {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as before, please keep the original name to this test.


@Test
public void nonEmpty() {
Expand All @@ -36,16 +36,6 @@ public void empty() {
Maybe.<Integer>empty().switchIfEmpty(Maybe.just(2)).test().assertResult(2);
}

@Test
public void defaultIfEmptyNonEmpty() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why were these removed?

Maybe.just(1).defaultIfEmpty(2).test().assertResult(1);
}

@Test
public void defaultIfEmptyEmpty() {
Maybe.<Integer>empty().defaultIfEmpty(2).test().assertResult(2);
}

@Test
public void error() {
Maybe.<Integer>error(new TestException()).switchIfEmpty(Maybe.just(2))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators.maybe;

import io.reactivex.Maybe;
import io.reactivex.Single;
import io.reactivex.TestHelper;
import io.reactivex.exceptions.TestException;
import io.reactivex.functions.Function;
import io.reactivex.observers.TestObserver;
import io.reactivex.processors.PublishProcessor;
import io.reactivex.schedulers.Schedulers;
import org.junit.Test;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

public class MaybeSwitchIfEmptyToSingleTest {

@Test
public void nonEmpty() {
Maybe.just(1).switchIfEmpty(Single.just(2)).test().assertResult(1);
}

@Test
public void empty() {
Maybe.<Integer>empty().switchIfEmpty(Single.just(2)).test().assertResult(2);
}

@Test
public void defaultIfEmptyNonEmpty() {
Maybe.just(1).defaultIfEmpty(2).test().assertResult(1);
}

@Test
public void defaultIfEmptyEmpty() {
Maybe.<Integer>empty().defaultIfEmpty(2).test().assertResult(2);
}

@Test
public void error() {
Maybe.<Integer>error(new TestException()).switchIfEmpty(Single.just(2))
.test().assertFailure(TestException.class);
}

@Test
public void errorOther() {
Maybe.empty().switchIfEmpty(Single.<Integer>error(new TestException()))
.test().assertFailure(TestException.class);
}

@Test
public void dispose() {
PublishProcessor<Integer> pp = PublishProcessor.create();

TestObserver<Integer> ts = pp.singleElement().switchIfEmpty(Single.just(2)).test();

assertTrue(pp.hasSubscribers());

ts.cancel();

assertFalse(pp.hasSubscribers());
}


@Test
public void isDisposed() {
PublishProcessor<Integer> pp = PublishProcessor.create();

TestHelper.checkDisposed(pp.singleElement().switchIfEmpty(Single.just(2)));
}

@Test
public void doubleOnSubscribe() {
TestHelper.checkDoubleOnSubscribeMaybe(new Function<Maybe<Integer>, Maybe<Integer>>() {
@Override
public Maybe<Integer> apply(Maybe<Integer> f) throws Exception {
return f.switchIfEmpty(Single.just(2));
}
});
}

@Test
public void emptyCancelRace() {
for (int i = 0; i < 500; i++) {
final PublishProcessor<Integer> pp = PublishProcessor.create();

final TestObserver<Integer> ts = pp.singleElement().switchIfEmpty(Single.just(2)).test();

Runnable r1 = new Runnable() {
@Override
public void run() {
pp.onComplete();
}
};

Runnable r2 = new Runnable() {
@Override
public void run() {
ts.cancel();
}
};

TestHelper.race(r1, r2, Schedulers.single());
}
}
}