Skip to content

Commit

Permalink
ZOOKEEPER-2260: Paginated getChildren call
Browse files Browse the repository at this point in the history
Motivation
Being able to get children with pagination.
Two main benefits:
 - Allow listing children that normally would exceed the buffer limit
 - Get stat for children in the same call as the list

Modifications
- Implemetation of DataTree#getPaginatedChildren
- Added the call to the zk client
- Handle the call in the FinalRequestProcessor
- Added PathWithStat, GetChildrenPaginatedRequest and GetChildrenPaginatedResponse to the protos
- Added tests for pagination and watch
  • Loading branch information
Marco Primi committed Dec 14, 2015
1 parent 735ea78 commit 32a7eb6
Show file tree
Hide file tree
Showing 9 changed files with 437 additions and 2 deletions.
2 changes: 2 additions & 0 deletions src/java/main/org/apache/zookeeper/ZooDefs.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ public interface OpCode {

public final int deleteContainer = 20;

public final int getChildrenPaginated = 71;

public final int auth = 100;

public final int setWatches = 101;
Expand Down
43 changes: 43 additions & 0 deletions src/java/main/org/apache/zookeeper/ZooKeeper.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.zookeeper.common.PathUtils;
import org.apache.zookeeper.common.StringUtils;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.PathWithStat;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.proto.CheckWatchesRequest;
import org.apache.zookeeper.proto.Create2Response;
Expand All @@ -62,6 +63,8 @@
import org.apache.zookeeper.proto.GetACLResponse;
import org.apache.zookeeper.proto.GetChildren2Request;
import org.apache.zookeeper.proto.GetChildren2Response;
import org.apache.zookeeper.proto.GetChildrenPaginatedRequest;
import org.apache.zookeeper.proto.GetChildrenPaginatedResponse;
import org.apache.zookeeper.proto.GetChildrenRequest;
import org.apache.zookeeper.proto.GetChildrenResponse;
import org.apache.zookeeper.proto.GetDataRequest;
Expand Down Expand Up @@ -2258,6 +2261,46 @@ public List<String> getChildren(final String path, Watcher watcher)
return response.getChildren();
}

/**
*
* @param path
* @param watcher explicit watcher
* @param maxReturned The maximum number of children to return
* @param minZkid The result will be filtered out to nodes having a zkid > minZkid
* @return an ordered list of child nodes, ordered by mZkid
* @throws KeeperException
* @throws InterruptedException
*/
public List<PathWithStat> getChildrenPaginated(final String path, Watcher watcher, final int maxReturned, final long minZkid)
throws KeeperException, InterruptedException
{
final String clientPath = path;
PathUtils.validatePath(clientPath);

// the watch contains the un-chroot path
WatchRegistration wcb = null;
if (watcher != null) {
wcb = new ChildWatchRegistration(watcher, clientPath);
}

final String serverPath = prependChroot(clientPath);

RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.getChildrenPaginated);
GetChildrenPaginatedRequest request = new GetChildrenPaginatedRequest();
request.setPath(serverPath);
request.setWatch(watcher != null);
request.setMaxReturned(maxReturned);
request.setMinzkid(minZkid);
GetChildrenPaginatedResponse response = new GetChildrenPaginatedResponse();
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
clientPath);
}
return response.getChildren();
}

/**
* Return the list of the children of the node of the given path.
* <p>
Expand Down
62 changes: 62 additions & 0 deletions src/java/main/org/apache/zookeeper/server/DataTree.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.common.PathTrie;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.PathWithStat;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.data.StatPersisted;
import org.apache.zookeeper.txn.CheckVersionTxn;
Expand All @@ -60,11 +61,14 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

Expand Down Expand Up @@ -732,6 +736,64 @@ public List<String> getChildren(String path, Stat stat, Watcher watcher)
}
}

/**
* This will return a paginated list of children for a given path
* @param path The path we want to list
* @param stat The current stat for this path
* @param watcher The watcher to attach to this path. The watcher will be added only when we return <= maxReturned items
* @param maxReturned How many to return max. We will return one more than this number to indicate truncation
* @param minZkId The last zkID to not return. Any returned item will have a zkid > minZkId
* @return A list of path with stats
* @throws NoNodeException
*/
public List<PathWithStat> getPaginatedChildren(String path, Stat stat, Watcher watcher, int maxReturned, long minZkId) throws NoNodeException {
DataNode n = nodes.get(path);
if (n == null) {
throw new KeeperException.NoNodeException();
}
synchronized (n) {
if (stat != null) {
n.copyStat(stat);
}
PriorityQueue<PathWithStat> children;
Set<String> childs = n.getChildren();
if (childs == null) {
children = new PriorityQueue<PathWithStat>(1);
} else {
children = new PriorityQueue<PathWithStat>(maxReturned + 1, new Comparator<PathWithStat>() {
@Override
public int compare(PathWithStat o1, PathWithStat o2) {
final long l = o2.getStat().getMzxid() - o1.getStat().getMzxid();
return l < 0 ? -1 : (l == 0 ? 0 : 1);
}
});
for (String child: childs) {
DataNode childNode = nodes.get(path + "/" + child);
if (null != childNode) {
final long mzxid = childNode.stat.getMzxid();
if (mzxid <= minZkId) continue; // Filter out the nodes that are below our water mark
Stat childStat = new Stat();
childNode.copyStat(childStat);
children.add(new PathWithStat(child, childStat));
// Do we have more than we want (maxReturned + 1), if so discard right away the extra one
if (children.size() > maxReturned + 1) {
children.poll();
}
}
}
}

if (children.size() <= maxReturned) {
if (watcher != null) {
childWatches.addWatch(path, watcher);
}
}
LinkedList<PathWithStat> result = new LinkedList<PathWithStat>();
while (!children.isEmpty()) result.addFirst(children.poll());
return result;
}
}

