From ed57dc5191bb351d2c9106027c7abe8a72a600da Mon Sep 17 00:00:00 2001 From: zsxwing Date: Sun, 27 Apr 2014 11:30:56 +0800 Subject: [PATCH] OperatorThrottleFirst --- rxjava-core/src/main/java/rx/Observable.java | 6 +- .../rx/operators/OperationThrottleFirst.java | 88 ------------------- .../rx/operators/OperatorThrottleFirst.java | 63 +++++++++++++ .../src/test/java/rx/ThrottleFirstTests.java | 62 ------------- ...st.java => OperatorThrottleFirstTest.java} | 55 +++++++++--- 5 files changed, 108 insertions(+), 166 deletions(-) delete mode 100644 rxjava-core/src/main/java/rx/operators/OperationThrottleFirst.java create mode 100644 rxjava-core/src/main/java/rx/operators/OperatorThrottleFirst.java delete mode 100644 rxjava-core/src/test/java/rx/ThrottleFirstTests.java rename rxjava-core/src/test/java/rx/operators/{OperationThrottleFirstTest.java => OperatorThrottleFirstTest.java} (69%) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 746362ca78..843e081631 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -71,7 +71,7 @@ import rx.operators.OperationTakeTimed; import rx.operators.OperationTakeUntil; import rx.operators.OperationTakeWhile; -import rx.operators.OperationThrottleFirst; +import rx.operators.OperatorThrottleFirst; import rx.operators.OperationTimeInterval; import rx.operators.OperationTimer; import rx.operators.OperationToMap; @@ -6733,7 +6733,7 @@ public final Observable takeWhileWithIndex(final Func2RxJava Wiki: throttleFirst() */ public final Observable throttleFirst(long windowDuration, TimeUnit unit) { - return create(OperationThrottleFirst.throttleFirst(this, windowDuration, unit)); + return lift(new OperatorThrottleFirst(windowDuration, unit, Schedulers.computation())); } /** @@ -6755,7 +6755,7 @@ public final Observable throttleFirst(long windowDuration, TimeUnit unit) { * @see RxJava Wiki: throttleFirst() */ public final Observable throttleFirst(long skipDuration, TimeUnit unit, Scheduler scheduler) { - return create(OperationThrottleFirst.throttleFirst(this, skipDuration, unit, scheduler)); + return lift(new OperatorThrottleFirst(skipDuration, unit, scheduler)); } /** diff --git a/rxjava-core/src/main/java/rx/operators/OperationThrottleFirst.java b/rxjava-core/src/main/java/rx/operators/OperationThrottleFirst.java deleted file mode 100644 index d305fdf1d7..0000000000 --- a/rxjava-core/src/main/java/rx/operators/OperationThrottleFirst.java +++ /dev/null @@ -1,88 +0,0 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.operators; - -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - -import rx.Observable; -import rx.Observable.OnSubscribeFunc; -import rx.Observer; -import rx.Scheduler; -import rx.Subscription; -import rx.functions.Func1; -import rx.observers.Subscribers; -import rx.schedulers.Schedulers; - -/** - * Throttle by windowing a stream and returning the first value in each window. - */ -public final class OperationThrottleFirst { - - /** - * Throttles to first value in each window. - * - * @param items - * The {@link Observable} which is publishing events. - * @param windowDuration - * Duration of windows within with the first value will be chosen. - * @param unit - * The unit of time for the specified timeout. - * @return A {@link Func1} which performs the throttle operation. - */ - public static OnSubscribeFunc throttleFirst(Observable items, long windowDuration, TimeUnit unit) { - return throttleFirst(items, windowDuration, unit, Schedulers.computation()); - } - - /** - * Throttles to first value in each window. - * - * @param items - * The {@link Observable} which is publishing events. - * @param windowDuration - * Duration of windows within with the first value will be chosen. - * @param unit - * The unit of time for the specified timeout. - * @param scheduler - * The {@link Scheduler} to use internally to manage the timers which handle timeout for each event. - * @return A {@link Func1} which performs the throttle operation. - */ - public static OnSubscribeFunc throttleFirst(final Observable items, final long windowDuration, final TimeUnit unit, final Scheduler scheduler) { - return new OnSubscribeFunc() { - @Override - public Subscription onSubscribe(Observer observer) { - - final AtomicLong lastOnNext = new AtomicLong(0); - final long timeInMilliseconds = unit.toMillis(windowDuration); - - return items.filter(new Func1() { - - @Override - public Boolean call(T value) { - long now = scheduler.now(); - if (lastOnNext.get() == 0 || now - lastOnNext.get() >= timeInMilliseconds) { - lastOnNext.set(now); - return Boolean.TRUE; - } else { - return Boolean.FALSE; - } - } - - }).unsafeSubscribe(Subscribers.from(observer)); - } - }; - } -} diff --git a/rxjava-core/src/main/java/rx/operators/OperatorThrottleFirst.java b/rxjava-core/src/main/java/rx/operators/OperatorThrottleFirst.java new file mode 100644 index 0000000000..e693eed821 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperatorThrottleFirst.java @@ -0,0 +1,63 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import java.util.concurrent.TimeUnit; + +import rx.*; +import rx.Observable.Operator; + +/** + * Throttle by windowing a stream and returning the first value in each window. + */ +public final class OperatorThrottleFirst implements Operator { + + private final long timeInMilliseconds; + private final Scheduler scheduler; + + public OperatorThrottleFirst(long windowDuration, TimeUnit unit, Scheduler scheduler) { + this.timeInMilliseconds = unit.toMillis(windowDuration); + this.scheduler = scheduler; + } + + @Override + public Subscriber call(final Subscriber subscriber) { + return new Subscriber(subscriber) { + + private long lastOnNext = 0; + + @Override + public void onNext(T v) { + long now = scheduler.now(); + if (lastOnNext == 0 || now - lastOnNext >= timeInMilliseconds) { + lastOnNext = now; + subscriber.onNext(v); + } + } + + @Override + public void onCompleted() { + subscriber.onCompleted(); + } + + @Override + public void onError(Throwable e) { + subscriber.onError(e); + } + + }; + } +} diff --git a/rxjava-core/src/test/java/rx/ThrottleFirstTests.java b/rxjava-core/src/test/java/rx/ThrottleFirstTests.java deleted file mode 100644 index ee0d1e7c93..0000000000 --- a/rxjava-core/src/test/java/rx/ThrottleFirstTests.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx; - -import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.mock; - -import java.util.concurrent.TimeUnit; - -import org.junit.Test; -import org.mockito.InOrder; - -import rx.schedulers.TestScheduler; -import rx.subjects.PublishSubject; - -public class ThrottleFirstTests { - - @Test - public void testThrottle() { - @SuppressWarnings("unchecked") - Observer observer = mock(Observer.class); - TestScheduler s = new TestScheduler(); - PublishSubject o = PublishSubject.create(); - o.throttleFirst(500, TimeUnit.MILLISECONDS, s).subscribe(observer); - - // send events with simulated time increments - s.advanceTimeTo(0, TimeUnit.MILLISECONDS); - o.onNext(1); // deliver - o.onNext(2); // skip - s.advanceTimeTo(501, TimeUnit.MILLISECONDS); - o.onNext(3); // deliver - s.advanceTimeTo(600, TimeUnit.MILLISECONDS); - o.onNext(4); // skip - s.advanceTimeTo(700, TimeUnit.MILLISECONDS); - o.onNext(5); // skip - o.onNext(6); // skip - s.advanceTimeTo(1001, TimeUnit.MILLISECONDS); - o.onNext(7); // deliver - s.advanceTimeTo(1501, TimeUnit.MILLISECONDS); - o.onCompleted(); - - InOrder inOrder = inOrder(observer); - inOrder.verify(observer).onNext(1); - inOrder.verify(observer).onNext(3); - inOrder.verify(observer).onNext(7); - inOrder.verify(observer).onCompleted(); - inOrder.verifyNoMoreInteractions(); - } -} diff --git a/rxjava-core/src/test/java/rx/operators/OperationThrottleFirstTest.java b/rxjava-core/src/test/java/rx/operators/OperatorThrottleFirstTest.java similarity index 69% rename from rxjava-core/src/test/java/rx/operators/OperationThrottleFirstTest.java rename to rxjava-core/src/test/java/rx/operators/OperatorThrottleFirstTest.java index 244e9a6878..b1c83e9325 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationThrottleFirstTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorThrottleFirstTest.java @@ -27,14 +27,15 @@ import org.mockito.InOrder; import rx.Observable; +import rx.Observable.OnSubscribe; import rx.Observer; import rx.Scheduler; -import rx.Subscription; +import rx.Subscriber; import rx.functions.Action0; import rx.schedulers.TestScheduler; -import rx.subscriptions.Subscriptions; +import rx.subjects.PublishSubject; -public class OperationThrottleFirstTest { +public class OperatorThrottleFirstTest { private TestScheduler scheduler; private Scheduler.Worker innerScheduler; @@ -50,20 +51,18 @@ public void before() { @Test public void testThrottlingWithCompleted() { - Observable source = Observable.create(new Observable.OnSubscribeFunc() { + Observable source = Observable.create(new OnSubscribe() { @Override - public Subscription onSubscribe(Observer observer) { + public void call(Subscriber observer) { publishNext(observer, 100, "one"); // publish as it's first publishNext(observer, 300, "two"); // skip as it's last within the first 400 publishNext(observer, 900, "three"); // publish publishNext(observer, 905, "four"); // skip publishCompleted(observer, 1000); // Should be published as soon as the timeout expires. - - return Subscriptions.empty(); } }); - Observable sampled = Observable.create(OperationThrottleFirst.throttleFirst(source, 400, TimeUnit.MILLISECONDS, scheduler)); + Observable sampled = source.throttleFirst(400, TimeUnit.MILLISECONDS, scheduler); sampled.subscribe(observer); InOrder inOrder = inOrder(observer); @@ -79,19 +78,17 @@ public Subscription onSubscribe(Observer observer) { @Test public void testThrottlingWithError() { - Observable source = Observable.create(new Observable.OnSubscribeFunc() { + Observable source = Observable.create(new OnSubscribe() { @Override - public Subscription onSubscribe(Observer observer) { + public void call(Subscriber observer) { Exception error = new TestException(); publishNext(observer, 100, "one"); // Should be published since it is first publishNext(observer, 200, "two"); // Should be skipped since onError will arrive before the timeout expires publishError(observer, 300, error); // Should be published as soon as the timeout expires. - - return Subscriptions.empty(); } }); - Observable sampled = Observable.create(OperationThrottleFirst.throttleFirst(source, 400, TimeUnit.MILLISECONDS, scheduler)); + Observable sampled = source.throttleFirst(400, TimeUnit.MILLISECONDS, scheduler); sampled.subscribe(observer); InOrder inOrder = inOrder(observer); @@ -132,4 +129,36 @@ public void call() { @SuppressWarnings("serial") private class TestException extends Exception { } + + @Test + public void testThrottle() { + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + TestScheduler s = new TestScheduler(); + PublishSubject o = PublishSubject.create(); + o.throttleFirst(500, TimeUnit.MILLISECONDS, s).subscribe(observer); + + // send events with simulated time increments + s.advanceTimeTo(0, TimeUnit.MILLISECONDS); + o.onNext(1); // deliver + o.onNext(2); // skip + s.advanceTimeTo(501, TimeUnit.MILLISECONDS); + o.onNext(3); // deliver + s.advanceTimeTo(600, TimeUnit.MILLISECONDS); + o.onNext(4); // skip + s.advanceTimeTo(700, TimeUnit.MILLISECONDS); + o.onNext(5); // skip + o.onNext(6); // skip + s.advanceTimeTo(1001, TimeUnit.MILLISECONDS); + o.onNext(7); // deliver + s.advanceTimeTo(1501, TimeUnit.MILLISECONDS); + o.onCompleted(); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer).onNext(1); + inOrder.verify(observer).onNext(3); + inOrder.verify(observer).onNext(7); + inOrder.verify(observer).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } }