Skip to content

Commit

Permalink
HBASE-28428 : Zookeeper ConnectionRegistry APIs should have timeout (a…
Browse files Browse the repository at this point in the history
…pache#5837)

Signed-off-by: Duo Zhang <[email protected]>
Signed-off-by: Pankaj Kumar <[email protected]>
Signed-off-by: Viraj Jasani <[email protected]>
  • Loading branch information
Divneet18 authored Jul 18, 2024
1 parent c3e40d6 commit 8f5516d
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,18 @@ class ZKConnectionRegistry implements ConnectionRegistry {
private final ReadOnlyZKClient zk;

private final ZNodePaths znodePaths;

private final Configuration conf;
private final int zkRegistryAsyncTimeout;
public static final String ZK_REGISTRY_ASYNC_GET_TIMEOUT = "zookeeper.registry.async.get.timeout";
public static final int DEFAULT_ZK_REGISTRY_ASYNC_GET_TIMEOUT = 60000; // 1 min
// User not used, but for rpc based registry we need it

ZKConnectionRegistry(Configuration conf, User ignored) {
this.znodePaths = new ZNodePaths(conf);
this.zk = new ReadOnlyZKClient(conf);
this.zk = new ReadOnlyZKClient(conf, AsyncConnectionImpl.RETRY_TIMER);
this.conf = conf;
this.zkRegistryAsyncTimeout =
conf.getInt(ZK_REGISTRY_ASYNC_GET_TIMEOUT, DEFAULT_ZK_REGISTRY_ASYNC_GET_TIMEOUT);
if (NEEDS_LOG_WARN) {
synchronized (WARN_LOCK) {
if (NEEDS_LOG_WARN) {
Expand All @@ -91,7 +98,7 @@ private interface Converter<T> {

private <T> CompletableFuture<T> getAndConvert(String path, Converter<T> converter) {
CompletableFuture<T> future = new CompletableFuture<>();
addListener(zk.get(path), (data, error) -> {
addListener(zk.get(path, this.zkRegistryAsyncTimeout), (data, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
Expand Down Expand Up @@ -218,8 +225,8 @@ public CompletableFuture<RegionLocations> getMetaRegionLocations() {
return tracedFuture(() -> {
CompletableFuture<RegionLocations> future = new CompletableFuture<>();
addListener(
zk.list(znodePaths.baseZNode).thenApply(children -> children.stream()
.filter(c -> this.znodePaths.isMetaZNodePrefix(c)).collect(Collectors.toList())),
zk.list(znodePaths.baseZNode, this.zkRegistryAsyncTimeout).thenApply(children -> children
.stream().filter(c -> this.znodePaths.isMetaZNodePrefix(c)).collect(Collectors.toList())),
(metaReplicaZNodes, error) -> {
if (error != null) {
future.completeExceptionally(error);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import org.apache.hbase.thirdparty.io.netty.util.TimerTask;

/**
* A very simple read only zookeeper implementation without watcher support.
*/
Expand Down Expand Up @@ -76,6 +79,8 @@ public final class ReadOnlyZKClient implements Closeable {

private final int keepAliveTimeMs;

private HashedWheelTimer retryTimer;

private final ZKClientConfig zkClientConfig;

private static abstract class Task implements Delayed {
Expand Down Expand Up @@ -126,7 +131,7 @@ private String getId() {
return String.format("0x%08x", System.identityHashCode(this));
}

public ReadOnlyZKClient(Configuration conf) {
public ReadOnlyZKClient(Configuration conf, HashedWheelTimer retryTimer) {
// We might use a different ZK for client access
String clientZkQuorumServers = ZKConfig.getClientZKQuorumServersString(conf);
if (clientZkQuorumServers != null) {
Expand All @@ -140,6 +145,7 @@ public ReadOnlyZKClient(Configuration conf) {
conf.getInt(RECOVERY_RETRY_INTERVAL_MILLIS, DEFAULT_RECOVERY_RETRY_INTERVAL_MILLIS);
this.keepAliveTimeMs = conf.getInt(KEEPALIVE_MILLIS, DEFAULT_KEEPALIVE_MILLIS);
this.zkClientConfig = ZKConfig.getZKClientConfig(conf);
this.retryTimer = retryTimer;
LOG.debug(
"Connect {} to {} with session timeout={}ms, retries={}, "
+ "retry interval={}ms, keepAlive={}ms, zk client config={}",
Expand Down Expand Up @@ -258,6 +264,23 @@ public void closed(IOException e) {
}
}

private static TimerTask getTimerTask(final long timeoutMs, final CompletableFuture<?> future,
final String api) {
return timeout -> {
if (!future.isDone()) {
future.completeExceptionally(new DoNotRetryIOException(
"Zookeeper " + api + " could not be completed in " + timeoutMs + " ms"));
}
};
}

public CompletableFuture<byte[]> get(final String path, final long timeoutMs) {
CompletableFuture<byte[]> future = get(path);
TimerTask timerTask = getTimerTask(timeoutMs, future, "GET");
retryTimer.newTimeout(timerTask, timeoutMs + 1, TimeUnit.MILLISECONDS);
return future;
}

public CompletableFuture<byte[]> get(String path) {
if (closed.get()) {
return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed"));
Expand All @@ -274,6 +297,13 @@ protected void doExec(ZooKeeper zk) {
return future;
}

public CompletableFuture<Stat> exists(String path, long timeoutMs) {
CompletableFuture<Stat> future = exists(path);
TimerTask timerTask = getTimerTask(timeoutMs, future, "EXISTS");
retryTimer.newTimeout(timerTask, timeoutMs + 1, TimeUnit.MILLISECONDS);
return future;
}

public CompletableFuture<Stat> exists(String path) {
if (closed.get()) {
return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed"));
Expand All @@ -289,6 +319,13 @@ protected void doExec(ZooKeeper zk) {
return future;
}

public CompletableFuture<List<String>> list(String path, long timeoutMs) {
CompletableFuture<List<String>> future = list(path);
TimerTask timerTask = getTimerTask(timeoutMs, future, "LIST");
retryTimer.newTimeout(timerTask, timeoutMs + 1, TimeUnit.MILLISECONDS);
return future;
}

public CompletableFuture<List<String>> list(String path) {
if (closed.get()) {
return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseZKTestingUtil;
Expand All @@ -51,6 +52,7 @@
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ZKTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
Expand All @@ -63,6 +65,9 @@
import org.junit.Test;
import org.junit.experimental.categories.Category;

import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;

@Category({ ZKTests.class, MediumTests.class })
public class TestReadOnlyZKClient {

Expand All @@ -79,6 +84,10 @@ public class TestReadOnlyZKClient {
private static int CHILDREN = 5;

private static ReadOnlyZKClient RO_ZK;
private static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer(
new ThreadFactoryBuilder().setNameFormat("Async-Client-Retry-Timer-pool-%d").setDaemon(true)
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
10, TimeUnit.MILLISECONDS);

@BeforeClass
public static void setUp() throws Exception {
Expand All @@ -98,13 +107,14 @@ public static void setUp() throws Exception {
conf.setInt(ReadOnlyZKClient.RECOVERY_RETRY, 3);
conf.setInt(ReadOnlyZKClient.RECOVERY_RETRY_INTERVAL_MILLIS, 100);
conf.setInt(ReadOnlyZKClient.KEEPALIVE_MILLIS, 3000);
RO_ZK = new ReadOnlyZKClient(conf);
RO_ZK = new ReadOnlyZKClient(conf, RETRY_TIMER);
// only connect when necessary
assertNull(RO_ZK.zookeeper);
}

@AfterClass
public static void tearDown() throws IOException {
RETRY_TIMER.stop();
RO_ZK.close();
UTIL.shutdownMiniZKCluster();
UTIL.cleanupTestDir();
Expand Down Expand Up @@ -204,4 +214,18 @@ public void testNotCloseZkWhenPending() throws Exception {
waitForIdleConnectionClosed();
verify(mockedZK, times(1)).close();
}

@Test
public void testReadWithTimeout() throws Exception {
assertArrayEquals(DATA, RO_ZK.get(PATH, 60000).get());
assertEquals(CHILDREN, RO_ZK.exists(PATH, 60000).get().getNumChildren());
List<String> children = RO_ZK.list(PATH, 60000).get();
assertEquals(CHILDREN, children.size());
Collections.sort(children);
for (int i = 0; i < CHILDREN; i++) {
assertEquals("c" + i, children.get(i));
}
assertNotNull(RO_ZK.zookeeper);
waitForIdleConnectionClosed();
}
}

0 comments on commit 8f5516d

Please sign in to comment.