Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZOOKEEPER-2260: Paginated getChildren call #50

Closed
wants to merge 8 commits into from
102 changes: 102 additions & 0 deletions src/java/main/org/apache/zookeeper/ChildrenBatchIterator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.zookeeper;

import org.apache.zookeeper.data.PathWithStat;

import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;

/**
* Iterator over children nodes of a given path.
*/
class ChildrenBatchIterator implements RemoteIterator<PathWithStat> {

private final ZooKeeper zooKeeper;
private final String path;
private final Watcher watcher;
private final int batchSize;
private final LinkedList<PathWithStat> childrenQueue;
private long nextBatchMinZxid;
private int nextBatchZxidOffset;


ChildrenBatchIterator(ZooKeeper zooKeeper, String path, Watcher watcher, int batchSize, long minZxid)
throws KeeperException, InterruptedException {
this.zooKeeper = zooKeeper;
this.path = path;
this.watcher = watcher;
this.batchSize = batchSize;
this.nextBatchZxidOffset = 0;
this.nextBatchMinZxid = minZxid;

this.childrenQueue = new LinkedList<>();

List<PathWithStat> firstChildrenBatch = zooKeeper.getChildren(path, watcher, batchSize, nextBatchMinZxid, nextBatchZxidOffset);
childrenQueue.addAll(firstChildrenBatch);

updateOffsetsForNextBatch(firstChildrenBatch);
}

@Override
public boolean hasNext() {

// next() never lets childrenQueue empty unless we iterated over all children
return ! childrenQueue.isEmpty();
}

@Override
public PathWithStat next() throws KeeperException, InterruptedException, NoSuchElementException {

if (!hasNext()) {
throw new NoSuchElementException("No more children");
}

// If we're down to the last element, backfill before returning it
if (childrenQueue.size() == 1) {

List<PathWithStat> childrenBatch = zooKeeper.getChildren(path, watcher, batchSize, nextBatchMinZxid, nextBatchZxidOffset);
childrenQueue.addAll(childrenBatch);

updateOffsetsForNextBatch(childrenBatch);
}

PathWithStat returnChildren = childrenQueue.pop();

return returnChildren;
}

/**
* Prepare minZxid and zkidOffset for the next batch request based on the children returned in the current
*/
private void updateOffsetsForNextBatch(List<PathWithStat> children) {

for (PathWithStat child : children) {
long childZxid = child.getStat().getCzxid();

if (nextBatchMinZxid == childZxid) {
++nextBatchZxidOffset;
} else {
nextBatchZxidOffset = 1;
nextBatchMinZxid = childZxid;
}
}
}
}
42 changes: 42 additions & 0 deletions src/java/main/org/apache/zookeeper/RemoteIterator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.zookeeper;

import java.util.NoSuchElementException;

/**
* An iterator over a collection whose elements need to be fetched remotely.
*/
public interface RemoteIterator<E> {

/**
* Returns true if the iterator has more elements.
* @return true if the iterator has more elements, false otherwise.
*/
boolean hasNext();

/**
* Returns the next element in the iteration.
* @return the next element in the iteration.
* @throws InterruptedException if the thread is interrupted
* @throws KeeperException if an error is encountered server-side
* @throws NoSuchElementException if the iteration has no more elements
*/
E next() throws InterruptedException, KeeperException, NoSuchElementException;
}
2 changes: 2 additions & 0 deletions src/java/main/org/apache/zookeeper/ZooDefs.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ public interface OpCode {

public final int createTTL = 21;

public final int getChildrenPaginated = 22;

public final int auth = 100;

public final int setWatches = 101;
Expand Down
111 changes: 111 additions & 0 deletions src/java/main/org/apache/zookeeper/ZooKeeper.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.zookeeper.client.ZooKeeperSaslClient;
import org.apache.zookeeper.common.PathUtils;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.PathWithStat;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.proto.CheckWatchesRequest;
import org.apache.zookeeper.proto.Create2Response;
Expand All @@ -52,6 +53,8 @@
import org.apache.zookeeper.proto.GetACLResponse;
import org.apache.zookeeper.proto.GetChildren2Request;
import org.apache.zookeeper.proto.GetChildren2Response;
import org.apache.zookeeper.proto.GetChildrenPaginatedRequest;
import org.apache.zookeeper.proto.GetChildrenPaginatedResponse;
import org.apache.zookeeper.proto.GetChildrenRequest;
import org.apache.zookeeper.proto.GetChildrenResponse;
import org.apache.zookeeper.proto.GetDataRequest;
Expand All @@ -78,6 +81,7 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -2443,6 +2447,113 @@ public List<String> getChildren(final String path, Watcher watcher)
return response.getChildren();
}

