Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Operator distinct, timeInterval, common Timed container. #3224

Merged
merged 1 commit into from
Aug 28, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 54 additions & 7 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import io.reactivex.observables.*;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.*;
import io.reactivex.subscribers.SafeSubscriber;
import io.reactivex.subscribers.*;

public class Observable<T> implements Publisher<T> {
final Publisher<T> onSubscribe;
Expand Down Expand Up @@ -364,7 +364,7 @@ public final Observable<T> skipWhile(Predicate<? super T> predicate) {
return lift(new OperatorSkipWhile<>(predicate));
}

public final Observable<T> skipUntil(Publisher<? extends T> other) {
public final <U> Observable<T> skipUntil(Publisher<? extends U> other) {
Objects.requireNonNull(other);
return lift(new OperatorSkipUntil<>(other));
}
Expand Down Expand Up @@ -1111,20 +1111,20 @@ public final <U> Observable<U> ofType(Class<U> clazz) {
return filter(clazz::isInstance).cast(clazz);
}

public final Observable<Timestamped<T>> timestamp() {
public final Observable<Timed<T>> timestamp() {
return timestamp(TimeUnit.MILLISECONDS, Schedulers.trampoline());
}

public final Observable<Timestamped<T>> timestamp(Scheduler scheduler) {
public final Observable<Timed<T>> timestamp(Scheduler scheduler) {
return timestamp(TimeUnit.MILLISECONDS, scheduler);
}

public final Observable<Timestamped<T>> timestamp(TimeUnit unit) {
public final Observable<Timed<T>> timestamp(TimeUnit unit) {
return timestamp(unit, Schedulers.trampoline());
}

public final Observable<Timestamped<T>> timestamp(TimeUnit unit, Scheduler scheduler) {
return map(v -> new Timestamped<>(v, scheduler.now(unit), unit));
public final Observable<Timed<T>> timestamp(TimeUnit unit, Scheduler scheduler) {
return map(v -> new Timed<>(v, scheduler.now(unit), unit));
}

public final Observable<Try<Optional<T>>> materialize() {
Expand All @@ -1146,4 +1146,51 @@ public final Observable<T> dematerialize() {
public final Observable<T> limit(long n) {
return take(n);
}

public final Observable<T> distinct() {
return distinct(HashSet::new);
}

public final Observable<T> distinct(Supplier<? extends Collection<? super T>> collectionSupplier) {
return lift(OperatorDistinct.withCollection(collectionSupplier));
}

public final Observable<T> distinctUntilChanged() {
return lift(OperatorDistinct.untilChanged());
}

@Deprecated
public final Observable<Observable<T>> nest() {
return just(this);
}

public final Observable<T> serialize() {
return lift(s -> new SerializedSubscriber<>(s));
}

public final Observable<T> take(long time, TimeUnit unit, Scheduler scheduler) {
// TODO consider inlining this behavior
return takeUntil(timer(time, unit, scheduler));
}

public final Observable<T> skip(long time, TimeUnit unit, Scheduler scheduler) {
// TODO consider inlining this behavior
return skipUntil(timer(time, unit, scheduler));
}

public final Observable<Timed<T>> timeInterval() {
return timeInterval(TimeUnit.MILLISECONDS, Schedulers.trampoline());
}

public final Observable<Timed<T>> timeInterval(Scheduler scheduler) {
return timeInterval(TimeUnit.MILLISECONDS, scheduler);
}

public final Observable<Timed<T>> timeInterval(TimeUnit unit) {
return timeInterval(unit, Schedulers.trampoline());
}

public final Observable<Timed<T>> timeInterval(TimeUnit unit, Scheduler scheduler) {
return lift(new OperatorTimeInterval<>(unit, scheduler));
}
}
153 changes: 153 additions & 0 deletions src/main/java/io/reactivex/internal/operators/OperatorDistinct.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/**
* Copyright 2015 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators;

import java.util.*;
import java.util.function.*;

import org.reactivestreams.*;

import io.reactivex.Observable.Operator;
import io.reactivex.internal.subscribers.CancellingSubscriber;
import io.reactivex.internal.subscriptions.*;

public final class OperatorDistinct<T> implements Operator<T, T> {

final Supplier<? extends Predicate<? super T>> predicateSupplier;

public OperatorDistinct(Supplier<? extends Predicate<? super T>> predicateSupplier) {
this.predicateSupplier = predicateSupplier;
}

public static <T> OperatorDistinct<T> withCollection(Supplier<? extends Collection<? super T>> collectionSupplier) {
Supplier<? extends Predicate<? super T>> p = () -> {
Collection<? super T> coll = collectionSupplier.get();

return t -> {
if (t == null) {
coll.clear();
return true;
}
return coll.add(t);
};
};

return new OperatorDistinct<>(p);
}

static final OperatorDistinct<Object> UNTIL_CHANGED;
static {
Supplier<? extends Predicate<? super Object>> p = () -> {
Object[] last = { null };

return t -> {
if (t == null) {
last[0] = null;
return true;
}
Object o = last[0];
last[0] = t;
return !Objects.equals(o, t);
};
};
UNTIL_CHANGED = new OperatorDistinct<>(p);
}

@SuppressWarnings("unchecked")
public static <T> OperatorDistinct<T> untilChanged() {
return (OperatorDistinct<T>)UNTIL_CHANGED;
}

@Override
public Subscriber<? super T> apply(Subscriber<? super T> t) {
Predicate<? super T> coll;
try {
coll = predicateSupplier.get();
} catch (Throwable e) {
t.onSubscribe(EmptySubscription.INSTANCE);
t.onError(e);
return CancellingSubscriber.INSTANCE;
}

if (coll == null) {
t.onSubscribe(EmptySubscription.INSTANCE);
t.onError(new NullPointerException("predicateSupplier returned null"));
return CancellingSubscriber.INSTANCE;
}

return null;
}

static final class DistinctSubscriber<T> implements Subscriber<T> {
final Subscriber<? super T> actual;
final Predicate<? super T> predicate;

Subscription s;

public DistinctSubscriber(Subscriber<? super T> actual, Predicate<? super T> predicate) {
this.actual = actual;
this.predicate = predicate;
}

@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.validateSubscription(this.s, s)) {
return;
}
this.s = s;
actual.onSubscribe(s);
}

@Override
public void onNext(T t) {
boolean b;
try {
b = predicate.test(t);
} catch (Throwable e) {
s.cancel();
actual.onError(e);
return;
}

if (b) {
actual.onNext(t);
} else {
s.request(1);
}
}

@Override
public void onError(Throwable t) {
try {
predicate.test(null); // special case: poison pill
} catch (Throwable e) {
t.addSuppressed(e);
actual.onError(t);
return;
}
actual.onError(t);
}

@Override
public void onComplete() {
try {
predicate.test(null); // special case: poison pill
} catch (Throwable e) {
actual.onError(e);
return;
}
actual.onComplete();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import io.reactivex.internal.util.NotificationLite;
import io.reactivex.observables.ConnectableObservable;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Timestamped;
import io.reactivex.schedulers.Timed;

public final class OperatorReplay<T> extends ConnectableObservable<T> {
/** The source observable. */
Expand Down Expand Up @@ -1078,12 +1078,12 @@ public SizeAndTimeBoundReplayBuffer(int limit, long maxAge, TimeUnit unit, Sched

@Override
Object enterTransform(Object value) {
return new Timestamped<>(value, scheduler.now(unit), unit);
return new Timed<>(value, scheduler.now(unit), unit);
}

@Override
Object leaveTransform(Object value) {
return ((Timestamped<?>)value).value();
return ((Timed<?>)value).value();
}

@Override
Expand All @@ -1102,8 +1102,8 @@ void truncate() {
prev = next;
next = next.get();
} else {
Timestamped<?> v = (Timestamped<?>)next.value;
if (v.timestamp() <= timeLimit) {
Timed<?> v = (Timed<?>)next.value;
if (v.time() <= timeLimit) {
e++;
size--;
prev = next;
Expand All @@ -1130,8 +1130,8 @@ void truncateFinal() {
int e = 0;
for (;;) {
if (next != null && size > 1) {
Timestamped<?> v = (Timestamped<?>)next.value;
if (v.timestamp() <= timeLimit) {
Timed<?> v = (Timed<?>)next.value;
if (v.time() <= timeLimit) {
e++;
size--;
prev = next;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/**
* Copyright 2015 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators;

import java.util.concurrent.TimeUnit;

import org.reactivestreams.*;

import io.reactivex.Observable.Operator;
import io.reactivex.Scheduler;
import io.reactivex.schedulers.Timed;

public final class OperatorTimeInterval<T> implements Operator<Timed<T>, T> {
final Scheduler scheduler;
final TimeUnit unit;

public OperatorTimeInterval(TimeUnit unit, Scheduler scheduler) {
this.scheduler = scheduler;
this.unit = unit;
}

@Override
public Subscriber<? super T> apply(Subscriber<? super Timed<T>> t) {
return new TimeIntervalSubscriber<>(t, unit, scheduler);
}

static final class TimeIntervalSubscriber<T> implements Subscriber<T> {
final Subscriber<? super Timed<T>> actual;
final TimeUnit unit;
final Scheduler scheduler;

long lastTime;

public TimeIntervalSubscriber(Subscriber<? super Timed<T>> actual, TimeUnit unit, Scheduler scheduler) {
this.actual = actual;
this.scheduler = scheduler;
this.unit = unit;
}

@Override
public void onSubscribe(Subscription s) {
lastTime = scheduler.now(unit);
actual.onSubscribe(s);
}

@Override
public void onNext(T t) {
long now = scheduler.now(unit);
long last = lastTime;
lastTime = now;
long delta = now - last;
actual.onNext(new Timed<>(t, delta, unit));
}

@Override
public void onError(Throwable t) {
actual.onError(t);
}

@Override
public void onComplete() {
actual.onComplete();
}
}
}
Loading