Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removes bindOnLocalhost=boolean. Adds bindAddress and advertisedAddress. #26

Merged
merged 20 commits into from
Sep 26, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
c7e3a7a
Changes bindOnLocalhost=boolean to bindAddress. Introduces advertised…
radekg Sep 20, 2016
cc08ed3
Removes advertisement from web socket config.
radekg Sep 20, 2016
6fa126f
ENsure services bind on the bind address.
radekg Sep 20, 2016
3745033
Change bind address and advertised address hostname resolution to as …
radekg Sep 20, 2016
393e10e
Remove unnecessary LOG in ServiceConfiguration.
radekg Sep 20, 2016
3ed746e
Removed unused imports.
radekg Sep 20, 2016
1bec010
Corrections in comments.
radekg Sep 20, 2016
bf27260
ServiceConfigurration in tests not loaded from Properties.
radekg Sep 20, 2016
368bb8d
Merge branch 'master' into advertisedAddress
radekg Sep 21, 2016
10ccdb6
Fixes the hanging tests.
radekg Sep 23, 2016
2a0920a
Merge branch 'master' into advertisedAddress
radekg Sep 23, 2016
98f01cd
Fixes to make the tests pass.
radekg Sep 23, 2016
94dde8e
Removing the setBindAddress and setAdvertisedAddress from tests. Unit…
radekg Sep 23, 2016
2b9c25c
Fixed TLS tests. These need to advertise as localhost.
radekg Sep 24, 2016
18d3c91
Missing license header.
radekg Sep 24, 2016
2683759
Fixes BrokerServiceTest.testBrokerStatsMetrics test.
radekg Sep 24, 2016
1db4d34
Updates comments for advertisedAddress in the configuration files.
radekg Sep 24, 2016
e51b961
Unit tests passing.
radekg Sep 26, 2016
1ac44bf
Last place where the getHost() needs to be replaced with getAdvertise…
radekg Sep 26, 2016
d4c76f5
Address the final review point, adjust the metrics test, rename Pulsa…
radekg Sep 26, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ test-results
dependency-reduced-pom.xml
logs
/data
pulsar-broker/tmp.*
pulsar-broker/src/test/resources/log4j.properties
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naughty, I know, might be a good idea to go through logging.


*.versionsBackup

Expand Down
7 changes: 5 additions & 2 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,11 @@ webServicePortTls=8443
# Enable the WebSocket API service in broker
webSocketServiceEnabled=false

# Control whether to bind directly on localhost rather than on normal hostname
bindOnLocalhost=false
# Hostname or IP address the service binds on, default is 0.0.0.0.
bindAddress=0.0.0.0

# Hostname or IP address the service advertises to the outside world. If not set, the value of InetAddress.getLocalHost().getHostName() is used.
advertisedAddress=
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving empty for clarity.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the bind address was 0.0.0.0, then defaulting to bind address might not work. Should we default to hostname for advertisedAddress ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 3745033

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update the comment with "default to hostname"


# Name of the cluster to which this broker belongs to
clusterName=
Expand Down
7 changes: 5 additions & 2 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@ brokerServicePort=6650
# Port to use to server HTTP request
webServicePort=8080

# Control whether to bind directly on localhost rather than on normal hostname
bindOnLocalhost=true
# Hostname or IP address the service binds on, default is 0.0.0.0.
bindAddress=0.0.0.0

# Hostname or IP address the service advertises to the outside world. If not set, the value of InetAddress.getLocalHost().getHostName() is used.
advertisedAddress=

# Name of the cluster to which this broker belongs to
clusterName=standalone
Expand Down
4 changes: 2 additions & 2 deletions conf/websocket.conf
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ globalZookeeperServers=
# Port to use to server HTTP request
webServicePort=8080

# Control whether to bind directly on localhost rather than on normal hostname
bindOnLocalhost=false
# Hostname or IP address the service binds on, default is 0.0.0.0.
bindAddress=0.0.0.0

