Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TakeLastTimed with backpressure support #1562

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 1 addition & 105 deletions rxjava-core/src/main/java/rx/internal/operators/OperatorTakeLast.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@

import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;

import rx.Observable.Operator;
import rx.Producer;
import rx.Subscriber;

/**
Expand All @@ -43,7 +41,7 @@ public OperatorTakeLast(int count) {
public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
final Deque<Object> deque = new ArrayDeque<Object>();
final NotificationLite<T> notification = NotificationLite.instance();
final QueueProducer<T> producer = new QueueProducer<T>(notification, deque, subscriber);
final TakeLastQueueProducer<T> producer = new TakeLastQueueProducer<T>(notification, deque, subscriber);
subscriber.setProducer(producer);

return new Subscriber<T>(subscriber) {
Expand Down Expand Up @@ -81,106 +79,4 @@ public void onNext(T value) {
}
};
}

private static final class QueueProducer<T> implements Producer {

private final NotificationLite<T> notification;
private final Deque<Object> deque;
private final Subscriber<? super T> subscriber;
private volatile boolean emittingStarted = false;

public QueueProducer(NotificationLite<T> n, Deque<Object> q, Subscriber<? super T> subscriber) {
this.notification = n;
this.deque = q;
this.subscriber = subscriber;
}

private volatile long requested = 0;
@SuppressWarnings("rawtypes")
private static final AtomicLongFieldUpdater<QueueProducer> REQUESTED_UPDATER = AtomicLongFieldUpdater.newUpdater(QueueProducer.class, "requested");

void startEmitting() {
if (!emittingStarted) {
emittingStarted = true;
emit(0); // start emitting
}
}

@Override
public void request(long n) {
if (requested == Long.MAX_VALUE) {
return;
}
long _c;
if (n == Long.MAX_VALUE) {
_c = REQUESTED_UPDATER.getAndSet(this, Long.MAX_VALUE);
} else {
_c = REQUESTED_UPDATER.getAndAdd(this, n);
}
if (!emittingStarted) {
// we haven't started yet, so record what was requested and return
return;
}
emit(_c);
}

void emit(long previousRequested) {
if (requested == Long.MAX_VALUE) {
// fast-path without backpressure
if (previousRequested == 0) {
try {
for (Object value : deque) {
notification.accept(subscriber, value);
}
} catch (Throwable e) {
subscriber.onError(e);
} finally {
deque.clear();
}
} else {
// backpressure path will handle Long.MAX_VALUE and emit the rest events.
}
} else {
// backpressure is requested
if (previousRequested == 0) {
while (true) {
/*
* This complicated logic is done to avoid touching the volatile `requested` value
* during the loop itself. If it is touched during the loop the performance is impacted significantly.
*/
long numToEmit = requested;
int emitted = 0;
Object o;
while (--numToEmit >= 0 && (o = deque.poll()) != null) {
if (subscriber.isUnsubscribed()) {
return;
}
if (notification.accept(subscriber, o)) {
// terminal event
return;
} else {
emitted++;
}
}
for (;;) {
long oldRequested = requested;
long newRequested = oldRequested - emitted;
if (oldRequested == Long.MAX_VALUE) {
// became unbounded during the loop
// continue the outer loop to emit the rest events.
break;
}
if (REQUESTED_UPDATER.compareAndSet(this, oldRequested, newRequested)) {
if (newRequested == 0) {
// we're done emitting the number requested so return
return;
}
break;
}
}
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import rx.Observable.Operator;
import rx.Scheduler;
import rx.Subscriber;
import rx.schedulers.Timestamped;

