Skip to content

Commit

Permalink
Make RemoteIterator#hasNext not throw (this commit will be squashed l…
Browse files Browse the repository at this point in the history
…ater)

 - Updating RemoteIterator interface to avoid throwing from hasNext()
 - Updating ChildrenIterator for new interface, and simplifying implementation using deque
  • Loading branch information
Marco Primi committed Jan 20, 2016
1 parent 7e30292 commit ae9977a
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 57 deletions.
54 changes: 20 additions & 34 deletions src/java/main/org/apache/zookeeper/ChildrenBatchIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import org.apache.zookeeper.data.PathWithStat;

import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;

/**
Expand All @@ -14,49 +14,27 @@ class ChildrenBatchIterator implements RemoteIterator<PathWithStat> {
private final String path;
private final Watcher watcher;
private final int batchSize;
private Iterator<PathWithStat> currentBatchIterator;
private List<PathWithStat> currentBatch;
private long highestCZkId = -1;
private boolean noMoreChildren = false;
private final LinkedList<PathWithStat> childrenQueue;


ChildrenBatchIterator(ZooKeeper zooKeeper, String path, Watcher watcher, int batchSize, int minZkid)
ChildrenBatchIterator(ZooKeeper zooKeeper, String path, Watcher watcher, int batchSize, int minZxid)
throws KeeperException, InterruptedException {
this.zooKeeper = zooKeeper;
this.path = path;
this.watcher = watcher;
this.batchSize = batchSize;
this.highestCZkId = minZkid;

this.currentBatch = zooKeeper.getChildren(path, watcher, batchSize, highestCZkId);
this.currentBatchIterator = currentBatch.iterator();
this.childrenQueue = new LinkedList<>();

List<PathWithStat> firstChildrenBatch = zooKeeper.getChildren(path, watcher, batchSize, minZxid);
childrenQueue.addAll(firstChildrenBatch);
}

@Override
public boolean hasNext() throws InterruptedException, KeeperException {

// More element in current batch
if (currentBatchIterator.hasNext()) {
return true;
}

// Server already said no more elements (and possibly set a watch)
if (noMoreChildren) {
return false;
}

// No more element in current batch, but server may have more
this.currentBatch = zooKeeper.getChildren(path, watcher, batchSize, highestCZkId);
this.currentBatchIterator = currentBatch.iterator();
public boolean hasNext() {

// Definitely reached the end of pagination
if(currentBatch.isEmpty()) {
noMoreChildren = true;
return false;
}

// We fetched a new non-empty batch
return true;
// next() never lets childrenQueue empty unless we iterated over all children
return ! childrenQueue.isEmpty();
}

@Override
Expand All @@ -66,9 +44,17 @@ public PathWithStat next() throws KeeperException, InterruptedException {
throw new RuntimeException("No more element");
}

PathWithStat returnChildren = currentBatchIterator.next();
// If we're down to the last element, backfill before returning it
if(childrenQueue.size() == 1) {

long highestCZxId = childrenQueue.get(0).getStat().getCzxid();

List<PathWithStat> childrenBatch = zooKeeper.getChildren(path, watcher, batchSize, highestCZxId);
childrenQueue.addAll(childrenBatch);

}

highestCZkId = returnChildren.getStat().getCzxid();
PathWithStat returnChildren = childrenQueue.pop();

return returnChildren;
}
Expand Down
2 changes: 1 addition & 1 deletion src/java/main/org/apache/zookeeper/RemoteIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
*/
public interface RemoteIterator<E> {

boolean hasNext() throws InterruptedException, KeeperException;
boolean hasNext();

E next() throws InterruptedException, KeeperException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.PathWithStat;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.RemoteIterator;
import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -133,13 +132,13 @@ public void testPaginationIterator() throws Exception {
}
}

@Test(timeout = 30000)
@Test(timeout = 60000)
public void testPaginationWithServerDown() throws Exception {

final String testId = UUID.randomUUID().toString();
final String basePath = "/testPagination-" + testId;

Map<String, Stat> createdChildrenMetadata = createChildren(basePath, random.nextInt(25)+1, 0);
Map<String, Stat> createdChildrenMetadata = createChildren(basePath, random.nextInt(15)+10, 0);

Map<String, Stat> readChildrenMetadata = new HashMap<String, Stat>();

Expand All @@ -149,13 +148,14 @@ public void testPaginationWithServerDown() throws Exception {

boolean serverDown = false;

while(true) {
while(childrenIterator.hasNext()) {

// Randomly change the up/down state of the server
if(random.nextBoolean()) {
if (serverDown) {
LOG.info("Bringing server UP");
startServer();
waitForServerUp(hostPort, 5000);
serverDown = false;
} else {
LOG.info("Taking server DOWN");
Expand All @@ -164,33 +164,17 @@ public void testPaginationWithServerDown() throws Exception {
}
}

try {
if(!childrenIterator.hasNext()) {
// Reached end of children
break;
}
} catch (InterruptedException|KeeperException e) {
// Just try again until the server is up
LOG.info("Exception in #hasNext()");
continue;
}


PathWithStat child = null;

boolean exception = false;
try {
child = childrenIterator.next();
} catch (InterruptedException|KeeperException e) {
LOG.info("Exception in #next()");
LOG.info("Exception in #next(): " + e.getMessage());
exception = true;
}

if(exception) {
// Only expect exception if server is not running
Assert.assertTrue(serverDown);

} else {
if (! exception) {
// next() returned (either more elements in current batch or server is up)
Assert.assertNotNull(child);

Expand Down

0 comments on commit ae9977a

Please sign in to comment.