From 8690a147727925f35564969b013f44c7c481d15c Mon Sep 17 00:00:00 2001 From: limingnihao Date: Wed, 2 Jun 2021 19:03:44 +0800 Subject: [PATCH] Add command topic max consumers. (streamnative/pulsarctl#246) (#355) Add command topic max number of consumers: * pulsarctl topics get-max-consumers [topic] * pulsarctl topics set-max-consumers [topic] -c [max] * pulsarctl topics remove-max-consumers [topic] --- pulsaradmin/pkg/pulsar/topic.go | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/pulsaradmin/pkg/pulsar/topic.go b/pulsaradmin/pkg/pulsar/topic.go index 152821cbf4..ac5ca3dabc 100644 --- a/pulsaradmin/pkg/pulsar/topic.go +++ b/pulsaradmin/pkg/pulsar/topic.go @@ -116,6 +116,15 @@ type Topics interface { // RemoveMaxProducers Remove max number of producers for a topic RemoveMaxProducers(utils.TopicName) error + + // GetMaxConsumers Get max number of consumers for a topic + GetMaxConsumers(utils.TopicName) (int, error) + + // SetMaxConsumers Set max number of consumers for a topic + SetMaxConsumers(utils.TopicName, int) error + + // RemoveMaxConsumers Remove max number of consumers for a topic + RemoveMaxConsumers(utils.TopicName) error } type topics struct { @@ -361,3 +370,22 @@ func (t *topics) RemoveMaxProducers(topic utils.TopicName) error { err := t.pulsar.Client.Delete(endpoint) return err } + +func (t *topics) GetMaxConsumers(topic utils.TopicName) (int, error) { + var maxConsumers int + endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxConsumers") + err := t.pulsar.Client.Get(endpoint, &maxConsumers) + return maxConsumers, err +} + +func (t *topics) SetMaxConsumers(topic utils.TopicName, maxConsumers int) error { + endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxConsumers") + err := t.pulsar.Client.Post(endpoint, &maxConsumers) + return err +} + +func (t *topics) RemoveMaxConsumers(topic utils.TopicName) error { + endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxConsumers") + err := t.pulsar.Client.Delete(endpoint) + return err +}