Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding eager concats to Single #5976

Merged
merged 7 commits into from
Apr 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/main/java/io/reactivex/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
public abstract class Maybe<T> implements MaybeSource<T> {

/**
* Runs multiple Maybe sources and signals the events of the first one that signals (cancelling
* Runs multiple MaybeSources and signals the events of the first one that signals (cancelling
* the rest).
* <dl>
* <dt><b>Scheduler:</b></dt>
Expand All @@ -68,7 +68,7 @@ public static <T> Maybe<T> amb(final Iterable<? extends MaybeSource<? extends T>
}

/**
* Runs multiple Maybe sources and signals the events of the first one that signals (cancelling
* Runs multiple MaybeSources and signals the events of the first one that signals (cancelling
* the rest).
* <dl>
* <dt><b>Scheduler:</b></dt>
Expand Down Expand Up @@ -412,7 +412,7 @@ public static <T> Flowable<T> concatEager(Iterable<? extends MaybeSource<? exten
}

/**
* Concatenates a Publisher sequence of Publishers eagerly into a single stream of values.
* Concatenates a Publisher sequence of MaybeSources eagerly into a single stream of values.
* <p>
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
* emitted source Publishers as they are observed. The operator buffers the values emitted by these
Expand Down
88 changes: 81 additions & 7 deletions src/main/java/io/reactivex/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
public abstract class Single<T> implements SingleSource<T> {

/**
* Runs multiple Single sources and signals the events of the first one that signals (cancelling
* Runs multiple SingleSources and signals the events of the first one that signals (cancelling
* the rest).
* <dl>
* <dt><b>Scheduler:</b></dt>
Expand All @@ -80,7 +80,7 @@ public static <T> Single<T> amb(final Iterable<? extends SingleSource<? extends
}

/**
* Runs multiple Single sources and signals the events of the first one that signals (cancelling
* Runs multiple SingleSources and signals the events of the first one that signals (cancelling
* the rest).
* <dl>
* <dt><b>Scheduler:</b></dt>
Expand All @@ -106,7 +106,7 @@ public static <T> Single<T> ambArray(final SingleSource<? extends T>... sources)
}

/**
* Concatenate the single values, in a non-overlapping fashion, of the Single sources provided by
* Concatenate the single values, in a non-overlapping fashion, of the SingleSources provided by
* an Iterable sequence.
* <dl>
* <dt><b>Backpressure:</b></dt>
Expand All @@ -127,7 +127,7 @@ public static <T> Flowable<T> concat(Iterable<? extends SingleSource<? extends T
}

/**
* Concatenate the single values, in a non-overlapping fashion, of the Single sources provided by
* Concatenate the single values, in a non-overlapping fashion, of the SingleSources provided by
* an Observable sequence.
* <dl>
* <dt><b>Scheduler:</b></dt>
Expand All @@ -147,7 +147,7 @@ public static <T> Observable<T> concat(ObservableSource<? extends SingleSource<?
}

/**
* Concatenate the single values, in a non-overlapping fashion, of the Single sources provided by
* Concatenate the single values, in a non-overlapping fashion, of the SingleSources provided by
* a Publisher sequence.
* <dl>
* <dt><b>Backpressure:</b></dt>
Expand All @@ -169,7 +169,7 @@ public static <T> Flowable<T> concat(Publisher<? extends SingleSource<? extends
}

/**
* Concatenate the single values, in a non-overlapping fashion, of the Single sources provided by
* Concatenate the single values, in a non-overlapping fashion, of the SingleSources provided by
* a Publisher sequence and prefetched by the specified amount.
* <dl>
* <dt><b>Backpressure:</b></dt>
Expand Down Expand Up @@ -299,7 +299,7 @@ public static <T> Flowable<T> concat(
}

/**
* Concatenate the single values, in a non-overlapping fashion, of the Single sources provided in
* Concatenate the single values, in a non-overlapping fashion, of the SingleSources provided in
* an array.
* <dl>
* <dt><b>Backpressure:</b></dt>
Expand All @@ -320,6 +320,80 @@ public static <T> Flowable<T> concatArray(SingleSource<? extends T>... sources)
return RxJavaPlugins.onAssembly(new FlowableConcatMap(Flowable.fromArray(sources), SingleInternalHelper.toFlowable(), 2, ErrorMode.BOUNDARY));
}

/**
* Concatenates a sequence of SingleSource eagerly into a single stream of values.
* <p>
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
* source SingleSources. The operator buffers the value emitted by these SingleSources and then drains them
* in order, each one after the previous one completes.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources a sequence of Single that need to be eagerly concatenated
* @return the new Flowable instance with the specified concatenation behavior
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
@BackpressureSupport(BackpressureKind.FULL)
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Flowable<T> concatArrayEager(SingleSource<? extends T>... sources) {
return Flowable.fromArray(sources).concatMapEager(SingleInternalHelper.<T>toFlowable());
}

/**
* Concatenates a Publisher sequence of SingleSources eagerly into a single stream of values.
* <p>
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
* emitted source Publishers as they are observed. The operator buffers the values emitted by these
* Publishers and then drains them in order, each one after the previous one completes.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>Backpressure is honored towards the downstream and the outer Publisher is
* expected to support backpressure. Violating this assumption, the operator will
* signal {@link io.reactivex.exceptions.MissingBackpressureException}.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources a sequence of Publishers that need to be eagerly concatenated
* @return the new Publisher instance with the specified concatenation behavior
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
@BackpressureSupport(BackpressureKind.FULL)
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Flowable<T> concatEager(Publisher<? extends SingleSource<? extends T>> sources) {
return Flowable.fromPublisher(sources).concatMapEager(SingleInternalHelper.<T>toFlowable());
}

/**
* Concatenates a sequence of SingleSources eagerly into a single stream of values.
* <p>
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
* source SingleSources. The operator buffers the values emitted by these SingleSources and then drains them
* in order, each one after the previous one completes.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>Backpressure is honored towards the downstream.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources a sequence of SingleSource that need to be eagerly concatenated
* @return the new Flowable instance with the specified concatenation behavior
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
@BackpressureSupport(BackpressureKind.FULL)
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Flowable<T> concatEager(Iterable<? extends SingleSource<? extends T>> sources) {
return Flowable.fromIterable(sources).concatMapEager(SingleInternalHelper.<T>toFlowable());
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is another overload you missed:

public static <T> Flowable<T> concatEager(Publisher<? extends SingleSource<? extends T>> sources)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops - thanks for the review, will update shortly

/**
* Provides an API (via a cold Completable) that bridges the reactive world with the callback-style world.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@
package io.reactivex.internal.operators.single;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.util.Arrays;

import io.reactivex.processors.PublishProcessor;
import io.reactivex.subscribers.TestSubscriber;
import org.junit.Test;

import io.reactivex.*;
Expand Down Expand Up @@ -67,6 +70,63 @@ public void concatArray() {
}
}

@SuppressWarnings("unchecked")
@Test
public void concatArrayEagerTest() {
PublishProcessor<String> pp1 = PublishProcessor.create();
PublishProcessor<String> pp2 = PublishProcessor.create();

TestSubscriber<String> ts = Single.concatArrayEager(pp1.single("1"), pp2.single("2")).test();

assertTrue(pp1.hasSubscribers());
assertTrue(pp2.hasSubscribers());

pp2.onComplete();
ts.assertEmpty();
pp1.onComplete();

ts.assertResult("1", "2");
ts.assertComplete();
}

@SuppressWarnings("unchecked")
@Test
public void concatEagerIterableTest() {
PublishProcessor<String> pp1 = PublishProcessor.create();
PublishProcessor<String> pp2 = PublishProcessor.create();

TestSubscriber<String> ts = Single.concatEager(Arrays.asList(pp1.single("2"), pp2.single("1"))).test();

assertTrue(pp1.hasSubscribers());
assertTrue(pp2.hasSubscribers());

pp2.onComplete();
ts.assertEmpty();
pp1.onComplete();

ts.assertResult("2", "1");
ts.assertComplete();
}

@SuppressWarnings("unchecked")
@Test
public void concatEagerPublisherTest() {
PublishProcessor<String> pp1 = PublishProcessor.create();
PublishProcessor<String> pp2 = PublishProcessor.create();

TestSubscriber<String> ts = Single.concatEager(Flowable.just(pp1.single("1"), pp2.single("2"))).test();

assertTrue(pp1.hasSubscribers());
assertTrue(pp2.hasSubscribers());

pp2.onComplete();
ts.assertEmpty();
pp1.onComplete();

ts.assertResult("1", "2");
ts.assertComplete();
}

@SuppressWarnings("unchecked")
@Test
public void concatObservable() {
Expand Down