From 4945eb56fc3686b9b0294a798c22a392c44a1c82 Mon Sep 17 00:00:00 2001 From: Huizhi Lu Date: Tue, 10 Nov 2020 18:29:17 -0800 Subject: [PATCH] Cherry-pick and adapt ZOOKEEPER-2260 for getChildren pagination Apply PR of paginated getChildren call(https://github.com/apache/zookeeper/pull/50) --- .../src/main/resources/zookeeper.jute | 15 + .../zookeeper/ChildrenBatchIterator.java | 102 +++++ .../org/apache/zookeeper/RemoteIterator.java | 42 ++ .../java/org/apache/zookeeper/ZooDefs.java | 2 + .../java/org/apache/zookeeper/ZooKeeper.java | 110 +++++ .../org/apache/zookeeper/server/DataTree.java | 103 +++++ .../server/FinalRequestProcessor.java | 30 ++ .../org/apache/zookeeper/server/Request.java | 1 + .../apache/zookeeper/server/ZKDatabase.java | 17 + .../apache/zookeeper/server/DataTreeTest.java | 172 ++++++++ .../test/GetChildrenPaginatedTest.java | 378 ++++++++++++++++++ 11 files changed, 972 insertions(+) create mode 100644 zookeeper-server/src/main/java/org/apache/zookeeper/ChildrenBatchIterator.java create mode 100644 zookeeper-server/src/main/java/org/apache/zookeeper/RemoteIterator.java create mode 100644 zookeeper-server/src/test/java/org/apache/zookeeper/test/GetChildrenPaginatedTest.java diff --git a/zookeeper-jute/src/main/resources/zookeeper.jute b/zookeeper-jute/src/main/resources/zookeeper.jute index 898838f62f6..502f2288d05 100644 --- a/zookeeper-jute/src/main/resources/zookeeper.jute +++ b/zookeeper-jute/src/main/resources/zookeeper.jute @@ -51,6 +51,10 @@ module org.apache.zookeeper.data { long ephemeralOwner; // owner id if ephemeral, 0 otw long pzxid; // last modified children } + class PathWithStat { + ustring path; + org.apache.zookeeper.data.Stat stat; + } } module org.apache.zookeeper.proto { @@ -157,6 +161,13 @@ module org.apache.zookeeper.proto { ustring path; boolean watch; } + class GetChildrenPaginatedRequest { + ustring path; + int maxReturned; + long minCzxId; + long czxIdOffset; + boolean watch; + } class CheckVersionRequest { ustring path; int version; @@ -228,6 +239,10 @@ module org.apache.zookeeper.proto { vector children; org.apache.zookeeper.data.Stat stat; } + class GetChildrenPaginatedResponse { + vector children; + org.apache.zookeeper.data.Stat stat; + } class GetACLResponse { vector acl; org.apache.zookeeper.data.Stat stat; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ChildrenBatchIterator.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ChildrenBatchIterator.java new file mode 100644 index 00000000000..b8141e868e7 --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ChildrenBatchIterator.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper; + +import org.apache.zookeeper.data.PathWithStat; + +import java.util.LinkedList; +import java.util.List; +import java.util.NoSuchElementException; + +/** + * Iterator over children nodes of a given path. + */ +class ChildrenBatchIterator implements RemoteIterator { + + private final ZooKeeper zooKeeper; + private final String path; + private final Watcher watcher; + private final int batchSize; + private final LinkedList childrenQueue; + private long nextBatchMinZxid; + private int nextBatchZxidOffset; + + + ChildrenBatchIterator(ZooKeeper zooKeeper, String path, Watcher watcher, int batchSize, long minZxid) + throws KeeperException, InterruptedException { + this.zooKeeper = zooKeeper; + this.path = path; + this.watcher = watcher; + this.batchSize = batchSize; + this.nextBatchZxidOffset = 0; + this.nextBatchMinZxid = minZxid; + + this.childrenQueue = new LinkedList<>(); + + List firstChildrenBatch = zooKeeper.getChildren(path, watcher, batchSize, nextBatchMinZxid, nextBatchZxidOffset); + childrenQueue.addAll(firstChildrenBatch); + + updateOffsetsForNextBatch(firstChildrenBatch); + } + + @Override + public boolean hasNext() { + + // next() never lets childrenQueue empty unless we iterated over all children + return ! childrenQueue.isEmpty(); + } + + @Override + public PathWithStat next() throws KeeperException, InterruptedException, NoSuchElementException { + + if (!hasNext()) { + throw new NoSuchElementException("No more children"); + } + + // If we're down to the last element, backfill before returning it + if (childrenQueue.size() == 1) { + + List childrenBatch = zooKeeper.getChildren(path, watcher, batchSize, nextBatchMinZxid, nextBatchZxidOffset); + childrenQueue.addAll(childrenBatch); + + updateOffsetsForNextBatch(childrenBatch); + } + + PathWithStat returnChildren = childrenQueue.pop(); + + return returnChildren; + } + + /** + * Prepare minZxid and zkidOffset for the next batch request based on the children returned in the current + */ + private void updateOffsetsForNextBatch(List children) { + + for (PathWithStat child : children) { + long childZxid = child.getStat().getCzxid(); + + if (nextBatchMinZxid == childZxid) { + ++nextBatchZxidOffset; + } else { + nextBatchZxidOffset = 1; + nextBatchMinZxid = childZxid; + } + } + } +} \ No newline at end of file diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/RemoteIterator.java b/zookeeper-server/src/main/java/org/apache/zookeeper/RemoteIterator.java new file mode 100644 index 00000000000..2517786f92f --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/RemoteIterator.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper; + +import java.util.NoSuchElementException; + +/** + * An iterator over a collection whose elements need to be fetched remotely. + */ +public interface RemoteIterator { + + /** + * Returns true if the iterator has more elements. + * @return true if the iterator has more elements, false otherwise. + */ + boolean hasNext(); + + /** + * Returns the next element in the iteration. + * @return the next element in the iteration. + * @throws InterruptedException if the thread is interrupted + * @throws KeeperException if an error is encountered server-side + * @throws NoSuchElementException if the iteration has no more elements + */ + E next() throws InterruptedException, KeeperException, NoSuchElementException; +} \ No newline at end of file diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ZooDefs.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ZooDefs.java index a12e5803c27..89009704d10 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/ZooDefs.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ZooDefs.java @@ -79,6 +79,8 @@ public interface OpCode { int multiRead = 22; + int getChildrenPaginated = 71; + int auth = 100; int setWatches = 101; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java index 7b53f9a4375..1126e852459 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java @@ -53,6 +53,7 @@ import org.apache.zookeeper.client.ZooKeeperSaslClient; import org.apache.zookeeper.common.PathUtils; import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.PathWithStat; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.proto.AddWatchRequest; import org.apache.zookeeper.proto.CheckWatchesRequest; @@ -69,6 +70,8 @@ import org.apache.zookeeper.proto.GetAllChildrenNumberResponse; import org.apache.zookeeper.proto.GetChildren2Request; import org.apache.zookeeper.proto.GetChildren2Response; +import org.apache.zookeeper.proto.GetChildrenPaginatedRequest; +import org.apache.zookeeper.proto.GetChildrenPaginatedResponse; import org.apache.zookeeper.proto.GetChildrenRequest; import org.apache.zookeeper.proto.GetChildrenResponse; import org.apache.zookeeper.proto.GetDataRequest; @@ -2748,6 +2751,113 @@ public List getChildren(final String path, Watcher watcher) throws Keepe return response.getChildren(); } + /** + * Returns the a subset (a "page") of the children node of the given path. + * + *

+ * The returned list contains up to {@code maxReturned} ordered by czxid. + *

+ * + *

+ * If {@code watcher} is non-null, a watch on children creation/deletion is set when reaching the end of pagination + *

+ * + * @param path + * - the path of the node + * @param watcher + * - a concrete watcher or null + * @param maxReturned + * - the maximum number of children returned + * @param minCzxId + * - only return children whose creation zkid is equal or greater than {@code minCzxId} + * @param czxIdOffset + * - how many children with zkid == minCzxId to skip server-side, as they were returned in previous pages + * @return + * an ordered list of children nodes, up to {@code maxReturned}, ordered by czxid + * @throws KeeperException + * if the server signals an error with a non-zero error code. + * @throws IllegalArgumentException + * if any of the following is true: + *
    + *
  • {@code path} is invalid + *
  • {@code maxReturned} is less than 1 + *
+ * + * @throws InterruptedException + * if the server transaction is interrupted. + * + * @since 3.6.2 + */ + public List getChildren(final String path, Watcher watcher, final int maxReturned, final long minCzxId, final int czxIdOffset) + throws KeeperException, InterruptedException { + final String clientPath = path; + PathUtils.validatePath(clientPath); + + if(maxReturned <= 0) { + throw new IllegalArgumentException("Cannot return less than 1 children"); + } + + // the watch contains the un-chroot path + WatchRegistration wcb = null; + if (watcher != null) { + wcb = new ChildWatchRegistration(watcher, clientPath); + } + + final String serverPath = prependChroot(clientPath); + + RequestHeader h = new RequestHeader(); + h.setType(ZooDefs.OpCode.getChildrenPaginated); + GetChildrenPaginatedRequest request = new GetChildrenPaginatedRequest(); + request.setPath(serverPath); + request.setWatch(watcher != null); + request.setMaxReturned(maxReturned); + request.setMinCzxId(minCzxId); + request.setCzxIdOffset(czxIdOffset); + GetChildrenPaginatedResponse response = new GetChildrenPaginatedResponse(); + ReplyHeader r = cnxn.submitRequest(h, request, response, wcb); + if (r.getErr() != 0) { + throw KeeperException.create(KeeperException.Code.get(r.getErr()), + clientPath); + } + return response.getChildren(); + } + + /** + * Returns the a RemoteIterator over the children nodes of the given path. + * + *

+ * If {@code watcher} is non-null, a watch on children creation/deletion is set when reaching the end the iterator + *

+ * + * @param path + * - the path of the node + * @param watcher + * - a concrete watcher or null + * @param batchSize + * - the maximum number of children returned by each background call to the server + * @param minCzxId + * - only return children whose creation zkid is equal or greater than {@code minCzxId} + * + * @return + * an iterator on children node, ordered by czxid + * @throws KeeperException + * if the server signals an error with a non-zero error code. + * @throws IllegalArgumentException + * if any of the following is true: + *
    + *
  • {@code path} is invalid + *
  • {@code maxReturned} is less than 1 + *
+ * @throws InterruptedException + * if the server transaction is interrupted. + * + * @since 3.6.2 + */ + public RemoteIterator getChildrenIterator(String path, Watcher watcher, int batchSize, long minCzxId) + throws KeeperException, InterruptedException { + return new ChildrenBatchIterator(this, path, watcher, batchSize, minCzxId); + } + /** * Return the list of the children of the node of the given path. *

diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java index d3529cf94e6..2dd159c3bfd 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java @@ -21,9 +21,11 @@ import java.io.EOFException; import java.io.IOException; import java.io.PrintWriter; +import java.io.Serializable; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; +import java.util.Comparator; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -31,6 +33,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.PriorityQueue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; @@ -57,6 +60,7 @@ import org.apache.zookeeper.audit.ZKAuditProvider; import org.apache.zookeeper.common.PathTrie; import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.PathWithStat; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.data.StatPersisted; import org.apache.zookeeper.server.watch.IWatchManager; @@ -775,6 +779,105 @@ public int getAllChildrenNumber(String path) { return (int) nodes.entrySet().parallelStream().filter(entry -> entry.getKey().startsWith(path + "/")).count(); } + /** + * Comparator used to sort children by creation time (older to newer) compare lexicographically to break ties + */ + private static class NodeCreationComparator implements Comparator, Serializable { + @Override + public int compare(PathWithStat left, PathWithStat right) { + final long leftCzxId = left.getStat().getCzxid(); + final long rightCzxId = right.getStat().getCzxid(); + + if (leftCzxId < rightCzxId) { + return -1; + } else if (leftCzxId > rightCzxId) { + return 1; + } else { + // For nodes with the same creation time (created with multi()), use name to order + return left.getPath().compareTo(right.getPath()); + } + } + } + + /** + * Static comparator instance, avoids creating and destroying a new one at each invocation of the method below + */ + private static NodeCreationComparator staticNodeCreationComparator = new NodeCreationComparator(); + + /** + * Produces a paginated list of the children of a given path + * @param path path of node node to list + * @param stat stat of the node to list + * @param watcher an optional watcher to attach to the node. The watcher is added only once when reaching the end of pagination + * @param maxReturned maximum number of children to return. Return one more than this number to indicate truncation + * @param minCzxId only return children whose creation zxid equal or greater than minCzxId + * @param czxIdOffset how many children with zxid == minCzxId to skip (as returned in previous pages) + * @return A list of path with stats + * @throws NoNodeException if the path does not exist + */ + public List getPaginatedChildren(String path, Stat stat, Watcher watcher, int maxReturned, + long minCzxId, long czxIdOffset) + throws NoNodeException { + DataNode n = nodes.get(path); + if (n == null) { + throw new KeeperException.NoNodeException(); + } + synchronized (n) { + if (stat != null) { + n.copyStat(stat); + } + PriorityQueue childrenQueue; + Set actualChildren = n.getChildren(); + if (actualChildren == null) { + childrenQueue = new PriorityQueue(1); + } else { + childrenQueue = new PriorityQueue(maxReturned + 1, staticNodeCreationComparator); + for (String child : actualChildren) { + DataNode childNode = nodes.get(path + "/" + child); + if (null != childNode) { + final long czxId = childNode.stat.getCzxid(); + + if (czxId < minCzxId) { + // Filter out nodes that are below minCzxId + continue; + } + + Stat childStat = new Stat(); + childNode.copyStat(childStat); + + // Cannot discard before having sorted and removed offset + childrenQueue.add(new PathWithStat(child, childStat)); + } + } + } + + // Go over the ordered list of children and skip the first czxIdOffset that have czxid equal to minCzxId, if any + int skipped = 0; + while (!childrenQueue.isEmpty() && skipped < czxIdOffset) { + PathWithStat head = childrenQueue.peek(); + if (head.getStat().getCzxid() > minCzxId) { + // We moved past the minCzxId, no point in looking further + break; + } else { + childrenQueue.poll(); + ++skipped; + } + } + + // Return as list preserving newer-to-older order + LinkedList result = new LinkedList(); + while (!childrenQueue.isEmpty() && result.size() < maxReturned) { + result.addLast(childrenQueue.poll()); + } + + // This is the last page, set the watch + if (childrenQueue.isEmpty()) { + childWatches.addWatch(path, watcher); + } + return result; + } + } + public Stat setACL(String path, List acl, int version) throws KeeperException.NoNodeException { Stat stat = new Stat(); DataNode n = nodes.get(path); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java index 9ffde55c10c..a833aedbcb6 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java @@ -48,6 +48,7 @@ import org.apache.zookeeper.audit.AuditHelper; import org.apache.zookeeper.common.Time; import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.PathWithStat; import org.apache.zookeeper.data.Id; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.proto.AddWatchRequest; @@ -63,6 +64,8 @@ import org.apache.zookeeper.proto.GetAllChildrenNumberResponse; import org.apache.zookeeper.proto.GetChildren2Request; import org.apache.zookeeper.proto.GetChildren2Response; +import org.apache.zookeeper.proto.GetChildrenPaginatedResponse; +import org.apache.zookeeper.proto.GetChildrenPaginatedRequest; import org.apache.zookeeper.proto.GetChildrenRequest; import org.apache.zookeeper.proto.GetChildrenResponse; import org.apache.zookeeper.proto.GetDataRequest; @@ -553,6 +556,33 @@ public void processRequest(Request request) { rsp = new GetEphemeralsResponse(ephemerals); break; } + case OpCode.getChildrenPaginated: { + lastOp = "GETC"; + GetChildrenPaginatedRequest getChildrenPaginatedRequest = new GetChildrenPaginatedRequest(); + ByteBufferInputStream.byteBuffer2Record(request.request, + getChildrenPaginatedRequest); + Stat stat = new Stat(); + path = getChildrenPaginatedRequest.getPath(); + DataNode n = zks.getZKDatabase().getNode(path); + if (n == null) { + throw new KeeperException.NoNodeException(); + } + zks.checkACL( + request.cnxn, + zks.getZKDatabase().aclForNode(n), + ZooDefs.Perms.READ, + request.authInfo, path, + null); + final int maxReturned = getChildrenPaginatedRequest.getMaxReturned(); + List list = zks.getZKDatabase().getPaginatedChildren( + getChildrenPaginatedRequest.getPath(), stat, + getChildrenPaginatedRequest.getWatch() ? cnxn : null, + maxReturned, + getChildrenPaginatedRequest.getMinCzxId(), + getChildrenPaginatedRequest.getCzxIdOffset()); + rsp = new GetChildrenPaginatedResponse(list, stat); + break; + } } } catch (SessionMovedException e) { // session moved is a connection level error, we need to tear diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java index d0fb7da4d2e..e632b842e22 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java @@ -252,6 +252,7 @@ static boolean isValid(int type) { case OpCode.checkWatches: case OpCode.removeWatches: case OpCode.addWatch: + case OpCode.getChildrenPaginated: return true; default: return false; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java index f758f5de5ae..30b7b7b03b5 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java @@ -45,6 +45,7 @@ import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.common.Time; import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.PathWithStat; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.server.DataTree.ProcessTxnResult; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; @@ -573,6 +574,22 @@ public int getAllChildrenNumber(String path) throws KeeperException.NoNodeExcept return dataTree.getAllChildrenNumber(path); } + /** + * Get a subset (a page) of the children of the given node + * @param path the path of the node + * @param stat the stat of the node + * @param watcher an optional watcher for this node children + * @param maxReturned the maximum number of nodes to be returned + * @param minCzxId only return children whose creation zxid greater than minCzxId + * @param czxIdOffset how many children with zxid == minCzxId to skip (as returned in previous pages) + * @return A list of PathWithStat for the children. Size is bound to maxReturned (maxReturned+1 indicates truncation) + * @throws NoNodeException if the given path does not exist + */ + public List getPaginatedChildren(String path, Stat stat, Watcher watcher, int maxReturned, + long minCzxId, long czxIdOffset) throws NoNodeException { + return dataTree.getPaginatedChildren(path, stat, watcher, maxReturned, minCzxId, czxIdOffset); + } + /** * check if the path is special or not * @param path the input path diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/DataTreeTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/DataTreeTest.java index da9a12a3c52..588d7035cf3 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/DataTreeTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/DataTreeTest.java @@ -36,6 +36,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.jute.BinaryInputArchive; import org.apache.jute.BinaryOutputArchive; @@ -44,9 +45,12 @@ import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.KeeperException.NodeExistsException; import org.apache.zookeeper.Quotas; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZKTestCase; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.common.PathTrie; +import org.apache.zookeeper.data.PathWithStat; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.metrics.MetricsUtils; import org.apache.zookeeper.txn.CreateTxn; @@ -306,6 +310,174 @@ public void run() { assertTrue("Didn't find the expected node", ranTestCase.get()); } + @Test(timeout = 60000) + public void getChildrenPaginated() throws NodeExistsException, NoNodeException { + final String rootPath = "/children"; + final int firstCzxId = 1000; + final int countNodes = 10; + + // Create the parent node + DataTree dt = new DataTree(); + dt.createNode(rootPath, new byte[0], null, 0, dt.getNode("/").stat.getCversion()+1, 1, 1); + + // Create 10 child nodes + for (int i = 0; i < countNodes; ++i) { + dt.createNode(rootPath + "/test-" + i, new byte[0], null, 0, dt.getNode(rootPath).stat.getCversion() + i + 1, firstCzxId + i, 1); + } + + // Asking from a negative for 5 nodes should return the 5, and not set the watch + int curWatchCount = dt.getWatchCount(); + List result = dt.getPaginatedChildren(rootPath, null, new DummyWatcher(), 5, -1, 0); + assertEquals(5, result.size()); + assertEquals("The watch not should have been set", curWatchCount, dt.getWatchCount()); + // Verify that the list is sorted + String before = ""; + for (final PathWithStat s: result) { + final String path = s.getPath(); + assertTrue(String.format("The next path (%s) should be > previons (%s)", path, before), + path.compareTo(before) > 0); + before = path; + } + + // Asking from a negative would give me all children, and set the watch + curWatchCount = dt.getWatchCount(); + result = dt.getPaginatedChildren(rootPath, null, new DummyWatcher(), countNodes, -1, 0); + assertEquals(countNodes, result.size()); + assertEquals("The watch should have been set", curWatchCount + 1, dt.getWatchCount()); + // Verify that the list is sorted + before = ""; + for (final PathWithStat s: result) { + final String path = s.getPath(); + assertTrue(String.format("The next path (%s) should be > previons (%s)", path, before), + path.compareTo(before) > 0); + before = path; + } + + // Asking from the last one should return only onde node + curWatchCount = dt.getWatchCount(); + result = dt.getPaginatedChildren(rootPath, null, new DummyWatcher(), 2, 1000 + countNodes - 1, 0); + assertEquals(1, result.size()); + assertEquals("test-" + (countNodes - 1), result.get(0).getPath()); + assertEquals(firstCzxId + countNodes - 1, result.get(0).getStat().getMzxid()); + assertEquals("The watch should have been set", curWatchCount + 1, dt.getWatchCount()); + + // Asking from the last created node+1 should return an empty list and set the watch + curWatchCount = dt.getWatchCount(); + result = dt.getPaginatedChildren(rootPath, null, new DummyWatcher(), 2, 1000 + countNodes, 0); + assertTrue("The result should be an empty list", result.isEmpty()); + assertEquals("The watch should have been set", curWatchCount + 1, dt.getWatchCount()); + + // Asking from -1 for one node should return two, and NOT set the watch + curWatchCount = dt.getWatchCount(); + result = dt.getPaginatedChildren(rootPath, null, new DummyWatcher(), 1, -1, 0); + assertEquals("No watch should be set", curWatchCount, dt.getWatchCount()); + assertEquals("We only return up to ", 1, result.size()); + // Check that we ordered correctly + assertEquals("test-0", result.get(0).getPath()); + } + + @Test(timeout = 60000) + public void getChildrenPaginatedWithOffset() throws NodeExistsException, NoNodeException { + final String rootPath = "/children"; + final int childrenCzxId = 1000; + final int countNodes = 9; + final int allNodes = countNodes+2; + + // Create the parent node + DataTree dt = new DataTree(); + dt.createNode(rootPath, new byte[0], null, 0, dt.getNode("/").stat.getCversion()+1, 1, 1); + + int parentVersion = dt.getNode(rootPath).stat.getCversion(); + + // Create a children sometimes "before" + dt.createNode(rootPath + "/test-0", new byte[0], null, 0, parentVersion + 1, childrenCzxId-100, 1); + + // Create 10 child nodes, all with the same + for (int i = 1; i <= countNodes; ++i) { + dt.createNode(rootPath + "/test-" + i, new byte[0], null, 0, parentVersion + 2, childrenCzxId, 1); + } + + // Create a children sometimes "after" + dt.createNode(rootPath + "/test-999", new byte[0], null, 0, parentVersion + 3, childrenCzxId+100, 1); + + // Asking from a negative would give me all children, and set the watch + int curWatchCount = dt.getWatchCount(); + List result = dt.getPaginatedChildren(rootPath, null, new DummyWatcher(), 1000, -1, 0); + assertEquals(allNodes, result.size()); + assertEquals("The watch should have been set", curWatchCount + 1, dt.getWatchCount()); + // Verify that the list is sorted + String before = ""; + for (final PathWithStat s: result) { + final String path = s.getPath(); + assertTrue(String.format("The next path (%s) should be > previons (%s)", path, before), + path.compareTo(before) > 0); + before = path; + } + + // Asking with offset minCzxId below childrenCzxId should not skip anything, regardless of offset + curWatchCount = dt.getWatchCount(); + result = dt.getPaginatedChildren(rootPath, null, new DummyWatcher(), 2, childrenCzxId-1, 3); + assertEquals(2, result.size()); + assertEquals("test-1", result.get(0).getPath()); + assertEquals("test-2", result.get(1).getPath()); + assertEquals("The watch should not have been set", curWatchCount, dt.getWatchCount()); + + // Asking with offset 5 should skip nodes 1, 2, 3, 4, 5 + curWatchCount = dt.getWatchCount(); + result = dt.getPaginatedChildren(rootPath, null, new DummyWatcher(), 2, childrenCzxId, 5); + assertEquals(2, result.size()); + assertEquals("test-6", result.get(0).getPath()); + assertEquals("test-7", result.get(1).getPath()); + assertEquals("The watch should not have been set", curWatchCount, dt.getWatchCount()); + + // Asking with offset 5 for more nodes than are there should skip nodes 1, 2, 3, 4, 5 (plus 0 due to zxid) + curWatchCount = dt.getWatchCount(); + result = dt.getPaginatedChildren(rootPath, null, new DummyWatcher(), 10, childrenCzxId, 5); + + assertEquals(5, result.size()); + assertEquals("test-6", result.get(0).getPath()); + assertEquals("test-7", result.get(1).getPath()); + assertEquals("test-8", result.get(2).getPath()); + assertEquals("test-9", result.get(3).getPath()); + assertEquals("The watch should have been set", curWatchCount+1, dt.getWatchCount()); + + // Asking with offset 5 for fewer nodes than are there should skip nodes 1, 2, 3, 4, 5 (plus 0 due to zxid) + curWatchCount = dt.getWatchCount(); + result = dt.getPaginatedChildren(rootPath, null, new DummyWatcher(), 4, childrenCzxId, 5); + + assertEquals(4, result.size()); + assertEquals("test-6", result.get(0).getPath()); + assertEquals("test-7", result.get(1).getPath()); + assertEquals("test-8", result.get(2).getPath()); + assertEquals("test-9", result.get(3).getPath()); + assertEquals("The watch should not have been set", curWatchCount, dt.getWatchCount()); + + // Asking from the last created node+1 should return an empty list and set the watch + curWatchCount = dt.getWatchCount(); + result = dt.getPaginatedChildren(rootPath, null, new DummyWatcher(), 2, 1000 + childrenCzxId, 0); + assertTrue("The result should be an empty list", result.isEmpty()); + assertEquals("The watch should have been set", curWatchCount + 1, dt.getWatchCount()); + } + + @Test(timeout = 60000) + public void getChildrenPaginatedEmpty() throws NodeExistsException, NoNodeException { + final String rootPath = "/children"; + + // Create the parent node + DataTree dt = new DataTree(); + dt.createNode(rootPath, new byte[0], null, 0, dt.getNode("/").stat.getCversion()+1, 1, 1); + + // Asking from a negative would give me all children, and set the watch + int curWatchCount = dt.getWatchCount(); + List result = dt.getPaginatedChildren(rootPath, null, new DummyWatcher(), 100, -1, 0); + assertTrue("The result should be empty", result.isEmpty()); + assertEquals("The watch should have been set", curWatchCount + 1, dt.getWatchCount()); + } + + private class DummyWatcher implements Watcher { + @Override public void process(WatchedEvent ignored) { } + } + /* ZOOKEEPER-3531 - similarly for aclCache.deserialize, we should not hold lock either */ @Test(timeout = 60000) diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/GetChildrenPaginatedTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/GetChildrenPaginatedTest.java new file mode 100644 index 00000000000..8dca43ed372 --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/GetChildrenPaginatedTest.java @@ -0,0 +1,378 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.test; + +import org.apache.zookeeper.*; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.data.PathWithStat; +import org.apache.zookeeper.data.Stat; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Random; +import java.util.UUID; + +import static org.junit.Assert.fail; + +public class GetChildrenPaginatedTest extends ClientBase { + private ZooKeeper zk; + private final Random random = new Random(); + + + @Override + public void setUp() throws Exception { + super.setUp(); + + zk = createClient(); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + + zk.close(); + } + + @Test(timeout = 30000) + public void testPagination() throws Exception { + + final String testId = UUID.randomUUID().toString(); + final String basePath = "/testPagination-" + testId; + + Map createdChildrenMetadata = createChildren(basePath, 10, 1); + + // Create child 0 out of order (to make sure paths are not ordered lexicographically). + { + String childPath = basePath + "/" + 0; + zk.create(childPath, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + final Stat stat = zk.exists(childPath, null); + + createdChildrenMetadata.put(String.valueOf(0), stat); + + LOG.info("Created: " + childPath + " czxId: " + stat.getCzxid()); + } + + long minCzxId = -1; + Map readChildrenMetadata = new HashMap(); + final int pageSize = 3; + + RemoteIterator it = zk.getChildrenIterator(basePath, null, pageSize, minCzxId); + + while (it.hasNext()) { + PathWithStat pathWithStat = it.next(); + + final String nodePath = pathWithStat.getPath(); + final Stat nodeStat = pathWithStat.getStat(); + + LOG.info("Read: " + nodePath + " czxid: " + nodeStat.getCzxid()); + readChildrenMetadata.put(nodePath, nodeStat); + + Assert.assertTrue(nodeStat.getCzxid() > minCzxId); + minCzxId = nodeStat.getCzxid(); + + } + + Assert.assertEquals(createdChildrenMetadata.keySet(), readChildrenMetadata.keySet()); + + for (String child : createdChildrenMetadata.keySet()) { + Assert.assertEquals(createdChildrenMetadata.get(child), readChildrenMetadata.get(child)); + } + } + + @Test(timeout = 30000) + public void testPaginationIterator() throws Exception { + + final String testId = UUID.randomUUID().toString(); + final String basePath = "/testPagination-" + testId; + + Map createdChildrenMetadata = createChildren(basePath, random.nextInt(50)+1, 0); + + Map readChildrenMetadata = new HashMap(); + + final int batchSize = random.nextInt(3)+1; + + RemoteIterator childrenIterator = zk.getChildrenIterator(basePath, null, batchSize, -1); + + + while(childrenIterator.hasNext()) { + PathWithStat child = childrenIterator.next(); + + final String nodePath = child.getPath(); + final Stat nodeStat = child.getStat(); + + LOG.info("Read: " + nodePath + " czxid: " + nodeStat.getCzxid()); + readChildrenMetadata.put(nodePath, nodeStat); + } + + Assert.assertEquals(createdChildrenMetadata.keySet(), readChildrenMetadata.keySet()); + + for (String child : createdChildrenMetadata.keySet()) { + Assert.assertEquals(createdChildrenMetadata.get(child), readChildrenMetadata.get(child)); + } + } + + /* + * This test validates a known list of children is returned by the iterator despite server failures. + * After the iterator is created successfully, the following logic drives the rest of the test: + *

    + *
  • Randomly change the server state (down to up or up to down)
  • + *
  • Try to fetch the next element, swallowing exception produced by the server being down
  • + *
+ * Eventually, all children should be returned regardless of the number of times the server was unavailable. + */ + @Test(timeout = 60000) + public void testPaginationWithServerDown() throws Exception { + + final String testId = UUID.randomUUID().toString(); + final String basePath = "/testPagination-" + testId; + + Map createdChildrenMetadata = createChildren(basePath, random.nextInt(15)+10, 0); + + Map readChildrenMetadata = new HashMap(); + + final int batchSize = random.nextInt(3)+1; + + RemoteIterator childrenIterator = zk.getChildrenIterator(basePath, null, batchSize, -1); + + boolean serverDown = false; + + while(childrenIterator.hasNext()) { + + // Randomly change the up/down state of the server + if(random.nextBoolean()) { + if (serverDown) { + LOG.info("Bringing server UP"); + startServer(); + waitForServerUp(hostPort, 5000); + serverDown = false; + } else { + LOG.info("Taking server DOWN"); + stopServer(); + serverDown = true; + } + } + + PathWithStat child = null; + + boolean exception = false; + try { + child = childrenIterator.next(); + } catch (InterruptedException|KeeperException e) { + LOG.info("Exception in #next(): " + e.getMessage()); + exception = true; + } + + if (! exception) { + // next() returned (either more elements in current batch or server is up) + Assert.assertNotNull(child); + + final String nodePath = child.getPath(); + final Stat nodeStat = child.getStat(); + + LOG.info("Read: " + nodePath + " czxid: " + nodeStat.getCzxid()); + readChildrenMetadata.put(nodePath, nodeStat); + } + } + + Assert.assertEquals(createdChildrenMetadata.keySet(), readChildrenMetadata.keySet()); + + for (String child : createdChildrenMetadata.keySet()) { + Assert.assertEquals(createdChildrenMetadata.get(child), readChildrenMetadata.get(child)); + } + } + + + class FireOnlyOnceWatcher implements Watcher { + int watchFiredCount = 0; + + @Override + public void process(WatchedEvent event) { + + synchronized (this) { + watchFiredCount += 1; + this.notify(); + } + } + } + + @Test(timeout = 30000) + public void testPaginationWatch() throws Exception { + + final String testId = UUID.randomUUID().toString(); + final String basePath = "/testPaginationWatch-" + testId; + + createChildren(basePath, 10, 0); + + long minCzxId = -1; + final int pageSize = 3; + + FireOnlyOnceWatcher fireOnlyOnceWatcher = new FireOnlyOnceWatcher(); + + RemoteIterator it = zk.getChildrenIterator(basePath, fireOnlyOnceWatcher, pageSize, minCzxId); + + int childrenIndex = 0; + + while (it.hasNext()) { + + ++childrenIndex; + + PathWithStat pathWithStat = it.next(); + + final String nodePath = pathWithStat.getPath(); + LOG.info("Read: " + nodePath); + + final Stat nodeStat = pathWithStat.getStat(); + + Assert.assertTrue(nodeStat.getCzxid() > minCzxId); + minCzxId = nodeStat.getCzxid(); + + // Create more children before pagination is completed -- should NOT trigger watch + if(childrenIndex < 6) { + String childPath = basePath + "/" + "before-pagination-" + childrenIndex; + zk.create(childPath, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + + // Modify the first child of each page. + // This should not trigger additional watches or create duplicates in the set of children returned + if(childrenIndex % pageSize == 0) { + zk.setData(basePath + "/" + pathWithStat.getPath(), new byte[3], -1); + } + + synchronized (fireOnlyOnceWatcher) { + Assert.assertEquals("Watch should not have fired yet", 0, fireOnlyOnceWatcher.watchFiredCount); + } + } + + // Create another children after pagination is completed -- should trigger watch + { + String childPath = basePath + "/" + "after-pagination"; + zk.create(childPath, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + + + // Test eventually times out and fails if watches does not fire + while (true) { + synchronized (fireOnlyOnceWatcher) { + if (fireOnlyOnceWatcher.watchFiredCount > 0) { + Assert.assertEquals("Watch should have fired once", 1, fireOnlyOnceWatcher.watchFiredCount); + break; + } + fireOnlyOnceWatcher.wait(1000); + } + } + + // Watch fired once. + + // Give it another chance to fire (i.e. a duplicate) which would make the test fail + synchronized (fireOnlyOnceWatcher) { + fireOnlyOnceWatcher.wait(1000); + Assert.assertEquals("Watch should have fired once", 1, fireOnlyOnceWatcher.watchFiredCount); + } + } + + @Test(timeout = 60000, expected = NoSuchElementException.class) + public void testPaginationWithNoChildren() throws Exception { + + final String testId = UUID.randomUUID().toString(); + final String basePath = "/testPagination-" + testId; + + Map createdChildrenMetadata = createChildren(basePath, 0, 0); + + final int batchSize = 10; + + RemoteIterator childrenIterator = zk.getChildrenIterator(basePath, null, batchSize, -1); + + Assert.assertFalse(childrenIterator.hasNext()); + + childrenIterator.next(); + fail("NoSuchElementException is expected"); + } + + private Map createChildren(String basePath, int numChildren, int firstChildrenNameOffset) throws KeeperException, InterruptedException { + zk.create(basePath, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + + Map createdChildrenMetadata = new HashMap(); + + for (int i = firstChildrenNameOffset; i < (firstChildrenNameOffset+numChildren); i++) { + String childPath = basePath + "/" + i; + zk.create(childPath, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + final Stat stat = zk.exists(childPath, null); + + createdChildrenMetadata.put(String.valueOf(i), stat); + + LOG.info("Created: " + childPath + " czxid: " + stat.getCzxid()); + } + return createdChildrenMetadata; + } + + @Test(timeout = 60000) + public void testPaginationWithMulti() throws Exception { + + final String testId = UUID.randomUUID().toString(); + final String basePath = "/testPagination-" + testId; + + final int numChildren = 10; + final int batchSize = 3; + + zk.create(basePath, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + + Transaction transaction = zk.transaction(); + for (int i = 0; i < numChildren; i++) { + String childPath = basePath + "/" + i; + transaction.create(childPath, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + + List transactionOpResults = transaction.commit(); + + Assert.assertEquals(numChildren, transactionOpResults.size()); + + Map createdChildrenMetadata = new HashMap<>(); + + for (int i = 0; i < numChildren; i++) { + String childPath = basePath + "/" + i; + final Stat stat = zk.exists(childPath, null); + createdChildrenMetadata.put(String.valueOf(i), stat); + LOG.info("Created: " + childPath + " zkId: " + stat.getCzxid()); + } + + Map readChildrenMetadata = new HashMap(); + + RemoteIterator childrenIterator = zk.getChildrenIterator(basePath, null, batchSize, -1); + + while (childrenIterator.hasNext()) { + + PathWithStat children = childrenIterator.next(); + + LOG.info("Read: " + children.getPath() + " zkId: " + children.getStat().getCzxid()); + readChildrenMetadata.put(children.getPath(), children.getStat()); + } + + Assert.assertEquals(numChildren, readChildrenMetadata.size()); + + Assert.assertEquals(createdChildrenMetadata.keySet(), readChildrenMetadata.keySet()); + + for (String child : createdChildrenMetadata.keySet()) { + Assert.assertEquals(createdChildrenMetadata.get(child), readChildrenMetadata.get(child)); + } + } +} \ No newline at end of file