Skip to content

Commit

Permalink
Change of logic and wire protocol for pagination
Browse files Browse the repository at this point in the history
Previous iteration would not work properly when children have non-unique creation id (i.e. as result of multi()).

Add an extra offset parameter in request, used by client to track how many children with a given creation zxid it has seen.
Server uses this to skip.

Also changes semantic of minzkid: now returns children with creation >= (before was strictly >)

Adding tests that cover these new behaviors.
  • Loading branch information
Marco Primi committed Apr 11, 2017
1 parent b76fb4e commit 2e98aab
Show file tree
Hide file tree
Showing 8 changed files with 265 additions and 96 deletions.
34 changes: 28 additions & 6 deletions src/java/main/org/apache/zookeeper/ChildrenBatchIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,25 @@ class ChildrenBatchIterator implements RemoteIterator<PathWithStat> {
private final Watcher watcher;
private final int batchSize;
private final LinkedList<PathWithStat> childrenQueue;
private long nextBatchMinZxid;
private int nextBatchZxidOffset;


ChildrenBatchIterator(ZooKeeper zooKeeper, String path, Watcher watcher, int batchSize, int minZxid)
ChildrenBatchIterator(ZooKeeper zooKeeper, String path, Watcher watcher, int batchSize, long minZxid)
throws KeeperException, InterruptedException {
this.zooKeeper = zooKeeper;
this.path = path;
this.watcher = watcher;
this.batchSize = batchSize;
this.nextBatchZxidOffset = 0;
this.nextBatchMinZxid = minZxid;

this.childrenQueue = new LinkedList<>();

List<PathWithStat> firstChildrenBatch = zooKeeper.getChildren(path, watcher, batchSize, minZxid);
List<PathWithStat> firstChildrenBatch = zooKeeper.getChildren(path, watcher, batchSize, nextBatchMinZxid, nextBatchZxidOffset);
childrenQueue.addAll(firstChildrenBatch);

updateOffsetsForNextBatch(firstChildrenBatch);
}

@Override
Expand All @@ -45,18 +51,34 @@ public PathWithStat next() throws KeeperException, InterruptedException, NoSuchE
throw new NoSuchElementException("No more children");
}

// If we're down tThe result will be filteredo the last element, backfill before returning it
// If we're down to the last element, backfill before returning it
if (childrenQueue.size() == 1) {

long highestCZxId = childrenQueue.get(0).getStat().getCzxid();

List<PathWithStat> childrenBatch = zooKeeper.getChildren(path, watcher, batchSize, highestCZxId);
List<PathWithStat> childrenBatch = zooKeeper.getChildren(path, watcher, batchSize, nextBatchMinZxid, nextBatchZxidOffset);
childrenQueue.addAll(childrenBatch);

updateOffsetsForNextBatch(childrenBatch);
}

PathWithStat returnChildren = childrenQueue.pop();

return returnChildren;
}

