-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Removes bindOnLocalhost=boolean. Adds bindAddress and advertisedAddress. #26
Conversation
CLA is valid! |
# Control whether to bind directly on localhost rather than on normal hostname | ||
bindOnLocalhost=false | ||
# Hostname or IP address the service binds on, default is InetAddress.getLocalHost().getHostName(). | ||
bindAddress= |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leaving empty for clarity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be 0.0.0.0 instead of hostname
by default?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess that's a personal preference. Some people like things wide open by default, some do not. I don't mind making it 0.0.0.0
.
bindAddress= | ||
|
||
# Hostname or IP address the service advertises to the outside world. If not set, the value of bindAddress is used. | ||
advertisedAddress= |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leaving empty for clarity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the bind address was 0.0.0.0
, then defaulting to bind address might not work. Should we default to hostname for advertisedAddress
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed in 3745033
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update the comment with "default to hostname"
@@ -5,7 +5,8 @@ brokerServicePort=6650 | |||
brokerServicePortTls=6651 | |||
webServicePort=8080 | |||
webServicePortTls=4443 | |||
bindOnLocalhost=false | |||
bindAddress= |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left empty for clarity.
It might be worth adding |
It isn't clear to me how the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change looks good to me. I think the only part missing is to enforce the bindAddress
settings by have sockets effectively binding on the desired IP/Host. Before, the bindOnLocalhost
was just a trick used for unit tests to advertise localhost
instead of the real hostname, but that was not really affecting the "bind" IP address.
For example, https://github.com/yahoo/pulsar/blob/master/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/web/WebService.java#L87 we'd have to add a connector.setHost()
to use it.
Same thing with the Netty server socket, at https://github.com/yahoo/pulsar/blob/master/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerService.java#L223 we need to specify the bindAddress.
# Control whether to bind directly on localhost rather than on normal hostname | ||
bindOnLocalhost=false | ||
# Hostname or IP address the service binds on, default is InetAddress.getLocalHost().getHostName(). | ||
bindAddress= |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be 0.0.0.0 instead of hostname
by default?
bindAddress= | ||
|
||
# Hostname or IP address the service advertises to the outside world. If not set, the value of bindAddress is used. | ||
advertisedAddress= |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the bind address was 0.0.0.0
, then defaulting to bind address might not work. Should we default to hostname for advertisedAddress
?
bindAddress= | ||
|
||
# Hostname or IP address the service advertises to the outside world. If not set, the value of bindAddress is used. | ||
advertisedAddress= |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Web socket server component doesn't need to advertise itself
@@ -291,12 +301,32 @@ public void setWebServicePortTls(int webServicePortTls) { | |||
this.webServicePortTls = webServicePortTls; | |||
} | |||
|
|||
public boolean isBindOnLocalhost() { | |||
return bindOnLocalhost; | |||
public String getBindAddress() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Preferably, the ServiceConfiguration
should be a simple POJO. This initialization of the bind address is potentially at risk if multiple threads are initializing. It'd be better to return null (or even better Optional<String>
) and have the caller cope with the missing value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was trying to avoid throws
on this method as the caller would have to require try / catch
everywhere where this is used. Unless we can settle on RuntimeException
or IllegalArgumentException
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, scratch that, that's a silly idea. I'll just re-throw the exception and handle where used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed in 3745033
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@merlimat Why would multiple threads try to initialize a single instance of ServiceConfiguration
? Is the service configuration stored somewhere as static
?
LOG.debug("bindAddress not set, attempting default configuration."); | ||
this.setBindAddress(InetAddress.getLocalHost().getHostName()); | ||
} catch (UnknownHostException ex) { | ||
LOG.warn(ex.getMessage(), "Using localhost as bindAddress."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should just re-trow an unchecked exception to have the broker exit if the hostname is invalid. Anyway it will probably not reachable on localhost
.
this.bindAddress = bindAddress; | ||
} | ||
|
||
public String getAdvertisedAddress() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above, we should just return what was configured, and in any case, the fallback here should be on hostname
rather than the bind address.
LOG.error(e.getMessage(), e); | ||
throw new IllegalStateException("failed to find host", e); | ||
} | ||
return config.getBindAddress(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These 2 methods should have the above logic to check whether bindAddress and advertisedAddress were configured.
If you're in the mood, I'm all for it! 😄
Ok, that's a bit mysterious, but there's a |
@merlimat Addressed the comments related to |
@rdhabalia Can you take a look as well? |
// If not set, InetAddress.getLocalHost().getHostName() will be used. | ||
private String bindAddress; | ||
// Controls which hostname is advertised to the discovery service via ZooKeeper. | ||
// If not set, bindAddress is used. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the comment is out of date now, since it defaults to hostname
# Hostname or IP address the service binds on, default is InetAddress.getLocalHost().getHostName(). | ||
bindAddress= | ||
|
||
# Hostname or IP address the service advertises to the outside world. If not set, the value of bindAddress is used. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
default to hostname
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice, thank you!
@merlimat I'm looking at travis, here: https://travis-ci.org/yahoo/pulsar/builds/161469038, I've been noticing similar situation on my machine:
I believe none of my changes are responsible for this, obviously I could be wrong. I do not remember having such earlier today, when I started working on this PR. Do you know what happened in the meantime? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor thing. but looks good to me.
public void setup() throws Exception { | ||
bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, 5001); | ||
bkEnsemble.start(); | ||
ServiceConfiguration config = create(new Properties(System.getProperties())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as we are not loading ServiceConfiguration
from map/properties: it is fine to directly instantiate
ServiceConfiguration config = new ServiceConfiguration()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed.
@radekg I think the test issue might be related to the fact that without specifying the IP at bind time, we were always binding on 0.0.0.0. Now, we're specifying the bind address, by passing the hostname, so 127.0.0.1 will be excluded. There might be few unit tests that weren't passing the |
The default binding is still When running these tests locally, the progress can be seen with |
@rdhabalia that does the job. This is now passing the tests. Thank you! Can we review again? |
I believe there is one more change required, here: https://github.com/yahoo/pulsar/blob/master/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/PersistentTopics.java#L906 |
The failure seems to be one of those intermittent issues. It successfully ran here: https://travis-ci.org/radekg/pulsar/builds/162817512. I guess the build needs to be restarted? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change looks good, thanks for going after all test cases. Just one very minor comment
@@ -565,41 +567,56 @@ public MessagingServiceShutdownHook getShutdownService() { | |||
/** | |||
* Derive the host | |||
* | |||
* @param isBindOnLocalhost | |||
* @return | |||
* @return Hostname or IP address the service binds on. | |||
*/ | |||
public static String host(ServiceConfiguration config) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this host(...)
method could probably be removed now, given that we can directly use config.getBindAddress()
which defaults to a non-null value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@merlimat is it okay to change getHost()
to getBindAddress()
? Also, would you like me to rebase?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure getBindAddress()
sounds good, but we probably don't even need to have the static method, since we could just call config.getBindAddress()
as it is.
For rebasing, I will squash it anyway when merging. I don't expect particular conflicts with current master.
…rService.getHost() to PulsarService.getBindAddress().
Nice work! |
Thank you! |
Add Producer/Consumer/EndToEnd Tests
### Motivation When a broker server is restarted, the Pulsar proxy outputs the following warning: ``` 15:09:01.486 [main-EventThread] INFO o.a.p.z.ZooKeeperChildrenCache - reloadCache called in zookeeperChildrenCache for path /loadbalance/brokers 15:09:31.487 [main-EventThread] WARN o.a.p.p.s.util.ZookeeperCacheLoader - Error updating broker info after broker list changed. java.util.concurrent.TimeoutException: null at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at org.apache.pulsar.zookeeper.ZooKeeperDataCache.get(ZooKeeperDataCache.java:97) at org.apache.pulsar.proxy.server.util.ZookeeperCacheLoader.updateBrokerList(ZookeeperCacheLoader.java:118) at org.apache.pulsar.proxy.server.util.ZookeeperCacheLoader.lambda$new$0(ZookeeperCacheLoader.java:82) at org.apache.pulsar.zookeeper.ZooKeeperChildrenCache.lambda$0(ZooKeeperChildrenCache.java:89) at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.pulsar.zookeeper.ZooKeeperCache.lambda$22(ZooKeeperCache.java:415) at org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:592) at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:508) ``` This is due to a deadlock occurring in the proxy. While this deadlock occurs, `main-EventThread` is stopped in the following states: ``` "main-EventThread" #26 daemon prio=5 os_prio=0 tid=0x00007fa9b55ff000 nid=0x3d98 waiting on condition [0x00007fa923dfc000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000000eef2ad80> (a java.util.concurrent.CompletableFuture$Signaller) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1709) at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1788) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at org.apache.pulsar.zookeeper.ZooKeeperDataCache.get(ZooKeeperDataCache.java:97) at org.apache.pulsar.proxy.server.util.ZookeeperCacheLoader.updateBrokerList(ZookeeperCacheLoader.java:123) at org.apache.pulsar.proxy.server.util.ZookeeperCacheLoader.lambda$new$0(ZookeeperCacheLoader.java:84) at org.apache.pulsar.proxy.server.util.ZookeeperCacheLoader$$Lambda$40/1450652220.onUpdate(Unknown Source) at org.apache.pulsar.zookeeper.ZooKeeperChildrenCache.lambda$0(ZooKeeperChildrenCache.java:89) at org.apache.pulsar.zookeeper.ZooKeeperChildrenCache$$Lambda$365/191748085.accept(Unknown Source) at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.pulsar.zookeeper.ZooKeeperCache.lambda$22(ZooKeeperCache.java:415) at org.apache.pulsar.zookeeper.ZooKeeperCache$$Lambda$46/1819738265.processResult(Unknown Source) at org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:592) at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:508) ``` `ZookeeperCacheLoader` will try to get load information for all available brokers from ZK or its cache if there is a change in the znode `/loadbalance/brokers`. The problem is that it is `main-EventThread` that performs the above process, and it is also `main-EventThread` that gets the data from ZK. `main-EventThread` is blocked at the following line, so deadlock occurs. https://github.com/apache/pulsar/blob/9d3ab60efddd3f4064789293fd8e9c4d4388b6c7/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperDataCache.java#L97 ### Modifications ~Make the method `ZookeeperCacheLoader#updateBrokerList` run on a thread separate from `main-EventThread`.~ Made `updateBrokerList()` method asynchronous so that `main-EventThread` is not blocked.
### Motivation When a broker server is restarted, the Pulsar proxy outputs the following warning: ``` 15:09:01.486 [main-EventThread] INFO o.a.p.z.ZooKeeperChildrenCache - reloadCache called in zookeeperChildrenCache for path /loadbalance/brokers 15:09:31.487 [main-EventThread] WARN o.a.p.p.s.util.ZookeeperCacheLoader - Error updating broker info after broker list changed. java.util.concurrent.TimeoutException: null at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at org.apache.pulsar.zookeeper.ZooKeeperDataCache.get(ZooKeeperDataCache.java:97) at org.apache.pulsar.proxy.server.util.ZookeeperCacheLoader.updateBrokerList(ZookeeperCacheLoader.java:118) at org.apache.pulsar.proxy.server.util.ZookeeperCacheLoader.lambda$new$0(ZookeeperCacheLoader.java:82) at org.apache.pulsar.zookeeper.ZooKeeperChildrenCache.lambda$0(ZooKeeperChildrenCache.java:89) at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.pulsar.zookeeper.ZooKeeperCache.lambda$22(ZooKeeperCache.java:415) at org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:592) at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:508) ``` This is due to a deadlock occurring in the proxy. While this deadlock occurs, `main-EventThread` is stopped in the following states: ``` "main-EventThread" apache#26 daemon prio=5 os_prio=0 tid=0x00007fa9b55ff000 nid=0x3d98 waiting on condition [0x00007fa923dfc000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000000eef2ad80> (a java.util.concurrent.CompletableFuture$Signaller) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1709) at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1788) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at org.apache.pulsar.zookeeper.ZooKeeperDataCache.get(ZooKeeperDataCache.java:97) at org.apache.pulsar.proxy.server.util.ZookeeperCacheLoader.updateBrokerList(ZookeeperCacheLoader.java:123) at org.apache.pulsar.proxy.server.util.ZookeeperCacheLoader.lambda$new$0(ZookeeperCacheLoader.java:84) at org.apache.pulsar.proxy.server.util.ZookeeperCacheLoader$$Lambda$40/1450652220.onUpdate(Unknown Source) at org.apache.pulsar.zookeeper.ZooKeeperChildrenCache.lambda$0(ZooKeeperChildrenCache.java:89) at org.apache.pulsar.zookeeper.ZooKeeperChildrenCache$$Lambda$365/191748085.accept(Unknown Source) at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.pulsar.zookeeper.ZooKeeperCache.lambda$22(ZooKeeperCache.java:415) at org.apache.pulsar.zookeeper.ZooKeeperCache$$Lambda$46/1819738265.processResult(Unknown Source) at org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:592) at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:508) ``` `ZookeeperCacheLoader` will try to get load information for all available brokers from ZK or its cache if there is a change in the znode `/loadbalance/brokers`. The problem is that it is `main-EventThread` that performs the above process, and it is also `main-EventThread` that gets the data from ZK. `main-EventThread` is blocked at the following line, so deadlock occurs. https://github.com/apache/pulsar/blob/9d3ab60efddd3f4064789293fd8e9c4d4388b6c7/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperDataCache.java#L97 ### Modifications ~Make the method `ZookeeperCacheLoader#updateBrokerList` run on a thread separate from `main-EventThread`.~ Made `updateBrokerList()` method asynchronous so that `main-EventThread` is not blocked.
### Motivation When a broker server is restarted, the Pulsar proxy outputs the following warning: ``` 15:09:01.486 [main-EventThread] INFO o.a.p.z.ZooKeeperChildrenCache - reloadCache called in zookeeperChildrenCache for path /loadbalance/brokers 15:09:31.487 [main-EventThread] WARN o.a.p.p.s.util.ZookeeperCacheLoader - Error updating broker info after broker list changed. java.util.concurrent.TimeoutException: null at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at org.apache.pulsar.zookeeper.ZooKeeperDataCache.get(ZooKeeperDataCache.java:97) at org.apache.pulsar.proxy.server.util.ZookeeperCacheLoader.updateBrokerList(ZookeeperCacheLoader.java:118) at org.apache.pulsar.proxy.server.util.ZookeeperCacheLoader.lambda$new$0(ZookeeperCacheLoader.java:82) at org.apache.pulsar.zookeeper.ZooKeeperChildrenCache.lambda$0(ZooKeeperChildrenCache.java:89) at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.pulsar.zookeeper.ZooKeeperCache.lambda$22(ZooKeeperCache.java:415) at org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:592) at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:508) ``` This is due to a deadlock occurring in the proxy. While this deadlock occurs, `main-EventThread` is stopped in the following states: ``` "main-EventThread" apache#26 daemon prio=5 os_prio=0 tid=0x00007fa9b55ff000 nid=0x3d98 waiting on condition [0x00007fa923dfc000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000000eef2ad80> (a java.util.concurrent.CompletableFuture$Signaller) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1709) at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1788) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at org.apache.pulsar.zookeeper.ZooKeeperDataCache.get(ZooKeeperDataCache.java:97) at org.apache.pulsar.proxy.server.util.ZookeeperCacheLoader.updateBrokerList(ZookeeperCacheLoader.java:123) at org.apache.pulsar.proxy.server.util.ZookeeperCacheLoader.lambda$new$0(ZookeeperCacheLoader.java:84) at org.apache.pulsar.proxy.server.util.ZookeeperCacheLoader$$Lambda$40/1450652220.onUpdate(Unknown Source) at org.apache.pulsar.zookeeper.ZooKeeperChildrenCache.lambda$0(ZooKeeperChildrenCache.java:89) at org.apache.pulsar.zookeeper.ZooKeeperChildrenCache$$Lambda$365/191748085.accept(Unknown Source) at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.pulsar.zookeeper.ZooKeeperCache.lambda$22(ZooKeeperCache.java:415) at org.apache.pulsar.zookeeper.ZooKeeperCache$$Lambda$46/1819738265.processResult(Unknown Source) at org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:592) at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:508) ``` `ZookeeperCacheLoader` will try to get load information for all available brokers from ZK or its cache if there is a change in the znode `/loadbalance/brokers`. The problem is that it is `main-EventThread` that performs the above process, and it is also `main-EventThread` that gets the data from ZK. `main-EventThread` is blocked at the following line, so deadlock occurs. https://github.com/apache/pulsar/blob/9d3ab60efddd3f4064789293fd8e9c4d4388b6c7/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperDataCache.java#L97 ### Modifications ~Make the method `ZookeeperCacheLoader#updateBrokerList` run on a thread separate from `main-EventThread`.~ Made `updateBrokerList()` method asynchronous so that `main-EventThread` is not blocked.
### Motivation When a broker server is restarted, the Pulsar proxy outputs the following warning: ``` 15:09:01.486 [main-EventThread] INFO o.a.p.z.ZooKeeperChildrenCache - reloadCache called in zookeeperChildrenCache for path /loadbalance/brokers 15:09:31.487 [main-EventThread] WARN o.a.p.p.s.util.ZookeeperCacheLoader - Error updating broker info after broker list changed. java.util.concurrent.TimeoutException: null at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at org.apache.pulsar.zookeeper.ZooKeeperDataCache.get(ZooKeeperDataCache.java:97) at org.apache.pulsar.proxy.server.util.ZookeeperCacheLoader.updateBrokerList(ZookeeperCacheLoader.java:118) at org.apache.pulsar.proxy.server.util.ZookeeperCacheLoader.lambda$new$0(ZookeeperCacheLoader.java:82) at org.apache.pulsar.zookeeper.ZooKeeperChildrenCache.lambda$0(ZooKeeperChildrenCache.java:89) at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.pulsar.zookeeper.ZooKeeperCache.lambda$22(ZooKeeperCache.java:415) at org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:592) at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:508) ``` This is due to a deadlock occurring in the proxy. While this deadlock occurs, `main-EventThread` is stopped in the following states: ``` "main-EventThread" apache#26 daemon prio=5 os_prio=0 tid=0x00007fa9b55ff000 nid=0x3d98 waiting on condition [0x00007fa923dfc000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000000eef2ad80> (a java.util.concurrent.CompletableFuture$Signaller) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1709) at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1788) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at org.apache.pulsar.zookeeper.ZooKeeperDataCache.get(ZooKeeperDataCache.java:97) at org.apache.pulsar.proxy.server.util.ZookeeperCacheLoader.updateBrokerList(ZookeeperCacheLoader.java:123) at org.apache.pulsar.proxy.server.util.ZookeeperCacheLoader.lambda$new$0(ZookeeperCacheLoader.java:84) at org.apache.pulsar.proxy.server.util.ZookeeperCacheLoader$$Lambda$40/1450652220.onUpdate(Unknown Source) at org.apache.pulsar.zookeeper.ZooKeeperChildrenCache.lambda$0(ZooKeeperChildrenCache.java:89) at org.apache.pulsar.zookeeper.ZooKeeperChildrenCache$$Lambda$365/191748085.accept(Unknown Source) at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.pulsar.zookeeper.ZooKeeperCache.lambda$22(ZooKeeperCache.java:415) at org.apache.pulsar.zookeeper.ZooKeeperCache$$Lambda$46/1819738265.processResult(Unknown Source) at org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:592) at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:508) ``` `ZookeeperCacheLoader` will try to get load information for all available brokers from ZK or its cache if there is a change in the znode `/loadbalance/brokers`. The problem is that it is `main-EventThread` that performs the above process, and it is also `main-EventThread` that gets the data from ZK. `main-EventThread` is blocked at the following line, so deadlock occurs. https://github.com/apache/pulsar/blob/9d3ab60efddd3f4064789293fd8e9c4d4388b6c7/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperDataCache.java#L97 ### Modifications ~Make the method `ZookeeperCacheLoader#updateBrokerList` run on a thread separate from `main-EventThread`.~ Made `updateBrokerList()` method asynchronous so that `main-EventThread` is not blocked.
### Motivation When a broker server is restarted, the Pulsar proxy outputs the following warning: ``` 15:09:01.486 [main-EventThread] INFO o.a.p.z.ZooKeeperChildrenCache - reloadCache called in zookeeperChildrenCache for path /loadbalance/brokers 15:09:31.487 [main-EventThread] WARN o.a.p.p.s.util.ZookeeperCacheLoader - Error updating broker info after broker list changed. java.util.concurrent.TimeoutException: null at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at org.apache.pulsar.zookeeper.ZooKeeperDataCache.get(ZooKeeperDataCache.java:97) at org.apache.pulsar.proxy.server.util.ZookeeperCacheLoader.updateBrokerList(ZookeeperCacheLoader.java:118) at org.apache.pulsar.proxy.server.util.ZookeeperCacheLoader.lambda$new$0(ZookeeperCacheLoader.java:82) at org.apache.pulsar.zookeeper.ZooKeeperChildrenCache.lambda$0(ZooKeeperChildrenCache.java:89) at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.pulsar.zookeeper.ZooKeeperCache.lambda$22(ZooKeeperCache.java:415) at org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:592) at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:508) ``` This is due to a deadlock occurring in the proxy. While this deadlock occurs, `main-EventThread` is stopped in the following states: ``` "main-EventThread" apache#26 daemon prio=5 os_prio=0 tid=0x00007fa9b55ff000 nid=0x3d98 waiting on condition [0x00007fa923dfc000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000000eef2ad80> (a java.util.concurrent.CompletableFuture$Signaller) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1709) at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1788) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at org.apache.pulsar.zookeeper.ZooKeeperDataCache.get(ZooKeeperDataCache.java:97) at org.apache.pulsar.proxy.server.util.ZookeeperCacheLoader.updateBrokerList(ZookeeperCacheLoader.java:123) at org.apache.pulsar.proxy.server.util.ZookeeperCacheLoader.lambda$new$0(ZookeeperCacheLoader.java:84) at org.apache.pulsar.proxy.server.util.ZookeeperCacheLoader$$Lambda$40/1450652220.onUpdate(Unknown Source) at org.apache.pulsar.zookeeper.ZooKeeperChildrenCache.lambda$0(ZooKeeperChildrenCache.java:89) at org.apache.pulsar.zookeeper.ZooKeeperChildrenCache$$Lambda$365/191748085.accept(Unknown Source) at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.pulsar.zookeeper.ZooKeeperCache.lambda$22(ZooKeeperCache.java:415) at org.apache.pulsar.zookeeper.ZooKeeperCache$$Lambda$46/1819738265.processResult(Unknown Source) at org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:592) at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:508) ``` `ZookeeperCacheLoader` will try to get load information for all available brokers from ZK or its cache if there is a change in the znode `/loadbalance/brokers`. The problem is that it is `main-EventThread` that performs the above process, and it is also `main-EventThread` that gets the data from ZK. `main-EventThread` is blocked at the following line, so deadlock occurs. https://github.com/apache/pulsar/blob/9d3ab60efddd3f4064789293fd8e9c4d4388b6c7/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperDataCache.java#L97 ### Modifications ~Make the method `ZookeeperCacheLoader#updateBrokerList` run on a thread separate from `main-EventThread`.~ Made `updateBrokerList()` method asynchronous so that `main-EventThread` is not blocked.
### Motivation When a broker server is restarted, the Pulsar proxy outputs the following warning: ``` 15:09:01.486 [main-EventThread] INFO o.a.p.z.ZooKeeperChildrenCache - reloadCache called in zookeeperChildrenCache for path /loadbalance/brokers 15:09:31.487 [main-EventThread] WARN o.a.p.p.s.util.ZookeeperCacheLoader - Error updating broker info after broker list changed. java.util.concurrent.TimeoutException: null at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at org.apache.pulsar.zookeeper.ZooKeeperDataCache.get(ZooKeeperDataCache.java:97) at org.apache.pulsar.proxy.server.util.ZookeeperCacheLoader.updateBrokerList(ZookeeperCacheLoader.java:118) at org.apache.pulsar.proxy.server.util.ZookeeperCacheLoader.lambda$new$0(ZookeeperCacheLoader.java:82) at org.apache.pulsar.zookeeper.ZooKeeperChildrenCache.lambda$0(ZooKeeperChildrenCache.java:89) at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.pulsar.zookeeper.ZooKeeperCache.lambda$22(ZooKeeperCache.java:415) at org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:592) at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:508) ``` This is due to a deadlock occurring in the proxy. While this deadlock occurs, `main-EventThread` is stopped in the following states: ``` "main-EventThread" apache#26 daemon prio=5 os_prio=0 tid=0x00007fa9b55ff000 nid=0x3d98 waiting on condition [0x00007fa923dfc000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000000eef2ad80> (a java.util.concurrent.CompletableFuture$Signaller) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1709) at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1788) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at org.apache.pulsar.zookeeper.ZooKeeperDataCache.get(ZooKeeperDataCache.java:97) at org.apache.pulsar.proxy.server.util.ZookeeperCacheLoader.updateBrokerList(ZookeeperCacheLoader.java:123) at org.apache.pulsar.proxy.server.util.ZookeeperCacheLoader.lambda$new$0(ZookeeperCacheLoader.java:84) at org.apache.pulsar.proxy.server.util.ZookeeperCacheLoader$$Lambda$40/1450652220.onUpdate(Unknown Source) at org.apache.pulsar.zookeeper.ZooKeeperChildrenCache.lambda$0(ZooKeeperChildrenCache.java:89) at org.apache.pulsar.zookeeper.ZooKeeperChildrenCache$$Lambda$365/191748085.accept(Unknown Source) at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.pulsar.zookeeper.ZooKeeperCache.lambda$22(ZooKeeperCache.java:415) at org.apache.pulsar.zookeeper.ZooKeeperCache$$Lambda$46/1819738265.processResult(Unknown Source) at org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:592) at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:508) ``` `ZookeeperCacheLoader` will try to get load information for all available brokers from ZK or its cache if there is a change in the znode `/loadbalance/brokers`. The problem is that it is `main-EventThread` that performs the above process, and it is also `main-EventThread` that gets the data from ZK. `main-EventThread` is blocked at the following line, so deadlock occurs. https://github.com/apache/pulsar/blob/9d3ab60efddd3f4064789293fd8e9c4d4388b6c7/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperDataCache.java#L97 ### Modifications ~Make the method `ZookeeperCacheLoader#updateBrokerList` run on a thread separate from `main-EventThread`.~ Made `updateBrokerList()` method asynchronous so that `main-EventThread` is not blocked.
### Motivation When a broker server is restarted, the Pulsar proxy outputs the following warning: ``` 15:09:01.486 [main-EventThread] INFO o.a.p.z.ZooKeeperChildrenCache - reloadCache called in zookeeperChildrenCache for path /loadbalance/brokers 15:09:31.487 [main-EventThread] WARN o.a.p.p.s.util.ZookeeperCacheLoader - Error updating broker info after broker list changed. java.util.concurrent.TimeoutException: null at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at org.apache.pulsar.zookeeper.ZooKeeperDataCache.get(ZooKeeperDataCache.java:97) at org.apache.pulsar.proxy.server.util.ZookeeperCacheLoader.updateBrokerList(ZookeeperCacheLoader.java:118) at org.apache.pulsar.proxy.server.util.ZookeeperCacheLoader.lambda$new$0(ZookeeperCacheLoader.java:82) at org.apache.pulsar.zookeeper.ZooKeeperChildrenCache.lambda$0(ZooKeeperChildrenCache.java:89) at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.pulsar.zookeeper.ZooKeeperCache.lambda$22(ZooKeeperCache.java:415) at org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:592) at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:508) ``` This is due to a deadlock occurring in the proxy. While this deadlock occurs, `main-EventThread` is stopped in the following states: ``` "main-EventThread" #26 daemon prio=5 os_prio=0 tid=0x00007fa9b55ff000 nid=0x3d98 waiting on condition [0x00007fa923dfc000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000000eef2ad80> (a java.util.concurrent.CompletableFuture$Signaller) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1709) at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1788) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at org.apache.pulsar.zookeeper.ZooKeeperDataCache.get(ZooKeeperDataCache.java:97) at org.apache.pulsar.proxy.server.util.ZookeeperCacheLoader.updateBrokerList(ZookeeperCacheLoader.java:123) at org.apache.pulsar.proxy.server.util.ZookeeperCacheLoader.lambda$new$0(ZookeeperCacheLoader.java:84) at org.apache.pulsar.proxy.server.util.ZookeeperCacheLoader$$Lambda$40/1450652220.onUpdate(Unknown Source) at org.apache.pulsar.zookeeper.ZooKeeperChildrenCache.lambda$0(ZooKeeperChildrenCache.java:89) at org.apache.pulsar.zookeeper.ZooKeeperChildrenCache$$Lambda$365/191748085.accept(Unknown Source) at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.pulsar.zookeeper.ZooKeeperCache.lambda$22(ZooKeeperCache.java:415) at org.apache.pulsar.zookeeper.ZooKeeperCache$$Lambda$46/1819738265.processResult(Unknown Source) at org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:592) at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:508) ``` `ZookeeperCacheLoader` will try to get load information for all available brokers from ZK or its cache if there is a change in the znode `/loadbalance/brokers`. The problem is that it is `main-EventThread` that performs the above process, and it is also `main-EventThread` that gets the data from ZK. `main-EventThread` is blocked at the following line, so deadlock occurs. https://github.com/apache/pulsar/blob/9d3ab60efddd3f4064789293fd8e9c4d4388b6c7/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperDataCache.java#L97 ### Modifications ~Make the method `ZookeeperCacheLoader#updateBrokerList` run on a thread separate from `main-EventThread`.~ Made `updateBrokerList()` method asynchronous so that `main-EventThread` is not blocked. (cherry picked from commit d7c1cfa)
1. Add kafka-examples to easy verify broker. 2. update readme.
update log4j dependency to 2.17.0 Fixes apache#26 ### Motivation *Explain here the context, and why you're making that change. What is the problem you're trying to solve.* ### Modifications *Describe the modifications you've done.* ### Verifying this change - [ ] Make sure that the change passes the CI checks. *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Does this pull request potentially affect one of the following parts: *If `yes` was chosen, please highlight the chan
…plication (#26) * [feat][broker] Add ResourceGroup rate limit for topic dispatch and replication Signed-off-by: Zixuan Liu <[email protected]> * [fix][broker] Fix ownership loss Signed-off-by: Zixuan Liu <[email protected]> * Fix test Signed-off-by: Zixuan Liu <[email protected]> --------- Signed-off-by: Zixuan Liu <[email protected]>
Motivation
The motivation has been documented and discussed in #23.
Modifications
Adds
bindAddress
andadvertisedAddress
configuration settings.Removes
bindOnLocalhost
configuration setting.Result
The user can how set the address the service will bind on, for example:
0.0.0.0
. The user can also advertise a different address via ZooKeeper. The latter setting makes it much easier to setup Pulsar in environments such as Amazon EC2.…Address.