diff --git a/src/main/java/com/github/davidmoten/rtree/OnSubscribeSearch.java b/src/main/java/com/github/davidmoten/rtree/OnSubscribeSearch.java index 950e45d7..1f23865d 100644 --- a/src/main/java/com/github/davidmoten/rtree/OnSubscribeSearch.java +++ b/src/main/java/com/github/davidmoten/rtree/OnSubscribeSearch.java @@ -86,6 +86,7 @@ private void requestSome(long n) { if (st.isEmpty()) { if (!subscriber.isUnsubscribed()) { subscriber.onCompleted(); + break; } else break; } else if (requested.addAndGet(-r) == 0) diff --git a/src/test/java/com/github/davidmoten/rtree/RTreeTest.java b/src/test/java/com/github/davidmoten/rtree/RTreeTest.java index 78ee783e..438b8efa 100644 --- a/src/test/java/com/github/davidmoten/rtree/RTreeTest.java +++ b/src/test/java/com/github/davidmoten/rtree/RTreeTest.java @@ -41,8 +41,10 @@ import rx.Observable; import rx.Subscriber; +import rx.functions.Action1; import rx.functions.Func1; import rx.functions.Func2; +import rx.observables.GroupedObservable; public class RTreeTest { @@ -901,6 +903,82 @@ public void testIntersectsPointLine() { assertTrue(Intersects.lineIntersectsPoint.call(line(1, 1, 2, 2), point(1, 1))); } + @Test(timeout = 3000) + public void testGroupByIssue40() { + RTree tree = RTree.star().create(); + + tree = tree.add(1, Geometries.point(13.0, 52.0)); + tree = tree.add(2, Geometries.point(13.0, 52.0)); + tree = tree.add(3, Geometries.point(13.0, 52.0)); + tree = tree.add(4, Geometries.point(13.0, 52.0)); + tree = tree.add(5, Geometries.point(13.0, 52.0)); + tree = tree.add(6, Geometries.point(13.0, 52.0)); + + Rectangle rectangle = Geometries.rectangle(12.9, 51.9, 13.1, 52.1); + assertEquals(Integer.valueOf(6), tree.search(rectangle).count().toBlocking().single()); + assertEquals(Integer.valueOf(2), tree.search(rectangle).doOnRequest(new Action1() { + @Override + public void call(Long n) { + System.out.println(n); + } + }).groupBy(new Func1, Boolean>() { + @Override + public Boolean call(Entry entry) { + System.out.println(entry); + return entry.value() % 2 == 0; + } + }).flatMap( + new Func1>, Observable>() { + @Override + public Observable call( + GroupedObservable> group) { + return group.count(); + } + }).count().toBlocking().single()); + } + + @Test(timeout = 3000) + public void testBackpressureSearchWhenLotsRequestedButNotMaxValue() { + RTree tree = RTree.star().create(); + + tree = tree.add(1, Geometries.point(13.0, 52.0)); + tree = tree.add(2, Geometries.point(13.0, 52.0)); + tree = tree.add(3, Geometries.point(13.0, 52.0)); + tree = tree.add(4, Geometries.point(13.0, 52.0)); + tree = tree.add(5, Geometries.point(13.0, 52.0)); + tree = tree.add(6, Geometries.point(13.0, 52.0)); + + Rectangle rectangle = Geometries.rectangle(12.9, 51.9, 13.1, 52.1); + + tree.search(rectangle).doOnRequest(new Action1() { + @Override + public void call(Long n) { + System.out.println(n); + } + }).subscribe(new Subscriber() { + + @Override + public void onStart() { + request(Long.MAX_VALUE - 100); + } + + @Override + public void onCompleted() { + } + + @Override + public void onError(Throwable arg0) { + + } + + @Override + public void onNext(Object arg0) { + request(1); + request(1); + } + }); + } + private static Func2 distanceCircleToPoint = new Func2() { @Override public Double call(Point point, Circle circle) { @@ -947,4 +1025,8 @@ private static Rectangle r(double n, double m) { static Rectangle random() { return r(Math.random() * 1000, Math.random() * 1000); } + + public static void main(String[] args) { + System.out.println(Long.MAX_VALUE); + } }