Skip to content

Commit

Permalink
Reimplement the "SkipLast" operator with time
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed Apr 19, 2014
1 parent 632ca76 commit 0fc6e2c
Show file tree
Hide file tree
Showing 5 changed files with 235 additions and 176 deletions.
3 changes: 2 additions & 1 deletion rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@
import rx.operators.OperatorSerialize;
import rx.operators.OperatorSkip;
import rx.operators.OperatorSkipLast;
import rx.operators.OperatorSkipLastTimed;
import rx.operators.OperatorSkipWhile;
import rx.operators.OperatorSubscribeOn;
import rx.operators.OperatorSynchronize;
Expand Down Expand Up @@ -6083,7 +6084,7 @@ public final Observable<T> skipLast(long time, TimeUnit unit) {
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211750.aspx">MSDN: Observable.SkipLast</a>
*/
public final Observable<T> skipLast(long time, TimeUnit unit, Scheduler scheduler) {
return create(new OperatorSkipLast.SkipLastTimed<T>(this, time, unit, scheduler));
return lift(new OperatorSkipLastTimed<T>(time, unit, scheduler));
}

/**
Expand Down
81 changes: 0 additions & 81 deletions rxjava-core/src/main/java/rx/operators/OperatorSkipLast.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,11 @@
*/
package rx.operators;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observable.Operator;
import rx.Observer;
import rx.Scheduler;
import rx.Subscriber;
import rx.Subscription;
import rx.schedulers.Timestamped;

/**
* Bypasses a specified number of elements at the end of an observable sequence.
Expand Down Expand Up @@ -85,75 +75,4 @@ public void onNext(T value) {
};
}

/**
* Skip delivering values in the time window before the values.
*
* @param <T>
* the result value type
*/
public static final class SkipLastTimed<T> implements OnSubscribeFunc<T> {
final Observable<? extends T> source;
final long timeInMillis;
final Scheduler scheduler;

public SkipLastTimed(Observable<? extends T> source, long time, TimeUnit unit, Scheduler scheduler) {
this.source = source;
this.timeInMillis = unit.toMillis(time);
this.scheduler = scheduler;
}

@Override
public Subscription onSubscribe(Observer<? super T> t1) {
return source.unsafeSubscribe(new SourceObserver<T>(t1, timeInMillis, scheduler));
}

/** Observes the source. */
private static final class SourceObserver<T> extends Subscriber<T> {
final Observer<? super T> observer;
final long timeInMillis;
final Scheduler scheduler;
List<Timestamped<T>> buffer = new ArrayList<Timestamped<T>>();

public SourceObserver(Observer<? super T> observer,
long timeInMillis, Scheduler scheduler) {
this.observer = observer;
this.timeInMillis = timeInMillis;
this.scheduler = scheduler;
}

@Override
public void onNext(T args) {
buffer.add(new Timestamped<T>(scheduler.now(), args));
}

@Override
public void onError(Throwable e) {
buffer = Collections.emptyList();
observer.onError(e);
}

@Override
public void onCompleted() {
long limit = scheduler.now() - timeInMillis;
try {
for (Timestamped<T> v : buffer) {
if (v.getTimestampMillis() < limit) {
try {
observer.onNext(v.getValue());
} catch (Throwable t) {
observer.onError(t);
return;
}
} else {
observer.onCompleted();
break;
}
}
} finally {
buffer = Collections.emptyList();
}
}

}
}
}
82 changes: 82 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperatorSkipLastTimed.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.operators;

