Skip to content

Commit

Permalink
ZOOKEEPER-837: Eliminate cycle dependency between ClientCnxn and ZooK…
Browse files Browse the repository at this point in the history
…eeper

1. Extract ZKWatchManager to single file
2. Move ZKWatchManager instance to ClientCnxn in order to eliminate cycle dependency
3. let `ZooKeeper` syncs a copy of default watcher, in order to reduce dependencies to `getWatchManager()`

Author: tison <[email protected]>

Reviewers: Enrico Olivelli <[email protected]>, Andor Molnar <[email protected]>

Closes apache#1095 from TisonKun/ZOOKEEPER-837
  • Loading branch information
tisonkun authored and RokLenarcic committed Aug 31, 2022
1 parent 1857e98 commit caacfea
Show file tree
Hide file tree
Showing 9 changed files with 651 additions and 641 deletions.
110 changes: 50 additions & 60 deletions zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,11 @@ static class AuthData {

private final int sessionTimeout;

private final ZooKeeper zooKeeper;

private final ClientWatchManager watcher;
private final ZKWatchManager watchManager;

private long sessionId;

private byte[] sessionPasswd = new byte[16];
private byte[] sessionPasswd;

/**
* If true, the connection is allowed to go to r-o mode. This field's value
Expand Down Expand Up @@ -224,6 +222,10 @@ static class AuthData {
*/
private long requestTimeout;

ZKWatchManager getWatcherManager() {
return watchManager;
}

public long getSessionId() {
return sessionId;
}
Expand Down Expand Up @@ -362,35 +364,29 @@ public String toString() {
* established until needed. The start() instance method must be called
* subsequent to construction.
*
* @param chrootPath - the chroot of this client. Should be removed from this Class in ZOOKEEPER-838
* @param hostProvider
* the list of ZooKeeper servers to connect to
* @param sessionTimeout
* the timeout for connections.
* @param zooKeeper
* the zookeeper object that this connection is related to.
* @param watcher watcher for this connection
* @param clientCnxnSocket
* the socket implementation used (e.g. NIO/Netty)
* @param canBeReadOnly
* whether the connection is allowed to go to read-only
* mode in case of partitioning
* @throws IOException
* @param chrootPath the chroot of this client. Should be removed from this Class in ZOOKEEPER-838
* @param hostProvider the list of ZooKeeper servers to connect to
* @param sessionTimeout the timeout for connections.
* @param clientConfig the client configuration.
* @param defaultWatcher default watcher for this connection
* @param clientCnxnSocket the socket implementation used (e.g. NIO/Netty)
* @param canBeReadOnly whether the connection is allowed to go to read-only mode in case of partitioning
*/
public ClientCnxn(
String chrootPath,
HostProvider hostProvider,
int sessionTimeout,
ZooKeeper zooKeeper,
ClientWatchManager watcher,
ZKClientConfig clientConfig,
Watcher defaultWatcher,
ClientCnxnSocket clientCnxnSocket,
boolean canBeReadOnly) throws IOException {
boolean canBeReadOnly
) throws IOException {
this(
chrootPath,
hostProvider,
sessionTimeout,
zooKeeper,
watcher,
clientConfig,
defaultWatcher,
clientCnxnSocket,
0,
new byte[16],
Expand All @@ -402,48 +398,45 @@ public ClientCnxn(
* established until needed. The start() instance method must be called
* subsequent to construction.
*
* @param chrootPath - the chroot of this client. Should be removed from this Class in ZOOKEEPER-838
* @param hostProvider
* the list of ZooKeeper servers to connect to
* @param sessionTimeout
* the timeout for connections.
* @param zooKeeper
* the zookeeper object that this connection is related to.
* @param watcher watcher for this connection
* @param clientCnxnSocket
* the socket implementation used (e.g. NIO/Netty)
* @param chrootPath the chroot of this client. Should be removed from this Class in ZOOKEEPER-838
* @param hostProvider the list of ZooKeeper servers to connect to
* @param sessionTimeout the timeout for connections.
* @param clientConfig the client configuration.
* @param defaultWatcher default watcher for this connection
* @param clientCnxnSocket the socket implementation used (e.g. NIO/Netty)
* @param sessionId session id if re-establishing session
* @param sessionPasswd session passwd if re-establishing session
* @param canBeReadOnly
* whether the connection is allowed to go to read-only
* mode in case of partitioning
* @param canBeReadOnly whether the connection is allowed to go to read-only mode in case of partitioning
* @throws IOException in cases of broken network
*/
public ClientCnxn(
String chrootPath,
HostProvider hostProvider,
int sessionTimeout,
ZooKeeper zooKeeper,
ClientWatchManager watcher,
ZKClientConfig clientConfig,
Watcher defaultWatcher,
ClientCnxnSocket clientCnxnSocket,
long sessionId,
byte[] sessionPasswd,
boolean canBeReadOnly) throws IOException {
this.zooKeeper = zooKeeper;
this.watcher = watcher;
boolean canBeReadOnly
) throws IOException {
this.chrootPath = chrootPath;
this.hostProvider = hostProvider;
this.sessionTimeout = sessionTimeout;
this.clientConfig = clientConfig;
this.sessionId = sessionId;
this.sessionPasswd = sessionPasswd;
this.sessionTimeout = sessionTimeout;
this.hostProvider = hostProvider;
this.chrootPath = chrootPath;
this.readOnly = canBeReadOnly;

this.watchManager = new ZKWatchManager(
clientConfig.getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET),
defaultWatcher);

connectTimeout = sessionTimeout / hostProvider.size();
readTimeout = sessionTimeout * 2 / 3;
readOnly = canBeReadOnly;
this.connectTimeout = sessionTimeout / hostProvider.size();
this.readTimeout = sessionTimeout * 2 / 3;

sendThread = new SendThread(clientCnxnSocket);
eventThread = new EventThread();
this.clientConfig = zooKeeper.getClientConfig();
this.sendThread = new SendThread(clientCnxnSocket);
this.eventThread = new EventThread();
initRequestTimeout();
}

Expand Down Expand Up @@ -506,10 +499,9 @@ private void queueEvent(WatchedEvent event, Set<Watcher> materializedWatchers) {
final Set<Watcher> watchers;
if (materializedWatchers == null) {
// materialize the watchers based on the event
watchers = watcher.materialize(event.getState(), event.getType(), event.getPath());
watchers = watchManager.materialize(event.getState(), event.getType(), event.getPath());
} else {
watchers = new HashSet<Watcher>();
watchers.addAll(materializedWatchers);
watchers = new HashSet<>(materializedWatchers);
}
WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event);
// queue the pair (watch set & event) for later processing
Expand Down Expand Up @@ -1007,14 +999,12 @@ void primeConnection() throws IOException {
ConnectRequest conReq = new ConnectRequest(0, lastZxid, sessionTimeout, sessId, sessionPasswd);
// We add backwards since we are pushing into the front
// Only send if there's a pending watch
// TODO: here we have the only remaining use of zooKeeper in
// this class. It's to be eliminated!
if (!clientConfig.getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET)) {
List<String> dataWatches = zooKeeper.getDataWatches();
List<String> existWatches = zooKeeper.getExistWatches();
List<String> childWatches = zooKeeper.getChildWatches();
List<String> persistentWatches = zooKeeper.getPersistentWatches();
List<String> persistentRecursiveWatches = zooKeeper.getPersistentRecursiveWatches();
List<String> dataWatches = watchManager.getDataWatchList();
List<String> existWatches = watchManager.getExistWatchList();
List<String> childWatches = watchManager.getChildWatchList();
List<String> persistentWatches = watchManager.getPersistentWatchList();
List<String> persistentRecursiveWatches = watchManager.getPersistentRecursiveWatchList();
if (!dataWatches.isEmpty() || !existWatches.isEmpty() || !childWatches.isEmpty()
|| !persistentWatches.isEmpty() || !persistentRecursiveWatches.isEmpty()) {
Iterator<String> dataWatchesIter = prependChroot(dataWatches).iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.Set;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.WatcherType;
import org.apache.zookeeper.ZooKeeper.ZKWatchManager;

/**
* Handles the special case of removing watches which has registered for a
Expand Down
Loading

0 comments on commit caacfea

Please sign in to comment.