Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for backpressure to observables. #27

Merged
merged 1 commit into from
Aug 24, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright (C) 2015 Square, Inc.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is copy/pasted from square/sqlbrite#35

*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.f2prateek.rx.preferences;

import rx.Observable.Operator;
import rx.Producer;
import rx.Subscriber;

/** An operator which keeps the last emitted instance when backpressure has been applied. */
final class BackpressureBufferLastOperator<T> implements Operator<T, T> {
static final Operator<Object, Object> instance = new BackpressureBufferLastOperator<>();

static <T> Operator<T, T> instance() {
//noinspection unchecked
return (Operator<T, T>) instance;
}

private BackpressureBufferLastOperator() {
}

@Override public Subscriber<? super T> call(final Subscriber<? super T> child) {
BufferLastSubscriber<T> parent = new BufferLastSubscriber<>(child);
child.add(parent);
child.setProducer(parent.producer);
return parent;
}

static final class BufferLastSubscriber<T> extends Subscriber<T> {
private static final Object NONE = new Object();

private final Subscriber<? super T> child;

private volatile Object last = NONE; // Guarded by 'this'.
private volatile long requested; // Guarded by 'this'. Starts at zero.

final Producer producer = new Producer() {
@Override public void request(long n) {
if (n < 0) {
throw new IllegalArgumentException("requested " + n + " < 0");
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious, could this ever happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you accidentally overflow a long (requests usually are Long.MAX_VALUE so you could blindly add 1 and get negative) or are just off-by-one (or more) on some subtraction code for request. But it's unlikely to ever happen in practice. Most of this file is copied from an RxJava operator.

}
if (n == 0) {
return;
}

Object candidate;
synchronized (BufferLastSubscriber.this) {
candidate = last;

long currentRequested = requested;
if (Long.MAX_VALUE - n <= currentRequested) {
requested = Long.MAX_VALUE;
} else {
if (candidate != NONE) {
n--; // Decrement since we will be emitting a value.
}
requested = currentRequested + n;
}
}

// Only emit if the value is not the explicit NONE marker.
if (candidate != NONE) {
//noinspection unchecked
child.onNext((T) candidate);
}
}
};

public BufferLastSubscriber(Subscriber<? super T> child) {
this.child = child;
}

@Override public void onNext(T t) {
boolean emit = false;
synchronized (this) {
long currentRequested = requested;
if (currentRequested == Long.MAX_VALUE) {
// No need to decrement when the firehose is open.
emit = true;
} else if (currentRequested > 0) {
requested = currentRequested - 1;
emit = true;
} else {
last = t; // Not emitting, store for later.
}
}

if (emit) {
child.onNext(t);
}
}

@Override public void onStart() {
request(Long.MAX_VALUE);
}

@Override public void onCompleted() {
child.onCompleted();
}

@Override public void onError(Throwable e) {
child.onError(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public interface Adapter<T> {
}
})
.startWith("<init>") // Dummy value to trigger initial load.
.lift(BackpressureBufferLastOperator.<String>instance())
.map(new Func1<String, T>() {
@Override public T call(String ignored) {
return get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,31 @@ public class PreferenceTest {
o.assertValues("bar", "baz", "bar");
}

@Test public void asObservableHonorsBackpressure() {
Preference<String> preference = rxPreferences.getString("foo", "bar");

TestSubscriber<String> o = new TestSubscriber<>(2); // Request only 2 values.
preference.asObservable().subscribe(o);
o.assertValues("bar");

preferences.edit().putString("foo", "baz").commit();
o.assertValues("bar", "baz");

preferences.edit().putString("foo", "foo").commit();
o.assertValues("bar", "baz"); // No new item due to backpressure.

o.requestMore(1);
o.assertValues("bar", "baz", "foo");

for (int i = 0; i < 1000; i++) {
preferences.edit().putString("foo", "foo" + i).commit();
}
o.assertValues("bar", "baz", "foo"); // No new items due to backpressure.

o.requestMore(Long.MAX_VALUE); // Request everything...
o.assertValues("bar", "baz", "foo", "foo999"); // ...but only get latest.
}

@Test public void asAction() {
Preference<String> preference = rxPreferences.getString("foo");
Action1<? super String> action = preference.asAction();
Expand Down