Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-28529 Use ZKClientConfig instead of system properties when sett… #5835

Merged
merged 1 commit into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.client.ZKClientConfig;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -75,6 +76,8 @@ public final class ReadOnlyZKClient implements Closeable {

private final int keepAliveTimeMs;

private final ZKClientConfig zkClientConfig;

private static abstract class Task implements Delayed {

protected long time = System.nanoTime();
Expand Down Expand Up @@ -136,10 +139,12 @@ public ReadOnlyZKClient(Configuration conf) {
this.retryIntervalMs =
conf.getInt(RECOVERY_RETRY_INTERVAL_MILLIS, DEFAULT_RECOVERY_RETRY_INTERVAL_MILLIS);
this.keepAliveTimeMs = conf.getInt(KEEPALIVE_MILLIS, DEFAULT_KEEPALIVE_MILLIS);
this.zkClientConfig = ZKConfig.getZKClientConfig(conf);
LOG.debug(
"Connect {} to {} with session timeout={}ms, retries {}, "
+ "retry interval {}ms, keepAlive={}ms",
getId(), connectString, sessionTimeoutMs, maxRetries, retryIntervalMs, keepAliveTimeMs);
"Connect {} to {} with session timeout={}ms, retries={}, "
+ "retry interval={}ms, keepAlive={}ms, zk client config={}",
getId(), connectString, sessionTimeoutMs, maxRetries, retryIntervalMs, keepAliveTimeMs,
zkClientConfig);
Threads.setDaemonThreadRunning(new Thread(this::run),
"ReadOnlyZKClient-" + connectString + "@" + getId());
}
Expand Down Expand Up @@ -316,7 +321,7 @@ private ZooKeeper getZk() throws IOException {
// may be closed when session expired
if (zookeeper == null || !zookeeper.getState().isAlive()) {
zookeeper = new ZooKeeper(connectString, sessionTimeoutMs, e -> {
});
}, zkClientConfig);
}
return zookeeper;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,22 @@
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.client.ZKClientConfig;

import org.apache.hbase.thirdparty.com.google.common.base.Splitter;

