Skip to content

Commit

Permalink
[pulsar-broker] Refresh ZooKeeper-data cache in background to avoid d…
Browse files Browse the repository at this point in the history
…eadlock and blocking IO on ZK thread (#8304)

### Motivation
We have been seeing broker restarts due to zk-session timeout and that's because of #4635 and 
```
"pulsar-ordered-OrderedExecutor-4-0-EventThread" #33 daemon prio=5 os_prio=0 cpu=36314.97ms elapsed=698.44s tid=0x00007f8114029790 nid=0x2a31 waiting on condition  [0x00007f8170575000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at jdk.internal.misc.Unsafe.park([email protected]/Native Method)
        - parking to wait for  <0x00001001a1c1aa50> (a java.util.concurrent.CompletableFuture$Signaller)
        at java.util.concurrent.locks.LockSupport.parkNanos([email protected]/LockSupport.java:234)
        at java.util.concurrent.CompletableFuture$Signaller.block([email protected]/CompletableFuture.java:1798)
        at java.util.concurrent.ForkJoinPool.managedBlock([email protected]/ForkJoinPool.java:3128)
        at java.util.concurrent.CompletableFuture.timedGet([email protected]/CompletableFuture.java:1868)
        at java.util.concurrent.CompletableFuture.get([email protected]/CompletableFuture.java:2021)
        at org.apache.pulsar.zookeeper.ZooKeeperCache.getData(ZooKeeperCache.java:293)
        at org.apache.pulsar.zookeeper.ZooKeeperCache.getData(ZooKeeperCache.java:238)
        at org.apache.pulsar.zookeeper.ZkIsolatedBookieEnsemblePlacementPolicy.getBlacklistedBookies(ZkIsolatedBookieEnsemblePlacementPolicy.java:150)
        at org.apache.pulsar.zookeeper.ZkIsolatedBookieEnsemblePlacementPolicy.newEnsemble(ZkIsolatedBookieEnsemblePlacementPolicy.java:123)
        at org.apache.bookkeeper.client.BookieWatcherImpl.newEnsemble(BookieWatcherImpl.java:233)
        at org.apache.bookkeeper.client.LedgerCreateOp.initiate(LedgerCreateOp.java:141)
        at org.apache.bookkeeper.client.BookKeeper.asyncCreateLedger(BookKeeper.java:831)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncCreateLedger(ManagedLedgerImpl.java:3063)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.ledgerClosed(ManagedLedgerImpl.java:1378)
        - locked <0x0000100035d21d60> (a org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl)
        at org.apache.bookkeeper.mledger.impl.OpAddEntry.closeComplete(OpAddEntry.java:188)
        at org.apache.bookkeeper.client.LedgerHandle$5.lambda$safeRun$0(LedgerHandle.java:556)
        at org.apache.bookkeeper.client.LedgerHandle$5$$Lambda$935/0x00007f7f30252908.accept(Unknown Source)
  
```

The main reason of such zk-session timeout and broker-restart is all zk-event threads get blocked on ZK-Cache, [zk-session-wather](https://github.com/apache/pulsar/blob/master/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperSessionWatcher.java) can't complete keep-alive and eventually zk-session timeout. Zk-thread gets blocked on `ZooKeeperCache` because [zk-cache](https://github.com/apache/pulsar/blob/master/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java#L111) expires cache-entry at every 5 mins and Zk-thread misses the cache and tries to retrieve from zk. once, all zk-thread are blocked on Zk-Cache to get data from zk, it creates a deadlock and broker sees zk-session timeout.

One of the solutions is to keep entries always available in cache, avoid cache miss and refresh zk-cache in background. this solution will make sure that zk-thread will not see cache-miss and will not be blocked. 

### Modification
- Refresh Zk-Cache in background without invalidating cache data so, it avoids zk-cache once entry is loaded at first time.

### Result
It wil fix #4635
  • Loading branch information
rdhabalia authored Oct 30, 2020
1 parent 465964b commit bebce4f
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ protected void setup() throws Exception {
config.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
config.setAdvertisedAddress("127.0.0.1");
config.setAllowAutoTopicCreationType("non-partitioned");
config.setZooKeeperOperationTimeoutSeconds(1);

pulsar = new PulsarService(config);
pulsar.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.io.IOException;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.Collections;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
Expand All @@ -40,13 +39,12 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;

import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.stats.CacheMetricsCollector;
import org.apache.zookeeper.AsyncCallback.ChildrenCallback;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.KeeperException.NoNodeException;
Expand Down Expand Up @@ -83,19 +81,20 @@ public static interface CacheUpdater<T> {

public static final String ZK_CACHE_INSTANCE = "zk_cache_instance";

protected final AsyncLoadingCache<String, Entry<Object, Stat>> dataCache;
protected final AsyncLoadingCache<String, Pair<Entry<Object, Stat>, Long>> dataCache;
protected final AsyncLoadingCache<String, Set<String>> childrenCache;
protected final AsyncLoadingCache<String, Boolean> existsCache;
private final OrderedExecutor executor;
private final OrderedExecutor backgroundExecutor = OrderedExecutor.newBuilder().name("zk-cache-background").numThreads(2).build();
private boolean shouldShutdownExecutor;
private final int zkOperationTimeoutSeconds;
private static final int CACHE_EXPIRY_SECONDS = 300; //5 minutes
private static final int DEFAULT_CACHE_EXPIRY_SECONDS = 300; //5 minutes
private final int cacheExpirySeconds;

protected AtomicReference<ZooKeeper> zkSession = new AtomicReference<ZooKeeper>(null);

public ZooKeeperCache(String cacheName, ZooKeeper zkSession, int zkOperationTimeoutSeconds, OrderedExecutor executor) {
this(cacheName, zkSession, zkOperationTimeoutSeconds, executor, CACHE_EXPIRY_SECONDS);
this(cacheName, zkSession, zkOperationTimeoutSeconds, executor, DEFAULT_CACHE_EXPIRY_SECONDS);
}

public ZooKeeperCache(String cacheName, ZooKeeper zkSession, int zkOperationTimeoutSeconds,
Expand All @@ -105,10 +104,10 @@ public ZooKeeperCache(String cacheName, ZooKeeper zkSession, int zkOperationTime
this.executor = executor;
this.zkSession.set(zkSession);
this.shouldShutdownExecutor = false;
this.cacheExpirySeconds = cacheExpirySeconds;

this.dataCache = Caffeine.newBuilder()
.recordStats()
.expireAfterWrite(cacheExpirySeconds, TimeUnit.SECONDS)
.buildAsync((key, executor1) -> null);

this.childrenCache = Caffeine.newBuilder()
Expand Down Expand Up @@ -346,10 +345,12 @@ public <T> CompletableFuture<Optional<Entry<T, Stat>>> getDataAsync(final String
checkNotNull(path);
checkNotNull(deserializer);

CompletableFuture<Optional<Entry<T, Stat>>> future = new CompletableFuture<>();
// refresh zk-cache entry in background if it's already expired
checkAndRefreshExpiredEntry(path, deserializer);
CompletableFuture<Optional<Entry<T,Stat>>> future = new CompletableFuture<>();
dataCache.get(path, (p, executor) -> {
// Return a future for the z-node to be fetched from ZK
CompletableFuture<Entry<Object, Stat>> zkFuture = new CompletableFuture<>();
CompletableFuture<Pair<Entry<Object, Stat>, Long>> zkFuture = new CompletableFuture<>();

// Broker doesn't restart on global-zk session lost: so handling unexpected exception
try {
Expand All @@ -358,8 +359,8 @@ public <T> CompletableFuture<Optional<Entry<T, Stat>>> getDataAsync(final String
try {
T obj = deserializer.deserialize(path, content);
// avoid using the zk-client thread to process the result
executor.execute(
() -> zkFuture.complete(new SimpleImmutableEntry<Object, Stat>(obj, stat)));
executor.execute(() -> zkFuture.complete(ImmutablePair
.of(new SimpleImmutableEntry<Object, Stat>(obj, stat), System.nanoTime())));
} catch (Exception e) {
executor.execute(() -> zkFuture.completeExceptionally(e));
}
Expand All @@ -378,7 +379,7 @@ public <T> CompletableFuture<Optional<Entry<T, Stat>>> getDataAsync(final String
return zkFuture;
}).thenAccept(result -> {
if (result != null) {
future.complete(Optional.of((Entry<T, Stat>) result));
future.complete(Optional.of((Entry<T, Stat>) result.getLeft()));
} else {
future.complete(Optional.empty());
}
Expand All @@ -390,6 +391,30 @@ public <T> CompletableFuture<Optional<Entry<T, Stat>>> getDataAsync(final String
return future;
}

private <T> void checkAndRefreshExpiredEntry(String path, final Deserializer<T> deserializer) {
CompletableFuture<Pair<Entry<Object, Stat>, Long>> result = dataCache.getIfPresent(path);
if (result != null && result.isDone()) {
Pair<Entry<Object, Stat>, Long> entryPair = result.getNow(null);
if (entryPair != null && entryPair.getRight() != null) {
if ((System.nanoTime() - entryPair.getRight()) > TimeUnit.SECONDS.toMillis(cacheExpirySeconds)) {
this.zkSession.get().getData(path, this, (rc, path1, ctx, content, stat) -> {
if (rc != Code.OK.intValue()) {
log.warn("Failed to refresh zookeeper-cache for {} due to {}", path, rc);
return;
}
try {
T obj = deserializer.deserialize(path, content);
dataCache.put(path, CompletableFuture.completedFuture(ImmutablePair
.of(new SimpleImmutableEntry<Object, Stat>(obj, stat), System.nanoTime())));
} catch (Exception e) {
log.warn("Failed to refresh zookeeper-cache for {}", path, e);
}
}, null);
}
}
}
}

/**
* Simple ZooKeeperChildrenCache use this method to invalidate cache entry on watch event w/o automatic re-loading
*
Expand Down Expand Up @@ -465,9 +490,9 @@ public CompletableFuture<Set<String>> getChildrenAsync(String path, Watcher watc

@SuppressWarnings("unchecked")
public <T> T getDataIfPresent(String path) {
CompletableFuture<Map.Entry<Object, Stat>> f = dataCache.getIfPresent(path);
CompletableFuture<Pair<Entry<Object, Stat>, Long>> f = dataCache.getIfPresent(path);
if (f != null && f.isDone() && !f.isCompletedExceptionally()) {
return (T) f.join().getKey();
return (T) f.join().getLeft().getKey();
} else {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;

import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.zookeeper.AsyncCallback.DataCallback;
import org.apache.zookeeper.KeeperException.Code;
Expand Down Expand Up @@ -563,6 +564,63 @@ public String deserialize(String key, byte[] content) throws Exception {
scheduledExecutor.shutdown();
}

/**
* Test to verify {@link ZooKeeperCache} renews cache data after expiry time in background.
*
* @throws Exception
*/
@Test
public void testZKRefreshExpiredEntry() throws Exception {
int cacheExpiryTimeSec = 1;
OrderedScheduler executor = OrderedScheduler.newSchedulerBuilder().build();
ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(2);
ExecutorService zkExecutor = Executors.newSingleThreadExecutor(new DefaultThreadFactory("mockZk"));

String path = "/test";
String val1 = "test-1";
String val2 = "test-2";
zkClient.create(path, val1.getBytes(), null, null);

// add readOpDelayMs so, main thread will not serve zkCacahe-returned future and let zkExecutor-thread handle
// callback-result process
ZooKeeperCache zkCacheService = new ZooKeeperCacheTest("test", zkClient, 30, executor, cacheExpiryTimeSec);
ZooKeeperDataCache<String> zkCache = new ZooKeeperDataCache<String>(zkCacheService) {
@Override
public String deserialize(String key, byte[] content) throws Exception {
return new String(content);
}
};

// try to do get on the path which will time-out and async-cache will have non-completed Future
assertEquals(zkCache.get(path).get(), val1);

zkClient.setData(path, val2.getBytes(), -1);

retryStrategically((test) -> {
try {
return zkCache.get(path).get().equalsIgnoreCase(val2);
} catch (Exception e) {
log.warn("Failed to get date for path {}", path);
}
return false;
}, 5, 1000);

assertEquals(zkCache.get(path).get(), val2);

executor.shutdown();
zkExecutor.shutdown();
scheduledExecutor.shutdown();
}

static class ZooKeeperCacheTest extends ZooKeeperCache {

public ZooKeeperCacheTest(String cacheName, ZooKeeper zkSession, int zkOperationTimeoutSeconds,
OrderedExecutor executor, int cacheExpirySeconds) {
super(cacheName, zkSession, zkOperationTimeoutSeconds, executor, cacheExpirySeconds);
}

}

private static void retryStrategically(Predicate<Void> predicate, int retryCount, long intSleepTimeInMillis)
throws Exception {
for (int i = 0; i < retryCount; i++) {
Expand Down

0 comments on commit bebce4f

Please sign in to comment.