Skip to content

Commit

Permalink
Add command topic Delayed Delivery Policies. (streamnative/pulsarctl#246
Browse files Browse the repository at this point in the history
) (#374)

Add command topic Delayed Delivery Policies:

- pulsarctl topics get-delayed-delivery [topic]
- pulsarctl topics set-delayed-delivery [topic] -t 22s -e
- pulsarctl topics remove-delayed-delivery [topic]
  • Loading branch information
limingnihao authored and tisonkun committed Aug 16, 2023
1 parent 93c0e45 commit 6f56245
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 0 deletions.
26 changes: 26 additions & 0 deletions pulsaradmin/pkg/pulsar/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,15 @@ type Topics interface {

// RemovePersistence Remove the persistence policies for a topic
RemovePersistence(utils.TopicName) error

// GetDelayedDelivery Get the delayed delivery policy for a topic
GetDelayedDelivery(utils.TopicName) (*utils.DelayedDeliveryData, error)

// SetDelayedDelivery Set the delayed delivery policy on a topic
SetDelayedDelivery(utils.TopicName, utils.DelayedDeliveryData) error

// RemoveDelayedDelivery Remove the delayed delivery policy on a topic
RemoveDelayedDelivery(utils.TopicName) error
}

type topics struct {
Expand Down Expand Up @@ -467,3 +476,20 @@ func (t *topics) RemovePersistence(topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "persistence")
return t.pulsar.Client.Delete(endpoint)
}

func (t *topics) GetDelayedDelivery(topic utils.TopicName) (*utils.DelayedDeliveryData, error) {
var delayedDeliveryData utils.DelayedDeliveryData
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "delayedDelivery")
err := t.pulsar.Client.Get(endpoint, &delayedDeliveryData)
return &delayedDeliveryData, err
}

func (t *topics) SetDelayedDelivery(topic utils.TopicName, delayedDeliveryData utils.DelayedDeliveryData) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "delayedDelivery")
return t.pulsar.Client.Post(endpoint, &delayedDeliveryData)
}

func (t *topics) RemoveDelayedDelivery(topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "delayedDelivery")
return t.pulsar.Client.Delete(endpoint)
}
11 changes: 11 additions & 0 deletions pulsaradmin/pkg/pulsar/utils/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,3 +370,14 @@ type PersistenceData struct {
BookkeeperAckQuorum int64 `json:"bookkeeperAckQuorum"`
ManagedLedgerMaxMarkDeleteRate float64 `json:"managedLedgerMaxMarkDeleteRate"`
}

type DelayedDeliveryCmdData struct {
Enable bool `json:"enable"`
Disable bool `json:"disable"`
DelayedDeliveryTimeStr string `json:"delayedDeliveryTimeStr"`
}

type DelayedDeliveryData struct {
TickTime float64 `json:"tickTime"`
Active bool `json:"active"`
}

0 comments on commit 6f56245

Please sign in to comment.