-
Notifications
You must be signed in to change notification settings - Fork 3.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HBASE-28464: Make replication ZKWatcher config customizable in extens… #5785
base: master
Are you sure you want to change the base?
Changes from 3 commits
d4e614a
21e3c53
eeec72f
5f34949
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,6 +41,7 @@ | |
import org.apache.zookeeper.ZooDefs; | ||
import org.apache.zookeeper.ZooKeeper; | ||
import org.apache.zookeeper.ZooKeeper.States; | ||
import org.apache.zookeeper.client.ZKClientConfig; | ||
import org.apache.zookeeper.data.ACL; | ||
import org.apache.zookeeper.data.Stat; | ||
import org.apache.zookeeper.proto.CreateRequest; | ||
|
@@ -79,6 +80,7 @@ public class RecoverableZooKeeper { | |
private final int sessionTimeout; | ||
private final String quorumServers; | ||
private final int maxMultiSize; | ||
private final ZKClientConfig zkConfig; | ||
|
||
/** | ||
* See {@link #connect(Configuration, String, Watcher, String)} | ||
|
@@ -110,6 +112,23 @@ public static RecoverableZooKeeper connect(Configuration conf, String ensemble, | |
*/ | ||
public static RecoverableZooKeeper connect(Configuration conf, String ensemble, Watcher watcher, | ||
final String identifier) throws IOException { | ||
return connect(conf, ensemble, watcher, identifier, null); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why passing null instead of create an empty ZKClientConfig? In the above protected method, we create an empty ZKClientConfig instance... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Technically it would not make a difference, because the ZooKeeper code we end up calling would just create a new ZKClientConfig when a null is passed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
} | ||
|
||
/** | ||
* Creates a new connection to ZooKeeper, pulling settings and ensemble config from the specified | ||
* configuration object using methods from {@link ZKConfig}. Sets the connection status monitoring | ||
* watcher to the specified watcher. | ||
* @param conf configuration to pull ensemble and other settings from | ||
* @param watcher watcher to monitor connection changes | ||
* @param ensemble ZooKeeper servers quorum string | ||
* @param identifier value used to identify this client instance. | ||
* @param zkConfig zkClientConfig to use for the connection, can be null | ||
* @return connection to zookeeper | ||
* @throws IOException if unable to connect to zk or config problem | ||
*/ | ||
public static RecoverableZooKeeper connect(Configuration conf, String ensemble, Watcher watcher, | ||
final String identifier, ZKClientConfig zkConfig) throws IOException { | ||
if (ensemble == null) { | ||
throw new IOException("Unable to determine ZooKeeper ensemble"); | ||
} | ||
|
@@ -122,14 +141,14 @@ public static RecoverableZooKeeper connect(Configuration conf, String ensemble, | |
int maxSleepTime = conf.getInt("zookeeper.recovery.retry.maxsleeptime", 60000); | ||
int multiMaxSize = conf.getInt("zookeeper.multi.max.size", 1024 * 1024); | ||
return new RecoverableZooKeeper(ensemble, timeout, watcher, retry, retryIntervalMillis, | ||
maxSleepTime, identifier, multiMaxSize); | ||
maxSleepTime, identifier, multiMaxSize, zkConfig); | ||
} | ||
|
||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DE_MIGHT_IGNORE", | ||
justification = "None. Its always been this way.") | ||
public RecoverableZooKeeper(String quorumServers, int sessionTimeout, Watcher watcher, | ||
int maxRetries, int retryIntervalMillis, int maxSleepTime, String identifier, int maxMultiSize) | ||
throws IOException { | ||
int maxRetries, int retryIntervalMillis, int maxSleepTime, String identifier, int maxMultiSize, | ||
ZKClientConfig zkConfig) throws IOException { | ||
// TODO: Add support for zk 'chroot'; we don't add it to the quorumServers String as we should. | ||
this.retryCounterFactory = | ||
new RetryCounterFactory(maxRetries + 1, retryIntervalMillis, maxSleepTime); | ||
|
@@ -147,6 +166,7 @@ public RecoverableZooKeeper(String quorumServers, int sessionTimeout, Watcher wa | |
this.sessionTimeout = sessionTimeout; | ||
this.quorumServers = quorumServers; | ||
this.maxMultiSize = maxMultiSize; | ||
this.zkConfig = zkConfig; | ||
|
||
try { | ||
checkZk(); | ||
|
@@ -174,7 +194,7 @@ public int getMaxMultiSizeLimit() { | |
protected synchronized ZooKeeper checkZk() throws KeeperException { | ||
if (this.zk == null) { | ||
try { | ||
this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher); | ||
this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher, zkConfig); | ||
} catch (IOException ex) { | ||
LOG.warn("Unable to create ZooKeeper Connection", ex); | ||
throw new KeeperException.OperationTimeoutException(); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -44,6 +44,7 @@ | |
import org.apache.zookeeper.Watcher; | ||
import org.apache.zookeeper.ZooDefs.Ids; | ||
import org.apache.zookeeper.ZooDefs.Perms; | ||
import org.apache.zookeeper.client.ZKClientConfig; | ||
import org.apache.zookeeper.data.ACL; | ||
import org.apache.zookeeper.data.Id; | ||
import org.apache.zookeeper.data.Stat; | ||
|
@@ -120,6 +121,19 @@ public ZKWatcher(Configuration conf, String identifier, Abortable abortable) | |
this(conf, identifier, abortable, false); | ||
} | ||
|
||
/** | ||
* Instantiate a ZooKeeper connection and watcher. | ||
* @param identifier string that is passed to RecoverableZookeeper to be used as identifier for | ||
* this instance. Use null for default. | ||
* @param zkConfig zkClientConfig to use for the connection, can be null | ||
* @throws IOException if the connection to ZooKeeper fails | ||
* @throws ZooKeeperConnectionException if the client can't connect to ZooKeeper | ||
*/ | ||
public ZKWatcher(Configuration conf, String identifier, Abortable abortable, | ||
ZKClientConfig zkConfig) throws ZooKeeperConnectionException, IOException { | ||
this(conf, identifier, abortable, false, zkConfig); | ||
} | ||
|
||
/** | ||
* Instantiate a ZooKeeper connection and watcher. | ||
* @param conf the configuration to use | ||
|
@@ -133,7 +147,25 @@ public ZKWatcher(Configuration conf, String identifier, Abortable abortable) | |
*/ | ||
public ZKWatcher(Configuration conf, String identifier, Abortable abortable, | ||
boolean canCreateBaseZNode) throws IOException, ZooKeeperConnectionException { | ||
this(conf, identifier, abortable, canCreateBaseZNode, false); | ||
this(conf, identifier, abortable, canCreateBaseZNode, false, null); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ditto. |
||
} | ||
|
||
/** | ||
* Instantiate a ZooKeeper connection and watcher. | ||
* @param conf the configuration to use | ||
* @param identifier string that is passed to RecoverableZookeeper to be used as | ||
* identifier for this instance. Use null for default. | ||
* @param abortable Can be null if there is on error there is no host to abort: e.g. | ||
* client context. | ||
* @param canCreateBaseZNode true if a base ZNode can be created | ||
* @param zkConfig zkClientConfig to use for the connection, can be null | ||
* @throws IOException if the connection to ZooKeeper fails | ||
* @throws ZooKeeperConnectionException if the client can't connect to ZooKeeper | ||
*/ | ||
public ZKWatcher(Configuration conf, String identifier, Abortable abortable, | ||
boolean canCreateBaseZNode, ZKClientConfig zkConfig) | ||
throws IOException, ZooKeeperConnectionException { | ||
this(conf, identifier, abortable, canCreateBaseZNode, false, zkConfig); | ||
} | ||
|
||
/** | ||
|
@@ -151,6 +183,26 @@ public ZKWatcher(Configuration conf, String identifier, Abortable abortable, | |
*/ | ||
public ZKWatcher(Configuration conf, String identifier, Abortable abortable, | ||
boolean canCreateBaseZNode, boolean clientZK) throws IOException, ZooKeeperConnectionException { | ||
this(conf, identifier, abortable, canCreateBaseZNode, clientZK, null); | ||
} | ||
|
||
/** | ||
* Instantiate a ZooKeeper connection and watcher. | ||
* @param conf the configuration to use | ||
* @param identifier string that is passed to RecoverableZookeeper to be used as | ||
* identifier for this instance. Use null for default. | ||
* @param abortable Can be null if there is on error there is no host to abort: e.g. | ||
* client context. | ||
* @param canCreateBaseZNode true if a base ZNode can be created | ||
* @param clientZK whether this watcher is set to access client ZK | ||
* @param zkConfig zkClientConfig to use for the connection, can be null | ||
* @throws IOException if the connection to ZooKeeper fails | ||
* @throws ZooKeeperConnectionException if the connection to Zookeeper fails when create base | ||
* ZNodes | ||
*/ | ||
public ZKWatcher(Configuration conf, String identifier, Abortable abortable, | ||
boolean canCreateBaseZNode, boolean clientZK, ZKClientConfig zkConfig) | ||
throws IOException, ZooKeeperConnectionException { | ||
this.conf = conf; | ||
if (clientZK) { | ||
String clientZkQuorumServers = ZKConfig.getClientZKQuorumServersString(conf); | ||
|
@@ -177,7 +229,7 @@ public ZKWatcher(Configuration conf, String identifier, Abortable abortable, | |
this.znodePaths = new ZNodePaths(conf); | ||
PendingWatcher pendingWatcher = new PendingWatcher(); | ||
this.recoverableZooKeeper = | ||
RecoverableZooKeeper.connect(conf, quorum, pendingWatcher, identifier); | ||
RecoverableZooKeeper.connect(conf, quorum, pendingWatcher, identifier, zkConfig); | ||
pendingWatcher.prepare(this); | ||
if (canCreateBaseZNode) { | ||
try { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better add some comments here to describe why we want to add this method? It is not used directly in the hbase code base, so maybe later other developers may decided to remove it...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing it out, let me add an explanation.