import java.util.ArrayDeque;
import java.util.Deque;
Expand Down Expand Up @@ -52,19 +51,24 @@ public OperatorTakeLastTimed(int count, long time, TimeUnit unit, Scheduler sche

@Override
public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
final Deque<Object> buffer = new ArrayDeque<Object>();
final Deque<Long> timestampBuffer = new ArrayDeque<Long>();
final NotificationLite<T> notification = NotificationLite.instance();
final TakeLastQueueProducer<T> producer = new TakeLastQueueProducer<T>(notification, buffer, subscriber);
subscriber.setProducer(producer);
return new Subscriber<T>(subscriber) {

private final Deque<Timestamped<T>> buffer = new ArrayDeque<Timestamped<T>>();

protected void runEvictionPolicy(long now) {
// trim size
while (count >= 0 && buffer.size() > count) {
timestampBuffer.pollFirst();
buffer.pollFirst();
}
// remove old entries
while (!buffer.isEmpty()) {
Timestamped<T> v = buffer.peekFirst();
if (v.getTimestampMillis() < now - ageMillis) {
long v = timestampBuffer.peekFirst();
if (v < now - ageMillis) {
timestampBuffer.pollFirst();
buffer.pollFirst();
} else {
break;
Expand All @@ -82,33 +86,25 @@ public void onStart() {
@Override
public void onNext(T args) {
long t = scheduler.now();
buffer.add(new Timestamped<T>(t, args));
timestampBuffer.add(t);
buffer.add(notification.next(args));
runEvictionPolicy(t);
}

@Override
public void onError(Throwable e) {
timestampBuffer.clear();
buffer.clear();
subscriber.onError(e);
}

@Override
public void onCompleted() {
runEvictionPolicy(scheduler.now());
try {
// TODO this can be made to support backpressure
for (Timestamped<T> v : buffer) {
subscriber.onNext(v.getValue());

}
} catch (Throwable e) {
onError(e);
return;
}
buffer.clear();
subscriber.onCompleted();
timestampBuffer.clear();
buffer.offer(notification.completed());
producer.startEmitting();
}
};
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package rx.internal.operators;


import rx.Producer;
import rx.Subscriber;

import java.util.Deque;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;

final class TakeLastQueueProducer<T> implements Producer {

private final NotificationLite<T> notification;
private final Deque<Object> deque;
private final Subscriber<? super T> subscriber;
private volatile boolean emittingStarted = false;

public TakeLastQueueProducer(NotificationLite<T> n, Deque<Object> q, Subscriber<? super T> subscriber) {
this.notification = n;
this.deque = q;
this.subscriber = subscriber;
}

private volatile long requested = 0;
@SuppressWarnings("rawtypes")
private static final AtomicLongFieldUpdater<TakeLastQueueProducer> REQUESTED_UPDATER = AtomicLongFieldUpdater.newUpdater(TakeLastQueueProducer.class, "requested");

void startEmitting() {
if (!emittingStarted) {
emittingStarted = true;
emit(0); // start emitting
}
}

@Override
public void request(long n) {
if (requested == Long.MAX_VALUE) {
return;
}
long _c;
if (n == Long.MAX_VALUE) {
_c = REQUESTED_UPDATER.getAndSet(this, Long.MAX_VALUE);
} else {
_c = REQUESTED_UPDATER.getAndAdd(this, n);
}
if (!emittingStarted) {
// we haven't started yet, so record what was requested and return
return;
}
emit(_c);
}

void emit(long previousRequested) {
if (requested == Long.MAX_VALUE) {
// fast-path without backpressure
if (previousRequested == 0) {
try {
for (Object value : deque) {
notification.accept(subscriber, value);
}
} catch (Throwable e) {
subscriber.onError(e);
} finally {
deque.clear();
}
} else {
// backpressure path will handle Long.MAX_VALUE and emit the rest events.
}
} else {
// backpressure is requested
if (previousRequested == 0) {
while (true) {
/*
* This complicated logic is done to avoid touching the volatile `requested` value
* during the loop itself. If it is touched during the loop the performance is impacted significantly.
*/
long numToEmit = requested;
int emitted = 0;
Object o;
while (--numToEmit >= 0 && (o = deque.poll()) != null) {
if (subscriber.isUnsubscribed()) {
return;
}
if (notification.accept(subscriber, o)) {
// terminal event
return;
} else {
emitted++;
}
}
for (; ; ) {
long oldRequested = requested;
long newRequested = oldRequested - emitted;
if (oldRequested == Long.MAX_VALUE) {
// became unbounded during the loop
// continue the outer loop to emit the rest events.
break;
}
if (REQUESTED_UPDATER.compareAndSet(this, oldRequested, newRequested)) {
if (newRequested == 0) {
// we're done emitting the number requested so return
return;
}
break;
}
}
}
}
}
}
}