/**
* Utility methods for reading, and building the ZooKeeper configuration. The order and priority for
* reading the config are as follows: (1). Property with "hbase.zookeeper.property." prefix from
* HBase XML (2). other zookeeper related properties in HBASE XML
* reading the config are as follows:
* <ol>
* <li>Property with "hbase.zookeeper.property." prefix from HBase XML.</li>
* <li>other zookeeper related properties in HBASE XML</li>
* </ol>
*/
@InterfaceAudience.Private
public final class ZKConfig {

private static final String VARIABLE_START = "${";
private static final String ZOOKEEPER_JAVA_PROPERTY_PREFIX = "zookeeper.";

private ZKConfig() {
}
Expand Down Expand Up @@ -132,7 +135,6 @@ private static String getZKQuorumServersStringFromHbaseConfig(Configuration conf
* @return Quorum servers
*/
public static String getZKQuorumServersString(Configuration conf) {
setZooKeeperClientSystemProperties(HConstants.ZK_CFG_PROPERTY_PREFIX, conf);
return getZKQuorumServersStringFromHbaseConfig(conf);
}

Expand Down Expand Up @@ -322,13 +324,19 @@ public String getZnodeParent() {
}
}

public static ZKClientConfig getZKClientConfig(Configuration conf) {
Properties zkProperties = extractZKPropsFromHBaseConfig(conf);
ZKClientConfig zkClientConfig = new ZKClientConfig();
zkProperties.forEach((k, v) -> zkClientConfig.setProperty(k.toString(), v.toString()));
return zkClientConfig;
}

/**
* Get the client ZK Quorum servers string
* @param conf the configuration to read
* @return Client quorum servers, or null if not specified
*/
public static String getClientZKQuorumServersString(Configuration conf) {
setZooKeeperClientSystemProperties(HConstants.ZK_CFG_PROPERTY_PREFIX, conf);
String clientQuromServers = conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM);
if (clientQuromServers == null) {
return null;
Expand All @@ -341,15 +349,4 @@ public static String getClientZKQuorumServersString(Configuration conf) {
final String[] serverHosts = StringUtils.getStrings(clientQuromServers);
return buildZKQuorumServerString(serverHosts, clientZkClientPort);
}

private static void setZooKeeperClientSystemProperties(String prefix, Configuration conf) {
Properties zkProperties = extractZKPropsFromHBaseConfig(conf);
for (Entry<Object, Object> entry : zkProperties.entrySet()) {
String key = entry.getKey().toString().trim();
String value = entry.getValue().toString().trim();
if (System.getProperty(ZOOKEEPER_JAVA_PROPERTY_PREFIX + key) == null) {
System.setProperty(ZOOKEEPER_JAVA_PROPERTY_PREFIX + key, value);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.zookeeper.client.ZKClientConfig;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
Expand Down Expand Up @@ -100,62 +101,22 @@ public void testClusterKeyWithMultiplePorts() throws Exception {
}

@Test
public void testZooKeeperTlsPropertiesClient() {
public void testZooKeeperTlsProperties() {
// Arrange
Configuration conf = HBaseConfiguration.create();
for (String p : ZOOKEEPER_CLIENT_TLS_PROPERTIES) {
conf.set(HConstants.ZK_CFG_PROPERTY_PREFIX + p, p);
String zkprop = "zookeeper." + p;
System.clearProperty(zkprop);
}

// Act
ZKConfig.getClientZKQuorumServersString(conf);
ZKClientConfig zkClientConfig = ZKConfig.getZKClientConfig(conf);

// Assert
for (String p : ZOOKEEPER_CLIENT_TLS_PROPERTIES) {
String zkprop = "zookeeper." + p;
assertEquals("Invalid or unset system property: " + zkprop, p, System.getProperty(zkprop));
System.clearProperty(zkprop);
assertEquals("Invalid or unset system property: " + p, p, zkClientConfig.getProperty(p));
}
}

@Test
public void testZooKeeperTlsPropertiesServer() {
// Arrange
Configuration conf = HBaseConfiguration.create();
for (String p : ZOOKEEPER_CLIENT_TLS_PROPERTIES) {
conf.set(HConstants.ZK_CFG_PROPERTY_PREFIX + p, p);
String zkprop = "zookeeper." + p;
System.clearProperty(zkprop);
}

// Act
ZKConfig.getZKQuorumServersString(conf);

// Assert
for (String p : ZOOKEEPER_CLIENT_TLS_PROPERTIES) {
String zkprop = "zookeeper." + p;
assertEquals("Invalid or unset system property: " + zkprop, p, System.getProperty(zkprop));
System.clearProperty(zkprop);
}
}

@Test
public void testZooKeeperPropertiesDoesntOverwriteSystem() {
// Arrange
System.setProperty("zookeeper.a.b.c", "foo");
Configuration conf = HBaseConfiguration.create();
conf.set(HConstants.ZK_CFG_PROPERTY_PREFIX + "a.b.c", "bar");

// Act
ZKConfig.getZKQuorumServersString(conf);

// Assert
assertEquals("foo", System.getProperty("zookeeper.a.b.c"));
System.clearProperty("zookeeper.a.b.c");
}

private void testKey(String ensemble, int port, String znode) throws IOException {
testKey(ensemble, port, znode, false); // not support multiple client ports
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooKeeper.States;
import org.apache.zookeeper.client.ZKClientConfig;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.proto.CreateRequest;
Expand All @@ -49,19 +50,22 @@
import org.slf4j.LoggerFactory;

/**
* A zookeeper that can handle 'recoverable' errors. To handle recoverable errors, developers need
* to realize that there are two classes of requests: idempotent and non-idempotent requests. Read
* requests and unconditional sets and deletes are examples of idempotent requests, they can be
* reissued with the same results. (Although, the delete may throw a NoNodeException on reissue its
* effect on the ZooKeeper state is the same.) Non-idempotent requests need special handling,
* application and library writers need to keep in mind that they may need to encode information in
* the data or name of znodes to detect retries. A simple example is a create that uses a sequence
* flag. If a process issues a create("/x-", ..., SEQUENCE) and gets a connection loss exception,
* that process will reissue another create("/x-", ..., SEQUENCE) and get back x-111. When the
* process does a getChildren("/"), it sees x-1,x-30,x-109,x-110,x-111, now it could be that x-109
* was the result of the previous create, so the process actually owns both x-109 and x-111. An easy
* way around this is to use "x-process id-" when doing the create. If the process is using an id of
* 352, before reissuing the create it will do a getChildren("/") and see "x-222-1", "x-542-30",
* A zookeeper that can handle 'recoverable' errors.
* <p>
* To handle recoverable errors, developers need to realize that there are two classes of requests:
* idempotent and non-idempotent requests. Read requests and unconditional sets and deletes are
* examples of idempotent requests, they can be reissued with the same results.
* <p>
* (Although, the delete may throw a NoNodeException on reissue its effect on the ZooKeeper state is
* the same.) Non-idempotent requests need special handling, application and library writers need to
* keep in mind that they may need to encode information in the data or name of znodes to detect
* retries. A simple example is a create that uses a sequence flag. If a process issues a
* create("/x-", ..., SEQUENCE) and gets a connection loss exception, that process will reissue
* another create("/x-", ..., SEQUENCE) and get back x-111. When the process does a
* getChildren("/"), it sees x-1,x-30,x-109,x-110,x-111, now it could be that x-109 was the result
* of the previous create, so the process actually owns both x-109 and x-111. An easy way around
* this is to use "x-process id-" when doing the create. If the process is using an id of 352,
* before reissuing the create it will do a getChildren("/") and see "x-222-1", "x-542-30",
* "x-352-109", x-333-110". The process will know that the original create succeeded an the znode it
* created is "x-352-109".
* @see "https://cwiki.apache.org/confluence/display/HADOOP2/ZooKeeper+ErrorHandling"
Expand All @@ -79,37 +83,31 @@ public class RecoverableZooKeeper {
private final int sessionTimeout;
private final String quorumServers;
private final int maxMultiSize;
private final ZKClientConfig zkClientConfig;

/**
* See {@link #connect(Configuration, String, Watcher, String)}
* See {@link #connect(Configuration, String, Watcher, String, ZKClientConfig)}.
*/
public static RecoverableZooKeeper connect(Configuration conf, Watcher watcher)
throws IOException {
String ensemble = ZKConfig.getZKQuorumServersString(conf);
return connect(conf, ensemble, watcher);
}

/**
* See {@link #connect(Configuration, String, Watcher, String)}
*/
public static RecoverableZooKeeper connect(Configuration conf, String ensemble, Watcher watcher)
throws IOException {
return connect(conf, ensemble, watcher, null);
return connect(conf, ensemble, watcher, null, null);
}

/**
* Creates a new connection to ZooKeeper, pulling settings and ensemble config from the specified
* configuration object using methods from {@link ZKConfig}. Sets the connection status monitoring
* watcher to the specified watcher.
* @param conf configuration to pull ensemble and other settings from
* @param watcher watcher to monitor connection changes
* @param ensemble ZooKeeper servers quorum string
* @param identifier value used to identify this client instance.
* @param conf configuration to pull ensemble and other settings from
* @param watcher watcher to monitor connection changes
* @param ensemble ZooKeeper servers quorum string
* @param identifier value used to identify this client instance.
* @param zkClientConfig client specific configurations for this instance
* @return connection to zookeeper
* @throws IOException if unable to connect to zk or config problem
*/
public static RecoverableZooKeeper connect(Configuration conf, String ensemble, Watcher watcher,
final String identifier) throws IOException {
final String identifier, ZKClientConfig zkClientConfig) throws IOException {
if (ensemble == null) {
throw new IOException("Unable to determine ZooKeeper ensemble");
}
Expand All @@ -122,14 +120,12 @@ public static RecoverableZooKeeper connect(Configuration conf, String ensemble,
int maxSleepTime = conf.getInt("zookeeper.recovery.retry.maxsleeptime", 60000);
int multiMaxSize = conf.getInt("zookeeper.multi.max.size", 1024 * 1024);
return new RecoverableZooKeeper(ensemble, timeout, watcher, retry, retryIntervalMillis,
maxSleepTime, identifier, multiMaxSize);
maxSleepTime, identifier, multiMaxSize, zkClientConfig);
}

@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DE_MIGHT_IGNORE",
justification = "None. Its always been this way.")
public RecoverableZooKeeper(String quorumServers, int sessionTimeout, Watcher watcher,
int maxRetries, int retryIntervalMillis, int maxSleepTime, String identifier, int maxMultiSize)
throws IOException {
RecoverableZooKeeper(String quorumServers, int sessionTimeout, Watcher watcher, int maxRetries,
int retryIntervalMillis, int maxSleepTime, String identifier, int maxMultiSize,
ZKClientConfig zkClientConfig) throws IOException {
// TODO: Add support for zk 'chroot'; we don't add it to the quorumServers String as we should.
this.retryCounterFactory =
new RetryCounterFactory(maxRetries + 1, retryIntervalMillis, maxSleepTime);
Expand All @@ -147,12 +143,7 @@ public RecoverableZooKeeper(String quorumServers, int sessionTimeout, Watcher wa
this.sessionTimeout = sessionTimeout;
this.quorumServers = quorumServers;
this.maxMultiSize = maxMultiSize;

try {
checkZk();
} catch (Exception x) {
/* ignore */
}
this.zkClientConfig = zkClientConfig;
}

/**
Expand All @@ -171,10 +162,10 @@ public int getMaxMultiSizeLimit() {
* @return The created ZooKeeper connection object
* @throws KeeperException if a ZooKeeper operation fails
*/
protected synchronized ZooKeeper checkZk() throws KeeperException {
private synchronized ZooKeeper checkZk() throws KeeperException {
if (this.zk == null) {
try {
this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher);
this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher, zkClientConfig);
} catch (IOException ex) {
LOG.warn("Unable to create ZooKeeper Connection", ex);
throw new KeeperException.OperationTimeoutException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,8 @@ public ZKWatcher(Configuration conf, String identifier, Abortable abortable,
this.abortable = abortable;
this.znodePaths = new ZNodePaths(conf);
PendingWatcher pendingWatcher = new PendingWatcher();
this.recoverableZooKeeper =
RecoverableZooKeeper.connect(conf, quorum, pendingWatcher, identifier);
this.recoverableZooKeeper = RecoverableZooKeeper.connect(conf, quorum, pendingWatcher,
identifier, ZKConfig.getZKClientConfig(conf));
pendingWatcher.prepare(this);
if (canCreateBaseZNode) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void testSetDataVersionMismatchInLoop() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
ZKWatcher zkw = new ZKWatcher(conf, "testSetDataVersionMismatchInLoop", abortable, true);
String ensemble = ZKConfig.getZKQuorumServersString(conf);
RecoverableZooKeeper rzk = RecoverableZooKeeper.connect(conf, ensemble, zkw);
RecoverableZooKeeper rzk = RecoverableZooKeeper.connect(conf, ensemble, zkw, null, null);
rzk.create(znode, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
rzk.setData(znode, Bytes.toBytes("OPENING"), 0);
Field zkField = RecoverableZooKeeper.class.getDeclaredField("zk");
Expand Down