diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index c05fab1448..b952c7b5a7 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -12509,6 +12509,35 @@ public final Flowable startWithArray(T... items) { return concatArray(fromArray, this); } + /** + * Ensures that the event flow between the upstream and downstream follow + * the Reactive-Streams 1.0 specification by honoring the 3 additional rules + * (which are omitted in standard operators due to performance reasons). + * + * In addition, if rule §2.12 (onSubscribe must be called at most once) is violated, + * the sequence is cancelled an onError(IllegalStateException) is emitted. + *
+ *
Backpressure:
+ *
The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s backpressure + * behavior.
+ *
Scheduler:
+ *
{@code strict} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @return the new Flowable instance + * @since 2.0.5 - experimental + */ + @BackpressureSupport(BackpressureKind.PASS_THROUGH) + @SchedulerSupport(SchedulerSupport.NONE) + @Experimental + @CheckReturnValue + public final Flowable strict() { + return RxJavaPlugins.onAssembly(new FlowableStrict(this)); + } + /** * Subscribes to a Publisher and ignores {@code onNext} and {@code onComplete} emissions. *

diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableStrict.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableStrict.java new file mode 100644 index 0000000000..039c13e0c5 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableStrict.java @@ -0,0 +1,122 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.flowable; + +import java.util.concurrent.atomic.*; + +import org.reactivestreams.*; + +import io.reactivex.internal.subscriptions.SubscriptionHelper; +import io.reactivex.internal.util.*; + +/** + * Ensures that the event flow between the upstream and downstream follow + * the Reactive-Streams 1.0 specification by honoring the 3 additional rules + * (which are omitted in standard operators due to performance reasons). + *

    + *
  • §1.3: onNext should not be called concurrently until onSubscribe returns
  • + *
  • §2.3: onError or onComplete must not call cancel
  • + *
  • §3.9: negative requests should emit an onError(IllegalArgumentException)
  • + *
+ * In addition, if rule §2.12 (onSubscribe must be called at most once) is violated, + * the sequence is cancelled an onError(IllegalStateException) is emitted. + * @param the value type + */ +public final class FlowableStrict extends AbstractFlowableWithUpstream { + + public FlowableStrict(Publisher source) { + super(source); + } + + @Override + protected void subscribeActual(Subscriber s) { + source.subscribe(new StrictSubscriber(s)); + } + + static final class StrictSubscriber + extends AtomicInteger + implements Subscriber, Subscription { + + private static final long serialVersionUID = -4945028590049415624L; + + final Subscriber actual; + + final AtomicThrowable error; + + final AtomicLong requested; + + final AtomicReference s; + + final AtomicBoolean once; + + volatile boolean done; + + StrictSubscriber(Subscriber actual) { + this.actual = actual; + this.error = new AtomicThrowable(); + this.requested = new AtomicLong(); + this.s = new AtomicReference(); + this.once = new AtomicBoolean(); + } + + @Override + public void request(long n) { + if (n <= 0) { + cancel(); + onError(new IllegalArgumentException("§3.9 violated: positive request amount required but it was " + n)); + } else { + SubscriptionHelper.deferredRequest(s, requested, n); + } + } + + @Override + public void cancel() { + if (!done) { + SubscriptionHelper.cancel(s); + } + } + + @Override + public void onSubscribe(Subscription s) { + if (once.compareAndSet(false, true)) { + + actual.onSubscribe(this); + + SubscriptionHelper.deferredSetOnce(this.s, requested, s); + } else { + s.cancel(); + cancel(); + onError(new IllegalStateException("§2.12 violated: onSubscribe must be called at most once")); + } + } + + @Override + public void onNext(T t) { + HalfSerializer.onNext(actual, t, this, error); + } + + @Override + public void onError(Throwable t) { + done = true; + HalfSerializer.onError(actual, t, this, error); + } + + @Override + public void onComplete() { + done = true; + HalfSerializer.onComplete(actual, this, error); + } + } + +} diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableStrictTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableStrictTest.java new file mode 100644 index 0000000000..01825e4105 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableStrictTest.java @@ -0,0 +1,236 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.flowable; + +import static org.junit.Assert.*; + +import java.util.*; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; +import org.reactivestreams.*; + +import io.reactivex.Flowable; +import io.reactivex.exceptions.TestException; +import io.reactivex.internal.subscriptions.BooleanSubscription; +import io.reactivex.schedulers.Schedulers; +import io.reactivex.subscribers.TestSubscriber; + +public class FlowableStrictTest { + + @Test + public void empty() { + Flowable.empty() + .strict() + .test() + .assertResult(); + } + + @Test + public void just() { + Flowable.just(1) + .strict() + .test() + .assertResult(1); + } + + @Test + public void range() { + Flowable.range(1, 5) + .strict() + .test() + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void take() { + Flowable.range(1, 5) + .take(2) + .strict() + .test() + .assertResult(1, 2); + } + + @Test + public void backpressure() { + Flowable.range(1, 5) + .strict() + .test(0) + .assertEmpty() + .requestMore(1) + .assertValue(1) + .requestMore(2) + .assertValues(1, 2, 3) + .requestMore(2) + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void error() { + Flowable.error(new TestException()) + .strict() + .test() + .assertFailure(TestException.class); + } + + @Test + public void observeOn() { + Flowable.range(1, 5) + .hide() + .observeOn(Schedulers.single()) + .strict() + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void invalidRequest() { + for (int i = 0; i > -100; i--) { + final int j = i; + final List items = new ArrayList(); + + Flowable.range(1, 2) + .strict() + .subscribe(new Subscriber() { + @Override + public void onSubscribe(Subscription s) { + s.request(j); + } + + @Override + public void onNext(Integer t) { + items.add(t); + } + + @Override + public void onError(Throwable t) { + items.add(t); + } + + @Override + public void onComplete() { + items.add("Done"); + } + }); + + assertTrue(items.toString(), items.size() == 1); + assertTrue(items.toString(), items.get(0) instanceof IllegalArgumentException); + assertTrue(items.toString(), items.get(0).toString().contains("§3.9")); + } + } + + @Test + public void doubleOnSubscribe() { + final BooleanSubscription bs1 = new BooleanSubscription(); + final BooleanSubscription bs2 = new BooleanSubscription(); + + TestSubscriber ts = Flowable.fromPublisher(new Publisher() { + @Override + public void subscribe(Subscriber p) { + p.onSubscribe(bs1); + p.onSubscribe(bs2); + } + }) + .strict() + .test() + .assertFailure(IllegalStateException.class); + + assertTrue(bs1.isCancelled()); + assertTrue(bs2.isCancelled()); + + String es = ts.errors().get(0).toString(); + assertTrue(es, es.contains("§2.12")); + } + + @Test + public void noCancelOnComplete() { + final BooleanSubscription bs = new BooleanSubscription(); + + Flowable.fromPublisher(new Publisher() { + @Override + public void subscribe(Subscriber p) { + p.onSubscribe(bs); + p.onComplete(); + } + }) + .strict() + .subscribe(new Subscriber() { + + Subscription s; + + @Override + public void onSubscribe(Subscription s) { + this.s = s; + } + + @Override + public void onNext(Object t) { + // not called + } + + @Override + public void onError(Throwable t) { + // not called + } + + @Override + public void onComplete() { + s.cancel(); + } + }); + + assertFalse(bs.isCancelled()); + } + + @Test + public void noCancelOnError() { + final BooleanSubscription bs = new BooleanSubscription(); + + Flowable.fromPublisher(new Publisher() { + @Override + public void subscribe(Subscriber p) { + p.onSubscribe(bs); + p.onError(new TestException()); + } + }) + .strict() + .subscribe(new Subscriber() { + + Subscription s; + + @Override + public void onSubscribe(Subscription s) { + this.s = s; + } + + @Override + public void onNext(Object t) { + // not called + } + + @Override + public void onError(Throwable t) { + s.cancel(); + } + + @Override + public void onComplete() { + // not called + } + }); + + assertFalse(bs.isCancelled()); + } +} diff --git a/src/test/java/io/reactivex/tck/DelayTckTest.java b/src/test/java/io/reactivex/tck/DelayTckTest.java index a9615e0027..cb5b96438e 100644 --- a/src/test/java/io/reactivex/tck/DelayTckTest.java +++ b/src/test/java/io/reactivex/tck/DelayTckTest.java @@ -26,9 +26,7 @@ public class DelayTckTest extends BaseTck { @Override public Publisher createPublisher(long elements) { return FlowableTck.wrap( - FlowableAwaitOnSubscribeTck.wrap( - Flowable.range(0, (int)elements).delay(1, TimeUnit.MILLISECONDS) - ) + Flowable.range(0, (int)elements).delay(1, TimeUnit.MILLISECONDS) ); } } diff --git a/src/test/java/io/reactivex/tck/FlowableAwaitOnSubscribeTck.java b/src/test/java/io/reactivex/tck/FlowableAwaitOnSubscribeTck.java deleted file mode 100644 index 6e246f17f0..0000000000 --- a/src/test/java/io/reactivex/tck/FlowableAwaitOnSubscribeTck.java +++ /dev/null @@ -1,122 +0,0 @@ -/** - * Copyright 2016 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is - * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See - * the License for the specific language governing permissions and limitations under the License. - */ - -package io.reactivex.tck; - -import java.util.concurrent.atomic.*; - -import org.reactivestreams.*; - -import io.reactivex.Flowable; -import io.reactivex.internal.functions.ObjectHelper; -import io.reactivex.internal.subscriptions.SubscriptionHelper; -import io.reactivex.internal.util.BackpressureHelper; - -/** - * Intercepts the onSubscribe call and makes sure calls to Subscription methods - * only happen after the child Subscriber has returned from its onSubscribe method. - * - *

This helps with child Subscribers that don't expect a recursive call from - * onSubscribe into their onNext because, for example, they request immediately from - * their onSubscribe but don't finish their preparation before that and onNext - * runs into a half-prepared state. This can happen with non Rx mentality based Subscribers. - * - * @param the value type - */ -public final class FlowableAwaitOnSubscribeTck extends Flowable { - - final Publisher source; - - FlowableAwaitOnSubscribeTck(Publisher source) { - this.source = source; - } - - public static Flowable wrap(Publisher source) { - return new FlowableAwaitOnSubscribeTck(ObjectHelper.requireNonNull(source, "source is null")); - } - - @Override - protected void subscribeActual(Subscriber s) { - source.subscribe(new PublisherPostOnSubscribeSubscriber(s)); - } - - static final class PublisherPostOnSubscribeSubscriber - extends AtomicReference - implements Subscriber, Subscription { - - private static final long serialVersionUID = -4850665729904103852L; - - final Subscriber actual; - - final AtomicLong requested; - - PublisherPostOnSubscribeSubscriber(Subscriber actual) { - this.actual = actual; - this.requested = new AtomicLong(); - } - - @Override - public void onSubscribe(Subscription s) { - if (SubscriptionHelper.validate(this.get(), s)) { - - actual.onSubscribe(this); - - if (SubscriptionHelper.setOnce(this, s)) { - long r = requested.getAndSet(0L); - if (r != 0L) { - s.request(r); - } - } - } - } - - @Override - public void onNext(T t) { - actual.onNext(t); - } - - @Override - public void onError(Throwable t) { - actual.onError(t); - } - - @Override - public void onComplete() { - actual.onComplete(); - } - - @Override - public void request(long n) { - Subscription a = get(); - if (a != null) { - a.request(n); - } else { - if (SubscriptionHelper.validate(n)) { - BackpressureHelper.add(requested, n); - a = get(); - if (a != null) { - long r = requested.getAndSet(0L); - if (r != 0L) { - a.request(n); - } - } - } - } - } - - @Override - public void cancel() { - SubscriptionHelper.cancel(this); - } - } -} diff --git a/src/test/java/io/reactivex/tck/FlowableTck.java b/src/test/java/io/reactivex/tck/FlowableTck.java index 7f45b73870..907c12b144 100644 --- a/src/test/java/io/reactivex/tck/FlowableTck.java +++ b/src/test/java/io/reactivex/tck/FlowableTck.java @@ -13,101 +13,16 @@ package io.reactivex.tck; -import java.util.concurrent.atomic.AtomicInteger; - -import org.reactivestreams.*; - import io.reactivex.Flowable; -import io.reactivex.internal.functions.ObjectHelper; -import io.reactivex.internal.subscriptions.SubscriptionHelper; -import io.reactivex.internal.util.*; - -/** - * Helper Flowable that makes sure invalid requests from downstream are reported - * as onErrors (instead of RxJavaPlugins.onError). Since requests can be - * async to the onXXX method calls, we need half-serialization to ensure correct - * operations. - * - * @param the value type - */ -public final class FlowableTck extends Flowable { +public final class FlowableTck { /** - * Wraps a given Publisher and makes sure invalid requests trigger an onError(IllegalArgumentException). + * Enable strict mode. * @param the value type - * @param source the source to wrap, not null - * @return the new Flowable instance + * @param f the input Flowable + * @return the output Flowable */ - public static Flowable wrap(Publisher source) { - return new FlowableTck(ObjectHelper.requireNonNull(source, "source is null")); - } - - final Publisher source; - - FlowableTck(Publisher source) { - this.source = source; - } - - @Override - protected void subscribeActual(Subscriber s) { - source.subscribe(new TckSubscriber(s)); - } - - static final class TckSubscriber - extends AtomicInteger - implements Subscriber, Subscription { - - private static final long serialVersionUID = -4945028590049415624L; - - final Subscriber actual; - - final AtomicThrowable error; - - Subscription s; - - TckSubscriber(Subscriber actual) { - this.actual = actual; - this.error = new AtomicThrowable(); - } - - - @Override - public void request(long n) { - if (n <= 0) { - s.cancel(); - onError(new IllegalArgumentException("§3.9 violated: positive request amount required but it was " + n)); - } else { - s.request(n); - } - } - - @Override - public void cancel() { - s.cancel(); - } - - @Override - public void onSubscribe(Subscription s) { - if (SubscriptionHelper.validate(this.s, s)) { - this.s = s; - - actual.onSubscribe(this); - } - } - - @Override - public void onNext(T t) { - HalfSerializer.onNext(actual, t, this, error); - } - - @Override - public void onError(Throwable t) { - HalfSerializer.onError(actual, t, this, error); - } - - @Override - public void onComplete() { - HalfSerializer.onComplete(actual, this, error); - } + public static Flowable wrap(Flowable f) { + return f.strict(); } } diff --git a/src/test/java/io/reactivex/tck/ObserveOnTckTest.java b/src/test/java/io/reactivex/tck/ObserveOnTckTest.java index 7e5f142100..a61fca2df4 100644 --- a/src/test/java/io/reactivex/tck/ObserveOnTckTest.java +++ b/src/test/java/io/reactivex/tck/ObserveOnTckTest.java @@ -29,9 +29,7 @@ public ObserveOnTckTest() { @Override public Publisher createPublisher(long elements) { return FlowableTck.wrap( - FlowableAwaitOnSubscribeTck.wrap( Flowable.range(0, (int)elements).observeOn(Schedulers.single()) - ) ); } }