import rx.Observable.Operator;
import rx.Scheduler;
import rx.Subscriber;
import rx.schedulers.Timestamped;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
* Skip delivering values in the time window before the values.
*/
public class OperatorSkipLastTimed<T> implements Operator<T, T> {

private final long timeInMillis;
private final Scheduler scheduler;

public OperatorSkipLastTimed(long time, TimeUnit unit, Scheduler scheduler) {
this.timeInMillis = unit.toMillis(time);
this.scheduler = scheduler;
}

@Override
public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
return new Subscriber<T>(subscriber) {

private List<Timestamped<T>> buffer = new ArrayList<Timestamped<T>>();

@Override
public void onNext(T value) {
buffer.add(new Timestamped<T>(scheduler.now(), value));
}

@Override
public void onError(Throwable e) {
buffer = Collections.emptyList();
subscriber.onError(e);
}

@Override
public void onCompleted() {
long limit = scheduler.now() - timeInMillis;
try {
for (Timestamped<T> v : buffer) {
if (v.getTimestampMillis() < limit) {
try {
subscriber.onNext(v.getValue());
} catch (Throwable t) {
subscriber.onError(t);
return;
}
} else {
break;
}
}
subscriber.onCompleted();
} finally {
buffer = Collections.emptyList();
}
}

};
}
}
94 changes: 0 additions & 94 deletions rxjava-core/src/test/java/rx/operators/OperatorSkipLastTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,12 @@
import static org.mockito.Mockito.verify;

import java.util.Arrays;
import java.util.concurrent.TimeUnit;

import org.junit.Test;
import org.mockito.InOrder;

import rx.Observable;
import rx.Observer;
import rx.operators.OperationSkipTest.CustomException;
import rx.schedulers.TestScheduler;
import rx.subjects.PublishSubject;

public class OperatorSkipLastTest {

Expand Down Expand Up @@ -108,94 +104,4 @@ public void testSkipLastWithNegativeCount() {
Observable.from("one").skipLast(-1);
}

@Test
public void testSkipLastTimed() {
TestScheduler scheduler = new TestScheduler();

PublishSubject<Integer> source = PublishSubject.create();

Observable<Integer> result = source.skipLast(1, TimeUnit.SECONDS, scheduler);

Observer<Object> o = mock(Observer.class);

result.subscribe(o);

source.onNext(1);
source.onNext(2);
source.onNext(3);

scheduler.advanceTimeBy(500, TimeUnit.MILLISECONDS);

source.onNext(4);
source.onNext(5);
source.onNext(6);

scheduler.advanceTimeBy(950, TimeUnit.MILLISECONDS);
source.onCompleted();

InOrder inOrder = inOrder(o);
inOrder.verify(o).onNext(1);
inOrder.verify(o).onNext(2);
inOrder.verify(o).onNext(3);
inOrder.verify(o, never()).onNext(4);
inOrder.verify(o, never()).onNext(5);
inOrder.verify(o, never()).onNext(6);
inOrder.verify(o).onCompleted();
inOrder.verifyNoMoreInteractions();

verify(o, never()).onError(any(Throwable.class));
}

@Test
public void testSkipLastTimedErrorBeforeTime() {
TestScheduler scheduler = new TestScheduler();

PublishSubject<Integer> source = PublishSubject.create();

Observable<Integer> result = source.skipLast(1, TimeUnit.SECONDS, scheduler);

Observer<Object> o = mock(Observer.class);

result.subscribe(o);

source.onNext(1);
source.onNext(2);
source.onNext(3);
source.onError(new OperationSkipTest.CustomException());

scheduler.advanceTimeBy(1050, TimeUnit.MILLISECONDS);

verify(o).onError(any(CustomException.class));

verify(o, never()).onCompleted();
verify(o, never()).onNext(any());
}

@Test
public void testSkipLastTimedCompleteBeforeTime() {
TestScheduler scheduler = new TestScheduler();

PublishSubject<Integer> source = PublishSubject.create();

Observable<Integer> result = source.skipLast(1, TimeUnit.SECONDS, scheduler);

Observer<Object> o = mock(Observer.class);

result.subscribe(o);

source.onNext(1);
source.onNext(2);
source.onNext(3);

scheduler.advanceTimeBy(500, TimeUnit.MILLISECONDS);

source.onCompleted();

InOrder inOrder = inOrder(o);
inOrder.verify(o).onCompleted();
inOrder.verifyNoMoreInteractions();

verify(o, never()).onNext(any());
verify(o, never()).onError(any(Throwable.class));
}
}
Loading

0 comments on commit 0fc6e2c

Please sign in to comment.