Skip to content

Commit

Permalink
[Broker, Functions, Websocket] Disable memory limit controller in int…
Browse files Browse the repository at this point in the history
…ernal Pulsar clients (#15752)
  • Loading branch information
lhotari authored May 27, 2022
1 parent f23eb1f commit ec52320
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
Expand Down Expand Up @@ -1259,8 +1260,9 @@ public PulsarClientImpl getNamespaceClient(ClusterDataImpl cluster) {
return namespaceClients.computeIfAbsent(cluster, key -> {
try {
ClientBuilder clientBuilder = PulsarClient.builder()
.enableTcpNoDelay(false)
.statsInterval(0, TimeUnit.SECONDS);
.memoryLimit(0, SizeUnit.BYTES)
.enableTcpNoDelay(false)
.statsInterval(0, TimeUnit.SECONDS);

if (pulsar.getConfiguration().isAuthenticationEnabled()) {
clientBuilder.authentication(pulsar.getConfiguration().getBrokerClientAuthenticationPlugin(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.pulsar.broker.ServiceConfigurationUtils;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.util.CmdGenerateDocs;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
Expand Down Expand Up @@ -103,7 +104,8 @@ public static void main(String[] args) throws Exception {
);
}

ClientBuilder clientBuilder = PulsarClient.builder();
ClientBuilder clientBuilder = PulsarClient.builder()
.memoryLimit(0, SizeUnit.BYTES);

if (isNotBlank(brokerConfig.getBrokerClientAuthenticationPlugin())) {
clientBuilder.authentication(brokerConfig.getBrokerClientAuthenticationPlugin(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,9 @@ public static ClientBuilder createPulsarClientBuilder(String pulsarServiceUrl,
Optional<Long> memoryLimit) throws PulsarClientException {
ClientBuilder clientBuilder = null;
if (isNotBlank(pulsarServiceUrl)) {
clientBuilder = PulsarClient.builder().serviceUrl(pulsarServiceUrl);
clientBuilder = PulsarClient.builder()
.memoryLimit(0, SizeUnit.BYTES)
.serviceUrl(pulsarServiceUrl);
if (authConfig != null) {
if (isNotBlank(authConfig.getClientAuthenticationPlugin())
&& isNotBlank(authConfig.getClientAuthenticationParameters())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import java.util.Map;
import java.util.Optional;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mockStatic;
Expand Down Expand Up @@ -79,7 +81,7 @@ public void testMemoryLimitNotSet() throws Exception {

ClientBuilder clientBuilder = testMemoryLimit(null, null);

Mockito.verify(clientBuilder, Mockito.times(0)).memoryLimit(Mockito.anyLong(), Mockito.any());
Mockito.verify(clientBuilder, Mockito.times(1)).memoryLimit(Mockito.eq(0L), Mockito.eq(SizeUnit.BYTES));
}

@Test
Expand Down Expand Up @@ -110,6 +112,7 @@ private ClientBuilder testMemoryLimit(Long absolute, Double percent) throws Exce
ClientBuilder clientBuilder = Mockito.mock(ClientBuilder.class);
mockedPulsarClient.when(() -> PulsarClient.builder()).thenAnswer(i -> clientBuilder);
doReturn(clientBuilder).when(clientBuilder).serviceUrl(anyString());
doReturn(clientBuilder).when(clientBuilder).memoryLimit(anyLong(), any());

ThreadRuntimeFactoryConfig threadRuntimeFactoryConfig = new ThreadRuntimeFactoryConfig();
threadRuntimeFactoryConfig.setThreadGroupName("foo");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.common.policies.data.FunctionInstanceStatsDataImpl;
Expand Down Expand Up @@ -263,7 +264,9 @@ public static PulsarClient getPulsarClient(String pulsarServiceUrl, String authP
Boolean enableTlsHostnameVerificationEnable) {

try {
ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(pulsarServiceUrl);
ClientBuilder clientBuilder = PulsarClient.builder()
.memoryLimit(0, SizeUnit.BYTES)
.serviceUrl(pulsarServiceUrl);

if (isNotBlank(authPlugin)
&& isNotBlank(authParams)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
Expand Down Expand Up @@ -171,6 +172,7 @@ public synchronized void setLocalCluster(ClusterData clusterData) {

private PulsarClient createClientInstance(ClusterData clusterData) throws IOException {
ClientBuilder clientBuilder = PulsarClient.builder() //
.memoryLimit(0, SizeUnit.BYTES)
.statsInterval(0, TimeUnit.SECONDS) //
.enableTls(config.isTlsEnabled()) //
.allowTlsInsecureConnection(config.isTlsAllowInsecureConnection()) //
Expand Down

0 comments on commit ec52320

Please sign in to comment.