Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Public API changes for 1.1.0 release #3550

Merged
merged 1 commit into from
Dec 3, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 9 additions & 84 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ public interface Transformer<T, R> extends Func1<Observable<T>, Observable<R>> {
* @see <a href="http://reactivex.io/documentation/single.html">ReactiveX documentation: Single</a>
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
*/
@Experimental
@Beta
public Single<T> toSingle() {
return new Single<T>(OnSubscribeSingle.create(this));
}
Expand Down Expand Up @@ -1789,9 +1789,8 @@ public final static <T> Observable<T> merge(Observable<? extends Observable<? ex
* @throws IllegalArgumentException
* if {@code maxConcurrent} is less than or equal to 0
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
* @since 1.1.0
*/
@Experimental
@SuppressWarnings({"unchecked", "rawtypes"})
public final static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source, int maxConcurrent) {
if (source.getClass() == ScalarSynchronousObservable.class) {
Expand Down Expand Up @@ -2088,9 +2087,8 @@ public final static <T> Observable<T> merge(Observable<? extends T>[] sequences)
* the maximum number of Observables that may be subscribed to concurrently
* @return an Observable that emits all of the items emitted by the Observables in the Array
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
* @since 1.1.0
*/
@Experimental
public final static <T> Observable<T> merge(Observable<? extends T>[] sequences, int maxConcurrent) {
return merge(from(sequences), maxConcurrent);
}
Expand Down Expand Up @@ -4014,9 +4012,8 @@ public void call(Subscriber<? super T> subscriber) {
* the alternate Observable to subscribe to if the source does not emit any items
* @return an Observable that emits the items emitted by the source Observable or the items of an
* alternate Observable if the source Observable is empty.
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
* @since 1.1.0
*/
@Experimental
public final Observable<T> switchIfEmpty(Observable<? extends T> alternate) {
return lift(new OperatorSwitchIfEmpty<T>(alternate));
}
Expand Down Expand Up @@ -5896,9 +5893,8 @@ public final Observable<T> onBackpressureBuffer() {
*
* @return the source Observable modified to buffer items up to the given capacity
* @see <a href="http://reactivex.io/documentation/operators/backpressure.html">ReactiveX operators documentation: backpressure operators</a>
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
* @since 1.1.0
*/
@Beta
public final Observable<T> onBackpressureBuffer(long capacity) {
return lift(new OperatorOnBackpressureBuffer<T>(capacity));
}
Expand All @@ -5917,9 +5913,8 @@ public final Observable<T> onBackpressureBuffer(long capacity) {
*
* @return the source Observable modified to buffer items up to the given capacity
* @see <a href="http://reactivex.io/documentation/operators/backpressure.html">ReactiveX operators documentation: backpressure operators</a>
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
* @since 1.1.0
*/
@Beta
public final Observable<T> onBackpressureBuffer(long capacity, Action0 onOverflow) {
return lift(new OperatorOnBackpressureBuffer<T>(capacity, onOverflow));
}
Expand All @@ -5941,9 +5936,8 @@ public final Observable<T> onBackpressureBuffer(long capacity, Action0 onOverflo
* @return the source Observable modified to drop {@code onNext} notifications on overflow
* @see <a href="http://reactivex.io/documentation/operators/backpressure.html">ReactiveX operators documentation: backpressure operators</a>
* @Experimental The behavior of this can change at any time.
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
* @since 1.1.0
*/
@Experimental
public final Observable<T> onBackpressureDrop(Action1<? super T> onDrop) {
return lift(new OperatorOnBackpressureDrop<T>(onDrop));
}
Expand All @@ -5968,72 +5962,6 @@ public final Observable<T> onBackpressureDrop() {
return lift(OperatorOnBackpressureDrop.<T>instance());
}

/**
* Instructs an Observable that is emitting items faster than its observer can consume them to
* block the producer thread.
* <p>
* <img width="640" height="245" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/bp.obp.block.png" alt="">
* <p>
* The producer side can emit up to {@code maxQueueLength} onNext elements without blocking, but the
* consumer side considers the amount its downstream requested through {@code Producer.request(n)}
* and doesn't emit more than requested even if more is available. For example, using
* {@code onBackpressureBlock(384).observeOn(Schedulers.io())} will not throw a MissingBackpressureException.
* <p>
* Note that if the upstream Observable does support backpressure, this operator ignores that capability
* and doesn't propagate any backpressure requests from downstream.
* <p>
* Warning! Using a chain like {@code source.onBackpressureBlock().subscribeOn(scheduler)} is prone to
* deadlocks because the consumption of the internal queue is scheduled behind a blocked emission by
* the subscribeOn. In order to avoid this, the operators have to be swapped in the chain:
* {@code source.subscribeOn(scheduler).onBackpressureBlock()} and in general, no subscribeOn operator should follow
* this operator.
*
* @param maxQueueLength the maximum number of items the producer can emit without blocking
* @return the source Observable modified to block {@code onNext} notifications on overflow
* @see <a href="http://reactivex.io/documentation/operators/backpressure.html">ReactiveX operators documentation: backpressure operators</a>
* @Experimental The behavior of this can change at any time.
* @deprecated The operator doesn't work properly with {@link #subscribeOn(Scheduler)} and is prone to
* deadlocks. It will be removed/unavailable starting from 1.1.
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
*/
@Experimental
@Deprecated
public final Observable<T> onBackpressureBlock(int maxQueueLength) {
return lift(new OperatorOnBackpressureBlock<T>(maxQueueLength));
}

/**
* Instructs an Observable that is emitting items faster than its observer can consume them to block the
* producer thread if the number of undelivered onNext events reaches the system-wide ring buffer size.
* <p>
* <img width="640" height="245" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/bp.obp.block.png" alt="">
* <p>
* The producer side can emit up to the system-wide ring buffer size onNext elements without blocking, but
* the consumer side considers the amount its downstream requested through {@code Producer.request(n)}
* and doesn't emit more than requested even if available.
* <p>
* Note that if the upstream Observable does support backpressure, this operator ignores that capability
* and doesn't propagate any backpressure requests from downstream.
* <p>
* Warning! Using a chain like {@code source.onBackpressureBlock().subscribeOn(scheduler)} is prone to
* deadlocks because the consumption of the internal queue is scheduled behind a blocked emission by
* the subscribeOn. In order to avoid this, the operators have to be swapped in the chain:
* {@code source.subscribeOn(scheduler).onBackpressureBlock()} and in general, no subscribeOn operator should follow
* this operator.
*
* @return the source Observable modified to block {@code onNext} notifications on overflow
* @see <a href="http://reactivex.io/documentation/operators/backpressure.html">ReactiveX operators documentation: backpressure operators</a>
* @Experimental The behavior of this can change at any time.
* @deprecated The operator doesn't work properly with {@link #subscribeOn(Scheduler)} and is prone to
* deadlocks. It will be removed/unavailable starting from 1.1.
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
*/
@Experimental
@Deprecated
public final Observable<T> onBackpressureBlock() {
return onBackpressureBlock(rx.internal.util.RxRingBuffer.SIZE);
}

/**
* Instructs an Observable that is emitting items faster than its observer can consume them to
* hold onto the latest value and emit that on request.
Expand All @@ -6050,10 +5978,8 @@ public final Observable<T> onBackpressureBlock() {
* requesting more than 1 from downstream doesn't guarantee a continuous delivery of onNext events.
*
* @return the source Observable modified so that it emits the most recently-received item upon request
* @Experimental The behavior of this can change at any time.
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
* @since 1.1.0
*/
@Experimental
public final Observable<T> onBackpressureLatest() {
return lift(OperatorOnBackpressureLatest.<T>instance());
}
Expand Down Expand Up @@ -8728,9 +8654,8 @@ public final Observable<T> takeWhile(final Func1<? super T, Boolean> predicate)
* condition after each item, and then completes if the condition is satisfied.
* @see <a href="http://reactivex.io/documentation/operators/takeuntil.html">ReactiveX operators documentation: TakeUntil</a>
* @see Observable#takeWhile(Func1)
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
* @since 1.1.0
*/
@Experimental
public final Observable<T> takeUntil(final Func1<? super T, Boolean> stopPredicate) {
return lift(new OperatorTakeUntilPredicate<T>(stopPredicate));
}
Expand Down
7 changes: 4 additions & 3 deletions src/main/java/rx/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,12 @@
import rx.internal.operators.OperatorSubscribeOn;
import rx.internal.operators.OperatorTimeout;
import rx.internal.operators.OperatorZip;

import rx.annotations.Beta;
import rx.internal.producers.SingleDelayedProducer;
import rx.singles.BlockingSingle;
import rx.observers.SafeSubscriber;
import rx.plugins.RxJavaObservableExecutionHook;
import rx.plugins.RxJavaPlugins;
import rx.plugins.*;
import rx.schedulers.Schedulers;
import rx.subscriptions.Subscriptions;

Expand All @@ -69,7 +70,7 @@
* the type of the item emitted by the Single
* @since (If this class graduates from "Experimental" replace this parenthetical with the release number)
*/
@Experimental
@Beta
public class Single<T> {

final Observable.OnSubscribe<T> onSubscribe;
Expand Down
5 changes: 3 additions & 2 deletions src/main/java/rx/SingleSubscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package rx;

import rx.annotations.Experimental;
import rx.annotations.Beta;
import rx.internal.util.SubscriptionList;

/**
Expand All @@ -29,8 +29,9 @@
* @see <a href="http://reactivex.io/documentation/observable.html">ReactiveX documentation: Observable</a>
* @param <T>
* the type of item the SingleSubscriber expects to observe
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
*/
@Experimental
@Beta
public abstract class SingleSubscriber<T> implements Subscription {

private final SubscriptionList cs = new SubscriptionList();
Expand Down
3 changes: 1 addition & 2 deletions src/main/java/rx/exceptions/Exceptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,8 @@ public static Throwable getFinalCause(Throwable e) {
* @param exceptions the collection of exceptions. If null or empty, no exception is thrown.
* If the collection contains a single exception, that exception is either thrown as-is or wrapped into a
* CompositeException. Multiple exceptions are wrapped into a CompositeException.
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
* @since 1.1.0
*/
@Experimental
public static void throwIfAny(List<? extends Throwable> exceptions) {
if (exceptions != null && !exceptions.isEmpty()) {
if (exceptions.size() == 1) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import rx.*;
import rx.Observable.Operator;
import rx.annotations.Experimental;
import rx.exceptions.Exceptions;
import rx.functions.Func1;

Expand All @@ -26,7 +25,6 @@
* the provided predicate returns false
* <p>
*/
@Experimental
public final class OperatorTakeUntilPredicate<T> implements Operator<T, T> {
/** Subscriber returned to the upstream. */
private final class ParentSubscriber extends Subscriber<T> {
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/rx/internal/util/BackpressureDrainManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
/**
* Manages the producer-backpressure-consumer interplay by
* matching up available elements with requested elements and/or
* terminal events.
* terminal events.
*
* @since 1.1.0
*/
@Experimental
public final class BackpressureDrainManager extends AtomicLong implements Producer {
Expand Down
Loading