Skip to content

Commit

Permalink
feat: add backlog quota command for topic (streamnative/pulsarctl#429)
Browse files Browse the repository at this point in the history
Signed-off-by: Zixuan Liu <[email protected]>

### Changes

background from streamnative/pulsarctl#246,  the PR implements the following commands:

- `pulsarctl topics get-backlog-quotas <topic> -a` - Get the backlog quota policy for a topic
- `pulsarctl topics remove-backlog-quota <topic> --type <producer_request_hold|message_age>` - Remove a backlog quota policy from a topic
- `pulsarctl topics set-backlog-quota <topic> --limit-size <string> --limit-time <int> --policy <producer_request_hold|producer_exception|consumer_backlog_eviction> --type <producer_request_hold|message_age>` - Set a backlog quota policy for a topic

### TODO
- [x] Add integration tests
  • Loading branch information
nodece authored and tisonkun committed Aug 16, 2023
1 parent 87aacec commit 9485412
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 4 deletions.
43 changes: 43 additions & 0 deletions pulsaradmin/pkg/pulsar/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package pulsar

import (
"fmt"
"net/url"
"strconv"

"github.com/streamnative/pulsar-admin-go/pkg/pulsar/common"
Expand Down Expand Up @@ -206,6 +207,15 @@ type Topics interface {

// Remove compaction threshold for a topic
RemoveCompactionThreshold(utils.TopicName) error

// GetBacklogQuotaMap returns backlog quota map for a topic
GetBacklogQuotaMap(topic utils.TopicName, applied bool) (map[utils.BacklogQuotaType]utils.BacklogQuota, error)

// SetBacklogQuota sets a backlog quota for a topic
SetBacklogQuota(utils.TopicName, utils.BacklogQuota, utils.BacklogQuotaType) error

// RemoveBacklogQuota removes a backlog quota policy from a topic
RemoveBacklogQuota(utils.TopicName, utils.BacklogQuotaType) error
}

type topics struct {
Expand Down Expand Up @@ -630,3 +640,36 @@ func (t *topics) RemoveCompactionThreshold(topic utils.TopicName) error {
err := t.pulsar.Client.Delete(endpoint)
return err
}

func (t *topics) GetBacklogQuotaMap(topic utils.TopicName, applied bool) (map[utils.BacklogQuotaType]utils.BacklogQuota,
error) {
var backlogQuotaMap map[utils.BacklogQuotaType]utils.BacklogQuota
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "backlogQuotaMap")

queryParams := map[string]string{"applied": strconv.FormatBool(applied)}
_, err := t.pulsar.Client.GetWithQueryParams(endpoint, &backlogQuotaMap, queryParams, true)

return backlogQuotaMap, err
}

func (t *topics) SetBacklogQuota(topic utils.TopicName, backlogQuota utils.BacklogQuota,
backlogQuotaType utils.BacklogQuotaType) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "backlogQuota")

u, err := url.Parse(endpoint)
if err != nil {
return err
}
q := u.Query()
q.Add("backlogQuotaType", string(backlogQuotaType))
u.RawQuery = q.Encode()

return t.pulsar.Client.Post(u.String(), &backlogQuota)
}

func (t *topics) RemoveBacklogQuota(topic utils.TopicName, backlogQuotaType utils.BacklogQuotaType) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "backlogQuota")
return t.pulsar.Client.DeleteWithQueryParams(endpoint, map[string]string{
"backlogQuotaType": string(backlogQuotaType),
})
}
28 changes: 24 additions & 4 deletions pulsaradmin/pkg/pulsar/utils/backlog_quota.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,6 @@ func NewBacklogQuota(limitSize int64, limitTime int64, policy RetentionPolicy) B

type RetentionPolicy string

type BacklogQuotaType string

const DestinationStorage BacklogQuotaType = "destination_storage"

const (
ProducerRequestHold RetentionPolicy = "producer_request_hold"
ProducerException RetentionPolicy = "producer_exception"
Expand All @@ -47,6 +43,8 @@ const (

func ParseRetentionPolicy(str string) (RetentionPolicy, error) {
switch str {
case ProducerRequestHold.String():
return ProducerRequestHold, nil
case ProducerException.String():
return ProducerException, nil
case ConsumerBacklogEviction.String():
Expand All @@ -59,3 +57,25 @@ func ParseRetentionPolicy(str string) (RetentionPolicy, error) {
func (s RetentionPolicy) String() string {
return string(s)
}

type BacklogQuotaType string

const (
DestinationStorage BacklogQuotaType = "destination_storage"
MessageAge BacklogQuotaType = "message_age"
)

func ParseBacklogQuotaType(str string) (BacklogQuotaType, error) {
switch str {
case DestinationStorage.String():
return DestinationStorage, nil
case MessageAge.String():
return MessageAge, nil
default:
return "", errors.Errorf("Invalid backlog quota type: %s", str)
}
}

func (b BacklogQuotaType) String() string {
return string(b)
}

0 comments on commit 9485412

Please sign in to comment.