From d4e614acb3e99be7e1275e5e6ae071b08632672c Mon Sep 17 00:00:00 2001 From: Szabolcs Bukros Date: Thu, 28 Mar 2024 09:35:52 +0100 Subject: [PATCH 1/4] HBASE-28464: Make replication ZKWatcher config customizable in extensions --- .../replication/HBaseReplicationEndpoint.java | 9 ++- .../hbase/zookeeper/RecoverableZooKeeper.java | 27 ++++++++- .../hadoop/hbase/zookeeper/ZKWatcher.java | 55 ++++++++++++++++++- 3 files changed, 84 insertions(+), 7 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java index 564f43324ccd..ab30bc6e33fd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java @@ -41,6 +41,7 @@ import org.apache.zookeeper.KeeperException.AuthFailedException; import org.apache.zookeeper.KeeperException.ConnectionLossException; import org.apache.zookeeper.KeeperException.SessionExpiredException; +import org.apache.zookeeper.client.ZKClientConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -193,12 +194,16 @@ private void reloadZkWatcher() throws IOException { if (zkw != null) { zkw.close(); } - zkw = - new ZKWatcher(ctx.getConfiguration(), "connection to cluster: " + ctx.getPeerId(), this); + zkw = new ZKWatcher(ctx.getConfiguration(), "connection to cluster: " + ctx.getPeerId(), this, + getZKClientConfig()); zkw.registerListener(new PeerRegionServerListener(this)); } } + protected ZKClientConfig getZKClientConfig() { + return new ZKClientConfig(); + } + private void connectPeerCluster() throws IOException { try { conn = createConnection(this.conf); diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java index f1798b00e315..fcfa6073eec5 100644 --- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java @@ -41,6 +41,7 @@ import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooKeeper.States; +import org.apache.zookeeper.client.ZKClientConfig; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.proto.CreateRequest; @@ -79,6 +80,7 @@ public class RecoverableZooKeeper { private final int sessionTimeout; private final String quorumServers; private final int maxMultiSize; + private final ZKClientConfig zkConfig; /** * See {@link #connect(Configuration, String, Watcher, String)} @@ -110,6 +112,23 @@ public static RecoverableZooKeeper connect(Configuration conf, String ensemble, */ public static RecoverableZooKeeper connect(Configuration conf, String ensemble, Watcher watcher, final String identifier) throws IOException { + return connect(conf, ensemble, watcher, identifier, null); + } + + /** + * Creates a new connection to ZooKeeper, pulling settings and ensemble config from the specified + * configuration object using methods from {@link ZKConfig}. Sets the connection status monitoring + * watcher to the specified watcher. + * @param conf configuration to pull ensemble and other settings from + * @param watcher watcher to monitor connection changes + * @param ensemble ZooKeeper servers quorum string + * @param identifier value used to identify this client instance. + * @param zkConfig zkClientConfig to use for the connection, can be null + * @return connection to zookeeper + * @throws IOException if unable to connect to zk or config problem + */ + public static RecoverableZooKeeper connect(Configuration conf, String ensemble, Watcher watcher, + final String identifier, ZKClientConfig zkConfig) throws IOException { if (ensemble == null) { throw new IOException("Unable to determine ZooKeeper ensemble"); } @@ -122,13 +141,14 @@ public static RecoverableZooKeeper connect(Configuration conf, String ensemble, int maxSleepTime = conf.getInt("zookeeper.recovery.retry.maxsleeptime", 60000); int multiMaxSize = conf.getInt("zookeeper.multi.max.size", 1024 * 1024); return new RecoverableZooKeeper(ensemble, timeout, watcher, retry, retryIntervalMillis, - maxSleepTime, identifier, multiMaxSize); + maxSleepTime, identifier, multiMaxSize, zkConfig); } @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DE_MIGHT_IGNORE", justification = "None. Its always been this way.") public RecoverableZooKeeper(String quorumServers, int sessionTimeout, Watcher watcher, - int maxRetries, int retryIntervalMillis, int maxSleepTime, String identifier, int maxMultiSize) + int maxRetries, int retryIntervalMillis, int maxSleepTime, String identifier, int maxMultiSize, + ZKClientConfig zkConfig) throws IOException { // TODO: Add support for zk 'chroot'; we don't add it to the quorumServers String as we should. this.retryCounterFactory = @@ -147,6 +167,7 @@ public RecoverableZooKeeper(String quorumServers, int sessionTimeout, Watcher wa this.sessionTimeout = sessionTimeout; this.quorumServers = quorumServers; this.maxMultiSize = maxMultiSize; + this.zkConfig = zkConfig; try { checkZk(); @@ -174,7 +195,7 @@ public int getMaxMultiSizeLimit() { protected synchronized ZooKeeper checkZk() throws KeeperException { if (this.zk == null) { try { - this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher); + this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher, zkConfig); } catch (IOException ex) { LOG.warn("Unable to create ZooKeeper Connection", ex); throw new KeeperException.OperationTimeoutException(); diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java index 8f0cfc811b85..27f7eb087767 100644 --- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java @@ -44,6 +44,7 @@ import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooDefs.Perms; +import org.apache.zookeeper.client.ZKClientConfig; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Id; import org.apache.zookeeper.data.Stat; @@ -120,6 +121,20 @@ public ZKWatcher(Configuration conf, String identifier, Abortable abortable) this(conf, identifier, abortable, false); } + /** + * Instantiate a ZooKeeper connection and watcher. + * @param identifier string that is passed to RecoverableZookeeper to be used as identifier for + * this instance. Use null for default. + * @param zkConfig zkClientConfig to use for the connection, can be null + * @throws IOException if the connection to ZooKeeper fails + * @throws ZooKeeperConnectionException if the client can't connect to ZooKeeper + */ + public ZKWatcher(Configuration conf, String identifier, Abortable abortable, + ZKClientConfig zkConfig) + throws ZooKeeperConnectionException, IOException { + this(conf, identifier, abortable, false, zkConfig); + } + /** * Instantiate a ZooKeeper connection and watcher. * @param conf the configuration to use @@ -133,7 +148,24 @@ public ZKWatcher(Configuration conf, String identifier, Abortable abortable) */ public ZKWatcher(Configuration conf, String identifier, Abortable abortable, boolean canCreateBaseZNode) throws IOException, ZooKeeperConnectionException { - this(conf, identifier, abortable, canCreateBaseZNode, false); + this(conf, identifier, abortable, canCreateBaseZNode, false, null); + } + + /** + * Instantiate a ZooKeeper connection and watcher. + * @param conf the configuration to use + * @param identifier string that is passed to RecoverableZookeeper to be used as + * identifier for this instance. Use null for default. + * @param abortable Can be null if there is on error there is no host to abort: e.g. + * client context. + * @param canCreateBaseZNode true if a base ZNode can be created + * @param zkConfig zkClientConfig to use for the connection, can be null + * @throws IOException if the connection to ZooKeeper fails + * @throws ZooKeeperConnectionException if the client can't connect to ZooKeeper + */ + public ZKWatcher(Configuration conf, String identifier, Abortable abortable, + boolean canCreateBaseZNode, ZKClientConfig zkConfig) throws IOException, ZooKeeperConnectionException { + this(conf, identifier, abortable, canCreateBaseZNode, false, zkConfig); } /** @@ -151,6 +183,25 @@ public ZKWatcher(Configuration conf, String identifier, Abortable abortable, */ public ZKWatcher(Configuration conf, String identifier, Abortable abortable, boolean canCreateBaseZNode, boolean clientZK) throws IOException, ZooKeeperConnectionException { + this(conf, identifier, abortable, canCreateBaseZNode, clientZK, null); + } + + /** + * Instantiate a ZooKeeper connection and watcher. + * @param conf the configuration to use + * @param identifier string that is passed to RecoverableZookeeper to be used as + * identifier for this instance. Use null for default. + * @param abortable Can be null if there is on error there is no host to abort: e.g. + * client context. + * @param canCreateBaseZNode true if a base ZNode can be created + * @param clientZK whether this watcher is set to access client ZK + * @param zkConfig zkClientConfig to use for the connection, can be null + * @throws IOException if the connection to ZooKeeper fails + * @throws ZooKeeperConnectionException if the connection to Zookeeper fails when create base + * ZNodes + */ + public ZKWatcher(Configuration conf, String identifier, Abortable abortable, + boolean canCreateBaseZNode, boolean clientZK, ZKClientConfig zkConfig) throws IOException, ZooKeeperConnectionException { this.conf = conf; if (clientZK) { String clientZkQuorumServers = ZKConfig.getClientZKQuorumServersString(conf); @@ -177,7 +228,7 @@ public ZKWatcher(Configuration conf, String identifier, Abortable abortable, this.znodePaths = new ZNodePaths(conf); PendingWatcher pendingWatcher = new PendingWatcher(); this.recoverableZooKeeper = - RecoverableZooKeeper.connect(conf, quorum, pendingWatcher, identifier); + RecoverableZooKeeper.connect(conf, quorum, pendingWatcher, identifier, zkConfig); pendingWatcher.prepare(this); if (canCreateBaseZNode) { try { From 21e3c53d87d77839ba6aafd50199a02e96935eb6 Mon Sep 17 00:00:00 2001 From: Szabolcs Bukros Date: Thu, 28 Mar 2024 16:17:23 +0100 Subject: [PATCH 2/4] HBASE-28464: Make replication ZKWatcher config customizable in extensions spotless fix --- .../hadoop/hbase/zookeeper/RecoverableZooKeeper.java | 5 ++--- .../org/apache/hadoop/hbase/zookeeper/ZKWatcher.java | 9 +++++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java index fcfa6073eec5..106de58219eb 100644 --- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java @@ -112,7 +112,7 @@ public static RecoverableZooKeeper connect(Configuration conf, String ensemble, */ public static RecoverableZooKeeper connect(Configuration conf, String ensemble, Watcher watcher, final String identifier) throws IOException { - return connect(conf, ensemble, watcher, identifier, null); + return connect(conf, ensemble, watcher, identifier, null); } /** @@ -148,8 +148,7 @@ public static RecoverableZooKeeper connect(Configuration conf, String ensemble, justification = "None. Its always been this way.") public RecoverableZooKeeper(String quorumServers, int sessionTimeout, Watcher watcher, int maxRetries, int retryIntervalMillis, int maxSleepTime, String identifier, int maxMultiSize, - ZKClientConfig zkConfig) - throws IOException { + ZKClientConfig zkConfig) throws IOException { // TODO: Add support for zk 'chroot'; we don't add it to the quorumServers String as we should. this.retryCounterFactory = new RetryCounterFactory(maxRetries + 1, retryIntervalMillis, maxSleepTime); diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java index 27f7eb087767..392437a844e0 100644 --- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java @@ -130,8 +130,7 @@ public ZKWatcher(Configuration conf, String identifier, Abortable abortable) * @throws ZooKeeperConnectionException if the client can't connect to ZooKeeper */ public ZKWatcher(Configuration conf, String identifier, Abortable abortable, - ZKClientConfig zkConfig) - throws ZooKeeperConnectionException, IOException { + ZKClientConfig zkConfig) throws ZooKeeperConnectionException, IOException { this(conf, identifier, abortable, false, zkConfig); } @@ -164,7 +163,8 @@ public ZKWatcher(Configuration conf, String identifier, Abortable abortable, * @throws ZooKeeperConnectionException if the client can't connect to ZooKeeper */ public ZKWatcher(Configuration conf, String identifier, Abortable abortable, - boolean canCreateBaseZNode, ZKClientConfig zkConfig) throws IOException, ZooKeeperConnectionException { + boolean canCreateBaseZNode, ZKClientConfig zkConfig) + throws IOException, ZooKeeperConnectionException { this(conf, identifier, abortable, canCreateBaseZNode, false, zkConfig); } @@ -201,7 +201,8 @@ public ZKWatcher(Configuration conf, String identifier, Abortable abortable, * ZNodes */ public ZKWatcher(Configuration conf, String identifier, Abortable abortable, - boolean canCreateBaseZNode, boolean clientZK, ZKClientConfig zkConfig) throws IOException, ZooKeeperConnectionException { + boolean canCreateBaseZNode, boolean clientZK, ZKClientConfig zkConfig) + throws IOException, ZooKeeperConnectionException { this.conf = conf; if (clientZK) { String clientZkQuorumServers = ZKConfig.getClientZKQuorumServersString(conf); From eeec72fabc55b15b963b8b43ce01accec1466a34 Mon Sep 17 00:00:00 2001 From: Szabolcs Bukros Date: Tue, 16 Apr 2024 10:31:40 +0200 Subject: [PATCH 3/4] HBASE-28464: Make replication ZKWatcher config customizable in extensions add java doc --- .../hadoop/hbase/replication/HBaseReplicationEndpoint.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java index ab30bc6e33fd..71d82f8d6861 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java @@ -200,6 +200,11 @@ private void reloadZkWatcher() throws IOException { } } + /** + * Allows us to create a zk client config matching the destination cluster's zk in + * replication extensions if needed. + * @return ZKClientConfig set up to match the destination cluster. + */ protected ZKClientConfig getZKClientConfig() { return new ZKClientConfig(); } From 5f349491d7ea770a2728eb1eda1c5451aeb86baf Mon Sep 17 00:00:00 2001 From: Szabolcs Bukros Date: Wed, 17 Apr 2024 17:08:41 +0200 Subject: [PATCH 4/4] HBASE-28464: Make replication ZKWatcher config customizable in extensions another spotless fix --- .../hadoop/hbase/replication/HBaseReplicationEndpoint.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java index 71d82f8d6861..afe8522c3326 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java @@ -201,8 +201,8 @@ private void reloadZkWatcher() throws IOException { } /** - * Allows us to create a zk client config matching the destination cluster's zk in - * replication extensions if needed. + * Allows us to create a zk client config matching the destination cluster's zk in replication + * extensions if needed. * @return ZKClientConfig set up to match the destination cluster. */ protected ZKClientConfig getZKClientConfig() {