Skip to content

Commit

Permalink
[improve] [broker] Check max producers/consumers limitation first bef…
Browse files Browse the repository at this point in the history
…ore other ops to save resources (apache#23074)
  • Loading branch information
poorbarcode authored Jul 29, 2024
1 parent e59cd05 commit 679a3d4
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -486,8 +486,18 @@ private PublishRate publishRateInBroker(ServiceConfiguration config) {
return new PublishRate(config.getMaxPublishRatePerTopicInMessages(), config.getMaxPublishRatePerTopicInBytes());
}

public boolean isProducersExceeded(String producerName) {
String replicatorPrefix = brokerService.getPulsar().getConfig().getReplicatorPrefix() + ".";
boolean isRemote = producerName.startsWith(replicatorPrefix);
return isProducersExceeded(isRemote);
}

protected boolean isProducersExceeded(Producer producer) {
if (isSystemTopic() || producer.isRemote()) {
return isProducersExceeded(producer.isRemote());
}

protected boolean isProducersExceeded(boolean isRemote) {
if (isSystemTopic() || isRemote) {
return false;
}
Integer maxProducers = topicPolicies.getMaxProducersPerTopic().get();
Expand Down Expand Up @@ -536,7 +546,7 @@ public int getNumberOfSameAddressProducers(final String clientAddress) {
return count;
}

protected boolean isConsumersExceededOnTopic() {
public boolean isConsumersExceededOnTopic() {
if (isSystemTopic()) {
return false;
}
Expand Down Expand Up @@ -973,12 +983,6 @@ protected void checkTopicFenced() throws BrokerServiceException {
}

protected CompletableFuture<Void> internalAddProducer(Producer producer) {
if (isProducersExceeded(producer)) {
log.warn("[{}] Attempting to add producer to topic which reached max producers limit", topic);
return CompletableFuture.failedFuture(new BrokerServiceException.ProducerBusyException(
"Topic '" + topic + "' reached max producers limit"));
}

if (isSameAddressProducersExceeded(producer)) {
log.warn("[{}] Attempting to add producer to topic which reached max same address producers limit", topic);
return CompletableFuture.failedFuture(new BrokerServiceException.ProducerBusyException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1307,6 +1307,16 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
"Topic " + topicName + " does not exist"));
}
final Topic topic = optTopic.get();
// Check max consumer limitation to avoid unnecessary ops wasting resources. For example:
// the new consumer reached max producer limitation, but pulsar did schema check first,
// it would waste CPU.
if (((AbstractTopic) topic).isConsumersExceededOnTopic()) {
log.warn("[{}] Attempting to add consumer to topic which reached max"
+ " consumers limit", topic);
Throwable t =
new ConsumerBusyException("Topic reached max consumers limit");
return FutureUtil.failedFuture(t);
}
return service.isAllowAutoSubscriptionCreationAsync(topicName)
.thenCompose(isAllowedAutoSubscriptionCreation -> {
boolean rejectSubscriptionIfDoesNotExist = isDurable
Expand Down Expand Up @@ -1545,6 +1555,15 @@ protected void handleProducer(final CommandProducer cmdProducer) {
}

service.getOrCreateTopic(topicName.toString()).thenCompose((Topic topic) -> {
// Check max producer limitation to avoid unnecessary ops wasting resources. For example: the new
// producer reached max producer limitation, but pulsar did schema check first, it would waste CPU
if (((AbstractTopic) topic).isProducersExceeded(producerName)) {
log.warn("[{}] Attempting to add producer to topic which reached max producers limit", topic);
String errorMsg = "Topic '" + topicName.toString() + "' reached max producers limit";
Throwable t = new BrokerServiceException.ProducerBusyException(errorMsg);
return CompletableFuture.failedFuture(t);
}

// Before creating producer, check if backlog quota exceeded
// on topic for size based limit and time based limit
CompletableFuture<Void> backlogQuotaCheckFuture = CompletableFuture.allOf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName;
import static org.apache.pulsar.broker.resources.LoadBalanceResources.BUNDLE_DATA_BASE_PATH;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -52,6 +54,7 @@
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.ws.rs.NotAcceptableException;
import javax.ws.rs.core.Response.Status;
import lombok.AllArgsConstructor;
Expand All @@ -70,6 +73,7 @@
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
Expand Down Expand Up @@ -127,7 +131,13 @@
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -2870,49 +2880,80 @@ public void testMaxProducersPerTopicUnlimited() throws Exception {
final String myNamespace = newUniqueName(defaultTenant + "/ns");
admin.namespaces().createNamespace(myNamespace, Set.of("test"));
final String topic = "persistent://" + myNamespace + "/testMaxProducersPerTopicUnlimited";
admin.topics().createNonPartitionedTopic(topic);
AtomicInteger schemaOpsCounter = injectSchemaCheckCounterForTopic(topic);
//the policy is set to 0, so there will be no restrictions
admin.namespaces().setMaxProducersPerTopic(myNamespace, 0);
Awaitility.await().until(()
-> admin.namespaces().getMaxProducersPerTopic(myNamespace) == 0);
List<Producer<byte[]>> producers = new ArrayList<>();
List<Producer<String>> producers = new ArrayList<>();
for (int i = 0; i < maxProducersPerTopic + 1; i++) {
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
producers.add(producer);
}
assertEquals(schemaOpsCounter.get(), maxProducersPerTopic + 1);

admin.namespaces().removeMaxProducersPerTopic(myNamespace);
Awaitility.await().until(()
-> admin.namespaces().getMaxProducersPerTopic(myNamespace) == null);

try {
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
fail("should fail");
} catch (PulsarClientException e) {
String expectMsg = "Topic '" + topic + "' reached max producers limit";
assertTrue(e.getMessage().contains(expectMsg));
assertEquals(schemaOpsCounter.get(), maxProducersPerTopic + 1);
}
//set the limit to 3
admin.namespaces().setMaxProducersPerTopic(myNamespace, 3);
Awaitility.await().until(()
-> admin.namespaces().getMaxProducersPerTopic(myNamespace) == 3);
// should success
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
producers.add(producer);
assertEquals(schemaOpsCounter.get(), maxProducersPerTopic + 2);
try {
@Cleanup
Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topic).create();
fail("should fail");
} catch (PulsarClientException e) {
String expectMsg = "Topic '" + topic + "' reached max producers limit";
assertTrue(e.getMessage().contains(expectMsg));
assertEquals(schemaOpsCounter.get(), maxProducersPerTopic + 2);
}

//clean up
for (Producer<byte[]> tempProducer : producers) {
for (Producer<String> tempProducer : producers) {
tempProducer.close();
}
}

private AtomicInteger injectSchemaCheckCounterForTopic(String topicName) {
ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> topics =
WhiteboxImpl.getInternalState(pulsar.getBrokerService(), "topics");
AbstractTopic topic = (AbstractTopic) topics.get(topicName).join().get();
AbstractTopic spyTopic = Mockito.spy(topic);
AtomicInteger counter = new AtomicInteger();
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
counter.incrementAndGet();
return invocation.callRealMethod();
}
}).when(spyTopic).addSchema(any(SchemaData.class));
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
counter.incrementAndGet();
return invocation.callRealMethod();
}
}).when(spyTopic).addSchemaIfIdleOrCheckCompatible(any(SchemaData.class));
topics.put(topicName, CompletableFuture.completedFuture(Optional.of(spyTopic)));
return counter;
}

@Test
public void testMaxConsumersPerTopicUnlimited() throws Exception {
restartClusterAfterTest();
Expand All @@ -2924,49 +2965,55 @@ public void testMaxConsumersPerTopicUnlimited() throws Exception {
final String myNamespace = newUniqueName(defaultTenant + "/ns");
admin.namespaces().createNamespace(myNamespace, Set.of("test"));
final String topic = "persistent://" + myNamespace + "/testMaxConsumersPerTopicUnlimited";
admin.topics().createNonPartitionedTopic(topic);
AtomicInteger schemaOpsCounter = injectSchemaCheckCounterForTopic(topic);

assertNull(admin.namespaces().getMaxConsumersPerTopic(myNamespace));
//the policy is set to 0, so there will be no restrictions
admin.namespaces().setMaxConsumersPerTopic(myNamespace, 0);
Awaitility.await().until(()
-> admin.namespaces().getMaxConsumersPerTopic(myNamespace) == 0);
List<Consumer<byte[]>> consumers = new ArrayList<>();
List<Consumer<String>> consumers = new ArrayList<>();
for (int i = 0; i < maxConsumersPerTopic + 1; i++) {
Consumer<byte[]> consumer =
pulsarClient.newConsumer().subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe();
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe();
consumers.add(consumer);
}
assertEquals(schemaOpsCounter.get(), maxConsumersPerTopic + 2);

admin.namespaces().removeMaxConsumersPerTopic(myNamespace);
Awaitility.await().until(()
-> admin.namespaces().getMaxConsumersPerTopic(myNamespace) == null);
try {
@Cleanup
Consumer<byte[]> subscribe =
pulsarClient.newConsumer().subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe();
Consumer<String> subscribe = pulsarClient.newConsumer(Schema.STRING)
.subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe();
fail("should fail");
} catch (PulsarClientException e) {
assertTrue(e.getMessage().contains("Topic reached max consumers limit"));
assertEquals(schemaOpsCounter.get(), maxConsumersPerTopic + 2);
}
//set the limit to 3
admin.namespaces().setMaxConsumersPerTopic(myNamespace, 3);
Awaitility.await().until(()
-> admin.namespaces().getMaxConsumersPerTopic(myNamespace) == 3);
// should success
Consumer<byte[]> consumer =
pulsarClient.newConsumer().subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe();
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe();
consumers.add(consumer);
assertEquals(schemaOpsCounter.get(), maxConsumersPerTopic + 3);
try {
@Cleanup
Consumer<byte[]> subscribe =
pulsarClient.newConsumer().subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe();
Consumer<String> subscribe = pulsarClient.newConsumer(Schema.STRING)
.subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe();
fail("should fail");
} catch (PulsarClientException e) {
assertTrue(e.getMessage().contains("Topic reached max consumers limit"));
assertEquals(schemaOpsCounter.get(), maxConsumersPerTopic + 3);
}

//clean up
for (Consumer<byte[]> subConsumer : consumers) {
for (Consumer<String> subConsumer : consumers) {
subConsumer.close();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,51 +509,6 @@ public void testProducerOverwrite() {
topic.getProducers().values().forEach(producer -> Assert.assertEquals(producer.getEpoch(), 3));
}

private void testMaxProducers() {
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
topic.initialize().join();
String role = "appid1";
// 1. add producer1
Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name1", role,
false, null, SchemaVersion.Latest, 0, false, ProducerAccessMode.Shared, Optional.empty(), true);
topic.addProducer(producer, new CompletableFuture<>());
assertEquals(topic.getProducers().size(), 1);

// 2. add producer2
Producer producer2 = new Producer(topic, serverCnx, 2 /* producer id */, "prod-name2", role,
false, null, SchemaVersion.Latest, 0, false, ProducerAccessMode.Shared, Optional.empty(), true);
topic.addProducer(producer2, new CompletableFuture<>());
assertEquals(topic.getProducers().size(), 2);

// 3. add producer3 but reached maxProducersPerTopic
try {
Producer producer3 = new Producer(topic, serverCnx, 3 /* producer id */, "prod-name3", role,
false, null, SchemaVersion.Latest, 0, false, ProducerAccessMode.Shared, Optional.empty(), true);
topic.addProducer(producer3, new CompletableFuture<>()).join();
fail("should have failed");
} catch (Exception e) {
assertEquals(e.getCause().getClass(), BrokerServiceException.ProducerBusyException.class);
}
}

@Test
public void testMaxProducersForBroker() {
// set max clients
pulsarTestContext.getConfig().setMaxProducersPerTopic(2);
testMaxProducers();
}

@Test
public void testMaxProducersForNamespace() throws Exception {
// set max clients
Policies policies = new Policies();
policies.max_producers_per_topic = 2;
pulsarTestContext.getPulsarResources().getNamespaceResources()
.createPolicies(TopicName.get(successTopicName).getNamespaceObject(),
policies);
testMaxProducers();
}

private Producer getMockedProducerWithSpecificAddress(Topic topic, long producerId, InetAddress address) {
final String producerNameBase = "producer";
final String role = "appid1";
Expand Down
Loading

0 comments on commit 679a3d4

Please sign in to comment.