Skip to content

Commit

Permalink
Cherry-pick and adapt ZOOKEEPER-2260 for getChildren pagination (#5)
Browse files Browse the repository at this point in the history
Apply the patch of ZOOKEEPER-2260: paginated getChildren call (apache#50)
And adjust code style to pass the check style verification
Credit goest to the original author of ZOOKEEPER-2260
  • Loading branch information
huizhilu authored and narendly committed Nov 23, 2021
1 parent 0f66f47 commit 2736f44
Show file tree
Hide file tree
Showing 11 changed files with 978 additions and 0 deletions.
15 changes: 15 additions & 0 deletions zookeeper-jute/src/main/resources/zookeeper.jute
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ module org.apache.zookeeper.data {
long ephemeralOwner; // owner id if ephemeral, 0 otw
long pzxid; // last modified children
}
class PathWithStat {
ustring path;
org.apache.zookeeper.data.Stat stat;
}
}

module org.apache.zookeeper.proto {
Expand Down Expand Up @@ -157,6 +161,13 @@ module org.apache.zookeeper.proto {
ustring path;
boolean watch;
}
class GetChildrenPaginatedRequest {
ustring path;
int maxReturned;
long minCzxId;
long czxIdOffset;
boolean watch;
}
class CheckVersionRequest {
ustring path;
int version;
Expand Down Expand Up @@ -228,6 +239,10 @@ module org.apache.zookeeper.proto {
vector<ustring> children;
org.apache.zookeeper.data.Stat stat;
}
class GetChildrenPaginatedResponse {
vector<org.apache.zookeeper.data.PathWithStat> children;
org.apache.zookeeper.data.Stat stat;
}
class GetACLResponse {
vector<org.apache.zookeeper.data.ACL> acl;
org.apache.zookeeper.data.Stat stat;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.zookeeper;

import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
import org.apache.zookeeper.data.PathWithStat;

/**
* Iterator over children nodes of a given path.
*/
class ChildrenBatchIterator implements RemoteIterator<PathWithStat> {

private final ZooKeeper zooKeeper;
private final String path;
private final Watcher watcher;
private final int batchSize;
private final LinkedList<PathWithStat> childrenQueue;
private long nextBatchMinZxid;
private int nextBatchZxidOffset;


ChildrenBatchIterator(ZooKeeper zooKeeper, String path, Watcher watcher, int batchSize, long minZxid)
throws KeeperException, InterruptedException {
this.zooKeeper = zooKeeper;
this.path = path;
this.watcher = watcher;
this.batchSize = batchSize;
this.nextBatchZxidOffset = 0;
this.nextBatchMinZxid = minZxid;

this.childrenQueue = new LinkedList<>();

List<PathWithStat> firstChildrenBatch = zooKeeper.getChildren(path, watcher, batchSize, nextBatchMinZxid, nextBatchZxidOffset);
childrenQueue.addAll(firstChildrenBatch);

updateOffsetsForNextBatch(firstChildrenBatch);
}

@Override
public boolean hasNext() {

// next() never lets childrenQueue empty unless we iterated over all children
return !childrenQueue.isEmpty();
}

@Override
public PathWithStat next() throws KeeperException, InterruptedException, NoSuchElementException {

if (!hasNext()) {
throw new NoSuchElementException("No more children");
}

// If we're down to the last element, backfill before returning it
if (childrenQueue.size() == 1) {

List<PathWithStat> childrenBatch = zooKeeper.getChildren(path, watcher, batchSize, nextBatchMinZxid, nextBatchZxidOffset);
childrenQueue.addAll(childrenBatch);

updateOffsetsForNextBatch(childrenBatch);
}

PathWithStat returnChildren = childrenQueue.pop();

return returnChildren;
}

/**
* Prepare minZxid and zkidOffset for the next batch request based on the children returned in the current
*/
private void updateOffsetsForNextBatch(List<PathWithStat> children) {

for (PathWithStat child : children) {
long childZxid = child.getStat().getCzxid();

if (nextBatchMinZxid == childZxid) {
++nextBatchZxidOffset;
} else {
nextBatchZxidOffset = 1;
nextBatchMinZxid = childZxid;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.zookeeper;

import java.util.NoSuchElementException;

/**
* An iterator over a collection whose elements need to be fetched remotely.
*/
public interface RemoteIterator<E> {

/**
* Returns true if the iterator has more elements.
* @return true if the iterator has more elements, false otherwise.
*/
boolean hasNext();

/**
* Returns the next element in the iteration.
* @return the next element in the iteration.
* @throws InterruptedException if the thread is interrupted
* @throws KeeperException if an error is encountered server-side
* @throws NoSuchElementException if the iteration has no more elements
*/
E next() throws InterruptedException, KeeperException, NoSuchElementException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ public interface OpCode {

int multiRead = 22;

int getChildrenPaginated = 71;

int auth = 100;

int setWatches = 101;
Expand Down
110 changes: 110 additions & 0 deletions zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.zookeeper.client.ZooKeeperSaslClient;
import org.apache.zookeeper.common.PathUtils;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.PathWithStat;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.proto.AddWatchRequest;
import org.apache.zookeeper.proto.CheckWatchesRequest;
Expand All @@ -69,6 +70,8 @@
import org.apache.zookeeper.proto.GetAllChildrenNumberResponse;
import org.apache.zookeeper.proto.GetChildren2Request;
import org.apache.zookeeper.proto.GetChildren2Response;
import org.apache.zookeeper.proto.GetChildrenPaginatedRequest;
import org.apache.zookeeper.proto.GetChildrenPaginatedResponse;
import org.apache.zookeeper.proto.GetChildrenRequest;
import org.apache.zookeeper.proto.GetChildrenResponse;
import org.apache.zookeeper.proto.GetDataRequest;
Expand Down Expand Up @@ -2748,6 +2751,113 @@ public List<String> getChildren(final String path, Watcher watcher) throws Keepe
return response.getChildren();
}

/**
* Returns the a subset (a "page") of the children node of the given path.
*
* <p>
* The returned list contains up to {@code maxReturned} ordered by czxid.
* </p>
*
* <p>
* If {@code watcher} is non-null, a watch on children creation/deletion is set when reaching the end of pagination
* </p>
*
* @param path
* - the path of the node
* @param watcher
* - a concrete watcher or null
* @param maxReturned
* - the maximum number of children returned
* @param minCzxId
* - only return children whose creation zkid is equal or greater than {@code minCzxId}
* @param czxIdOffset
* - how many children with zkid == minCzxId to skip server-side, as they were returned in previous pages
* @return
* an ordered list of children nodes, up to {@code maxReturned}, ordered by czxid
* @throws KeeperException
* if the server signals an error with a non-zero error code.
* @throws IllegalArgumentException
* if any of the following is true:
* <ul>
* <li> {@code path} is invalid
* <li> {@code maxReturned} is less than 1
* </ul>
*
* @throws InterruptedException
* if the server transaction is interrupted.
*
* @since 3.6.2
*/
public List<PathWithStat> getChildren(final String path, Watcher watcher, final int maxReturned, final long minCzxId, final int czxIdOffset)
throws KeeperException, InterruptedException {
final String clientPath = path;
PathUtils.validatePath(clientPath);

if (maxReturned <= 0) {
throw new IllegalArgumentException("Cannot return less than 1 children");
}

// the watch contains the un-chroot path
WatchRegistration wcb = null;
if (watcher != null) {
wcb = new ChildWatchRegistration(watcher, clientPath);
}

final String serverPath = prependChroot(clientPath);

RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.getChildrenPaginated);
GetChildrenPaginatedRequest request = new GetChildrenPaginatedRequest();
request.setPath(serverPath);
request.setWatch(watcher != null);
request.setMaxReturned(maxReturned);
request.setMinCzxId(minCzxId);
request.setCzxIdOffset(czxIdOffset);
GetChildrenPaginatedResponse response = new GetChildrenPaginatedResponse();
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
clientPath);
}
return response.getChildren();
}

/**
* Returns the a RemoteIterator over the children nodes of the given path.
*
* <p>
* If {@code watcher} is non-null, a watch on children creation/deletion is set when reaching the end the iterator
* </p>
*
* @param path
* - the path of the node
* @param watcher
* - a concrete watcher or null
* @param batchSize
* - the maximum number of children returned by each background call to the server
* @param minCzxId
* - only return children whose creation zkid is equal or greater than {@code minCzxId}
*
* @return
* an iterator on children node, ordered by czxid
* @throws KeeperException
* if the server signals an error with a non-zero error code.
* @throws IllegalArgumentException
* if any of the following is true:
* <ul>
* <li> {@code path} is invalid
* <li> {@code maxReturned} is less than 1
* </ul>
* @throws InterruptedException
* if the server transaction is interrupted.
*
* @since 3.6.2
*/
public RemoteIterator<PathWithStat> getChildrenIterator(String path, Watcher watcher, int batchSize, long minCzxId)
throws KeeperException, InterruptedException {
return new ChildrenBatchIterator(this, path, watcher, batchSize, minCzxId);
}

/**
* Return the list of the children of the node of the given path.
* <p>
Expand Down
Loading

0 comments on commit 2736f44

Please sign in to comment.