# Name of the pulsar cluster to connect to
clusterName=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,12 @@ public class ServiceConfiguration {
private int webServicePort = 8080;
// Port to use to server HTTPS request
private int webServicePortTls = 8443;
// Control whether to bind directly on localhost rather than on normal
// hostname
private boolean bindOnLocalhost = false;

// Hostname or IP address the service binds on.
private String bindAddress = "0.0.0.0";

// Controls which hostname is advertised to the discovery service via ZooKeeper.
private String advertisedAddress;

// Enable the WebSocket API service
private boolean webSocketServiceEnabled = false;
Expand Down Expand Up @@ -290,12 +293,20 @@ public void setWebServicePortTls(int webServicePortTls) {
this.webServicePortTls = webServicePortTls;
}

public boolean isBindOnLocalhost() {
return bindOnLocalhost;
public String getBindAddress() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Preferably, the ServiceConfiguration should be a simple POJO. This initialization of the bind address is potentially at risk if multiple threads are initializing. It'd be better to return null (or even better Optional<String>) and have the caller cope with the missing value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to avoid throws on this method as the caller would have to require try / catch everywhere where this is used. Unless we can settle on RuntimeException or IllegalArgumentException?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, scratch that, that's a silly idea. I'll just re-throw the exception and handle where used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 3745033

Copy link
Contributor Author

@radekg radekg Sep 21, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@merlimat Why would multiple threads try to initialize a single instance of ServiceConfiguration? Is the service configuration stored somewhere as static?

return this.bindAddress;
}

public void setBindAddress(String bindAddress) {
this.bindAddress = bindAddress;
}

public String getAdvertisedAddress() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, we should just return what was configured, and in any case, the fallback here should be on hostname rather than the bind address.

return this.advertisedAddress;
}