public Stat setACL(String path, List<ACL> acl, int version)
throws KeeperException.NoNodeException {
Stat stat = new Stat();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.common.Time;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.PathWithStat;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.proto.CheckWatchesRequest;
import org.apache.zookeeper.proto.Create2Response;
Expand All @@ -44,6 +45,8 @@
import org.apache.zookeeper.proto.GetACLResponse;
import org.apache.zookeeper.proto.GetChildren2Request;
import org.apache.zookeeper.proto.GetChildren2Response;
import org.apache.zookeeper.proto.GetChildrenPaginatedResponse;
import org.apache.zookeeper.proto.GetChildrenPaginatedRequest;
import org.apache.zookeeper.proto.GetChildrenRequest;
import org.apache.zookeeper.proto.GetChildrenResponse;
import org.apache.zookeeper.proto.GetDataRequest;
Expand Down Expand Up @@ -430,6 +433,38 @@ public void processRequest(Request request) {
}
break;
}
case OpCode.getChildrenPaginated: {
lastOp = "GETC";
GetChildrenPaginatedRequest getChildrenPaginatedRequest = new GetChildrenPaginatedRequest();
ByteBufferInputStream.byteBuffer2Record(request.request,
getChildrenPaginatedRequest);
Stat stat = new Stat();
DataNode n = zks.getZKDatabase().getNode(getChildrenPaginatedRequest.getPath());
if (n == null) {
throw new KeeperException.NoNodeException();
}
Long aclG;
synchronized(n) {
aclG = n.acl;
}
PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().convertLong(aclG),
ZooDefs.Perms.READ,
request.authInfo);
final int maxReturned = getChildrenPaginatedRequest.getMaxReturned();
List<PathWithStat> list = zks.getZKDatabase().getPaginatedChildren(
getChildrenPaginatedRequest.getPath(), stat,
getChildrenPaginatedRequest.getWatch() ? cnxn : null,
maxReturned,
getChildrenPaginatedRequest.getMinzkid());
// Check if we truncated...
boolean watching = false;
if (list.size() > maxReturned) {
watching = true;
list = list.subList(0, maxReturned);
}
rsp = new GetChildrenPaginatedResponse(list, watching, stat);
break;
}
}
} catch (SessionMovedException e) {
// session moved is a connection level error, we need to tear
Expand Down
1 change: 1 addition & 0 deletions src/java/main/org/apache/zookeeper/server/Request.java
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ static boolean isValid(int type) {
case OpCode.sync:
case OpCode.checkWatches:
case OpCode.removeWatches:
case OpCode.getChildrenPaginated:
return true;
default:
return false;
Expand Down
15 changes: 15 additions & 0 deletions src/java/main/org/apache/zookeeper/server/ZKDatabase.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.common.Time;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.PathWithStat;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.DataTree.ProcessTxnResult;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
Expand Down Expand Up @@ -480,6 +481,20 @@ public List<String> getChildren(String path, Stat stat, Watcher watcher)
return dataTree.getChildren(path, stat, watcher);
}

/**
* get paginated children list for this path
* @param path the path of the node
* @param stat the stat of the node
* @param watcher the watcher function for this path
* @param maxReturned the maximum number of nodes to be returned. One more will be returned to indicate truncation
* @param minZkId We want to filter out any node whose zkID is <= to minZkId
* @return A list of PathWithStat not bounded by maxReturned
* @throws NoNodeException
*/
public List<PathWithStat> getPaginatedChildren(String path, Stat stat, Watcher watcher, int maxReturned, long minZkId) throws NoNodeException {
return dataTree.getPaginatedChildren(path, stat, watcher, maxReturned, minZkId);
}

/**
* check if the path is special or not
* @param path the input path
Expand Down
77 changes: 75 additions & 2 deletions src/java/test/org/apache/zookeeper/server/DataTreeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZKTestCase;
import org.apache.zookeeper.data.PathWithStat;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.DataTree;
import org.junit.After;
Expand All @@ -47,6 +48,7 @@
import java.lang.reflect.*;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

public class DataTreeTest extends ZKTestCase {
Expand Down Expand Up @@ -115,7 +117,7 @@ private void createEphemeralNode(long session, final DataTree dataTree,
dataTree.getNode("/").stat.getCversion() + 1, 1, 1);
}
}

@Test(timeout = 60000)
public void testRootWatchTriggered() throws Exception {
class MyWatcher implements Watcher{
Expand Down Expand Up @@ -179,7 +181,7 @@ public void testPathTrieClearOnDeserialize() throws Exception {
PathTrie pTrie = (PathTrie)pfield.get(dserTree);

//Check that the node path is removed from pTrie
Assert.assertEquals("/bug is still in pTrie", "", pTrie.findMaxPrefix("/bug"));
Assert.assertEquals("/bug is still in pTrie", "", pTrie.findMaxPrefix("/bug"));
}

/*
Expand Down Expand Up @@ -232,4 +234,75 @@ public void run() {
//Let's make sure that we hit the code that ran the real assertion above
Assert.assertTrue("Didn't find the expected node", ranTestCase.get());
}

@Test(timeout = 60000)
public void getChildrenPaginated() throws NodeExistsException, NoNodeException {
final String rootPath = "/children";
final int firstZkID = 1000;
final int countNodes = 10;

// Create the parent node
dt.createNode(rootPath, new byte[0], null, 0, dt.getNode("/").stat.getCversion()+1, 1, 1);

// Create 10 child nodes
for (int i = 0; i < countNodes; ++i) {
dt.createNode(rootPath + "/test-" + i, new byte[0], null, 0, dt.getNode(rootPath).stat.getCversion() + i + 1, firstZkID + i, 1);
}

// Asking from a negative would give me all children, and set the watch
int curWatchCount = dt.getWatchCount();
List<PathWithStat> result = dt.getPaginatedChildren(rootPath, null, new DummyWatcher(), countNodes, -1);
Assert.assertEquals(countNodes, result.size());
Assert.assertEquals("The watch should have been set", curWatchCount + 1, dt.getWatchCount());
// Verify that the list is sorted
String before = "";
for (final PathWithStat s: result) {
final String path = s.getPath();
Assert.assertTrue(String.format("The next path (%s) should be > previons (%s)", path, before),
path.compareTo(before) > 0);
before = path;
}

// Asking from the next to last one should return only onde node
curWatchCount = dt.getWatchCount();
result = dt.getPaginatedChildren(rootPath, null, new DummyWatcher(), 2, 1000 + countNodes - 2);
Assert.assertEquals(1, result.size());
Assert.assertEquals("test-" + (countNodes - 1), result.get(0).getPath());
Assert.assertEquals(firstZkID + countNodes - 1, result.get(0).getStat().getMzxid());
Assert.assertEquals("The watch should have been set", curWatchCount + 1, dt.getWatchCount());

// Asking from the last created node should return an empty list and set the watch
curWatchCount = dt.getWatchCount();
result = dt.getPaginatedChildren(rootPath, null, new DummyWatcher(), 2, 1000 + countNodes - 1);
Assert.assertTrue("The result should be an empty list", result.isEmpty());
Assert.assertEquals("The watch should have been set", curWatchCount + 1, dt.getWatchCount());

// Asking from -1 for one node should return two, and NOT set the watch
curWatchCount = dt.getWatchCount();
result = dt.getPaginatedChildren(rootPath, null, new DummyWatcher(), 1, -1);
Assert.assertEquals("No watch should be set", curWatchCount, dt.getWatchCount());
Assert.assertEquals("We only return up to ", 2, result.size());
// Check that we ordered correctly
Assert.assertEquals("test-0", result.get(0).getPath());
Assert.assertEquals("test-1", result.get(1).getPath());
}

@Test(timeout = 60000)
public void getChildrenPaginatedEmpty() throws NodeExistsException, NoNodeException {
final String rootPath = "/children";
final int firstZkID = 1000;

// Create the parent node
dt.createNode(rootPath, new byte[0], null, 0, dt.getNode("/").stat.getCversion()+1, 1, 1);

// Asking from a negative would give me all children, and set the watch
int curWatchCount = dt.getWatchCount();
List<PathWithStat> result = dt.getPaginatedChildren(rootPath, null, new DummyWatcher(), 100, -1);
Assert.assertTrue("The result should be empty", result.isEmpty());
Assert.assertEquals("The watch should have been set", curWatchCount + 1, dt.getWatchCount());
}

private class DummyWatcher implements Watcher {
@Override public void process(WatchedEvent ignored) { }
}
}
Loading

0 comments on commit 32a7eb6

Please sign in to comment.