Skip to content

Commit

Permalink
Address review comment - round 3 (this commit will be squashed later)
Browse files Browse the repository at this point in the history
 - Adding to CHANGES file
 - s/zkid/czxid
 - Better javadoc and more comments
 - Minor refactoring and renaming
 - Add test for empty children list and iteration beyond last element
  • Loading branch information
Marco Primi committed Apr 11, 2017
1 parent b010a79 commit b76fb4e
Show file tree
Hide file tree
Showing 9 changed files with 198 additions and 86 deletions.
11 changes: 6 additions & 5 deletions src/java/main/org/apache/zookeeper/ChildrenBatchIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;

/**
* Iterator over children nodes of a given path.
Expand Down Expand Up @@ -38,14 +39,14 @@ public boolean hasNext() {
}

@Override
public PathWithStat next() throws KeeperException, InterruptedException {
public PathWithStat next() throws KeeperException, InterruptedException, NoSuchElementException {

if(!hasNext()) {
throw new RuntimeException("No more element");
if (!hasNext()) {
throw new NoSuchElementException("No more children");
}

// If we're down to the last element, backfill before returning it
if(childrenQueue.size() == 1) {
// If we're down tThe result will be filteredo the last element, backfill before returning it
if (childrenQueue.size() == 1) {

long highestCZxId = childrenQueue.get(0).getStat().getCzxid();

Expand Down
17 changes: 15 additions & 2 deletions src/java/main/org/apache/zookeeper/RemoteIterator.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,24 @@
package org.apache.zookeeper;

import java.util.NoSuchElementException;

/**
* Iterator that involves server communication, and as such can throw
* An iterator over a collection whose elements need to be fetched remotely.
*/
public interface RemoteIterator<E> {

/**
* Returns true if the iterator has more elements.
* @return true if the iterator has more elements, false otherwise.
*/
boolean hasNext();

E next() throws InterruptedException, KeeperException;
/**
* Returns the next element in the iteration.
* @return the next element in the iteration.
* @throws InterruptedException if the thread is interrupted
* @throws KeeperException if an error is encountered server-side
* @throws NoSuchElementException if the iteration has no more elements
*/
E next() throws InterruptedException, KeeperException, NoSuchElementException;
}
83 changes: 63 additions & 20 deletions src/java/main/org/apache/zookeeper/ZooKeeper.java
Original file line number Diff line number Diff line change
Expand Up @@ -2449,23 +2449,46 @@ public List<String> getChildren(final String path, Watcher watcher)

