Skip to content

Commit

Permalink
Removes bindOnLocalhost=boolean. Adds bindAddress and advertisedAddre…
Browse files Browse the repository at this point in the history
…ss. (#26)

Fixes #23
  • Loading branch information
radekg authored and merlimat committed Sep 26, 2016
1 parent 2d75e88 commit 8df6b74
Show file tree
Hide file tree
Showing 26 changed files with 223 additions and 83 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ test-results
dependency-reduced-pom.xml
logs
/data
pulsar-broker/tmp.*
pulsar-broker/src/test/resources/log4j.properties

*.versionsBackup

Expand Down
7 changes: 5 additions & 2 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,11 @@ webServicePortTls=8443
# Enable the WebSocket API service in broker
webSocketServiceEnabled=false

# Control whether to bind directly on localhost rather than on normal hostname
bindOnLocalhost=false
# Hostname or IP address the service binds on, default is 0.0.0.0.
bindAddress=0.0.0.0

# Hostname or IP address the service advertises to the outside world. If not set, the value of InetAddress.getLocalHost().getHostName() is used.
advertisedAddress=

# Name of the cluster to which this broker belongs to
clusterName=
Expand Down
7 changes: 5 additions & 2 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@ brokerServicePort=6650
# Port to use to server HTTP request
webServicePort=8080

# Control whether to bind directly on localhost rather than on normal hostname
bindOnLocalhost=true
# Hostname or IP address the service binds on, default is 0.0.0.0.
bindAddress=0.0.0.0

# Hostname or IP address the service advertises to the outside world. If not set, the value of InetAddress.getLocalHost().getHostName() is used.
advertisedAddress=

# Name of the cluster to which this broker belongs to
clusterName=standalone
Expand Down
4 changes: 2 additions & 2 deletions conf/websocket.conf
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ globalZookeeperServers=
# Port to use to server HTTP request
webServicePort=8080

# Control whether to bind directly on localhost rather than on normal hostname
bindOnLocalhost=false
# Hostname or IP address the service binds on, default is 0.0.0.0.
bindAddress=0.0.0.0

# Name of the pulsar cluster to connect to
clusterName=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,12 @@ public class ServiceConfiguration {
private int webServicePort = 8080;
// Port to use to server HTTPS request
private int webServicePortTls = 8443;
// Control whether to bind directly on localhost rather than on normal
// hostname
private boolean bindOnLocalhost = false;

// Hostname or IP address the service binds on.
private String bindAddress = "0.0.0.0";

// Controls which hostname is advertised to the discovery service via ZooKeeper.
private String advertisedAddress;

// Enable the WebSocket API service
private boolean webSocketServiceEnabled = false;
Expand Down Expand Up @@ -290,12 +293,20 @@ public void setWebServicePortTls(int webServicePortTls) {
this.webServicePortTls = webServicePortTls;
}

public boolean isBindOnLocalhost() {
return bindOnLocalhost;
public String getBindAddress() {
return this.bindAddress;
}

public void setBindAddress(String bindAddress) {
this.bindAddress = bindAddress;
}

public String getAdvertisedAddress() {
return this.advertisedAddress;
}

public void setBindOnLocalhost(boolean bindOnLocalhost) {
this.bindOnLocalhost = bindOnLocalhost;
public void setAdvertisedAddress(String advertisedAddress) {
this.advertisedAddress = advertisedAddress;
}

public boolean isWebSocketServiceEnabled() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.broker;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetAddress;
import java.net.UnknownHostException;

public class ServiceConfigurationUtils {

private static final Logger LOG = LoggerFactory.getLogger(ServiceConfigurationUtils.class);

public static String getDefaultOrConfiguredAddress(String configuredAddress) {
if ( configuredAddress == null ) {
return unsafeLocalhostResolve();
}
return configuredAddress;
}

public static String unsafeLocalhostResolve() {
try {
return InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException ex) {
LOG.error(ex.getMessage(), ex);
throw new IllegalStateException("Failed to resolve localhost name.", ex);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.io.IOException;
import java.net.InetAddress;
import java.net.URL;
import java.net.UnknownHostException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -97,7 +98,8 @@ public class PulsarService implements AutoCloseable {
private LoadManager loadManager = null;
private PulsarAdmin adminClient = null;
private ZooKeeperClientFactory zkClientFactory = null;
private final String host;
private final String bindAddress;
private final String advertisedAddress;
private final String webServiceAddress;
private final String webServiceAddressTls;
private final String brokerServiceUrl;
Expand All @@ -118,7 +120,8 @@ public enum State {

public PulsarService(ServiceConfiguration config) {
state = State.Init;
this.host = host(config);
this.bindAddress = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getBindAddress());
this.advertisedAddress = advertisedAddress(config);
this.webServiceAddress = webAddress(config);
this.webServiceAddressTls = webAddressTls(config);
this.brokerServiceUrl = brokerUrl(config);
Expand Down Expand Up @@ -319,7 +322,7 @@ private void acquireSLANamespace() {
try {
// Namespace not created hence no need to unload it
if (!this.globalZkCache.exists(
AdminResource.path("policies") + "/" + NamespaceService.getSLAMonitorNamespace(host, config))) {
AdminResource.path("policies") + "/" + NamespaceService.getSLAMonitorNamespace(getAdvertisedAddress(), config))) {
return;
}
if (!this.nsservice.registerSLANamespace()) {
Expand Down Expand Up @@ -540,8 +543,7 @@ public BookKeeperClientFactory getBookKeeperClientFactory() {
public synchronized PulsarAdmin getAdminClient() throws PulsarServerException {
if (this.adminClient == null) {
try {
String adminApiUrl = "http://" + InetAddress.getLocalHost().getHostName() + ":"
+ this.getConfiguration().getWebServicePort();
String adminApiUrl = webAddress(config);
this.adminClient = new PulsarAdmin(new URL(adminApiUrl),
this.getConfiguration().getBrokerClientAuthenticationPlugin(),
this.getConfiguration().getBrokerClientAuthenticationParameters());
Expand All @@ -563,50 +565,44 @@ public MessagingServiceShutdownHook getShutdownService() {
}

/**
* Derive the host
* Advertised service address.
*
* @param isBindOnLocalhost
* @return
* @return Hostname or IP address the service advertises to the outside world.
*/
public static String host(ServiceConfiguration config) {
try {
if (!config.isBindOnLocalhost()) {
return InetAddress.getLocalHost().getHostName();
} else {
return "localhost";
}
} catch (Exception e) {
LOG.error(e.getMessage(), e);
throw new IllegalStateException("failed to find host", e);
}
public static String advertisedAddress(ServiceConfiguration config) {
return ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getAdvertisedAddress());
}

public static String brokerUrl(ServiceConfiguration config) {
return "pulsar://" + host(config) + ":" + config.getBrokerServicePort();
return "pulsar://" + advertisedAddress(config) + ":" + config.getBrokerServicePort();
}

public static String brokerUrlTls(ServiceConfiguration config) {
if (config.isTlsEnabled()) {
return "pulsar://" + host(config) + ":" + config.getBrokerServicePortTls();
return "pulsar://" + advertisedAddress(config) + ":" + config.getBrokerServicePortTls();
} else {
return "";
}
}

public static String webAddress(ServiceConfiguration config) {
return String.format("http://%s:%d", host(config), config.getWebServicePort());
return String.format("http://%s:%d", advertisedAddress(config), config.getWebServicePort());
}

public static String webAddressTls(ServiceConfiguration config) {
if (config.isTlsEnabled()) {
return String.format("https://%s:%d", host(config), config.getWebServicePortTls());
return String.format("https://%s:%d", advertisedAddress(config), config.getWebServicePortTls());
} else {
return "";
}
}

public String getHost() {
return host;
public String getBindAddress() {
return bindAddress;
}

public String getAdvertisedAddress() {
return advertisedAddress;
}

public String getWebServiceAddress() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -903,7 +903,7 @@ public PersistentOfflineTopicStats getBacklog(@PathParam("property") String prop
}
final ManagedLedgerConfig config = pulsar().getBrokerService().getManagedLedgerConfig(dn).get();
ManagedLedgerOfflineBacklog offlineTopicBacklog = new ManagedLedgerOfflineBacklog(config.getDigestType(),
config.getPassword(), pulsar().getHost(), false);
config.getPassword(), pulsar().getAdvertisedAddress(), false);
offlineTopicStats = offlineTopicBacklog
.estimateUnloadedTopicBacklog((ManagedLedgerFactoryImpl) pulsar().getManagedLedgerFactory(), dn);
pulsar().getBrokerService().cacheOfflineTopicStats(dn, offlineTopicStats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ public void start() throws PulsarServerException {
}
}

String lookupServiceAddress = pulsar.getHost() + ":" + conf.getWebServicePort();
String lookupServiceAddress = pulsar.getAdvertisedAddress() + ":" + conf.getWebServicePort();
brokerZnodePath = LOADBALANCE_BROKERS_ROOT + "/" + lookupServiceAddress;
LoadReport loadReport = null;
try {
Expand Down Expand Up @@ -637,7 +637,7 @@ public void writeResourceQuotasToZooKeeper() throws Exception {
*/
private synchronized void doLoadRanking() {
ResourceUnitRanking.setCpuUsageByMsgRate(this.realtimeCpuLoadFactor);
String hostname = pulsar.getHost();
String hostname = pulsar.getAdvertisedAddress();
String strategy = this.getLoadBalancerPlacementStrategy();
log.info("doLoadRanking - load balancing strategy: {}", strategy);
if (!currentLoadReports.isEmpty()) {
Expand Down Expand Up @@ -1094,7 +1094,7 @@ public LoadReport generateLoadReport() throws Exception {
try {
LoadReport loadReport = new LoadReport(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(),
pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
loadReport.setName(String.format("%s:%s", pulsar.getHost(), pulsar.getConfiguration().getWebServicePort()));
loadReport.setName(String.format("%s:%s", pulsar.getAdvertisedAddress(), pulsar.getConfiguration().getWebServicePort()));
SystemResourceUsage systemResourceUsage = this.getSystemResourceUsage();
loadReport.setOverLoaded(
isAboveLoadLevel(systemResourceUsage, this.getLoadBalancerBrokerOverloadedThresholdPercentage()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public enum AddressType {
*/
public NamespaceService(PulsarService pulsar) {
this.pulsar = pulsar;
host = pulsar.getHost();
host = pulsar.getAdvertisedAddress();
this.config = pulsar.getConfiguration();
this.loadManager = pulsar.getLoadManager();
ServiceUnitZkUtils.initZK(pulsar.getLocalZkCache().getZooKeeper(), pulsar.getBrokerServiceUrl());
Expand Down Expand Up @@ -446,9 +446,9 @@ private NamespaceOwnershipStatus getNamespaceOwnershipStatus(OwnedServiceUnit ns
}
// found corresponding policy, set the status to controlled
nsOwnedStatus.is_controlled = true;
if (nsIsolationPolicy.isPrimaryBroker(pulsar.getHost())) {
if (nsIsolationPolicy.isPrimaryBroker(pulsar.getAdvertisedAddress())) {
nsOwnedStatus.broker_assignment = BrokerAssignment.primary;
} else if (nsIsolationPolicy.isSecondaryBroker(pulsar.getHost())) {
} else if (nsIsolationPolicy.isSecondaryBroker(pulsar.getAdvertisedAddress())) {
nsOwnedStatus.broker_assignment = BrokerAssignment.secondary;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,19 @@

import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

import org.apache.bookkeeper.client.BookKeeper.DigestType;
Expand Down Expand Up @@ -72,7 +70,6 @@
import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.common.naming.NamespaceBundle;
import com.yahoo.pulsar.common.naming.NamespaceBundleFactory;
import com.yahoo.pulsar.common.naming.NamespaceBundles;
import com.yahoo.pulsar.common.naming.NamespaceName;
import com.yahoo.pulsar.common.naming.ServiceUnitId;
import com.yahoo.pulsar.common.policies.data.ClusterData;
Expand Down Expand Up @@ -220,13 +217,13 @@ public void start() throws Exception {

bootstrap.childHandler(new PulsarChannelInitializer(this, serviceConfig, false));
// Bind and start to accept incoming connections.
bootstrap.bind(port).sync();
bootstrap.bind(new InetSocketAddress(pulsar.getBindAddress(), port)).sync();
log.info("Started Pulsar Broker service on port {}", port);

if (serviceConfig.isTlsEnabled()) {
ServerBootstrap tlsBootstrap = bootstrap.clone();
tlsBootstrap.childHandler(new PulsarChannelInitializer(this, serviceConfig, true));
tlsBootstrap.bind(tlsPort).sync();
tlsBootstrap.bind(new InetSocketAddress(pulsar.getBindAddress(), tlsPort)).sync();
log.info("Started Pulsar Broker TLS service on port {}", tlsPort);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public PulsarStats(PulsarService pulsar) {
this.tempMetricsCollection = Lists.newArrayList();
this.metricsCollection = Lists.newArrayList();
this.brokerOperabilityMetrics = new BrokerOperabilityMetrics(pulsar.getConfiguration().getClusterName(),
pulsar.getHost());
pulsar.getAdvertisedAddress());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public WebService(ServiceConfiguration config, PulsarService pulsar) throws Puls

ServerConnector connector = new PulsarServerConnector(server, 1, 1);
connector.setPort(config.getWebServicePort());
connector.setHost(pulsar.getBindAddress());
connectors.add(connector);

if (config.isTlsEnabled()) {
Expand All @@ -102,6 +103,7 @@ public WebService(ServiceConfiguration config, PulsarService pulsar) throws Puls
sslCtxFactory.setWantClientAuth(true);
ServerConnector tlsConnector = new PulsarServerConnector(server, 1, 1, sslCtxFactory);
tlsConnector.setPort(config.getWebServicePortTls());
tlsConnector.setHost(pulsar.getBindAddress());
connectors.add(tlsConnector);
}

Expand Down
Loading

0 comments on commit 8df6b74

Please sign in to comment.