/**
* Prepare minZxid and zkidOffset for the next batch request based on the children returned in the current
*/
private void updateOffsetsForNextBatch(List<PathWithStat> children) {

for (PathWithStat child : children) {
long childZxid = child.getStat().getCzxid();

if (nextBatchMinZxid == childZxid) {
++nextBatchZxidOffset;
} else {
nextBatchZxidOffset = 1;
nextBatchMinZxid = childZxid;
}
}
}
}
11 changes: 7 additions & 4 deletions src/java/main/org/apache/zookeeper/ZooKeeper.java
Original file line number Diff line number Diff line change
Expand Up @@ -2465,7 +2465,9 @@ public List<String> getChildren(final String path, Watcher watcher)
* @param maxReturned
* - the maximum number of children returned
* @param minCzxId
* - only return children whose creation zkid is greater than {@code minCzxId}
* - only return children whose creation zkid is equal or greater than {@code minCzxId}
* @param czxIdOffset
* - how many children with zkid == minCzxId to skip server-side, as they were returned in previous pages
* @return
* an ordered list of children nodes, up to {@code maxReturned}, ordered by czxid
* @throws KeeperException
Expand All @@ -2482,7 +2484,7 @@ public List<String> getChildren(final String path, Watcher watcher)
*
* @since 3.5.2
*/
public List<PathWithStat> getChildren(final String path, Watcher watcher, final int maxReturned, final long minCzxId)
public List<PathWithStat> getChildren(final String path, Watcher watcher, final int maxReturned, final long minCzxId, final int czxIdOffset)
throws KeeperException, InterruptedException {
final String clientPath = path;
PathUtils.validatePath(clientPath);
Expand All @@ -2506,6 +2508,7 @@ public List<PathWithStat> getChildren(final String path, Watcher watcher, final
request.setWatch(watcher != null);
request.setMaxReturned(maxReturned);
request.setMinCzxId(minCzxId);
request.setCzxIdOffset(czxIdOffset);
GetChildrenPaginatedResponse response = new GetChildrenPaginatedResponse();
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
if (r.getErr() != 0) {
Expand All @@ -2529,7 +2532,7 @@ public List<PathWithStat> getChildren(final String path, Watcher watcher, final
* @param batchSize
* - the maximum number of children returned by each background call to the server
* @param minCzxId
* - only return children whose creation zkid is greater than {@code minCzxId}
* - only return children whose creation zkid is equal or greater than {@code minCzxId}
*
* @return
* an iterator on children node, ordered by czxid
Expand All @@ -2546,7 +2549,7 @@ public List<PathWithStat> getChildren(final String path, Watcher watcher, final
*
* @since 3.5.2
*/
public RemoteIterator<PathWithStat> getChildrenIterator(String path, Watcher watcher, int batchSize, int minCzxId)
public RemoteIterator<PathWithStat> getChildrenIterator(String path, Watcher watcher, int batchSize, long minCzxId)
throws KeeperException, InterruptedException {
return new ChildrenBatchIterator(this, path, watcher, batchSize, minCzxId);
}
Expand Down
55 changes: 36 additions & 19 deletions src/java/main/org/apache/zookeeper/server/DataTree.java
Original file line number Diff line number Diff line change
Expand Up @@ -691,20 +691,21 @@ public List<String> getChildren(String path, Stat stat, Watcher watcher)
}

/**
* Comparator used to sort children by creation time
* Comparator used to sort children by creation time (older to newer) compare lexicographically to break ties
*/
private static class NodeCreationComparator implements Comparator<PathWithStat> {
@Override
public int compare(PathWithStat left, PathWithStat right) {
final long leftCzxId = left.getStat().getCzxid();
final long rightCzxId = right.getStat().getCzxid();

if (leftCzxId > rightCzxId) {
if (leftCzxId < rightCzxId) {
return -1;
} else if (rightCzxId > leftCzxId) {
} else if (leftCzxId > rightCzxId) {
return 1;
} else {
return 0;
// For nodes with the same creation time (created with multi()), use name to order
return left.getPath().compareTo(right.getPath());
}
}
}
Expand All @@ -720,12 +721,13 @@ public int compare(PathWithStat left, PathWithStat right) {
* @param stat stat of the node to list
* @param watcher an optional watcher to attach to the node. The watcher is added only once when reaching the end of pagination
* @param maxReturned maximum number of children to return. Return one more than this number to indicate truncation
* @param minCzxId only return children whose creation zxid greater than minCzxId
* @param minCzxId only return children whose creation zxid equal or greater than minCzxId
* @param czxIdOffset how many children with zxid == minCzxId to skip (as returned in previous pages)
* @return A list of path with stats
* @throws NoNodeException if the path does not exist
*/
public List<PathWithStat> getPaginatedChildren(String path, Stat stat, Watcher watcher, int maxReturned,
long minCzxId)
long minCzxId, long czxIdOffset)
throws NoNodeException {
DataNode n = nodes.get(path);
if (n == null) {
Expand All @@ -741,32 +743,47 @@ public List<PathWithStat> getPaginatedChildren(String path, Stat stat, Watcher w
childrenQueue = new PriorityQueue<PathWithStat>(1);
} else {
childrenQueue = new PriorityQueue<PathWithStat>(maxReturned + 1, staticNodeCreationComparator);
for (String child: actualChildren) {
for (String child : actualChildren) {
DataNode childNode = nodes.get(path + "/" + child);
if (null != childNode) {
final long czxId = childNode.stat.getCzxid();
if (czxId <= minCzxId) continue; // Filter out the nodes that are below minCzxId

if (czxId < minCzxId) {
// Filter out nodes that are below minCzxId
continue;
}

Stat childStat = new Stat();
childNode.copyStat(childStat);

// Cannot discard before having sorted and removed offset
childrenQueue.add(new PathWithStat(child, childStat));
// Do we have more than we want (maxReturned + 1), if so discard right away the extra one
if (childrenQueue.size() > maxReturned + 1) {
// Drop the node with highest CzxId in the queue.
childrenQueue.poll();
}
}
}
}
// This is the last page, set the watch
if (childrenQueue.size() <= maxReturned) {
if (watcher != null) {
childWatches.addWatch(path, watcher);

// Go over the ordered list of children and skip the first czxIdOffset that have czxid equal to minCzxId, if any
int skipped = 0;
while (!childrenQueue.isEmpty() && skipped < czxIdOffset) {
PathWithStat head = childrenQueue.peek();
if (head.getStat().getCzxid() > minCzxId) {
// We moved past the minCzxId, no point in looking further
break;
} else {
childrenQueue.poll();
++skipped;
}
}

// Return as list preserving newer-to-older order
LinkedList<PathWithStat> result = new LinkedList<PathWithStat>();
while (!childrenQueue.isEmpty()) {
result.addFirst(childrenQueue.poll());
while (!childrenQueue.isEmpty() && result.size() < maxReturned) {
result.addLast(childrenQueue.poll());
}

// This is the last page, set the watch
if (childrenQueue.isEmpty()) {
childWatches.addWatch(path, watcher);
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,27 +432,17 @@ public void processRequest(Request request) {
if (n == null) {
throw new KeeperException.NoNodeException();
}
Long aclG;
synchronized(n) {
aclG = n.acl;
}
PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().convertLong(aclG),
PrepRequestProcessor.checkACL(zks, request.cnxn, zks.getZKDatabase().aclForNode(n),
ZooDefs.Perms.READ,
request.authInfo);
request.authInfo, getChildrenPaginatedRequest.getPath(), null);
final int maxReturned = getChildrenPaginatedRequest.getMaxReturned();
List<PathWithStat> list = zks.getZKDatabase().getPaginatedChildren(
getChildrenPaginatedRequest.getPath(), stat,
getChildrenPaginatedRequest.getWatch() ? cnxn : null,
maxReturned,
getChildrenPaginatedRequest.getMinCzxId());
// If the returned size is maxReturned+1, more children are available
boolean watching = false;
if (list.size() > maxReturned) {
watching = true;
// Drop the extra children
list = list.subList(0, maxReturned);
}
rsp = new GetChildrenPaginatedResponse(list, watching, stat);
getChildrenPaginatedRequest.getMinCzxId(),
getChildrenPaginatedRequest.getCzxIdOffset());
rsp = new GetChildrenPaginatedResponse(list, stat);
break;
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/java/main/org/apache/zookeeper/server/ZKDatabase.java
Original file line number Diff line number Diff line change
Expand Up @@ -482,11 +482,12 @@ public List<String> getChildren(String path, Stat stat, Watcher watcher)
* @param watcher an optional watcher for this node children
* @param maxReturned the maximum number of nodes to be returned
* @param minCzxId only return children whose creation zxid greater than minCzxId
* @param czxIdOffset how many children with zxid == minCzxId to skip (as returned in previous pages)
* @return A list of PathWithStat for the children. Size is bound to maxReturned (maxReturned+1 indicates truncation)
* @throws NoNodeException if the given path does not exist
*/
public List<PathWithStat> getPaginatedChildren(String path, Stat stat, Watcher watcher, int maxReturned, long minCzxId) throws NoNodeException {
return dataTree.getPaginatedChildren(path, stat, watcher, maxReturned, minCzxId);
public List<PathWithStat> getPaginatedChildren(String path, Stat stat, Watcher watcher, int maxReturned, long minCzxId, long czxIdOffset) throws NoNodeException {
return dataTree.getPaginatedChildren(path, stat, watcher, maxReturned, minCzxId, czxIdOffset);
}

/**
Expand Down
Loading

0 comments on commit 2e98aab

Please sign in to comment.