Skip to content

Commit

Permalink
CURATOR-690. CuratorCache fails to load the cache if there are more t…
Browse files Browse the repository at this point in the history
…han 64k child znodes (#480)

Signed-off-by: tison <[email protected]>
Co-authored-by: Ryan Ruel <[email protected]>
  • Loading branch information
tisonkun and rruel authored Sep 15, 2023
1 parent dc1cd9f commit 51483ad
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

package org.apache.curator.framework.recipes.cache;

import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type.*;
import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type.NODE_CHANGED;
import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type.NODE_CREATED;
import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type.NODE_DELETED;
import static org.apache.zookeeper.KeeperException.Code.NONODE;
import static org.apache.zookeeper.KeeperException.Code.OK;
import com.google.common.annotations.VisibleForTesting;
Expand All @@ -28,7 +30,6 @@
import java.util.Collections;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Stream;
Expand Down Expand Up @@ -59,13 +60,8 @@ class CuratorCacheImpl implements CuratorCache, CuratorCacheBridge {
private final StandardListenerManager<CuratorCacheListener> listenerManager = StandardListenerManager.standard();
private final Consumer<Exception> exceptionHandler;

private final Phaser outstandingOps = new Phaser() {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
callListeners(CuratorCacheListener::initialized);
return true;
}
};
private final OutstandingOps outstandingOps =
new OutstandingOps(() -> callListeners(CuratorCacheListener::initialized));

private enum State {
LATENT,
Expand Down Expand Up @@ -191,10 +187,10 @@ private void checkChildrenChanged(String fromPath, Stat oldStat, Stat newStat) {
} else {
handleException(event);
}
outstandingOps.arriveAndDeregister();
outstandingOps.decrement();
};

outstandingOps.register();
outstandingOps.increment();
client.getChildren().inBackground(callback).forPath(fromPath);
} catch (Exception e) {
handleException(e);
Expand All @@ -218,10 +214,10 @@ private void nodeChanged(String fromPath) {
} else {
handleException(event);
}
outstandingOps.arriveAndDeregister();
outstandingOps.decrement();
};

outstandingOps.register();
outstandingOps.increment();
if (compressedData) {
client.getData().decompressed().inBackground(callback).forPath(fromPath);
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.curator.framework.recipes.cache;

import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

class OutstandingOps {
private final AtomicReference<Runnable> completionProc;
private final AtomicLong count = new AtomicLong(0);
private volatile boolean active = true;

OutstandingOps(Runnable completionProc) {
this.completionProc = new AtomicReference<>(completionProc);
}

void increment() {
if (active) {
count.incrementAndGet();
}
}

void decrement() {
if (active && (count.decrementAndGet() == 0)) {
Runnable proc = completionProc.getAndSet(null);
if (proc != null) {
active = false;
proc.run();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@
import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.builder;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.transaction.CuratorOp;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.compatibility.CuratorTestBase;
import org.junit.jupiter.api.Tag;
Expand Down Expand Up @@ -179,4 +182,48 @@ public void testClearOnClose() throws Exception {
assertEquals(storage.size(), 0);
}
}

// CURATOR-690 - CuratorCache fails to load the cache if there are more than 64K child ZNodes
@Test
public void testGreaterThan64kZNodes() throws Exception {
final CuratorCacheStorage storage = new StandardCuratorCacheStorage(false);

// Phaser has a hard-limit of 64k registrants; we need to create more than that to trigger the initial problem.
final int zNodeCount = 0xFFFF + 5;

// Bulk creations in multiOp for (1) speed up creations (2) not exceed jute.maxbuffer size.
final int bulkSize = 10000;

try (CuratorFramework client = CuratorFrameworkFactory.newClient(
server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1))) {
client.start();
final CountDownLatch initializedLatch = new CountDownLatch(1);
client.create().creatingParentsIfNeeded().forPath("/test");

final List<CuratorOp> creations = new ArrayList<>();
for (int i = 0; i < zNodeCount; i++) {
creations.add(client.transactionOp().create().forPath("/test/node_" + i));
if (creations.size() > bulkSize) {
client.transaction().forOperations(creations);
creations.clear();
}
}
client.transaction().forOperations(creations);
creations.clear();

try (CuratorCache cache =
CuratorCache.builder(client, "/test").withStorage(storage).build()) {
final CuratorCacheListener listener =
builder().forInitialized(initializedLatch::countDown).build();
cache.listenable().addListener(listener);
cache.start();

assertTrue(timing.awaitLatch(initializedLatch));
assertEquals(
zNodeCount + 1,
cache.size(),
"Cache size should be equal to the number of zNodes created plus the root");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
public class BaseClassForTests {
protected volatile TestingServer server;

private final Logger log = LoggerFactory.getLogger(getClass());
protected final Logger log = LoggerFactory.getLogger(getClass());
private final AtomicBoolean isRetrying = new AtomicBoolean(false);

private static final String INTERNAL_PROPERTY_DONT_LOG_CONNECTION_ISSUES;
Expand Down

0 comments on commit 51483ad

Please sign in to comment.