diff --git a/src/java/main/org/apache/zookeeper/ChildrenBatchIterator.java b/src/java/main/org/apache/zookeeper/ChildrenBatchIterator.java index ed53b284b88..7664b3fb74b 100644 --- a/src/java/main/org/apache/zookeeper/ChildrenBatchIterator.java +++ b/src/java/main/org/apache/zookeeper/ChildrenBatchIterator.java @@ -2,7 +2,7 @@ import org.apache.zookeeper.data.PathWithStat; -import java.util.Iterator; +import java.util.LinkedList; import java.util.List; /** @@ -14,49 +14,27 @@ class ChildrenBatchIterator implements RemoteIterator { private final String path; private final Watcher watcher; private final int batchSize; - private Iterator currentBatchIterator; - private List currentBatch; - private long highestCZkId = -1; - private boolean noMoreChildren = false; + private final LinkedList childrenQueue; - ChildrenBatchIterator(ZooKeeper zooKeeper, String path, Watcher watcher, int batchSize, int minZkid) + ChildrenBatchIterator(ZooKeeper zooKeeper, String path, Watcher watcher, int batchSize, int minZxid) throws KeeperException, InterruptedException { this.zooKeeper = zooKeeper; this.path = path; this.watcher = watcher; this.batchSize = batchSize; - this.highestCZkId = minZkid; - this.currentBatch = zooKeeper.getChildren(path, watcher, batchSize, highestCZkId); - this.currentBatchIterator = currentBatch.iterator(); + this.childrenQueue = new LinkedList<>(); + + List firstChildrenBatch = zooKeeper.getChildren(path, watcher, batchSize, minZxid); + childrenQueue.addAll(firstChildrenBatch); } @Override - public boolean hasNext() throws InterruptedException, KeeperException { - - // More element in current batch - if (currentBatchIterator.hasNext()) { - return true; - } - - // Server already said no more elements (and possibly set a watch) - if (noMoreChildren) { - return false; - } - - // No more element in current batch, but server may have more - this.currentBatch = zooKeeper.getChildren(path, watcher, batchSize, highestCZkId); - this.currentBatchIterator = currentBatch.iterator(); + public boolean hasNext() { - // Definitely reached the end of pagination - if(currentBatch.isEmpty()) { - noMoreChildren = true; - return false; - } - - // We fetched a new non-empty batch - return true; + // next() never lets childrenQueue empty unless we iterated over all children + return ! childrenQueue.isEmpty(); } @Override @@ -66,9 +44,17 @@ public PathWithStat next() throws KeeperException, InterruptedException { throw new RuntimeException("No more element"); } - PathWithStat returnChildren = currentBatchIterator.next(); + // If we're down to the last element, backfill before returning it + if(childrenQueue.size() == 1) { + + long highestCZxId = childrenQueue.get(0).getStat().getCzxid(); + + List childrenBatch = zooKeeper.getChildren(path, watcher, batchSize, highestCZxId); + childrenQueue.addAll(childrenBatch); + + } - highestCZkId = returnChildren.getStat().getCzxid(); + PathWithStat returnChildren = childrenQueue.pop(); return returnChildren; } diff --git a/src/java/main/org/apache/zookeeper/RemoteIterator.java b/src/java/main/org/apache/zookeeper/RemoteIterator.java index c64a959cad6..8ab7bd90080 100644 --- a/src/java/main/org/apache/zookeeper/RemoteIterator.java +++ b/src/java/main/org/apache/zookeeper/RemoteIterator.java @@ -5,7 +5,7 @@ */ public interface RemoteIterator { - boolean hasNext() throws InterruptedException, KeeperException; + boolean hasNext(); E next() throws InterruptedException, KeeperException; } diff --git a/src/java/test/org/apache/zookeeper/test/GetChildrenPaginatedTest.java b/src/java/test/org/apache/zookeeper/test/GetChildrenPaginatedTest.java index 2bfcfeaf18d..f5e168669f6 100644 --- a/src/java/test/org/apache/zookeeper/test/GetChildrenPaginatedTest.java +++ b/src/java/test/org/apache/zookeeper/test/GetChildrenPaginatedTest.java @@ -22,7 +22,6 @@ import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.data.PathWithStat; import org.apache.zookeeper.data.Stat; -import org.apache.zookeeper.RemoteIterator; import org.junit.Assert; import org.junit.Test; @@ -133,13 +132,13 @@ public void testPaginationIterator() throws Exception { } } - @Test(timeout = 30000) + @Test(timeout = 60000) public void testPaginationWithServerDown() throws Exception { final String testId = UUID.randomUUID().toString(); final String basePath = "/testPagination-" + testId; - Map createdChildrenMetadata = createChildren(basePath, random.nextInt(25)+1, 0); + Map createdChildrenMetadata = createChildren(basePath, random.nextInt(15)+10, 0); Map readChildrenMetadata = new HashMap(); @@ -149,13 +148,14 @@ public void testPaginationWithServerDown() throws Exception { boolean serverDown = false; - while(true) { + while(childrenIterator.hasNext()) { // Randomly change the up/down state of the server if(random.nextBoolean()) { if (serverDown) { LOG.info("Bringing server UP"); startServer(); + waitForServerUp(hostPort, 5000); serverDown = false; } else { LOG.info("Taking server DOWN"); @@ -164,33 +164,17 @@ public void testPaginationWithServerDown() throws Exception { } } - try { - if(!childrenIterator.hasNext()) { - // Reached end of children - break; - } - } catch (InterruptedException|KeeperException e) { - // Just try again until the server is up - LOG.info("Exception in #hasNext()"); - continue; - } - - PathWithStat child = null; boolean exception = false; try { child = childrenIterator.next(); } catch (InterruptedException|KeeperException e) { - LOG.info("Exception in #next()"); + LOG.info("Exception in #next(): " + e.getMessage()); exception = true; } - if(exception) { - // Only expect exception if server is not running - Assert.assertTrue(serverDown); - - } else { + if (! exception) { // next() returned (either more elements in current batch or server is up) Assert.assertNotNull(child);