Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][cli] improve admin set-backlog-quota more clear #19300

Merged
merged 3 commits into from
Feb 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

@Slf4j
Expand Down Expand Up @@ -437,6 +438,12 @@ public void namespaces() throws Exception {
namespaces.run(split("unload myprop/clust/ns1"));
verify(mockNamespaces).unload("myprop/clust/ns1");

// message_age must have time limit, destination_storage must have size limit
Assert.assertFalse(namespaces.run(
split("set-backlog-quota myprop/clust/ns1 -p producer_exception -l 10G -t message_age")));
Assert.assertFalse(namespaces.run(
split("set-backlog-quota myprop/clust/ns1 -p producer_exception -lt 10h -t destination_storage")));

mockNamespaces = mock(Namespaces.class);
when(admin.namespaces()).thenReturn(mockNamespaces);
namespaces = new CmdNamespaces(() -> admin);
Expand Down Expand Up @@ -498,23 +505,21 @@ public void namespaces() throws Exception {
when(admin.namespaces()).thenReturn(mockNamespaces);
namespaces = new CmdNamespaces(() -> admin);

namespaces.run(split("set-backlog-quota myprop/clust/ns1 -p consumer_backlog_eviction -l 10K -lt 10m"));
namespaces.run(split("set-backlog-quota myprop/clust/ns1 -p consumer_backlog_eviction -lt 10m -t message_age"));
verify(mockNamespaces).setBacklogQuota("myprop/clust/ns1",
BacklogQuota.builder()
.limitSize(10 * 1024)
.limitTime(10 * 60)
.retentionPolicy(RetentionPolicy.consumer_backlog_eviction)
.build(),
BacklogQuota.BacklogQuotaType.destination_storage);
BacklogQuota.BacklogQuotaType.message_age);

mockNamespaces = mock(Namespaces.class);
when(admin.namespaces()).thenReturn(mockNamespaces);
namespaces = new CmdNamespaces(() -> admin);

namespaces.run(split("set-backlog-quota myprop/clust/ns1 -p producer_exception -l 10G -lt 10000 -t message_age"));
namespaces.run(split("set-backlog-quota myprop/clust/ns1 -p producer_exception -lt 10000 -t message_age"));
verify(mockNamespaces).setBacklogQuota("myprop/clust/ns1",
BacklogQuota.builder()
.limitSize(10l * 1024 * 1024 * 1024)
.limitTime(10000)
.retentionPolicy(RetentionPolicy.producer_exception)
.build(),
Expand Down Expand Up @@ -1216,24 +1221,28 @@ public void topicPolicies() throws Exception {
cmdTopics = new CmdTopicPolicies(() -> admin);
cmdTopics.run(split("set-message-ttl persistent://myprop/clust/ns1/ds1 -t 10h"));
verify(mockTopicsPolicies).setMessageTTL("persistent://myprop/clust/ns1/ds1", 10 * 60 * 60);
cmdTopics.run(split("set-backlog-quota persistent://myprop/clust/ns1/ds1 -lt 1w -p consumer_backlog_eviction"));
cmdTopics.run(split("set-backlog-quota persistent://myprop/clust/ns1/ds1 -lt 1w -p consumer_backlog_eviction -t message_age"));
verify(mockTopicsPolicies).setBacklogQuota("persistent://myprop/clust/ns1/ds1",
BacklogQuota.builder()
.limitSize(-1)
.limitTime(60 * 60 * 24 * 7)
.retentionPolicy(RetentionPolicy.consumer_backlog_eviction)
.build(),
BacklogQuota.BacklogQuotaType.destination_storage);
BacklogQuota.BacklogQuotaType.message_age);
//cmd with option cannot be executed repeatedly.
cmdTopics = new CmdTopicPolicies(() -> admin);
cmdTopics.run(split("set-backlog-quota persistent://myprop/clust/ns1/ds1 -lt 1000 -p producer_request_hold -t message_age"));
verify(mockTopicsPolicies).setBacklogQuota("persistent://myprop/clust/ns1/ds1",
BacklogQuota.builder()
.limitSize(-1)
.limitTime(1000)
.retentionPolicy(RetentionPolicy.producer_request_hold)
.build(),
BacklogQuota.BacklogQuotaType.message_age);
//cmd with option cannot be executed repeatedly.
cmdTopics = new CmdTopicPolicies(() -> admin);
Assert.assertFalse(cmdTopics.run(split("set-backlog-quota persistent://myprop/clust/ns1/ds1 -l 1000 -p producer_request_hold -t message_age")));
cmdTopics = new CmdTopicPolicies(() -> admin);
Assert.assertFalse(cmdTopics.run(split("set-backlog-quota persistent://myprop/clust/ns1/ds1 -lt 60 -p producer_request_hold -t destination_storage")));

//cmd with option cannot be executed repeatedly.
cmdTopics = new CmdTopicPolicies(() -> admin);
cmdTopics.run(split("remove-backlog-quota persistent://myprop/clust/ns1/ds1"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1266,7 +1266,7 @@ private class SetBacklogQuota extends CliCommand {
@Parameter(description = "tenant/namespace", required = true)
private java.util.List<String> params;

@Parameter(names = { "-l", "--limit" }, description = "Size limit (eg: 10M, 16G)", required = true)
@Parameter(names = { "-l", "--limit" }, description = "Size limit (eg: 10M, 16G)")
private String limitStr;

@Parameter(names = { "-lt", "--limitTime" },
Expand All @@ -1280,16 +1280,15 @@ private class SetBacklogQuota extends CliCommand {
private String policyStr;

@Parameter(names = {"-t", "--type"}, description = "Backlog quota type to set. Valid options are: "
+ "destination_storage and message_age. "
+ "destination_storage limits backlog by size (in bytes). "
+ "destination_storage (default) and message_age. "
+ "destination_storage limits backlog by size. "
+ "message_age limits backlog by time, that is, message timestamp (broker or publish timestamp). "
+ "You can set size or time to control the backlog, or combine them together to control the backlog. ")
private String backlogQuotaTypeStr = BacklogQuota.BacklogQuotaType.destination_storage.name();

@Override
void run() throws PulsarAdminException {
BacklogQuota.RetentionPolicy policy;
long limit = validateSizeString(limitStr);
BacklogQuota.BacklogQuotaType backlogQuotaType;

try {
Expand All @@ -1306,26 +1305,30 @@ void run() throws PulsarAdminException {
backlogQuotaTypeStr, Arrays.toString(BacklogQuota.BacklogQuotaType.values())));
}

long limitTimeInSec = -1;
if (limitTimeStr != null) {
String namespace = validateNamespace(params);

BacklogQuota.Builder builder = BacklogQuota.builder().retentionPolicy(policy);
if (backlogQuotaType == BacklogQuota.BacklogQuotaType.destination_storage) {
// set quota by storage size
if (limitStr == null) {
throw new ParameterException("Quota type of 'destination_storage' needs a size limit");
}
long limit = validateSizeString(limitStr);
builder.limitSize(limit);
} else {
// set quota by time
if (limitTimeStr == null) {
throw new ParameterException("Quota type of 'message_age' needs a time limit");
}
long limitTimeInSec;
try {
limitTimeInSec = RelativeTimeUtil.parseRelativeTimeInSeconds(limitTimeStr);
} catch (IllegalArgumentException e) {
throw new ParameterException(e.getMessage());
}
builder.limitTime((int) limitTimeInSec);
}
if (limitTimeInSec > Integer.MAX_VALUE) {
throw new ParameterException(
String.format("Time limit cannot be greater than %d seconds", Integer.MAX_VALUE));
}

String namespace = validateNamespace(params);
getAdmin().namespaces().setBacklogQuota(namespace,
BacklogQuota.builder().limitSize(limit)
.limitTime((int) limitTimeInSec)
.retentionPolicy(policy)
.build(),
backlogQuotaType);
getAdmin().namespaces().setBacklogQuota(namespace, builder.build(), backlogQuotaType);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -964,7 +964,7 @@ private class SetBacklogQuota extends CliCommand {
private java.util.List<String> params;

@Parameter(names = { "-l", "--limit" }, description = "Size limit (eg: 10M, 16G)")
private String limitStr = "-1";
private String limitStr = null;

@Parameter(names = { "-lt", "--limitTime" },
description = "Time limit in second (or minutes, hours, days, weeks eg: 100m, 3h, 2d, 5w), "
Expand All @@ -977,8 +977,8 @@ private class SetBacklogQuota extends CliCommand {
private String policyStr;

@Parameter(names = {"-t", "--type"}, description = "Backlog quota type to set. Valid options are: "
+ "destination_storage and message_age. "
+ "destination_storage limits backlog by size (in bytes). "
+ "destination_storage (default) and message_age. "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default is backlog size, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

+ "destination_storage limits backlog by size. "
+ "message_age limits backlog by time, that is, message timestamp (broker or publish timestamp). "
+ "You can set size or time to control the backlog, or combine them together to control the backlog. ")
private String backlogQuotaTypeStr = BacklogQuota.BacklogQuotaType.destination_storage.name();
Expand All @@ -990,7 +990,6 @@ private class SetBacklogQuota extends CliCommand {
@Override
void run() throws PulsarAdminException {
BacklogQuota.RetentionPolicy policy;
long limit;
BacklogQuota.BacklogQuotaType backlogQuotaType;

try {
Expand All @@ -999,35 +998,41 @@ void run() throws PulsarAdminException {
throw new ParameterException(String.format("Invalid retention policy type '%s'. Valid options are: %s",
policyStr, Arrays.toString(BacklogQuota.RetentionPolicy.values())));
}

limit = validateSizeString(limitStr);

try {
backlogQuotaType = BacklogQuota.BacklogQuotaType.valueOf(backlogQuotaTypeStr);
} catch (IllegalArgumentException e) {
throw new ParameterException(String.format("Invalid backlog quota type '%s'. Valid options are: %s",
backlogQuotaTypeStr, Arrays.toString(BacklogQuota.BacklogQuotaType.values())));
}
String persistentTopic = validatePersistentTopic(params);
BacklogQuota.Builder builder = BacklogQuota.builder().retentionPolicy(policy);

long limitTimeInSec = -1;
if (limitTimeStr != null) {
if (backlogQuotaType == BacklogQuota.BacklogQuotaType.destination_storage) {
// set quota by storage size
if (limitStr == null) {
throw new ParameterException("Quota type of 'destination_storage' needs a size limit");
}
long limit = validateSizeString(limitStr);
builder.limitSize((int) limit);
} else {
// set quota by time
if (limitTimeStr == null) {
throw new ParameterException("Quota type of 'message_age' needs a time limit");
}
long limitTimeInSec;
try {
limitTimeInSec = RelativeTimeUtil.parseRelativeTimeInSeconds(limitTimeStr);
} catch (IllegalArgumentException e) {
throw new ParameterException(e.getMessage());
}
if (limitTimeInSec > Integer.MAX_VALUE) {
throw new ParameterException(
String.format("Time limit cannot be greater than %d seconds", Integer.MAX_VALUE));
}
builder.limitTime((int) limitTimeInSec);
}
if (limitTimeInSec > Integer.MAX_VALUE) {
throw new ParameterException(
String.format("Time limit cannot be greater than %d seconds", Integer.MAX_VALUE));
}

String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).setBacklogQuota(persistentTopic,
BacklogQuota.builder().limitSize(limit)
.limitTime((int) limitTimeInSec)
.retentionPolicy(policy)
.build(),
builder.build(),
backlogQuotaType);
}
}
Expand Down