Skip to content

Commit

Permalink
Merge pull request #3487 from srvaroa/1.x
Browse files Browse the repository at this point in the history
1.x: OnBackpressureBuffer: DROP_LATEST and DROP_OLDEST
  • Loading branch information
akarnokd committed Mar 17, 2016
2 parents a92a077 + e4598a5 commit df963fa
Show file tree
Hide file tree
Showing 5 changed files with 308 additions and 48 deletions.
90 changes: 90 additions & 0 deletions src/main/java/rx/BackpressureOverflow.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx;

import rx.annotations.Experimental;
import rx.exceptions.MissingBackpressureException;

/**
* Generic strategy and default implementations to deal with backpressure buffer overflows.
*/
@Experimental
public final class BackpressureOverflow {

public interface Strategy {

/**
* Whether the Backpressure manager should attempt to drop the oldest item, or simply
* drop the item currently causing backpressure.
*
* @return true to request drop of the oldest item, false to drop the newest.
* @throws MissingBackpressureException
*/
boolean mayAttemptDrop() throws MissingBackpressureException;
}

public static final BackpressureOverflow.Strategy ON_OVERFLOW_DEFAULT = Error.INSTANCE;
@SuppressWarnings("unused")
public static final BackpressureOverflow.Strategy ON_OVERFLOW_ERROR = Error.INSTANCE;
@SuppressWarnings("unused")
public static final BackpressureOverflow.Strategy ON_OVERFLOW_DROP_OLDEST = DropOldest.INSTANCE;
@SuppressWarnings("unused")
public static final BackpressureOverflow.Strategy ON_OVERFLOW_DROP_LATEST = DropLatest.INSTANCE;

/**
* Drop oldest items from the buffer making room for newer ones.
*/
static class DropOldest implements BackpressureOverflow.Strategy {
static final DropOldest INSTANCE = new DropOldest();

private DropOldest() {}

@Override
public boolean mayAttemptDrop() {
return true;
}
}

/**
* Drop most recent items, but not {@code onError} nor unsubscribe from source
* (as {code OperatorOnBackpressureDrop}).
*/
static class DropLatest implements BackpressureOverflow.Strategy {
static final DropLatest INSTANCE = new DropLatest();

private DropLatest() {}

@Override
public boolean mayAttemptDrop() {
return false;
}
}

/**
* {@code onError} a MissingBackpressureException and unsubscribe from source.
*/
static class Error implements BackpressureOverflow.Strategy {

static final Error INSTANCE = new Error();

private Error() {}

@Override
public boolean mayAttemptDrop() throws MissingBackpressureException {
throw new MissingBackpressureException("Overflowed buffer");
}
}
}
42 changes: 40 additions & 2 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -6399,7 +6399,8 @@ public final Observable<T> onBackpressureBuffer() {
* <dd>{@code onBackpressureBuffer} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @return the source Observable modified to buffer items up to the given capacity
* @param capacity number of slots available in the buffer.
* @return the source {@code Observable} modified to buffer items up to the given capacity.
* @see <a href="http://reactivex.io/documentation/operators/backpressure.html">ReactiveX operators documentation: backpressure operators</a>
* @since 1.1.0
*/
Expand All @@ -6419,14 +6420,51 @@ public final Observable<T> onBackpressureBuffer(long capacity) {
* <dd>{@code onBackpressureBuffer} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @return the source Observable modified to buffer items up to the given capacity
* @param capacity number of slots available in the buffer.
* @param onOverflow action to execute if an item needs to be buffered, but there are no available slots. Null is allowed.
* @return the source {@code Observable} modified to buffer items up to the given capacity
* @see <a href="http://reactivex.io/documentation/operators/backpressure.html">ReactiveX operators documentation: backpressure operators</a>
* @since 1.1.0
*/
public final Observable<T> onBackpressureBuffer(long capacity, Action0 onOverflow) {
return lift(new OperatorOnBackpressureBuffer<T>(capacity, onOverflow));
}

/**
* Instructs an Observable that is emitting items faster than its observer can consume them to buffer up to
* a given amount of items until they can be emitted. The resulting Observable will behave as determined
* by {@code overflowStrategy} if the buffer capacity is exceeded.
*
* <ul>
* <li>{@code BackpressureOverflow.Strategy.ON_OVERFLOW_ERROR} (default) will {@code onError} dropping all undelivered items,
* unsubscribing from the source, and notifying the producer with {@code onOverflow}. </li>
* <li>{@code BackpressureOverflow.Strategy.ON_OVERFLOW_DROP_LATEST} will drop any new items emitted by the producer while
* the buffer is full, without generating any {@code onError}. Each drop will however invoke {@code onOverflow}
* to signal the overflow to the producer.</li>j
* <li>{@code BackpressureOverflow.Strategy.ON_OVERFLOW_DROP_OLDEST} will drop the oldest items in the buffer in order to make
* room for newly emitted ones. Overflow will not generate an{@code onError}, but each drop will invoke
* {@code onOverflow} to signal the overflow to the producer.</li>
* </ul>
*
* <p>
* <img width="640" height="300" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/bp.obp.buffer.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code onBackpressureBuffer} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param capacity number of slots available in the buffer.
* @param onOverflow action to execute if an item needs to be buffered, but there are no available slots. Null is allowed.
* @param overflowStrategy how should the {@code Observable} react to buffer overflows. Null is not allowed.
* @return the source {@code Observable} modified to buffer items up to the given capacity
* @see <a href="http://reactivex.io/documentation/operators/backpressure.html">ReactiveX operators documentation: backpressure operators</a>
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
*/
@Experimental
public final Observable<T> onBackpressureBuffer(long capacity, Action0 onOverflow, BackpressureOverflow.Strategy overflowStrategy) {
return lift(new OperatorOnBackpressureBuffer<T>(capacity, onOverflow, overflowStrategy));
}

/**
* Instructs an Observable that is emitting items faster than its observer can consume them to discard,
* rather than emit, those items that its observer is not prepared to observe.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import rx.BackpressureOverflow;
import rx.Observable.Operator;
import rx.Producer;
import rx.Subscriber;
Expand All @@ -27,15 +28,18 @@
import rx.functions.Action0;
import rx.internal.util.BackpressureDrainManager;

import static rx.BackpressureOverflow.*;

public class OperatorOnBackpressureBuffer<T> implements Operator<T, T> {

private final Long capacity;
private final Action0 onOverflow;
private final BackpressureOverflow.Strategy overflowStrategy;

private static class Holder {
static final OperatorOnBackpressureBuffer<?> INSTANCE = new OperatorOnBackpressureBuffer<Object>();
}

@SuppressWarnings("unchecked")
public static <T> OperatorOnBackpressureBuffer<T> instance() {
return (OperatorOnBackpressureBuffer<T>) Holder.INSTANCE;
Expand All @@ -44,33 +48,65 @@ public static <T> OperatorOnBackpressureBuffer<T> instance() {
OperatorOnBackpressureBuffer() {
this.capacity = null;
this.onOverflow = null;
this.overflowStrategy = ON_OVERFLOW_DEFAULT;
}

/**
* Construct a new instance that will handle overflows with {@code ON_OVERFLOW_DEFAULT}, providing the
* following behavior config:
*
* @param capacity the max number of items to be admitted in the buffer, must be greater than 0.
*/
public OperatorOnBackpressureBuffer(long capacity) {
this(capacity, null);
this(capacity, null, ON_OVERFLOW_DEFAULT);
}

/**
* Construct a new instance that will handle overflows with {@code ON_OVERFLOW_DEFAULT}, providing the
* following behavior config:
*
* @param capacity the max number of items to be admitted in the buffer, must be greater than 0.
* @param onOverflow the {@code Action0} to execute when the buffer overflows, may be null.
*/
public OperatorOnBackpressureBuffer(long capacity, Action0 onOverflow) {
this(capacity, onOverflow, ON_OVERFLOW_DEFAULT);
}

/**
* Construct a new instance feeding the following behavior config:
*
* @param capacity the max number of items to be admitted in the buffer, must be greater than 0.
* @param onOverflow the {@code Action0} to execute when the buffer overflows, may be null.
* @param overflowStrategy the {@code BackpressureOverflow.Strategy} to handle overflows, it must not be null.
*/
public OperatorOnBackpressureBuffer(long capacity, Action0 onOverflow,
BackpressureOverflow.Strategy overflowStrategy) {
if (capacity <= 0) {
throw new IllegalArgumentException("Buffer capacity must be > 0");
}
if (overflowStrategy == null) {
throw new NullPointerException("The BackpressureOverflow strategy must not be null");
}
this.capacity = capacity;
this.onOverflow = onOverflow;
this.overflowStrategy = overflowStrategy;
}

@Override
public Subscriber<? super T> call(final Subscriber<? super T> child) {

// don't pass through subscriber as we are async and doing queue draining
// a parent being unsubscribed should not affect the children
BufferSubscriber<T> parent = new BufferSubscriber<T>(child, capacity, onOverflow);
BufferSubscriber<T> parent = new BufferSubscriber<T>(child, capacity, onOverflow,
overflowStrategy);

// if child unsubscribes it should unsubscribe the parent, but not the other way around
child.add(parent);
child.setProducer(parent.manager());

return parent;
}

private static final class BufferSubscriber<T> extends Subscriber<T> implements BackpressureDrainManager.BackpressureQueueCallback {
// TODO get a different queue implementation
private final ConcurrentLinkedQueue<Object> queue = new ConcurrentLinkedQueue<Object>();
Expand All @@ -81,14 +117,18 @@ private static final class BufferSubscriber<T> extends Subscriber<T> implements
private final BackpressureDrainManager manager;
private final NotificationLite<T> on = NotificationLite.instance();
private final Action0 onOverflow;
private final BackpressureOverflow.Strategy overflowStrategy;

public BufferSubscriber(final Subscriber<? super T> child, Long capacity, Action0 onOverflow) {
public BufferSubscriber(final Subscriber<? super T> child, Long capacity, Action0 onOverflow,
BackpressureOverflow.Strategy overflowStrategy) {
this.child = child;
this.baseCapacity = capacity;
this.capacity = capacity != null ? new AtomicLong(capacity) : null;
this.onOverflow = onOverflow;
this.manager = new BackpressureDrainManager(this);
this.overflowStrategy = overflowStrategy;
}

@Override
public void onStart() {
request(Long.MAX_VALUE);
Expand Down Expand Up @@ -141,7 +181,7 @@ public Object poll() {
}
return value;
}

private boolean assertCapacity() {
if (capacity == null) {
return true;
Expand All @@ -151,24 +191,30 @@ private boolean assertCapacity() {
do {
currCapacity = capacity.get();
if (currCapacity <= 0) {
if (saturated.compareAndSet(false, true)) {
unsubscribe();
child.onError(new MissingBackpressureException(
"Overflowed buffer of "
+ baseCapacity));
if (onOverflow != null) {
try {
onOverflow.call();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
manager.terminateAndDrain(e);
// this line not strictly necessary but nice for clarity
// and in case of future changes to code after this catch block
return false;
}
boolean hasCapacity = false;
try {
// ok if we're allowed to drop, and there is indeed an item to discard
hasCapacity = overflowStrategy.mayAttemptDrop() && poll() != null;
} catch (MissingBackpressureException e) {
if (saturated.compareAndSet(false, true)) {
unsubscribe();
child.onError(e);
}
}
return false;
if (onOverflow != null) {
try {
onOverflow.call();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
manager.terminateAndDrain(e);
// this line not strictly necessary but nice for clarity
// and in case of future changes to code after this catch block
return false;
}
}
if (!hasCapacity) {
return false;
}
}
// ensure no other thread stole our slot, or retry
} while (!capacity.compareAndSet(currCapacity, currCapacity - 1));
Expand Down
Loading

0 comments on commit df963fa

Please sign in to comment.