Skip to content

Commit

Permalink
break loop in OnSubscribeSearch when completed
Browse files Browse the repository at this point in the history
  • Loading branch information
davidmoten committed Feb 23, 2016
1 parent e678c0f commit 3f5a55d
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ private void requestSome(long n) {
if (st.isEmpty()) {
if (!subscriber.isUnsubscribed()) {
subscriber.onCompleted();
break;
} else
break;
} else if (requested.addAndGet(-r) == 0)
Expand Down
82 changes: 82 additions & 0 deletions src/test/java/com/github/davidmoten/rtree/RTreeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@

import rx.Observable;
import rx.Subscriber;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.observables.GroupedObservable;

public class RTreeTest {

Expand Down Expand Up @@ -901,6 +903,82 @@ public void testIntersectsPointLine() {
assertTrue(Intersects.lineIntersectsPoint.call(line(1, 1, 2, 2), point(1, 1)));
}

@Test(timeout = 3000)
public void testGroupByIssue40() {
RTree<Integer, Geometry> tree = RTree.star().create();

tree = tree.add(1, Geometries.point(13.0, 52.0));
tree = tree.add(2, Geometries.point(13.0, 52.0));
tree = tree.add(3, Geometries.point(13.0, 52.0));
tree = tree.add(4, Geometries.point(13.0, 52.0));
tree = tree.add(5, Geometries.point(13.0, 52.0));
tree = tree.add(6, Geometries.point(13.0, 52.0));

Rectangle rectangle = Geometries.rectangle(12.9, 51.9, 13.1, 52.1);
assertEquals(Integer.valueOf(6), tree.search(rectangle).count().toBlocking().single());
assertEquals(Integer.valueOf(2), tree.search(rectangle).doOnRequest(new Action1<Long>() {
@Override
public void call(Long n) {
System.out.println(n);
}
}).groupBy(new Func1<Entry<Integer, Geometry>, Boolean>() {
@Override
public Boolean call(Entry<Integer, Geometry> entry) {
System.out.println(entry);
return entry.value() % 2 == 0;
}
}).flatMap(
new Func1<GroupedObservable<Boolean, Entry<Integer, Geometry>>, Observable<Integer>>() {
@Override
public Observable<Integer> call(
GroupedObservable<Boolean, Entry<Integer, Geometry>> group) {
return group.count();
}
}).count().toBlocking().single());
}

@Test(timeout = 3000)
public void testBackpressureSearchWhenLotsRequestedButNotMaxValue() {
RTree<Integer, Geometry> tree = RTree.star().create();

tree = tree.add(1, Geometries.point(13.0, 52.0));
tree = tree.add(2, Geometries.point(13.0, 52.0));
tree = tree.add(3, Geometries.point(13.0, 52.0));
tree = tree.add(4, Geometries.point(13.0, 52.0));
tree = tree.add(5, Geometries.point(13.0, 52.0));
tree = tree.add(6, Geometries.point(13.0, 52.0));

Rectangle rectangle = Geometries.rectangle(12.9, 51.9, 13.1, 52.1);

tree.search(rectangle).doOnRequest(new Action1<Long>() {
@Override
public void call(Long n) {
System.out.println(n);
}
}).subscribe(new Subscriber<Object>() {

@Override
public void onStart() {
request(Long.MAX_VALUE - 100);
}

@Override
public void onCompleted() {
}

@Override
public void onError(Throwable arg0) {

}

@Override
public void onNext(Object arg0) {
request(1);
request(1);
}
});
}

private static Func2<Point, Circle, Double> distanceCircleToPoint = new Func2<Point, Circle, Double>() {
@Override
public Double call(Point point, Circle circle) {
Expand Down Expand Up @@ -947,4 +1025,8 @@ private static Rectangle r(double n, double m) {
static Rectangle random() {
return r(Math.random() * 1000, Math.random() * 1000);
}

public static void main(String[] args) {
System.out.println(Long.MAX_VALUE);
}
}

0 comments on commit 3f5a55d

Please sign in to comment.