diff --git a/src/java/main/org/apache/zookeeper/ZooDefs.java b/src/java/main/org/apache/zookeeper/ZooDefs.java index 021d421aae1..96a0913ca27 100644 --- a/src/java/main/org/apache/zookeeper/ZooDefs.java +++ b/src/java/main/org/apache/zookeeper/ZooDefs.java @@ -69,6 +69,8 @@ public interface OpCode { public final int deleteContainer = 20; + public final int getChildrenPaginated = 71; + public final int auth = 100; public final int setWatches = 101; diff --git a/src/java/main/org/apache/zookeeper/ZooKeeper.java b/src/java/main/org/apache/zookeeper/ZooKeeper.java index 2dca3858595..a3fa5692b0d 100644 --- a/src/java/main/org/apache/zookeeper/ZooKeeper.java +++ b/src/java/main/org/apache/zookeeper/ZooKeeper.java @@ -51,6 +51,7 @@ import org.apache.zookeeper.common.PathUtils; import org.apache.zookeeper.common.StringUtils; import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.PathWithStat; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.proto.CheckWatchesRequest; import org.apache.zookeeper.proto.Create2Response; @@ -62,6 +63,8 @@ import org.apache.zookeeper.proto.GetACLResponse; import org.apache.zookeeper.proto.GetChildren2Request; import org.apache.zookeeper.proto.GetChildren2Response; +import org.apache.zookeeper.proto.GetChildrenPaginatedRequest; +import org.apache.zookeeper.proto.GetChildrenPaginatedResponse; import org.apache.zookeeper.proto.GetChildrenRequest; import org.apache.zookeeper.proto.GetChildrenResponse; import org.apache.zookeeper.proto.GetDataRequest; @@ -2258,6 +2261,46 @@ public List getChildren(final String path, Watcher watcher) return response.getChildren(); } + /** + * + * @param path + * @param watcher explicit watcher + * @param maxReturned The maximum number of children to return + * @param minZkid The result will be filtered out to nodes having a zkid > minZkid + * @return an ordered list of child nodes, ordered by mZkid + * @throws KeeperException + * @throws InterruptedException + */ + public List getChildrenPaginated(final String path, Watcher watcher, final int maxReturned, final long minZkid) + throws KeeperException, InterruptedException + { + final String clientPath = path; + PathUtils.validatePath(clientPath); + + // the watch contains the un-chroot path + WatchRegistration wcb = null; + if (watcher != null) { + wcb = new ChildWatchRegistration(watcher, clientPath); + } + + final String serverPath = prependChroot(clientPath); + + RequestHeader h = new RequestHeader(); + h.setType(ZooDefs.OpCode.getChildrenPaginated); + GetChildrenPaginatedRequest request = new GetChildrenPaginatedRequest(); + request.setPath(serverPath); + request.setWatch(watcher != null); + request.setMaxReturned(maxReturned); + request.setMinzkid(minZkid); + GetChildrenPaginatedResponse response = new GetChildrenPaginatedResponse(); + ReplyHeader r = cnxn.submitRequest(h, request, response, wcb); + if (r.getErr() != 0) { + throw KeeperException.create(KeeperException.Code.get(r.getErr()), + clientPath); + } + return response.getChildren(); + } + /** * Return the list of the children of the node of the given path. *

diff --git a/src/java/main/org/apache/zookeeper/server/DataTree.java b/src/java/main/org/apache/zookeeper/server/DataTree.java index d82825815ff..e1f7ceff7b0 100644 --- a/src/java/main/org/apache/zookeeper/server/DataTree.java +++ b/src/java/main/org/apache/zookeeper/server/DataTree.java @@ -39,6 +39,7 @@ import org.apache.zookeeper.ZooDefs.OpCode; import org.apache.zookeeper.common.PathTrie; import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.PathWithStat; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.data.StatPersisted; import org.apache.zookeeper.txn.CheckVersionTxn; @@ -60,11 +61,14 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.PriorityQueue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -732,6 +736,64 @@ public List getChildren(String path, Stat stat, Watcher watcher) } } + /** + * This will return a paginated list of children for a given path + * @param path The path we want to list + * @param stat The current stat for this path + * @param watcher The watcher to attach to this path. The watcher will be added only when we return <= maxReturned items + * @param maxReturned How many to return max. We will return one more than this number to indicate truncation + * @param minZkId The last zkID to not return. Any returned item will have a zkid > minZkId + * @return A list of path with stats + * @throws NoNodeException + */ + public List getPaginatedChildren(String path, Stat stat, Watcher watcher, int maxReturned, long minZkId) throws NoNodeException { + DataNode n = nodes.get(path); + if (n == null) { + throw new KeeperException.NoNodeException(); + } + synchronized (n) { + if (stat != null) { + n.copyStat(stat); + } + PriorityQueue children; + Set childs = n.getChildren(); + if (childs == null) { + children = new PriorityQueue(1); + } else { + children = new PriorityQueue(maxReturned + 1, new Comparator() { + @Override + public int compare(PathWithStat o1, PathWithStat o2) { + final long l = o2.getStat().getMzxid() - o1.getStat().getMzxid(); + return l < 0 ? -1 : (l == 0 ? 0 : 1); + } + }); + for (String child: childs) { + DataNode childNode = nodes.get(path + "/" + child); + if (null != childNode) { + final long mzxid = childNode.stat.getMzxid(); + if (mzxid <= minZkId) continue; // Filter out the nodes that are below our water mark + Stat childStat = new Stat(); + childNode.copyStat(childStat); + children.add(new PathWithStat(child, childStat)); + // Do we have more than we want (maxReturned + 1), if so discard right away the extra one + if (children.size() > maxReturned + 1) { + children.poll(); + } + } + } + } + + if (children.size() <= maxReturned) { + if (watcher != null) { + childWatches.addWatch(path, watcher); + } + } + LinkedList result = new LinkedList(); + while (!children.isEmpty()) result.addFirst(children.poll()); + return result; + } + } + public Stat setACL(String path, List acl, int version) throws KeeperException.NoNodeException { Stat stat = new Stat(); diff --git a/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java b/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java index 672810e2293..b23b4c6331b 100644 --- a/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java +++ b/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java @@ -34,6 +34,7 @@ import org.apache.zookeeper.ZooDefs.OpCode; import org.apache.zookeeper.common.Time; import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.PathWithStat; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.proto.CheckWatchesRequest; import org.apache.zookeeper.proto.Create2Response; @@ -44,6 +45,8 @@ import org.apache.zookeeper.proto.GetACLResponse; import org.apache.zookeeper.proto.GetChildren2Request; import org.apache.zookeeper.proto.GetChildren2Response; +import org.apache.zookeeper.proto.GetChildrenPaginatedResponse; +import org.apache.zookeeper.proto.GetChildrenPaginatedRequest; import org.apache.zookeeper.proto.GetChildrenRequest; import org.apache.zookeeper.proto.GetChildrenResponse; import org.apache.zookeeper.proto.GetDataRequest; @@ -430,6 +433,38 @@ public void processRequest(Request request) { } break; } + case OpCode.getChildrenPaginated: { + lastOp = "GETC"; + GetChildrenPaginatedRequest getChildrenPaginatedRequest = new GetChildrenPaginatedRequest(); + ByteBufferInputStream.byteBuffer2Record(request.request, + getChildrenPaginatedRequest); + Stat stat = new Stat(); + DataNode n = zks.getZKDatabase().getNode(getChildrenPaginatedRequest.getPath()); + if (n == null) { + throw new KeeperException.NoNodeException(); + } + Long aclG; + synchronized(n) { + aclG = n.acl; + } + PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().convertLong(aclG), + ZooDefs.Perms.READ, + request.authInfo); + final int maxReturned = getChildrenPaginatedRequest.getMaxReturned(); + List list = zks.getZKDatabase().getPaginatedChildren( + getChildrenPaginatedRequest.getPath(), stat, + getChildrenPaginatedRequest.getWatch() ? cnxn : null, + maxReturned, + getChildrenPaginatedRequest.getMinzkid()); + // Check if we truncated... + boolean watching = false; + if (list.size() > maxReturned) { + watching = true; + list = list.subList(0, maxReturned); + } + rsp = new GetChildrenPaginatedResponse(list, watching, stat); + break; + } } } catch (SessionMovedException e) { // session moved is a connection level error, we need to tear diff --git a/src/java/main/org/apache/zookeeper/server/Request.java b/src/java/main/org/apache/zookeeper/server/Request.java index d34efe61a1a..8eccc460a08 100644 --- a/src/java/main/org/apache/zookeeper/server/Request.java +++ b/src/java/main/org/apache/zookeeper/server/Request.java @@ -155,6 +155,7 @@ static boolean isValid(int type) { case OpCode.sync: case OpCode.checkWatches: case OpCode.removeWatches: + case OpCode.getChildrenPaginated: return true; default: return false; diff --git a/src/java/main/org/apache/zookeeper/server/ZKDatabase.java b/src/java/main/org/apache/zookeeper/server/ZKDatabase.java index 3fa96a6ced8..7fd9a8d5b65 100644 --- a/src/java/main/org/apache/zookeeper/server/ZKDatabase.java +++ b/src/java/main/org/apache/zookeeper/server/ZKDatabase.java @@ -43,6 +43,7 @@ import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.common.Time; import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.PathWithStat; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.server.DataTree.ProcessTxnResult; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; @@ -480,6 +481,20 @@ public List getChildren(String path, Stat stat, Watcher watcher) return dataTree.getChildren(path, stat, watcher); } + /** + * get paginated children list for this path + * @param path the path of the node + * @param stat the stat of the node + * @param watcher the watcher function for this path + * @param maxReturned the maximum number of nodes to be returned. One more will be returned to indicate truncation + * @param minZkId We want to filter out any node whose zkID is <= to minZkId + * @return A list of PathWithStat not bounded by maxReturned + * @throws NoNodeException + */ + public List getPaginatedChildren(String path, Stat stat, Watcher watcher, int maxReturned, long minZkId) throws NoNodeException { + return dataTree.getPaginatedChildren(path, stat, watcher, maxReturned, minZkId); + } + /** * check if the path is special or not * @param path the input path diff --git a/src/java/test/org/apache/zookeeper/server/DataTreeTest.java b/src/java/test/org/apache/zookeeper/server/DataTreeTest.java index d7266437f57..62222844ea4 100644 --- a/src/java/test/org/apache/zookeeper/server/DataTreeTest.java +++ b/src/java/test/org/apache/zookeeper/server/DataTreeTest.java @@ -25,6 +25,7 @@ import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZKTestCase; +import org.apache.zookeeper.data.PathWithStat; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.server.DataTree; import org.junit.After; @@ -47,6 +48,7 @@ import java.lang.reflect.*; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; public class DataTreeTest extends ZKTestCase { @@ -231,5 +233,75 @@ public void run() { //Let's make sure that we hit the code that ran the real assertion above Assert.assertTrue("Didn't find the expected node", ranTestCase.get()); + + @Test(timeout = 60000) + public void getChildrenPaginated() throws NodeExistsException, NoNodeException { + final String rootPath = "/children"; + final int firstZkID = 1000; + final int countNodes = 10; + + // Create the parent node + dt.createNode(rootPath, new byte[0], null, 0, dt.getNode("/").stat.getCversion()+1, 1, 1); + + // Create 10 child nodes + for (int i = 0; i < countNodes; ++i) { + dt.createNode(rootPath + "/test-" + i, new byte[0], null, 0, dt.getNode(rootPath).stat.getCversion() + i + 1, firstZkID + i, 1); + } + + // Asking from a negative would give me all children, and set the watch + int curWatchCount = dt.getWatchCount(); + List result = dt.getPaginatedChildren(rootPath, null, new DummyWatcher(), countNodes, -1); + Assert.assertEquals(countNodes, result.size()); + Assert.assertEquals("The watch should have been set", curWatchCount + 1, dt.getWatchCount()); + // Verify that the list is sorted + String before = ""; + for (final PathWithStat s: result) { + final String path = s.getPath(); + Assert.assertTrue(String.format("The next path (%s) should be > previons (%s)", path, before), + path.compareTo(before) > 0); + before = path; + } + + // Asking from the next to last one should return only onde node + curWatchCount = dt.getWatchCount(); + result = dt.getPaginatedChildren(rootPath, null, new DummyWatcher(), 2, 1000 + countNodes - 2); + Assert.assertEquals(1, result.size()); + Assert.assertEquals("test-" + (countNodes - 1), result.get(0).getPath()); + Assert.assertEquals(firstZkID + countNodes - 1, result.get(0).getStat().getMzxid()); + Assert.assertEquals("The watch should have been set", curWatchCount + 1, dt.getWatchCount()); + + // Asking from the last created node should return an empty list and set the watch + curWatchCount = dt.getWatchCount(); + result = dt.getPaginatedChildren(rootPath, null, new DummyWatcher(), 2, 1000 + countNodes - 1); + Assert.assertTrue("The result should be an empty list", result.isEmpty()); + Assert.assertEquals("The watch should have been set", curWatchCount + 1, dt.getWatchCount()); + + // Asking from -1 for one node should return two, and NOT set the watch + curWatchCount = dt.getWatchCount(); + result = dt.getPaginatedChildren(rootPath, null, new DummyWatcher(), 1, -1); + Assert.assertEquals("No watch should be set", curWatchCount, dt.getWatchCount()); + Assert.assertEquals("We only return up to ", 2, result.size()); + // Check that we ordered correctly + Assert.assertEquals("test-0", result.get(0).getPath()); + Assert.assertEquals("test-1", result.get(1).getPath()); + } + + @Test(timeout = 60000) + public void getChildrenPaginatedEmpty() throws NodeExistsException, NoNodeException { + final String rootPath = "/children"; + final int firstZkID = 1000; + + // Create the parent node + dt.createNode(rootPath, new byte[0], null, 0, dt.getNode("/").stat.getCversion()+1, 1, 1); + + // Asking from a negative would give me all children, and set the watch + int curWatchCount = dt.getWatchCount(); + List result = dt.getPaginatedChildren(rootPath, null, new DummyWatcher(), 100, -1); + Assert.assertTrue("The result should be empty", result.isEmpty()); + Assert.assertEquals("The watch should have been set", curWatchCount + 1, dt.getWatchCount()); + } + + private class DummyWatcher implements Watcher { + @Override public void process(WatchedEvent ignored) { } } } diff --git a/src/java/test/org/apache/zookeeper/test/GetChildrenPaginatedTest.java b/src/java/test/org/apache/zookeeper/test/GetChildrenPaginatedTest.java new file mode 100644 index 00000000000..93cba168691 --- /dev/null +++ b/src/java/test/org/apache/zookeeper/test/GetChildrenPaginatedTest.java @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.test; + +import org.apache.zookeeper.*; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.data.PathWithStat; +import org.apache.zookeeper.data.Stat; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +public class GetChildrenPaginatedTest extends ClientBase { + private ZooKeeper zk; + + @Override + public void setUp() throws Exception { + super.setUp(); + + zk = createClient(); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + + zk.close(); + } + + @Test + public void testPagination() throws Exception { + + final String testId = UUID.randomUUID().toString(); + final String basePath = "/testPagination-" + testId; + + Map createdChildrenMetadata = createChildren(basePath, 10, 1); + + // Create child 0 out of order (to make sure paths are not ordered lexicographically). + { + String childPath = basePath + "/" + 0; + zk.create(childPath, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + final Stat stat = zk.exists(childPath, null); + + createdChildrenMetadata.put(String.valueOf(0), stat); + + LOG.info("Created: " + childPath + " zkId: " + stat.getCzxid()); + } + + long minZkId = -1; + Map readChildrenMetadata = new HashMap(); + final int pageSize = 3; + + while (true) { + final List page = zk.getChildrenPaginated(basePath, null, pageSize, minZkId); + + if(page.isEmpty()) { + break; + } + + for (PathWithStat pathWithStat : page) { + + final String nodePath = pathWithStat.getPath(); + final Stat nodeStat = pathWithStat.getStat(); + + LOG.info("Read: " + nodePath + " zkId: " + nodeStat.getCzxid()); + readChildrenMetadata.put(nodePath, nodeStat); + + Assert.assertTrue(nodeStat.getCzxid() > minZkId); + minZkId = nodeStat.getCzxid(); + } + } + + Assert.assertEquals(createdChildrenMetadata.keySet(), readChildrenMetadata.keySet()); + + for (String child : createdChildrenMetadata.keySet()) { + Assert.assertEquals(createdChildrenMetadata.get(child), readChildrenMetadata.get(child)); + } + } + + class MyWatcher implements Watcher { + boolean watchFired = false; + + @Override + public void process(WatchedEvent event) { + Assert.assertFalse("Watch fired multiple times " + event.toString(), watchFired); + watchFired = true; + } + }; + + @Test(timeout = 30000) + public void testPaginationWatch() throws Exception { + + final String testId = UUID.randomUUID().toString(); + final String basePath = "/testPaginationWatch-" + testId; + + createChildren(basePath, 10, 0); + + long minZkId = -1; + final int pageSize = 3; + int pageCount = 0; + + MyWatcher myWatcher = new MyWatcher(); + + while (true) { + final List page = zk.getChildrenPaginated(basePath, myWatcher, pageSize, minZkId); + + if(page.isEmpty()) { + break; + } + + // Create another children before pagination is completed -- should NOT trigger watch + if(pageCount < 3) { + String childPath = basePath + "/" + "before-pagination-" + pageCount; + zk.create(childPath, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + + for (PathWithStat pathWithStat : page) { + + final String nodePath = pathWithStat.getPath(); + LOG.info("Read: " + nodePath); + + final Stat nodeStat = pathWithStat.getStat(); + + Assert.assertTrue(nodeStat.getCzxid() > minZkId); + minZkId = nodeStat.getCzxid(); + } + + pageCount += 1; + } + + // Create another children after pagination is completed -- should trigger watch + { + String childPath = basePath + "/" + "after-pagination"; + zk.create(childPath, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + + while (true) { + // Test times-out if this doesn't become true in a reasonable amount of time + if(!myWatcher.watchFired) { + LOG.info("Watch did not fire yet"); + Thread.sleep(1000); + } else { + LOG.info("Watch fired"); + break; + } + } + + //Sleep a bit more, to allow any duplicated watch to fire again + Thread.sleep(1000); + Assert.assertTrue(myWatcher.watchFired); + } + + private Map createChildren(String basePath, int numChildren, int firstChildrenNameOffset) throws KeeperException, InterruptedException { + zk.create(basePath, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + + Map createdChildrenMetadata = new HashMap(); + + for (int i = firstChildrenNameOffset; i < (firstChildrenNameOffset+numChildren); i++) { + String childPath = basePath + "/" + i; + zk.create(childPath, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + final Stat stat = zk.exists(childPath, null); + + createdChildrenMetadata.put(String.valueOf(i), stat); + + LOG.info("Created: " + childPath + " zkId: " + stat.getCzxid()); + } + return createdChildrenMetadata; + } +} diff --git a/src/zookeeper.jute b/src/zookeeper.jute index 3858081dc7e..2975dd6991f 100644 --- a/src/zookeeper.jute +++ b/src/zookeeper.jute @@ -51,6 +51,10 @@ module org.apache.zookeeper.data { long ephemeralOwner; // owner id if ephemeral, 0 otw long pzxid; // last modified children } + class PathWithStat { + ustring path; + Stat stat; + } } module org.apache.zookeeper.proto { @@ -139,6 +143,12 @@ module org.apache.zookeeper.proto { ustring path; boolean watch; } + class GetChildrenPaginatedRequest { + ustring path; + int maxReturned; + long minzkid; + boolean watch; + } class CheckVersionRequest { ustring path; int version; @@ -203,6 +213,11 @@ module org.apache.zookeeper.proto { vector children; org.apache.zookeeper.data.Stat stat; } + class GetChildrenPaginatedResponse { + vector children; + boolean watching; + org.apache.zookeeper.data.Stat stat; + } class GetACLResponse { vector acl; org.apache.zookeeper.data.Stat stat;