/**
* Return the a subset (a "page") of the children node of the given path.
*
* <p>
* The returned list contains up to {@code maxReturned} ordered by czxid.
* </p>
*
* <p>
* If {@code watcher} is non-null, a watch on children creation/deletion is set when reaching the end of pagination
* </p>
*
* @param path
* @param watcher explicit watcher
* @param maxReturned The maximum number of children to return
* @param minZxid The result will be filtered out to nodes having a czkid > minZxid
* @return an ordered list of child nodes, ordered by czkid
* @throws KeeperException If the server signals an error with a non-zero error code.
* @throws IllegalArgumentException if an invalid path is specified
* - the path of the node
* @param watcher
* - a concrete watcher or null
* @param maxReturned
* - the maximum number of children returned
* @param minCzxId
* - only return children whose creation zkid is greater than {@code minCzxId}
* @return
* an ordered list of children nodes, up to {@code maxReturned}, ordered by czxid
* @throws KeeperException
* if the server signals an error with a non-zero error code.
* @throws IllegalArgumentException
* if any of the following is true:
* <ul>
* <li> {@code path} is invalid
* <li> {@code maxReturned} is less than 1
* </ul>
*
* @throws InterruptedException
* if the server transaction is interrupted.
*
* @since 3.5.2
*/
public List<PathWithStat> getChildren(final String path, Watcher watcher, final int maxReturned, final long minZxid)
throws KeeperException, InterruptedException
{
public List<PathWithStat> getChildren(final String path, Watcher watcher, final int maxReturned, final long minCzxId)
throws KeeperException, InterruptedException {
final String clientPath = path;
PathUtils.validatePath(clientPath);

if(maxReturned <= 0) {
throw new IllegalArgumentException("Cannot return less than 1 children at the time");
throw new IllegalArgumentException("Cannot return less than 1 children");
}

// the watch contains the un-chroot path
Expand All @@ -2482,7 +2505,7 @@ public List<PathWithStat> getChildren(final String path, Watcher watcher, final
request.setPath(serverPath);
request.setWatch(watcher != null);
request.setMaxReturned(maxReturned);
request.setMinzkid(minZxid);
request.setMinCzxId(minCzxId);
GetChildrenPaginatedResponse response = new GetChildrenPaginatedResponse();
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
if (r.getErr() != 0) {
Expand All @@ -2493,19 +2516,39 @@ public List<PathWithStat> getChildren(final String path, Watcher watcher, final
}

/**
* Return the a RemoteIterator over the children node of the given path.
* Return the a RemoteIterator over the children nodes of the given path.
*
* <p>
* If {@code watcher} is non-null, a watch on children creation/deletion is set when reaching the end the iterator
* </p>
*
* @param path
* @param watcher explicit watcher, set when the end of the list is reached
* @param batchSize number of children in each batch fetched by the iterator in the background
* @param minZxid The result will be filtered out to nodes having a czkid > minZxid
* @return an iterator, ordered by czkid
* @throws KeeperException If the server signals an error with a non-zero error code.
* @throws IllegalArgumentException if an invalid path is specified
* - the path of the node
* @param watcher
* - a concrete watcher or null
* @param batchSize
* - the maximum number of children returned by each background call to the server
* @param minCzxId
* - only return children whose creation zkid is greater than {@code minCzxId}
*
* @return
* an iterator on children node, ordered by czxid
* @throws KeeperException
* if the server signals an error with a non-zero error code.
* @throws IllegalArgumentException
* if any of the following is true:
* <ul>
* <li> {@code path} is invalid
* <li> {@code maxReturned} is less than 1
* </ul>
* @throws InterruptedException
* if the server transaction is interrupted.
*
* @since 3.5.2
*/
public RemoteIterator<PathWithStat> getChildrenIterator(String path, Watcher watcher, int batchSize, int minZxid)
public RemoteIterator<PathWithStat> getChildrenIterator(String path, Watcher watcher, int batchSize, int minCzxId)
throws KeeperException, InterruptedException {
return new ChildrenBatchIterator(this, path, watcher, batchSize, minZxid);
return new ChildrenBatchIterator(this, path, watcher, batchSize, minCzxId);
}

/**
Expand Down
84 changes: 54 additions & 30 deletions src/java/main/org/apache/zookeeper/server/DataTree.java
Original file line number Diff line number Diff line change
Expand Up @@ -691,16 +691,42 @@ public List<String> getChildren(String path, Stat stat, Watcher watcher)
}

/**
* This will return a paginated list of children for a given path
* @param path The path we want to list
* @param stat The current stat for this path
* @param watcher The watcher to attach to this path. The watcher will be added only when we return <= maxReturned items
* @param maxReturned How many to return max. We will return one more than this number to indicate truncation
* @param minZkId The last zkID to not return. Any returned item will have a zkid > minZkId
* @return A list of path with stats
* @throws NoNodeException
* Comparator used to sort children by creation time
*/
public List<PathWithStat> getPaginatedChildren(String path, Stat stat, Watcher watcher, int maxReturned, long minZkId) throws NoNodeException {
private static class NodeCreationComparator implements Comparator<PathWithStat> {
@Override
public int compare(PathWithStat left, PathWithStat right) {
final long leftCzxId = left.getStat().getCzxid();
final long rightCzxId = right.getStat().getCzxid();

if (leftCzxId > rightCzxId) {
return -1;
} else if (rightCzxId > leftCzxId) {
return 1;
} else {
return 0;
}
}
}

/**
* Static comparator instance, avoids creating and destroying a new one at each invocation of the method below
*/
private static NodeCreationComparator staticNodeCreationComparator = new NodeCreationComparator();

/**
* Produces a paginated list of the children of a given path
* @param path path of node node to list
* @param stat stat of the node to list
* @param watcher an optional watcher to attach to the node. The watcher is added only once when reaching the end of pagination
* @param maxReturned maximum number of children to return. Return one more than this number to indicate truncation
* @param minCzxId only return children whose creation zxid greater than minCzxId
* @return A list of path with stats
* @throws NoNodeException if the path does not exist
*/
public List<PathWithStat> getPaginatedChildren(String path, Stat stat, Watcher watcher, int maxReturned,
long minCzxId)
throws NoNodeException {
DataNode n = nodes.get(path);
if (n == null) {
throw new KeeperException.NoNodeException();
Expand All @@ -709,41 +735,39 @@ public List<PathWithStat> getPaginatedChildren(String path, Stat stat, Watcher w
if (stat != null) {
n.copyStat(stat);
}
PriorityQueue<PathWithStat> children;
Set<String> childs = n.getChildren();
if (childs == null) {
children = new PriorityQueue<PathWithStat>(1);
PriorityQueue<PathWithStat> childrenQueue;
Set<String> actualChildren = n.getChildren();
if (actualChildren == null) {
childrenQueue = new PriorityQueue<PathWithStat>(1);
} else {
children = new PriorityQueue<PathWithStat>(maxReturned + 1, new Comparator<PathWithStat>() {
@Override
public int compare(PathWithStat o1, PathWithStat o2) {
final long l = o2.getStat().getCzxid() - o1.getStat().getCzxid();
return l < 0 ? -1 : (l == 0 ? 0 : 1);
}
});
for (String child: childs) {
childrenQueue = new PriorityQueue<PathWithStat>(maxReturned + 1, staticNodeCreationComparator);
for (String child: actualChildren) {
DataNode childNode = nodes.get(path + "/" + child);
if (null != childNode) {
final long czxid = childNode.stat.getCzxid();
if (czxid <= minZkId) continue; // Filter out the nodes that are below our water mark
final long czxId = childNode.stat.getCzxid();
if (czxId <= minCzxId) continue; // Filter out the nodes that are below minCzxId
Stat childStat = new Stat();
childNode.copyStat(childStat);
children.add(new PathWithStat(child, childStat));
// Do we have more than we want (maxReturned + 1), if so discard right away the extra one
if (children.size() > maxReturned + 1) {
children.poll();
childrenQueue.add(new PathWithStat(child, childStat));
// Do we have more than we want (maxReturned + 1), if so discard right away the extra one
if (childrenQueue.size() > maxReturned + 1) {
// Drop the node with highest CzxId in the queue.
childrenQueue.poll();
}
}
}
}

if (children.size() <= maxReturned) {
// This is the last page, set the watch
if (childrenQueue.size() <= maxReturned) {
if (watcher != null) {
childWatches.addWatch(path, watcher);
}
}
// Return as list preserving newer-to-older order
LinkedList<PathWithStat> result = new LinkedList<PathWithStat>();
while (!children.isEmpty()) result.addFirst(children.poll());
while (!childrenQueue.isEmpty()) {
result.addFirst(childrenQueue.poll());
}
return result;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,11 +444,12 @@ public void processRequest(Request request) {
getChildrenPaginatedRequest.getPath(), stat,
getChildrenPaginatedRequest.getWatch() ? cnxn : null,
maxReturned,
getChildrenPaginatedRequest.getMinzkid());
// Check if we truncated...
getChildrenPaginatedRequest.getMinCzxId());
// If the returned size is maxReturned+1, more children are available
boolean watching = false;
if (list.size() > maxReturned) {
watching = true;
// Drop the extra children
list = list.subList(0, maxReturned);
}
rsp = new GetChildrenPaginatedResponse(list, watching, stat);
Expand Down
18 changes: 9 additions & 9 deletions src/java/main/org/apache/zookeeper/server/ZKDatabase.java
Original file line number Diff line number Diff line change
Expand Up @@ -476,17 +476,17 @@ public List<String> getChildren(String path, Stat stat, Watcher watcher)
}

/**
* get paginated children list for this path
* Get a subset (a page) of the children of the given node
* @param path the path of the node
* @param stat the stat of the node
* @param watcher the watcher function for this path
* @param maxReturned the maximum number of nodes to be returned. One more will be returned to indicate truncation
* @param minZkId We want to filter out any node whose zkID is <= to minZkId
* @return A list of PathWithStat not bounded by maxReturned
* @throws NoNodeException
*/
public List<PathWithStat> getPaginatedChildren(String path, Stat stat, Watcher watcher, int maxReturned, long minZkId) throws NoNodeException {
return dataTree.getPaginatedChildren(path, stat, watcher, maxReturned, minZkId);
* @param watcher an optional watcher for this node children
* @param maxReturned the maximum number of nodes to be returned
* @param minCzxId only return children whose creation zxid greater than minCzxId
* @return A list of PathWithStat for the children. Size is bound to maxReturned (maxReturned+1 indicates truncation)
* @throws NoNodeException if the given path does not exist
*/
public List<PathWithStat> getPaginatedChildren(String path, Stat stat, Watcher watcher, int maxReturned, long minCzxId) throws NoNodeException {
return dataTree.getPaginatedChildren(path, stat, watcher, maxReturned, minCzxId);
}

/**
Expand Down
8 changes: 4 additions & 4 deletions src/java/test/org/apache/zookeeper/server/DataTreeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -241,15 +241,15 @@ public void run() {
@Test(timeout = 60000)
public void getChildrenPaginated() throws NodeExistsException, NoNodeException {
final String rootPath = "/children";
final int firstZkID = 1000;
final int firstCzxId = 1000;
final int countNodes = 10;

// Create the parent node
dt.createNode(rootPath, new byte[0], null, 0, dt.getNode("/").stat.getCversion()+1, 1, 1);

// Create 10 child nodes
for (int i = 0; i < countNodes; ++i) {
dt.createNode(rootPath + "/test-" + i, new byte[0], null, 0, dt.getNode(rootPath).stat.getCversion() + i + 1, firstZkID + i, 1);
dt.createNode(rootPath + "/test-" + i, new byte[0], null, 0, dt.getNode(rootPath).stat.getCversion() + i + 1, firstCzxId + i, 1);
}

// Asking from a negative would give me all children, and set the watch
Expand All @@ -271,7 +271,7 @@ public void getChildrenPaginated() throws NodeExistsException, NoNodeException {
result = dt.getPaginatedChildren(rootPath, null, new DummyWatcher(), 2, 1000 + countNodes - 2);
Assert.assertEquals(1, result.size());
Assert.assertEquals("test-" + (countNodes - 1), result.get(0).getPath());
Assert.assertEquals(firstZkID + countNodes - 1, result.get(0).getStat().getMzxid());
Assert.assertEquals(firstCzxId + countNodes - 1, result.get(0).getStat().getMzxid());
Assert.assertEquals("The watch should have been set", curWatchCount + 1, dt.getWatchCount());

// Asking from the last created node should return an empty list and set the watch
Expand All @@ -293,7 +293,7 @@ public void getChildrenPaginated() throws NodeExistsException, NoNodeException {
@Test(timeout = 60000)
public void getChildrenPaginatedEmpty() throws NodeExistsException, NoNodeException {
final String rootPath = "/children";
final int firstZkID = 1000;
final int firstCzxId = 1000;

// Create the parent node
dt.createNode(rootPath, new byte[0], null, 0, dt.getNode("/").stat.getCversion()+1, 1, 1);
Expand Down
Loading

0 comments on commit b76fb4e

Please sign in to comment.