/**
* Return the a subset (a "page") of the children node of the given path.
*
* <p>
* The returned list contains up to {@code maxReturned} ordered by czxid.
* </p>
*
* <p>
* If {@code watcher} is non-null, a watch on children creation/deletion is set when reaching the end of pagination
* </p>
*
* @param path
* - the path of the node
* @param watcher
* - a concrete watcher or null
* @param maxReturned
* - the maximum number of children returned
* @param minCzxId
* - only return children whose creation zkid is equal or greater than {@code minCzxId}
* @param czxIdOffset
* - how many children with zkid == minCzxId to skip server-side, as they were returned in previous pages
* @return
* an ordered list of children nodes, up to {@code maxReturned}, ordered by czxid
* @throws KeeperException
* if the server signals an error with a non-zero error code.
* @throws IllegalArgumentException
* if any of the following is true:
* <ul>
* <li> {@code path} is invalid
* <li> {@code maxReturned} is less than 1
* </ul>
*
* @throws InterruptedException
* if the server transaction is interrupted.
*
* @since 3.5.2
*/
public List<PathWithStat> getChildren(final String path, Watcher watcher, final int maxReturned, final long minCzxId, final int czxIdOffset)
throws KeeperException, InterruptedException {
final String clientPath = path;
PathUtils.validatePath(clientPath);

if(maxReturned <= 0) {
throw new IllegalArgumentException("Cannot return less than 1 children");
}

// the watch contains the un-chroot path
WatchRegistration wcb = null;
if (watcher != null) {
wcb = new ChildWatchRegistration(watcher, clientPath);
}

final String serverPath = prependChroot(clientPath);

RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.getChildrenPaginated);
GetChildrenPaginatedRequest request = new GetChildrenPaginatedRequest();
request.setPath(serverPath);
request.setWatch(watcher != null);
request.setMaxReturned(maxReturned);
request.setMinCzxId(minCzxId);
request.setCzxIdOffset(czxIdOffset);
GetChildrenPaginatedResponse response = new GetChildrenPaginatedResponse();
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
clientPath);
}
return response.getChildren();
}

/**
* Return the a RemoteIterator over the children nodes of the given path.
*
* <p>
* If {@code watcher} is non-null, a watch on children creation/deletion is set when reaching the end the iterator
* </p>
*
* @param path
* - the path of the node
* @param watcher
* - a concrete watcher or null
* @param batchSize
* - the maximum number of children returned by each background call to the server
* @param minCzxId
* - only return children whose creation zkid is equal or greater than {@code minCzxId}
*
* @return
* an iterator on children node, ordered by czxid
* @throws KeeperException
* if the server signals an error with a non-zero error code.
* @throws IllegalArgumentException
* if any of the following is true:
* <ul>
* <li> {@code path} is invalid
* <li> {@code maxReturned} is less than 1
* </ul>
* @throws InterruptedException
* if the server transaction is interrupted.
*
* @since 3.5.2
*/
public RemoteIterator<PathWithStat> getChildrenIterator(String path, Watcher watcher, int batchSize, long minCzxId)
throws KeeperException, InterruptedException {
return new ChildrenBatchIterator(this, path, watcher, batchSize, minCzxId);
}

/**
* Return the list of the children of the node of the given path.
* <p>
Expand Down
Loading