Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Operators Skip, SkipLast, Take with time #667

Merged
merged 1 commit into from
Dec 27, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -5246,6 +5246,29 @@ public Observable<T> skip(int num) {
return create(OperationSkip.skip(this, num));
}

/**
* Create an Observable that skips values before the given time ellapses.
* @param time the length of the time window
* @param unit the time unit
* @return an Observable that skips values before the given time ellapses
*/
public Observable<T> skip(long time, TimeUnit unit) {
return skip(time, unit, Schedulers.threadPoolForComputation());
}

/**
* Create an Observable that skips values before the given time
* elapses while waiting on the given scheduler.
* @param time the length of the time window
* @param unit the time unit
* @param scheduler the scheduler where the timed wait happens
* @return an Observable that skips values before the given time
* elapses while waiting on the given scheduler
*/
public Observable<T> skip(long time, TimeUnit unit, Scheduler scheduler) {
return create(new OperationSkip.SkipTimed<T>(this, time, unit, scheduler));
}

/**
* If the Observable completes after emitting a single item, return an
* Observable containing that item. If it emits more than one item or no
Expand Down Expand Up @@ -5426,6 +5449,31 @@ public Observable<T> take(final int num) {
return create(OperationTake.take(this, num));
}

/**
* Create an Observable that takes the emitted values of the source
* Observable before the time runs out.
* @param time the length of the time window
* @param unit the time unit
* @return an Observable that takes the emitted values of the source
* Observable before the time runs out.
*/
public Observable<T> take(long time, TimeUnit unit) {
return take(time, unit, Schedulers.threadPoolForComputation());
}

/**
* Create an Observable that takes the emitted values of the source
* Observable before the time runs out, waiting on the given scheduler.
* @param time the length of the time window
* @param unit the time unit
* @param scheduler the scheduler used for time source
* @return an Observable that takes the emitted values of the source
* Observable before the time runs out, waiting on the given scheduler.
*/
public Observable<T> take(long time, TimeUnit unit, Scheduler scheduler) {
return create(new OperationTake.TakeTimed<T>(this, time, unit, scheduler));
}

/**
* Returns an Observable that emits items emitted by the source Observable
* so long as a specified condition is true.
Expand Down Expand Up @@ -5734,6 +5782,31 @@ public Observable<T> skipLast(int count) {
return create(OperationSkipLast.skipLast(this, count));
}

/**
* Create an observable which skips values emitted in a time window
* before the source completes.
* @param time the length of the time window
* @param unit the time unit
* @return an observable which skips values emitted in a time window
* before the source completes
*/
public Observable<T> skipLast(long time, TimeUnit unit) {
return skipLast(time, unit, Schedulers.threadPoolForComputation());
}

/**
* Create an observable which skips values emitted in a time window
* before the source completes by using the given scheduler as time source.
* @param time the length of the time window
* @param unit the time unit
* @param scheduler the scheduler used for time source
* @return an observable which skips values emitted in a time window
* before the source completes by using the given scheduler as time source
*/
public Observable<T> skipLast(long time, TimeUnit unit, Scheduler scheduler) {
return create(new OperationSkipLast.SkipLastTimed<T>(this, time, unit, scheduler));
}

/**
* Returns an Observable that emits a single item, a list composed of all
* the items emitted by the source Observable.
Expand Down
87 changes: 87 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationSkip.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,17 @@
*/
package rx.operators;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Scheduler;
import rx.Subscription;
import rx.subscriptions.CompositeSubscription;
import rx.util.functions.Action0;

/**
* Returns an Observable that skips the first <code>num</code> items emitted by the source
Expand Down Expand Up @@ -107,4 +112,86 @@ public void onNext(T args) {
}

}

/**
* Skip the items after subscription for the given duration.
* @param <T> the value type
*/
public static final class SkipTimed<T> implements OnSubscribeFunc<T> {
final Observable<? extends T> source;
final long time;
final TimeUnit unit;
final Scheduler scheduler;

public SkipTimed(Observable<? extends T> source, long time, TimeUnit unit, Scheduler scheduler) {
this.source = source;
this.time = time;
this.unit = unit;
this.scheduler = scheduler;
}

@Override
public Subscription onSubscribe(Observer<? super T> t1) {

SafeObservableSubscription timer = new SafeObservableSubscription();
SafeObservableSubscription data = new SafeObservableSubscription();

CompositeSubscription csub = new CompositeSubscription(timer, data);

SourceObserver<T> so = new SourceObserver<T>(t1, csub);
data.wrap(source.subscribe(so));
if (!data.isUnsubscribed()) {
timer.wrap(scheduler.schedule(so, time, unit));
}

return csub;
}
/**
* Observes the source and relays its values once gate turns into true.
* @param <T> the observed value type
*/
private static final class SourceObserver<T> implements Observer<T>, Action0 {
final AtomicBoolean gate;
final Observer<? super T> observer;
final Subscription cancel;

public SourceObserver(Observer<? super T> observer,
Subscription cancel) {
this.gate = new AtomicBoolean();
this.observer = observer;
this.cancel = cancel;
}

@Override
public void onNext(T args) {
if (gate.get()) {
observer.onNext(args);
}
}

@Override
public void onError(Throwable e) {
try {
observer.onError(e);
} finally {
cancel.unsubscribe();
}
}

@Override
public void onCompleted() {
try {
observer.onCompleted();
} finally {
cancel.unsubscribe();
}
}

@Override
public void call() {
gate.set(true);
}

}
}
}
75 changes: 75 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationSkipLast.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,20 @@
*/
package rx.operators;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Scheduler;
import rx.Subscription;
import rx.util.Timestamped;

/**
* Bypasses a specified number of elements at the end of an observable sequence.
Expand Down Expand Up @@ -123,4 +129,73 @@ public void onNext(T value) {
}));
}
}

/**
* Skip delivering values in the time window before the values.
* @param <T> the result value type
*/
public static final class SkipLastTimed<T> implements OnSubscribeFunc<T> {
final Observable<? extends T> source;
final long timeInMillis;
final Scheduler scheduler;

public SkipLastTimed(Observable<? extends T> source, long time, TimeUnit unit, Scheduler scheduler) {
this.source = source;
this.timeInMillis = unit.toMillis(time);
this.scheduler = scheduler;
}

@Override
public Subscription onSubscribe(Observer<? super T> t1) {
return source.subscribe(new SourceObserver<T>(t1, timeInMillis, scheduler));
}
/** Observes the source. */
private static final class SourceObserver<T> implements Observer<T> {
final Observer<? super T> observer;
final long timeInMillis;
final Scheduler scheduler;
List<Timestamped<T>> buffer = new ArrayList<Timestamped<T>>();

public SourceObserver(Observer<? super T> observer,
long timeInMillis, Scheduler scheduler) {
this.observer = observer;
this.timeInMillis = timeInMillis;
this.scheduler = scheduler;
}

@Override
public void onNext(T args) {
buffer.add(new Timestamped<T>(scheduler.now(), args));
}

@Override
public void onError(Throwable e) {
buffer = Collections.emptyList();
observer.onError(e);
}

@Override
public void onCompleted() {
long limit = scheduler.now() - timeInMillis;
try {
for (Timestamped<T> v : buffer) {
if (v.getTimestampMillis() < limit) {
try {
observer.onNext(v.getValue());
} catch (Throwable t) {
observer.onError(t);
return;
}
} else {
observer.onCompleted();
break;
}
}
} finally {
buffer = Collections.emptyList();
}
}

}
}
}
Loading