Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] Changing sessionWather as a immutable nullable #18446

Merged
merged 2 commits into from
Nov 14, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public class ZKMetadataStore extends AbstractBatchedMetadataStore
private final MetadataStoreConfig metadataStoreConfig;
private final boolean isZkManaged;
private final ZooKeeper zkc;
private Optional<ZKSessionWatcher> sessionWatcher;
private final ZKSessionWatcher sessionWatcher;

public ZKMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConfig, boolean enableSessionWatcher)
throws MetadataStoreException {
Expand All @@ -96,23 +96,25 @@ public ZKMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConf
.connectRetryPolicy(new BoundExponentialBackoffRetryPolicy(100, 60_000, Integer.MAX_VALUE))
.allowReadOnlyMode(metadataStoreConfig.isAllowReadOnlyOperations())
.sessionTimeoutMs(metadataStoreConfig.getSessionTimeoutMillis())
.watchers(Collections.singleton(event -> {
if (sessionWatcher != null) {
sessionWatcher.ifPresent(sw -> executor.execute(() -> sw.process(event)));
}
}))
.watchers(Collections.singleton(this::processSessionWatcher))
.build();
zkc.addWatch("/", this::handleWatchEvent, AddWatchMode.PERSISTENT_RECURSIVE);
if (enableSessionWatcher) {
sessionWatcher = Optional.of(new ZKSessionWatcher(zkc, this::receivedSessionEvent));
sessionWatcher = new ZKSessionWatcher(zkc, this::receivedSessionEvent);
} else {
sessionWatcher = Optional.empty();
sessionWatcher = null;
}
zkc.addWatch("/", this::handleWatchEvent, AddWatchMode.PERSISTENT_RECURSIVE);
} catch (Throwable t) {
throw new MetadataStoreException(t);
}
}

private void processSessionWatcher(WatchedEvent event) {
if (sessionWatcher != null) {
executor.execute(() -> sessionWatcher.process(event));
}
}

@VisibleForTesting
@SneakyThrows
public ZKMetadataStore(ZooKeeper zkc) {
Expand All @@ -129,7 +131,7 @@ public ZKMetadataStore(ZooKeeper zkc, MetadataStoreConfig config) {
this.metadataStoreConfig = null;
this.isZkManaged = false;
this.zkc = zkc;
this.sessionWatcher = Optional.of(new ZKSessionWatcher(zkc, this::receivedSessionEvent));
this.sessionWatcher = new ZKSessionWatcher(zkc, this::receivedSessionEvent);
zkc.addWatch("/", this::handleWatchEvent, AddWatchMode.PERSISTENT_RECURSIVE);
}

Expand All @@ -143,9 +145,11 @@ protected void receivedSessionEvent(SessionEvent event) {
super.receivedSessionEvent(event);
} else {
log.error("Failed to recreate persistent watch on ZooKeeper: {}", Code.get(rc));
sessionWatcher.ifPresent(ZKSessionWatcher::setSessionInvalid);
// On the reconnectable client, mark the session as expired to trigger a new reconnect and
// we will have the chance to set the watch again.
if (sessionWatcher != null) {
sessionWatcher.setSessionInvalid();
}
// On the re-connectable client, mark the session as expired to trigger a new reconnect,
// and we will have the chance to set the watch again.
if (zkc instanceof PulsarZooKeeperClient) {
((PulsarZooKeeperClient) zkc).process(
new WatchedEvent(Watcher.Event.EventType.None,
Expand Down Expand Up @@ -423,8 +427,8 @@ public void close() throws Exception {
if (isZkManaged) {
zkc.close();
}
if (sessionWatcher.isPresent()) {
sessionWatcher.get().close();
if (sessionWatcher != null) {
sessionWatcher.close();
}
super.close();
}
Expand Down