Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-28428 : Zookeeper ConnectionRegistry APIs should have timeout #6095

Merged
merged 4 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,18 @@ class ZKConnectionRegistry implements ConnectionRegistry {
private final ReadOnlyZKClient zk;

private final ZNodePaths znodePaths;
private final Configuration conf;
private final int zkRegistryAsyncTimeout;
public static final String ZK_REGISTRY_ASYNC_GET_TIMEOUT = "zookeeper.registry.async.get.timeout";
public static final int DEFAULT_ZK_REGISTRY_ASYNC_GET_TIMEOUT = 60000; // 1 min

// User not used, but for rpc based registry we need it
ZKConnectionRegistry(Configuration conf, User ignored) {
this.znodePaths = new ZNodePaths(conf);
this.zk = new ReadOnlyZKClient(conf);
this.zk = new ReadOnlyZKClient(conf, AsyncConnectionImpl.RETRY_TIMER);
this.conf = conf;
this.zkRegistryAsyncTimeout =
conf.getInt(ZK_REGISTRY_ASYNC_GET_TIMEOUT, DEFAULT_ZK_REGISTRY_ASYNC_GET_TIMEOUT);
if (NEEDS_LOG_WARN) {
synchronized (WARN_LOCK) {
if (NEEDS_LOG_WARN) {
Expand All @@ -94,7 +101,7 @@ private interface Converter<T> {

private <T> CompletableFuture<T> getAndConvert(String path, Converter<T> converter) {
CompletableFuture<T> future = new CompletableFuture<>();
addListener(zk.get(path), (data, error) -> {
addListener(zk.get(path, this.zkRegistryAsyncTimeout), (data, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
Expand Down Expand Up @@ -228,8 +235,8 @@ public CompletableFuture<RegionLocations> getMetaRegionLocations() {
return tracedFuture(() -> {
CompletableFuture<RegionLocations> future = new CompletableFuture<>();
addListener(
zk.list(znodePaths.baseZNode).thenApply(children -> children.stream()
.filter(c -> this.znodePaths.isMetaZNodePrefix(c)).collect(Collectors.toList())),
zk.list(znodePaths.baseZNode, this.zkRegistryAsyncTimeout).thenApply(children -> children
.stream().filter(c -> this.znodePaths.isMetaZNodePrefix(c)).collect(Collectors.toList())),
(metaReplicaZNodes, error) -> {
if (error != null) {
future.completeExceptionally(error);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import org.apache.hbase.thirdparty.io.netty.util.TimerTask;

/**
* A very simple read only zookeeper implementation without watcher support.
*/
Expand Down Expand Up @@ -76,6 +79,8 @@ public final class ReadOnlyZKClient implements Closeable {

private final int keepAliveTimeMs;

private HashedWheelTimer retryTimer;

private final ZKClientConfig zkClientConfig;

private static abstract class Task implements Delayed {
Expand Down Expand Up @@ -126,7 +131,7 @@ private String getId() {
return String.format("0x%08x", System.identityHashCode(this));
}

public ReadOnlyZKClient(Configuration conf) {
public ReadOnlyZKClient(Configuration conf, HashedWheelTimer retryTimer) {
// We might use a different ZK for client access
String clientZkQuorumServers = ZKConfig.getClientZKQuorumServersString(conf);
if (clientZkQuorumServers != null) {
Expand All @@ -139,6 +144,7 @@ public ReadOnlyZKClient(Configuration conf) {
this.retryIntervalMs =
conf.getInt(RECOVERY_RETRY_INTERVAL_MILLIS, DEFAULT_RECOVERY_RETRY_INTERVAL_MILLIS);
this.keepAliveTimeMs = conf.getInt(KEEPALIVE_MILLIS, DEFAULT_KEEPALIVE_MILLIS);
this.retryTimer = retryTimer;
this.zkClientConfig = ZKConfig.getZKClientConfig(conf);
LOG.debug(
"Connect {} to {} with session timeout={}ms, retries={}, "
Expand Down Expand Up @@ -258,6 +264,23 @@ public void closed(IOException e) {
}
}

private static TimerTask getTimerTask(final long timeoutMs, final CompletableFuture<?> future,
final String api) {
return timeout -> {
if (!future.isDone()) {
future.completeExceptionally(new DoNotRetryIOException(
"Zookeeper " + api + " could not be completed in " + timeoutMs + " ms"));
}
};
}

public CompletableFuture<byte[]> get(final String path, final long timeoutMs) {
CompletableFuture<byte[]> future = get(path);
TimerTask timerTask = getTimerTask(timeoutMs, future, "GET");
retryTimer.newTimeout(timerTask, timeoutMs + 1, TimeUnit.MILLISECONDS);
return future;
}

public CompletableFuture<byte[]> get(String path) {
if (closed.get()) {
return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed"));
Expand All @@ -274,6 +297,13 @@ protected void doExec(ZooKeeper zk) {
return future;
}

public CompletableFuture<Stat> exists(String path, long timeoutMs) {
CompletableFuture<Stat> future = exists(path);
TimerTask timerTask = getTimerTask(timeoutMs, future, "EXISTS");
retryTimer.newTimeout(timerTask, timeoutMs + 1, TimeUnit.MILLISECONDS);
return future;
}

public CompletableFuture<Stat> exists(String path) {
if (closed.get()) {
return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed"));
Expand All @@ -289,6 +319,13 @@ protected void doExec(ZooKeeper zk) {
return future;
}

public CompletableFuture<List<String>> list(String path, long timeoutMs) {
CompletableFuture<List<String>> future = list(path);
TimerTask timerTask = getTimerTask(timeoutMs, future, "LIST");
retryTimer.newTimeout(timerTask, timeoutMs + 1, TimeUnit.MILLISECONDS);
return future;
}

public CompletableFuture<List<String>> list(String path) {
if (closed.get()) {
return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseZKTestingUtility;
Expand All @@ -51,6 +52,7 @@
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ZKTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
Expand All @@ -63,6 +65,9 @@
import org.junit.Test;
import org.junit.experimental.categories.Category;

import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;

@Category({ ZKTests.class, MediumTests.class })
public class TestReadOnlyZKClient {

Expand All @@ -79,6 +84,10 @@ public class TestReadOnlyZKClient {
private static int CHILDREN = 5;

private static ReadOnlyZKClient RO_ZK;
private static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer(
new ThreadFactoryBuilder().setNameFormat("Async-Client-Retry-Timer-pool-%d").setDaemon(true)
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
10, TimeUnit.MILLISECONDS);

@BeforeClass
public static void setUp() throws Exception {
Expand All @@ -98,13 +107,14 @@ public static void setUp() throws Exception {
conf.setInt(ReadOnlyZKClient.RECOVERY_RETRY, 3);
conf.setInt(ReadOnlyZKClient.RECOVERY_RETRY_INTERVAL_MILLIS, 100);
conf.setInt(ReadOnlyZKClient.KEEPALIVE_MILLIS, 3000);
RO_ZK = new ReadOnlyZKClient(conf);
RO_ZK = new ReadOnlyZKClient(conf, RETRY_TIMER);
// only connect when necessary
assertNull(RO_ZK.zookeeper);
}

@AfterClass
public static void tearDown() throws IOException {
RETRY_TIMER.stop();
RO_ZK.close();
UTIL.shutdownMiniZKCluster();
UTIL.cleanupTestDir();
Expand Down Expand Up @@ -204,4 +214,18 @@ public void testNotCloseZkWhenPending() throws Exception {
waitForIdleConnectionClosed();
verify(mockedZK, times(1)).close();
}

@Test
public void testReadWithTimeout() throws Exception {
assertArrayEquals(DATA, RO_ZK.get(PATH, 60000).get());
assertEquals(CHILDREN, RO_ZK.exists(PATH, 60000).get().getNumChildren());
List<String> children = RO_ZK.list(PATH, 60000).get();
assertEquals(CHILDREN, children.size());
Collections.sort(children);
for (int i = 0; i < CHILDREN; i++) {
assertEquals("c" + i, children.get(i));
}
assertNotNull(RO_ZK.zookeeper);
waitForIdleConnectionClosed();
}
}