diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java index a46f4d74e382..8c4bdf4d51c6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java @@ -69,11 +69,18 @@ class ZKConnectionRegistry implements ConnectionRegistry { private final ReadOnlyZKClient zk; private final ZNodePaths znodePaths; - + private final Configuration conf; + private final int zkRegistryAsyncTimeout; + public static final String ZK_REGISTRY_ASYNC_GET_TIMEOUT = "zookeeper.registry.async.get.timeout"; + public static final int DEFAULT_ZK_REGISTRY_ASYNC_GET_TIMEOUT = 60000; // 1 min // User not used, but for rpc based registry we need it + ZKConnectionRegistry(Configuration conf, User ignored) { this.znodePaths = new ZNodePaths(conf); - this.zk = new ReadOnlyZKClient(conf); + this.zk = new ReadOnlyZKClient(conf, AsyncConnectionImpl.RETRY_TIMER); + this.conf = conf; + this.zkRegistryAsyncTimeout = + conf.getInt(ZK_REGISTRY_ASYNC_GET_TIMEOUT, DEFAULT_ZK_REGISTRY_ASYNC_GET_TIMEOUT); if (NEEDS_LOG_WARN) { synchronized (WARN_LOCK) { if (NEEDS_LOG_WARN) { @@ -91,7 +98,7 @@ private interface Converter { private CompletableFuture getAndConvert(String path, Converter converter) { CompletableFuture future = new CompletableFuture<>(); - addListener(zk.get(path), (data, error) -> { + addListener(zk.get(path, this.zkRegistryAsyncTimeout), (data, error) -> { if (error != null) { future.completeExceptionally(error); return; @@ -218,8 +225,8 @@ public CompletableFuture getMetaRegionLocations() { return tracedFuture(() -> { CompletableFuture future = new CompletableFuture<>(); addListener( - zk.list(znodePaths.baseZNode).thenApply(children -> children.stream() - .filter(c -> this.znodePaths.isMetaZNodePrefix(c)).collect(Collectors.toList())), + zk.list(znodePaths.baseZNode, this.zkRegistryAsyncTimeout).thenApply(children -> children + .stream().filter(c -> this.znodePaths.isMetaZNodePrefix(c)).collect(Collectors.toList())), (metaReplicaZNodes, error) -> { if (error != null) { future.completeExceptionally(error); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java index 64b151dc19a5..6c26f089d742 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java @@ -43,6 +43,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; +import org.apache.hbase.thirdparty.io.netty.util.TimerTask; + /** * A very simple read only zookeeper implementation without watcher support. */ @@ -76,6 +79,8 @@ public final class ReadOnlyZKClient implements Closeable { private final int keepAliveTimeMs; + private HashedWheelTimer retryTimer; + private final ZKClientConfig zkClientConfig; private static abstract class Task implements Delayed { @@ -126,7 +131,7 @@ private String getId() { return String.format("0x%08x", System.identityHashCode(this)); } - public ReadOnlyZKClient(Configuration conf) { + public ReadOnlyZKClient(Configuration conf, HashedWheelTimer retryTimer) { // We might use a different ZK for client access String clientZkQuorumServers = ZKConfig.getClientZKQuorumServersString(conf); if (clientZkQuorumServers != null) { @@ -140,6 +145,7 @@ public ReadOnlyZKClient(Configuration conf) { conf.getInt(RECOVERY_RETRY_INTERVAL_MILLIS, DEFAULT_RECOVERY_RETRY_INTERVAL_MILLIS); this.keepAliveTimeMs = conf.getInt(KEEPALIVE_MILLIS, DEFAULT_KEEPALIVE_MILLIS); this.zkClientConfig = ZKConfig.getZKClientConfig(conf); + this.retryTimer = retryTimer; LOG.debug( "Connect {} to {} with session timeout={}ms, retries={}, " + "retry interval={}ms, keepAlive={}ms, zk client config={}", @@ -258,6 +264,23 @@ public void closed(IOException e) { } } + private static TimerTask getTimerTask(final long timeoutMs, final CompletableFuture future, + final String api) { + return timeout -> { + if (!future.isDone()) { + future.completeExceptionally(new DoNotRetryIOException( + "Zookeeper " + api + " could not be completed in " + timeoutMs + " ms")); + } + }; + } + + public CompletableFuture get(final String path, final long timeoutMs) { + CompletableFuture future = get(path); + TimerTask timerTask = getTimerTask(timeoutMs, future, "GET"); + retryTimer.newTimeout(timerTask, timeoutMs + 1, TimeUnit.MILLISECONDS); + return future; + } + public CompletableFuture get(String path) { if (closed.get()) { return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed")); @@ -274,6 +297,13 @@ protected void doExec(ZooKeeper zk) { return future; } + public CompletableFuture exists(String path, long timeoutMs) { + CompletableFuture future = exists(path); + TimerTask timerTask = getTimerTask(timeoutMs, future, "EXISTS"); + retryTimer.newTimeout(timerTask, timeoutMs + 1, TimeUnit.MILLISECONDS); + return future; + } + public CompletableFuture exists(String path) { if (closed.get()) { return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed")); @@ -289,6 +319,13 @@ protected void doExec(ZooKeeper zk) { return future; } + public CompletableFuture> list(String path, long timeoutMs) { + CompletableFuture> future = list(path); + TimerTask timerTask = getTimerTask(timeoutMs, future, "LIST"); + retryTimer.newTimeout(timerTask, timeoutMs + 1, TimeUnit.MILLISECONDS); + return future; + } + public CompletableFuture> list(String path) { if (closed.get()) { return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed")); diff --git a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java index 2f08f6276db4..23a8c339cd71 100644 --- a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java +++ b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java @@ -43,6 +43,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Exchanger; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseZKTestingUtil; @@ -51,6 +52,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ZKTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -63,6 +65,9 @@ import org.junit.Test; import org.junit.experimental.categories.Category; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; + @Category({ ZKTests.class, MediumTests.class }) public class TestReadOnlyZKClient { @@ -79,6 +84,10 @@ public class TestReadOnlyZKClient { private static int CHILDREN = 5; private static ReadOnlyZKClient RO_ZK; + private static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer( + new ThreadFactoryBuilder().setNameFormat("Async-Client-Retry-Timer-pool-%d").setDaemon(true) + .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(), + 10, TimeUnit.MILLISECONDS); @BeforeClass public static void setUp() throws Exception { @@ -98,13 +107,14 @@ public static void setUp() throws Exception { conf.setInt(ReadOnlyZKClient.RECOVERY_RETRY, 3); conf.setInt(ReadOnlyZKClient.RECOVERY_RETRY_INTERVAL_MILLIS, 100); conf.setInt(ReadOnlyZKClient.KEEPALIVE_MILLIS, 3000); - RO_ZK = new ReadOnlyZKClient(conf); + RO_ZK = new ReadOnlyZKClient(conf, RETRY_TIMER); // only connect when necessary assertNull(RO_ZK.zookeeper); } @AfterClass public static void tearDown() throws IOException { + RETRY_TIMER.stop(); RO_ZK.close(); UTIL.shutdownMiniZKCluster(); UTIL.cleanupTestDir(); @@ -204,4 +214,18 @@ public void testNotCloseZkWhenPending() throws Exception { waitForIdleConnectionClosed(); verify(mockedZK, times(1)).close(); } + + @Test + public void testReadWithTimeout() throws Exception { + assertArrayEquals(DATA, RO_ZK.get(PATH, 60000).get()); + assertEquals(CHILDREN, RO_ZK.exists(PATH, 60000).get().getNumChildren()); + List children = RO_ZK.list(PATH, 60000).get(); + assertEquals(CHILDREN, children.size()); + Collections.sort(children); + for (int i = 0; i < CHILDREN; i++) { + assertEquals("c" + i, children.get(i)); + } + assertNotNull(RO_ZK.zookeeper); + waitForIdleConnectionClosed(); + } }