Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-28464: Make replication ZKWatcher config customizable in extens… #5785

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.zookeeper.KeeperException.AuthFailedException;
import org.apache.zookeeper.KeeperException.ConnectionLossException;
import org.apache.zookeeper.KeeperException.SessionExpiredException;
import org.apache.zookeeper.client.ZKClientConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -193,12 +194,16 @@ private void reloadZkWatcher() throws IOException {
if (zkw != null) {
zkw.close();
}
zkw =
new ZKWatcher(ctx.getConfiguration(), "connection to cluster: " + ctx.getPeerId(), this);
zkw = new ZKWatcher(ctx.getConfiguration(), "connection to cluster: " + ctx.getPeerId(), this,
getZKClientConfig());
zkw.registerListener(new PeerRegionServerListener(this));
}
}

protected ZKClientConfig getZKClientConfig() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better add some comments here to describe why we want to add this method? It is not used directly in the hbase code base, so maybe later other developers may decided to remove it...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing it out, let me add an explanation.

return new ZKClientConfig();
}

private void connectPeerCluster() throws IOException {
try {
conn = createConnection(this.conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooKeeper.States;
import org.apache.zookeeper.client.ZKClientConfig;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.proto.CreateRequest;
Expand Down Expand Up @@ -79,6 +80,7 @@ public class RecoverableZooKeeper {
private final int sessionTimeout;
private final String quorumServers;
private final int maxMultiSize;
private final ZKClientConfig zkConfig;

/**
* See {@link #connect(Configuration, String, Watcher, String)}
Expand Down Expand Up @@ -110,6 +112,23 @@ public static RecoverableZooKeeper connect(Configuration conf, String ensemble,
*/
public static RecoverableZooKeeper connect(Configuration conf, String ensemble, Watcher watcher,
final String identifier) throws IOException {
return connect(conf, ensemble, watcher, identifier, null);
}

/**
* Creates a new connection to ZooKeeper, pulling settings and ensemble config from the specified
* configuration object using methods from {@link ZKConfig}. Sets the connection status monitoring
* watcher to the specified watcher.
* @param conf configuration to pull ensemble and other settings from
* @param watcher watcher to monitor connection changes
* @param ensemble ZooKeeper servers quorum string
* @param identifier value used to identify this client instance.
* @param zkConfig zkClientConfig to use for the connection, can be null
* @return connection to zookeeper
* @throws IOException if unable to connect to zk or config problem
*/
public static RecoverableZooKeeper connect(Configuration conf, String ensemble, Watcher watcher,
final String identifier, ZKClientConfig zkConfig) throws IOException {
if (ensemble == null) {
throw new IOException("Unable to determine ZooKeeper ensemble");
}
Expand All @@ -122,13 +141,14 @@ public static RecoverableZooKeeper connect(Configuration conf, String ensemble,
int maxSleepTime = conf.getInt("zookeeper.recovery.retry.maxsleeptime", 60000);
int multiMaxSize = conf.getInt("zookeeper.multi.max.size", 1024 * 1024);
return new RecoverableZooKeeper(ensemble, timeout, watcher, retry, retryIntervalMillis,
maxSleepTime, identifier, multiMaxSize);
maxSleepTime, identifier, multiMaxSize, zkConfig);
}

@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DE_MIGHT_IGNORE",
justification = "None. Its always been this way.")
public RecoverableZooKeeper(String quorumServers, int sessionTimeout, Watcher watcher,
int maxRetries, int retryIntervalMillis, int maxSleepTime, String identifier, int maxMultiSize)
int maxRetries, int retryIntervalMillis, int maxSleepTime, String identifier, int maxMultiSize,
ZKClientConfig zkConfig)
throws IOException {
// TODO: Add support for zk 'chroot'; we don't add it to the quorumServers String as we should.
this.retryCounterFactory =
Expand All @@ -147,6 +167,7 @@ public RecoverableZooKeeper(String quorumServers, int sessionTimeout, Watcher wa
this.sessionTimeout = sessionTimeout;
this.quorumServers = quorumServers;
this.maxMultiSize = maxMultiSize;
this.zkConfig = zkConfig;

try {
checkZk();
Expand Down Expand Up @@ -174,7 +195,7 @@ public int getMaxMultiSizeLimit() {
protected synchronized ZooKeeper checkZk() throws KeeperException {
if (this.zk == null) {
try {
this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher);
this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher, zkConfig);
} catch (IOException ex) {
LOG.warn("Unable to create ZooKeeper Connection", ex);
throw new KeeperException.OperationTimeoutException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooDefs.Perms;
import org.apache.zookeeper.client.ZKClientConfig;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
Expand Down Expand Up @@ -120,6 +121,20 @@ public ZKWatcher(Configuration conf, String identifier, Abortable abortable)
this(conf, identifier, abortable, false);
}

/**
* Instantiate a ZooKeeper connection and watcher.
* @param identifier string that is passed to RecoverableZookeeper to be used as identifier for
* this instance. Use null for default.
* @param zkConfig zkClientConfig to use for the connection, can be null
* @throws IOException if the connection to ZooKeeper fails
* @throws ZooKeeperConnectionException if the client can't connect to ZooKeeper
*/
public ZKWatcher(Configuration conf, String identifier, Abortable abortable,
ZKClientConfig zkConfig)
throws ZooKeeperConnectionException, IOException {
this(conf, identifier, abortable, false, zkConfig);
}

/**
* Instantiate a ZooKeeper connection and watcher.
* @param conf the configuration to use
Expand All @@ -133,7 +148,24 @@ public ZKWatcher(Configuration conf, String identifier, Abortable abortable)
*/
public ZKWatcher(Configuration conf, String identifier, Abortable abortable,
boolean canCreateBaseZNode) throws IOException, ZooKeeperConnectionException {
this(conf, identifier, abortable, canCreateBaseZNode, false);
this(conf, identifier, abortable, canCreateBaseZNode, false, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

}

/**
* Instantiate a ZooKeeper connection and watcher.
* @param conf the configuration to use
* @param identifier string that is passed to RecoverableZookeeper to be used as
* identifier for this instance. Use null for default.
* @param abortable Can be null if there is on error there is no host to abort: e.g.
* client context.
* @param canCreateBaseZNode true if a base ZNode can be created
* @param zkConfig zkClientConfig to use for the connection, can be null
* @throws IOException if the connection to ZooKeeper fails
* @throws ZooKeeperConnectionException if the client can't connect to ZooKeeper
*/
public ZKWatcher(Configuration conf, String identifier, Abortable abortable,
boolean canCreateBaseZNode, ZKClientConfig zkConfig) throws IOException, ZooKeeperConnectionException {
this(conf, identifier, abortable, canCreateBaseZNode, false, zkConfig);
}

/**
Expand All @@ -151,6 +183,25 @@ public ZKWatcher(Configuration conf, String identifier, Abortable abortable,
*/
public ZKWatcher(Configuration conf, String identifier, Abortable abortable,
boolean canCreateBaseZNode, boolean clientZK) throws IOException, ZooKeeperConnectionException {
this(conf, identifier, abortable, canCreateBaseZNode, clientZK, null);
}

/**
* Instantiate a ZooKeeper connection and watcher.
* @param conf the configuration to use
* @param identifier string that is passed to RecoverableZookeeper to be used as
* identifier for this instance. Use null for default.
* @param abortable Can be null if there is on error there is no host to abort: e.g.
* client context.
* @param canCreateBaseZNode true if a base ZNode can be created
* @param clientZK whether this watcher is set to access client ZK
* @param zkConfig zkClientConfig to use for the connection, can be null
* @throws IOException if the connection to ZooKeeper fails
* @throws ZooKeeperConnectionException if the connection to Zookeeper fails when create base
* ZNodes
*/
public ZKWatcher(Configuration conf, String identifier, Abortable abortable,
boolean canCreateBaseZNode, boolean clientZK, ZKClientConfig zkConfig) throws IOException, ZooKeeperConnectionException {
this.conf = conf;
if (clientZK) {
String clientZkQuorumServers = ZKConfig.getClientZKQuorumServersString(conf);
Expand All @@ -177,7 +228,7 @@ public ZKWatcher(Configuration conf, String identifier, Abortable abortable,
this.znodePaths = new ZNodePaths(conf);
PendingWatcher pendingWatcher = new PendingWatcher();
this.recoverableZooKeeper =
RecoverableZooKeeper.connect(conf, quorum, pendingWatcher, identifier);
RecoverableZooKeeper.connect(conf, quorum, pendingWatcher, identifier, zkConfig);
pendingWatcher.prepare(this);
if (canCreateBaseZNode) {
try {
Expand Down