-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[improve][broker] Changing sessionWather as a immutable nullable #18446
Conversation
Codecov Report
@@ Coverage Diff @@
## master #18446 +/- ##
============================================
+ Coverage 45.61% 47.68% +2.06%
+ Complexity 10728 9315 -1413
============================================
Files 752 617 -135
Lines 72521 58469 -14052
Branches 7791 6089 -1702
============================================
- Hits 33083 27879 -5204
+ Misses 35769 27585 -8184
+ Partials 3669 3005 -664
Flags with carried forward coverage won't be shown. Click here to find out more.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From my perspective, we can avoid Optional
field and use an immutable nullable field instead:
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index bf607f889f..f1f5f01640 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -76,7 +76,7 @@ public class ZKMetadataStore extends AbstractBatchedMetadataStore
private final MetadataStoreConfig metadataStoreConfig;
private final boolean isZkManaged;
private final ZooKeeper zkc;
- private Optional<ZKSessionWatcher> sessionWatcher;
+ private final ZKSessionWatcher sessionWatcher;
public ZKMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConfig, boolean enableSessionWatcher)
throws MetadataStoreException {
@@ -96,23 +96,25 @@ public class ZKMetadataStore extends AbstractBatchedMetadataStore
.connectRetryPolicy(new BoundExponentialBackoffRetryPolicy(100, 60_000, Integer.MAX_VALUE))
.allowReadOnlyMode(metadataStoreConfig.isAllowReadOnlyOperations())
.sessionTimeoutMs(metadataStoreConfig.getSessionTimeoutMillis())
- .watchers(Collections.singleton(event -> {
- if (sessionWatcher != null) {
- sessionWatcher.ifPresent(sw -> executor.execute(() -> sw.process(event)));
- }
- }))
+ .watchers(Collections.singleton(this::processSessionWatcher))
.build();
- zkc.addWatch("/", this::handleWatchEvent, AddWatchMode.PERSISTENT_RECURSIVE);
if (enableSessionWatcher) {
- sessionWatcher = Optional.of(new ZKSessionWatcher(zkc, this::receivedSessionEvent));
+ sessionWatcher = new ZKSessionWatcher(zkc, this::receivedSessionEvent);
} else {
- sessionWatcher = Optional.empty();
+ sessionWatcher = null;
}
+ zkc.addWatch("/", this::handleWatchEvent, AddWatchMode.PERSISTENT_RECURSIVE);
} catch (Throwable t) {
throw new MetadataStoreException(t);
}
}
+ private void processSessionWatcher(WatchedEvent event) {
+ if (sessionWatcher != null) {
+ executor.execute(() -> sessionWatcher.process(event));
+ }
+ }
+
@VisibleForTesting
@SneakyThrows
public ZKMetadataStore(ZooKeeper zkc) {
@@ -129,7 +131,7 @@ public class ZKMetadataStore extends AbstractBatchedMetadataStore
this.metadataStoreConfig = null;
this.isZkManaged = false;
this.zkc = zkc;
- this.sessionWatcher = Optional.of(new ZKSessionWatcher(zkc, this::receivedSessionEvent));
+ this.sessionWatcher = new ZKSessionWatcher(zkc, this::receivedSessionEvent);
zkc.addWatch("/", this::handleWatchEvent, AddWatchMode.PERSISTENT_RECURSIVE);
}
@@ -143,9 +145,11 @@ public class ZKMetadataStore extends AbstractBatchedMetadataStore
super.receivedSessionEvent(event);
} else {
log.error("Failed to recreate persistent watch on ZooKeeper: {}", Code.get(rc));
- sessionWatcher.ifPresent(ZKSessionWatcher::setSessionInvalid);
- // On the reconnectable client, mark the session as expired to trigger a new reconnect and
- // we will have the chance to set the watch again.
+ if (sessionWatcher != null) {
+ sessionWatcher.setSessionInvalid();
+ }
+ // On the reconnect-able client, mark the session as expired to trigger a new reconnect,
+ // and we will have the chance to set the watch again.
if (zkc instanceof PulsarZooKeeperClient) {
((PulsarZooKeeperClient) zkc).process(
new WatchedEvent(Watcher.Event.EventType.None,
@@ -423,8 +427,8 @@ public class ZKMetadataStore extends AbstractBatchedMetadataStore
if (isZkManaged) {
zkc.close();
}
- if (sessionWatcher.isPresent()) {
- sessionWatcher.get().close();
+ if (sessionWatcher != null) {
+ sessionWatcher.close();
}
super.close();
}
Great. immutable nullable is better. I have updated. PTAL @tisonkun |
Thanks for your contribution @AnonHxy! Merging... |
Motivation
Remove null-check for sessionWather by initilized as
Optional.empty()
Modifications
Remove null-check for sessionWather by initilized as
Optional.empty()
Verifying this change
Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: AnonHxy#17