Skip to content

Commit

Permalink
[improve][cli] improve admin set-backlog-quota more clear (#19300)
Browse files Browse the repository at this point in the history
  • Loading branch information
labuladong authored Feb 2, 2023
1 parent 595a125 commit 31fe347
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

@Slf4j
Expand Down Expand Up @@ -437,6 +438,12 @@ public void namespaces() throws Exception {
namespaces.run(split("unload myprop/clust/ns1"));
verify(mockNamespaces).unload("myprop/clust/ns1");

// message_age must have time limit, destination_storage must have size limit
Assert.assertFalse(namespaces.run(
split("set-backlog-quota myprop/clust/ns1 -p producer_exception -l 10G -t message_age")));
Assert.assertFalse(namespaces.run(
split("set-backlog-quota myprop/clust/ns1 -p producer_exception -lt 10h -t destination_storage")));

mockNamespaces = mock(Namespaces.class);
when(admin.namespaces()).thenReturn(mockNamespaces);
namespaces = new CmdNamespaces(() -> admin);
Expand Down Expand Up @@ -498,23 +505,21 @@ public void namespaces() throws Exception {
when(admin.namespaces()).thenReturn(mockNamespaces);
namespaces = new CmdNamespaces(() -> admin);

namespaces.run(split("set-backlog-quota myprop/clust/ns1 -p consumer_backlog_eviction -l 10K -lt 10m"));
namespaces.run(split("set-backlog-quota myprop/clust/ns1 -p consumer_backlog_eviction -lt 10m -t message_age"));
verify(mockNamespaces).setBacklogQuota("myprop/clust/ns1",
BacklogQuota.builder()
.limitSize(10 * 1024)
.limitTime(10 * 60)
.retentionPolicy(RetentionPolicy.consumer_backlog_eviction)
.build(),
BacklogQuota.BacklogQuotaType.destination_storage);
BacklogQuota.BacklogQuotaType.message_age);

mockNamespaces = mock(Namespaces.class);
when(admin.namespaces()).thenReturn(mockNamespaces);
namespaces = new CmdNamespaces(() -> admin);

namespaces.run(split("set-backlog-quota myprop/clust/ns1 -p producer_exception -l 10G -lt 10000 -t message_age"));
namespaces.run(split("set-backlog-quota myprop/clust/ns1 -p producer_exception -lt 10000 -t message_age"));
verify(mockNamespaces).setBacklogQuota("myprop/clust/ns1",
BacklogQuota.builder()
.limitSize(10l * 1024 * 1024 * 1024)
.limitTime(10000)
.retentionPolicy(RetentionPolicy.producer_exception)
.build(),
Expand Down Expand Up @@ -1216,24 +1221,28 @@ public void topicPolicies() throws Exception {
cmdTopics = new CmdTopicPolicies(() -> admin);
cmdTopics.run(split("set-message-ttl persistent://myprop/clust/ns1/ds1 -t 10h"));
verify(mockTopicsPolicies).setMessageTTL("persistent://myprop/clust/ns1/ds1", 10 * 60 * 60);
cmdTopics.run(split("set-backlog-quota persistent://myprop/clust/ns1/ds1 -lt 1w -p consumer_backlog_eviction"));
cmdTopics.run(split("set-backlog-quota persistent://myprop/clust/ns1/ds1 -lt 1w -p consumer_backlog_eviction -t message_age"));
verify(mockTopicsPolicies).setBacklogQuota("persistent://myprop/clust/ns1/ds1",
BacklogQuota.builder()
.limitSize(-1)
.limitTime(60 * 60 * 24 * 7)
.retentionPolicy(RetentionPolicy.consumer_backlog_eviction)
.build(),
BacklogQuota.BacklogQuotaType.destination_storage);
BacklogQuota.BacklogQuotaType.message_age);
//cmd with option cannot be executed repeatedly.
cmdTopics = new CmdTopicPolicies(() -> admin);
cmdTopics.run(split("set-backlog-quota persistent://myprop/clust/ns1/ds1 -lt 1000 -p producer_request_hold -t message_age"));
verify(mockTopicsPolicies).setBacklogQuota("persistent://myprop/clust/ns1/ds1",
BacklogQuota.builder()
.limitSize(-1)
.limitTime(1000)
.retentionPolicy(RetentionPolicy.producer_request_hold)
.build(),
BacklogQuota.BacklogQuotaType.message_age);
//cmd with option cannot be executed repeatedly.
cmdTopics = new CmdTopicPolicies(() -> admin);
Assert.assertFalse(cmdTopics.run(split("set-backlog-quota persistent://myprop/clust/ns1/ds1 -l 1000 -p producer_request_hold -t message_age")));
cmdTopics = new CmdTopicPolicies(() -> admin);
Assert.assertFalse(cmdTopics.run(split("set-backlog-quota persistent://myprop/clust/ns1/ds1 -lt 60 -p producer_request_hold -t destination_storage")));

//cmd with option cannot be executed repeatedly.
cmdTopics = new CmdTopicPolicies(() -> admin);
cmdTopics.run(split("remove-backlog-quota persistent://myprop/clust/ns1/ds1"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1266,7 +1266,7 @@ private class SetBacklogQuota extends CliCommand {
@Parameter(description = "tenant/namespace", required = true)
private java.util.List<String> params;

@Parameter(names = { "-l", "--limit" }, description = "Size limit (eg: 10M, 16G)", required = true)
@Parameter(names = { "-l", "--limit" }, description = "Size limit (eg: 10M, 16G)")
private String limitStr;

@Parameter(names = { "-lt", "--limitTime" },
Expand All @@ -1280,16 +1280,15 @@ private class SetBacklogQuota extends CliCommand {
private String policyStr;

@Parameter(names = {"-t", "--type"}, description = "Backlog quota type to set. Valid options are: "
+ "destination_storage and message_age. "
+ "destination_storage limits backlog by size (in bytes). "
+ "destination_storage (default) and message_age. "
+ "destination_storage limits backlog by size. "
+ "message_age limits backlog by time, that is, message timestamp (broker or publish timestamp). "
+ "You can set size or time to control the backlog, or combine them together to control the backlog. ")
private String backlogQuotaTypeStr = BacklogQuota.BacklogQuotaType.destination_storage.name();

@Override
void run() throws PulsarAdminException {
BacklogQuota.RetentionPolicy policy;
long limit = validateSizeString(limitStr);
BacklogQuota.BacklogQuotaType backlogQuotaType;

try {
Expand All @@ -1306,26 +1305,30 @@ void run() throws PulsarAdminException {
backlogQuotaTypeStr, Arrays.toString(BacklogQuota.BacklogQuotaType.values())));
}

long limitTimeInSec = -1;
if (limitTimeStr != null) {
String namespace = validateNamespace(params);

BacklogQuota.Builder builder = BacklogQuota.builder().retentionPolicy(policy);
if (backlogQuotaType == BacklogQuota.BacklogQuotaType.destination_storage) {
// set quota by storage size
if (limitStr == null) {
throw new ParameterException("Quota type of 'destination_storage' needs a size limit");
}
long limit = validateSizeString(limitStr);
builder.limitSize(limit);
} else {
// set quota by time
if (limitTimeStr == null) {
throw new ParameterException("Quota type of 'message_age' needs a time limit");
}
long limitTimeInSec;
try {
limitTimeInSec = RelativeTimeUtil.parseRelativeTimeInSeconds(limitTimeStr);
} catch (IllegalArgumentException e) {
throw new ParameterException(e.getMessage());
}
builder.limitTime((int) limitTimeInSec);
}
if (limitTimeInSec > Integer.MAX_VALUE) {
throw new ParameterException(
String.format("Time limit cannot be greater than %d seconds", Integer.MAX_VALUE));
}

String namespace = validateNamespace(params);
getAdmin().namespaces().setBacklogQuota(namespace,
BacklogQuota.builder().limitSize(limit)
.limitTime((int) limitTimeInSec)
.retentionPolicy(policy)
.build(),
backlogQuotaType);
getAdmin().namespaces().setBacklogQuota(namespace, builder.build(), backlogQuotaType);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -964,7 +964,7 @@ private class SetBacklogQuota extends CliCommand {
private java.util.List<String> params;

@Parameter(names = { "-l", "--limit" }, description = "Size limit (eg: 10M, 16G)")
private String limitStr = "-1";
private String limitStr = null;

@Parameter(names = { "-lt", "--limitTime" },
description = "Time limit in second (or minutes, hours, days, weeks eg: 100m, 3h, 2d, 5w), "
Expand All @@ -977,8 +977,8 @@ private class SetBacklogQuota extends CliCommand {
private String policyStr;

@Parameter(names = {"-t", "--type"}, description = "Backlog quota type to set. Valid options are: "
+ "destination_storage and message_age. "
+ "destination_storage limits backlog by size (in bytes). "
+ "destination_storage (default) and message_age. "
+ "destination_storage limits backlog by size. "
+ "message_age limits backlog by time, that is, message timestamp (broker or publish timestamp). "
+ "You can set size or time to control the backlog, or combine them together to control the backlog. ")
private String backlogQuotaTypeStr = BacklogQuota.BacklogQuotaType.destination_storage.name();
Expand All @@ -990,7 +990,6 @@ private class SetBacklogQuota extends CliCommand {
@Override
void run() throws PulsarAdminException {
BacklogQuota.RetentionPolicy policy;
long limit;
BacklogQuota.BacklogQuotaType backlogQuotaType;

try {
Expand All @@ -999,35 +998,41 @@ void run() throws PulsarAdminException {
throw new ParameterException(String.format("Invalid retention policy type '%s'. Valid options are: %s",
policyStr, Arrays.toString(BacklogQuota.RetentionPolicy.values())));
}

limit = validateSizeString(limitStr);

try {
backlogQuotaType = BacklogQuota.BacklogQuotaType.valueOf(backlogQuotaTypeStr);
} catch (IllegalArgumentException e) {
throw new ParameterException(String.format("Invalid backlog quota type '%s'. Valid options are: %s",
backlogQuotaTypeStr, Arrays.toString(BacklogQuota.BacklogQuotaType.values())));
}
String persistentTopic = validatePersistentTopic(params);
BacklogQuota.Builder builder = BacklogQuota.builder().retentionPolicy(policy);

long limitTimeInSec = -1;
if (limitTimeStr != null) {
if (backlogQuotaType == BacklogQuota.BacklogQuotaType.destination_storage) {
// set quota by storage size
if (limitStr == null) {
throw new ParameterException("Quota type of 'destination_storage' needs a size limit");
}
long limit = validateSizeString(limitStr);
builder.limitSize((int) limit);
} else {
// set quota by time
if (limitTimeStr == null) {
throw new ParameterException("Quota type of 'message_age' needs a time limit");
}
long limitTimeInSec;
try {
limitTimeInSec = RelativeTimeUtil.parseRelativeTimeInSeconds(limitTimeStr);
} catch (IllegalArgumentException e) {
throw new ParameterException(e.getMessage());
}
if (limitTimeInSec > Integer.MAX_VALUE) {
throw new ParameterException(
String.format("Time limit cannot be greater than %d seconds", Integer.MAX_VALUE));
}
builder.limitTime((int) limitTimeInSec);
}
if (limitTimeInSec > Integer.MAX_VALUE) {
throw new ParameterException(
String.format("Time limit cannot be greater than %d seconds", Integer.MAX_VALUE));
}

String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).setBacklogQuota(persistentTopic,
BacklogQuota.builder().limitSize(limit)
.limitTime((int) limitTimeInSec)
.retentionPolicy(policy)
.build(),
builder.build(),
backlogQuotaType);
}
}
Expand Down

0 comments on commit 31fe347

Please sign in to comment.