Skip to content

Commit

Permalink
fix #40 and prevent request overflow by using BackpressureUtils
Browse files Browse the repository at this point in the history
  • Loading branch information
davidmoten committed Feb 24, 2016
1 parent 3f5a55d commit c74e1b3
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 23 deletions.
1 change: 0 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@
<groupId>com.github.davidmoten</groupId>
<artifactId>rxjava-extras</artifactId>
<version>0.7</version>
<scope>test</scope>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import com.github.davidmoten.guavamini.annotations.VisibleForTesting;
import com.github.davidmoten.rtree.geometry.Geometry;
import com.github.davidmoten.rx.util.BackpressureUtils;
import com.github.davidmoten.util.ImmutableStack;

import rx.Observable.OnSubscribe;
Expand Down Expand Up @@ -72,17 +73,16 @@ private void requestSome(long n) {

// rxjava used AtomicLongFieldUpdater instead of AtomicLong
// but benchmarks showed no benefit here so reverted to AtomicLong
long previousCount = requested.getAndAdd(n);
long previousCount = BackpressureUtils.getAndAddRequest(requested, n);
if (previousCount == 0) {
// don't touch stack every time during the loop because
// is a volatile and every write forces a thread memory
// cache flush
ImmutableStack<NodePosition<T, S>> st = stack;
while (true) {
// minimize atomic reads by assigning to a variable here
long r = requested.get();
long numToEmit = r;

st = Backpressure.search(condition, subscriber, st, numToEmit);
st = Backpressure.search(condition, subscriber, st, r);
if (st.isEmpty()) {
if (!subscriber.isUnsubscribed()) {
subscriber.onCompleted();
Expand Down
39 changes: 21 additions & 18 deletions src/test/java/com/github/davidmoten/rtree/RTreeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Test;

Expand Down Expand Up @@ -903,7 +904,7 @@ public void testIntersectsPointLine() {
assertTrue(Intersects.lineIntersectsPoint.call(line(1, 1, 2, 2), point(1, 1)));
}

@Test(timeout = 3000)
@Test(timeout = 30000000)
public void testGroupByIssue40() {
RTree<Integer, Geometry> tree = RTree.star().create();

Expand All @@ -915,18 +916,22 @@ public void testGroupByIssue40() {
tree = tree.add(6, Geometries.point(13.0, 52.0));

Rectangle rectangle = Geometries.rectangle(12.9, 51.9, 13.1, 52.1);
assertEquals(Integer.valueOf(6), tree.search(rectangle).count().toBlocking().single());
assertEquals(Integer.valueOf(2), tree.search(rectangle).doOnRequest(new Action1<Long>() {
@Override
public void call(Long n) {
System.out.println(n);
System.out.println("requestFromGroupBy=" + n);
}
}).groupBy(new Func1<Entry<Integer, Geometry>, Boolean>() {
@Override
public Boolean call(Entry<Integer, Geometry> entry) {
System.out.println(entry);
return entry.value() % 2 == 0;
}
}).doOnRequest(new Action1<Long>() {
@Override
public void call(Long n) {
System.out.println("requestFromFlatMap=" + n);
}
}).flatMap(
new Func1<GroupedObservable<Boolean, Entry<Integer, Geometry>>, Observable<Integer>>() {
@Override
Expand All @@ -937,8 +942,8 @@ public Observable<Integer> call(
}).count().toBlocking().single());
}

@Test(timeout = 3000)
public void testBackpressureSearchWhenLotsRequestedButNotMaxValue() {
@Test
public void testBackpressureForOverflow() {
RTree<Integer, Geometry> tree = RTree.star().create();

tree = tree.add(1, Geometries.point(13.0, 52.0));
Expand All @@ -947,36 +952,34 @@ public void testBackpressureSearchWhenLotsRequestedButNotMaxValue() {
tree = tree.add(4, Geometries.point(13.0, 52.0));
tree = tree.add(5, Geometries.point(13.0, 52.0));
tree = tree.add(6, Geometries.point(13.0, 52.0));

final AtomicInteger count = new AtomicInteger();
Rectangle rectangle = Geometries.rectangle(12.9, 51.9, 13.1, 52.1);

tree.search(rectangle).doOnRequest(new Action1<Long>() {
@Override
public void call(Long n) {
System.out.println(n);
}
}).subscribe(new Subscriber<Object>() {
tree.search(rectangle).subscribe(new Subscriber<Object>() {

@Override
public void onStart() {
request(Long.MAX_VALUE - 100);
request(4);
}

@Override
public void onCompleted() {

}

@Override
public void onError(Throwable arg0) {
public void onError(Throwable e) {

}

@Override
public void onNext(Object arg0) {
request(1);
request(1);
public void onNext(Object t) {
request(Long.MAX_VALUE);
count.incrementAndGet();
}
});
assertEquals(6, count.get());
assertEquals(6, (int) tree.search(rectangle).count().toBlocking().single());

}

private static Func2<Point, Circle, Double> distanceCircleToPoint = new Func2<Point, Circle, Double>() {
Expand Down

0 comments on commit c74e1b3

Please sign in to comment.