public void setBindOnLocalhost(boolean bindOnLocalhost) {
this.bindOnLocalhost = bindOnLocalhost;
public void setAdvertisedAddress(String advertisedAddress) {
this.advertisedAddress = advertisedAddress;
}

public boolean isWebSocketServiceEnabled() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.broker;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetAddress;
import java.net.UnknownHostException;

public class ServiceConfigurationUtils {

private static final Logger LOG = LoggerFactory.getLogger(ServiceConfigurationUtils.class);

public static String getDefaultOrConfiguredAddress(String configuredAddress) {
if ( configuredAddress == null ) {
return unsafeLocalhostResolve();
}
return configuredAddress;
}

public static String unsafeLocalhostResolve() {
try {
return InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException ex) {
LOG.error(ex.getMessage(), ex);
throw new IllegalStateException("Failed to resolve localhost name.", ex);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.io.IOException;
import java.net.InetAddress;
import java.net.URL;
import java.net.UnknownHostException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -97,7 +98,8 @@ public class PulsarService implements AutoCloseable {
private LoadManager loadManager = null;
private PulsarAdmin adminClient = null;
private ZooKeeperClientFactory zkClientFactory = null;
private final String host;
private final String bindAddress;
private final String advertisedAddress;
private final String webServiceAddress;
private final String webServiceAddressTls;
private final String brokerServiceUrl;
Expand All @@ -118,7 +120,8 @@ public enum State {

public PulsarService(ServiceConfiguration config) {
state = State.Init;
this.host = host(config);
this.bindAddress = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getBindAddress());
this.advertisedAddress = advertisedAddress(config);
this.webServiceAddress = webAddress(config);
this.webServiceAddressTls = webAddressTls(config);
this.brokerServiceUrl = brokerUrl(config);
Expand Down Expand Up @@ -319,7 +322,7 @@ private void acquireSLANamespace() {
try {
// Namespace not created hence no need to unload it
if (!this.globalZkCache.exists(
AdminResource.path("policies") + "/" + NamespaceService.getSLAMonitorNamespace(host, config))) {
AdminResource.path("policies") + "/" + NamespaceService.getSLAMonitorNamespace(getAdvertisedAddress(), config))) {
return;
}
if (!this.nsservice.registerSLANamespace()) {
Expand Down Expand Up @@ -540,8 +543,7 @@ public BookKeeperClientFactory getBookKeeperClientFactory() {
public synchronized PulsarAdmin getAdminClient() throws PulsarServerException {
if (this.adminClient == null) {
try {
String adminApiUrl = "http://" + InetAddress.getLocalHost().getHostName() + ":"
+ this.getConfiguration().getWebServicePort();
String adminApiUrl = webAddress(config);
this.adminClient = new PulsarAdmin(new URL(adminApiUrl),
this.getConfiguration().getBrokerClientAuthenticationPlugin(),
this.getConfiguration().getBrokerClientAuthenticationParameters());
Expand All @@ -563,50 +565,44 @@ public MessagingServiceShutdownHook getShutdownService() {
}

/**
* Derive the host
* Advertised service address.
*
* @param isBindOnLocalhost
* @return
* @return Hostname or IP address the service advertises to the outside world.
*/
public static String host(ServiceConfiguration config) {
try {
if (!config.isBindOnLocalhost()) {
return InetAddress.getLocalHost().getHostName();
} else {
return "localhost";
}
} catch (Exception e) {
LOG.error(e.getMessage(), e);
throw new IllegalStateException("failed to find host", e);
}
public static String advertisedAddress(ServiceConfiguration config) {
return ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getAdvertisedAddress());
}

public static String brokerUrl(ServiceConfiguration config) {
return "pulsar://" + host(config) + ":" + config.getBrokerServicePort();
return "pulsar://" + advertisedAddress(config) + ":" + config.getBrokerServicePort();
}

public static String brokerUrlTls(ServiceConfiguration config) {
if (config.isTlsEnabled()) {
return "pulsar://" + host(config) + ":" + config.getBrokerServicePortTls();
return "pulsar://" + advertisedAddress(config) + ":" + config.getBrokerServicePortTls();
} else {
return "";
}
}

public static String webAddress(ServiceConfiguration config) {
return String.format("http://%s:%d", host(config), config.getWebServicePort());
return String.format("http://%s:%d", advertisedAddress(config), config.getWebServicePort());
}

public static String webAddressTls(ServiceConfiguration config) {
if (config.isTlsEnabled()) {
return String.format("https://%s:%d", host(config), config.getWebServicePortTls());
return String.format("https://%s:%d", advertisedAddress(config), config.getWebServicePortTls());
} else {
return "";
}
}

public String getHost() {
return host;
public String getBindAddress() {
return bindAddress;
}

public String getAdvertisedAddress() {
return advertisedAddress;
}

public String getWebServiceAddress() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -903,7 +903,7 @@ public PersistentOfflineTopicStats getBacklog(@PathParam("property") String prop
}
final ManagedLedgerConfig config = pulsar().getBrokerService().getManagedLedgerConfig(dn).get();
ManagedLedgerOfflineBacklog offlineTopicBacklog = new ManagedLedgerOfflineBacklog(config.getDigestType(),
config.getPassword(), pulsar().getHost(), false);
config.getPassword(), pulsar().getAdvertisedAddress(), false);
offlineTopicStats = offlineTopicBacklog
.estimateUnloadedTopicBacklog((ManagedLedgerFactoryImpl) pulsar().getManagedLedgerFactory(), dn);
pulsar().getBrokerService().cacheOfflineTopicStats(dn, offlineTopicStats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ public void start() throws PulsarServerException {
}
}

String lookupServiceAddress = pulsar.getHost() + ":" + conf.getWebServicePort();
String lookupServiceAddress = pulsar.getAdvertisedAddress() + ":" + conf.getWebServicePort();
brokerZnodePath = LOADBALANCE_BROKERS_ROOT + "/" + lookupServiceAddress;
LoadReport loadReport = null;
try {
Expand Down Expand Up @@ -637,7 +637,7 @@ public void writeResourceQuotasToZooKeeper() throws Exception {
*/
private synchronized void doLoadRanking() {
ResourceUnitRanking.setCpuUsageByMsgRate(this.realtimeCpuLoadFactor);
String hostname = pulsar.getHost();
String hostname = pulsar.getAdvertisedAddress();
String strategy = this.getLoadBalancerPlacementStrategy();
log.info("doLoadRanking - load balancing strategy: {}", strategy);
if (!currentLoadReports.isEmpty()) {
Expand Down Expand Up @@ -1094,7 +1094,7 @@ public LoadReport generateLoadReport() throws Exception {
try {
LoadReport loadReport = new LoadReport(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(),
pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
loadReport.setName(String.format("%s:%s", pulsar.getHost(), pulsar.getConfiguration().getWebServicePort()));
loadReport.setName(String.format("%s:%s", pulsar.getAdvertisedAddress(), pulsar.getConfiguration().getWebServicePort()));
SystemResourceUsage systemResourceUsage = this.getSystemResourceUsage();
loadReport.setOverLoaded(
isAboveLoadLevel(systemResourceUsage, this.getLoadBalancerBrokerOverloadedThresholdPercentage()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public enum AddressType {
*/
public NamespaceService(PulsarService pulsar) {
this.pulsar = pulsar;
host = pulsar.getHost();
host = pulsar.getAdvertisedAddress();
this.config = pulsar.getConfiguration();
this.loadManager = pulsar.getLoadManager();
ServiceUnitZkUtils.initZK(pulsar.getLocalZkCache().getZooKeeper(), pulsar.getBrokerServiceUrl());
Expand Down Expand Up @@ -446,9 +446,9 @@ private NamespaceOwnershipStatus getNamespaceOwnershipStatus(OwnedServiceUnit ns
}
// found corresponding policy, set the status to controlled
nsOwnedStatus.is_controlled = true;
if (nsIsolationPolicy.isPrimaryBroker(pulsar.getHost())) {
if (nsIsolationPolicy.isPrimaryBroker(pulsar.getAdvertisedAddress())) {
nsOwnedStatus.broker_assignment = BrokerAssignment.primary;
} else if (nsIsolationPolicy.isSecondaryBroker(pulsar.getHost())) {
} else if (nsIsolationPolicy.isSecondaryBroker(pulsar.getAdvertisedAddress())) {
nsOwnedStatus.broker_assignment = BrokerAssignment.secondary;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,19 @@

import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

import org.apache.bookkeeper.client.BookKeeper.DigestType;
Expand Down Expand Up @@ -72,7 +70,6 @@
import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.common.naming.NamespaceBundle;
import com.yahoo.pulsar.common.naming.NamespaceBundleFactory;
import com.yahoo.pulsar.common.naming.NamespaceBundles;
import com.yahoo.pulsar.common.naming.NamespaceName;
import com.yahoo.pulsar.common.naming.ServiceUnitId;
import com.yahoo.pulsar.common.policies.data.ClusterData;
Expand Down Expand Up @@ -220,13 +217,13 @@ public void start() throws Exception {

bootstrap.childHandler(new PulsarChannelInitializer(this, serviceConfig, false));
// Bind and start to accept incoming connections.
bootstrap.bind(port).sync();
bootstrap.bind(new InetSocketAddress(pulsar.getBindAddress(), port)).sync();
log.info("Started Pulsar Broker service on port {}", port);

if (serviceConfig.isTlsEnabled()) {
ServerBootstrap tlsBootstrap = bootstrap.clone();
tlsBootstrap.childHandler(new PulsarChannelInitializer(this, serviceConfig, true));
tlsBootstrap.bind(tlsPort).sync();
tlsBootstrap.bind(new InetSocketAddress(pulsar.getBindAddress(), tlsPort)).sync();
log.info("Started Pulsar Broker TLS service on port {}", tlsPort);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public PulsarStats(PulsarService pulsar) {
this.tempMetricsCollection = Lists.newArrayList();
this.metricsCollection = Lists.newArrayList();
this.brokerOperabilityMetrics = new BrokerOperabilityMetrics(pulsar.getConfiguration().getClusterName(),
pulsar.getHost());
pulsar.getAdvertisedAddress());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public WebService(ServiceConfiguration config, PulsarService pulsar) throws Puls

ServerConnector connector = new PulsarServerConnector(server, 1, 1);
connector.setPort(config.getWebServicePort());
connector.setHost(pulsar.getBindAddress());
connectors.add(connector);

if (config.isTlsEnabled()) {
Expand All @@ -102,6 +103,7 @@ public WebService(ServiceConfiguration config, PulsarService pulsar) throws Puls
sslCtxFactory.setWantClientAuth(true);
ServerConnector tlsConnector = new PulsarServerConnector(server, 1, 1, sslCtxFactory);
tlsConnector.setPort(config.getWebServicePortTls());
tlsConnector.setHost(pulsar.getBindAddress());
connectors.add(tlsConnector);
}

Expand Down
Loading