Skip to content

Commit

Permalink
Add command topic max unacked messages per consumer. (streamnative#246)
Browse files Browse the repository at this point in the history
  • Loading branch information
limingnihao committed Jun 3, 2021
1 parent ac89b35 commit 4734d1c
Show file tree
Hide file tree
Showing 6 changed files with 335 additions and 0 deletions.
78 changes: 78 additions & 0 deletions pkg/ctl/topic/get_max_unack_messages_per_consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package topic

import (
"github.com/streamnative/pulsarctl/pkg/cmdutils"
"github.com/streamnative/pulsarctl/pkg/pulsar/utils"
)

func GetMaxUnackMessagesPerConsumerCmd(vc *cmdutils.VerbCmd) {
desc := cmdutils.LongDescription{}
desc.CommandUsedFor = "Get max unacked messages per consumer for a topic"
desc.CommandPermission = "This command requires tenant admin permissions."

var examples []cmdutils.Example
msg := cmdutils.Example{
Desc: "Get max unacked messages per consumer for a topic",
Command: "pulsarctl topics get-max-unacked-messages-per-consumer topic",
}
examples = append(examples, msg)
desc.CommandExamples = examples

var out []cmdutils.Output
successOut := cmdutils.Output{
Desc: "normal output",
Out: "Get max unacked messages per consumer successfully for [topic]",
}
out = append(out, successOut, ArgError)
out = append(out, TopicNameErrors...)
out = append(out, NamespaceErrors...)
desc.CommandOutput = out

vc.SetDescription(
"get-max-unacked-messages-per-consumer",
"Get max unacked messages per consumer for a topic",
desc.ToString(),
desc.ExampleToString(),
"get-max-unacked-messages-per-consumer",
)

vc.SetRunFuncWithNameArg(func() error {
return doGetMaxUnackMessagesPerConsumer(vc)
}, "the topic name is not specified or the topic name is specified more than one")
}

func doGetMaxUnackMessagesPerConsumer(vc *cmdutils.VerbCmd) error {
// for testing
if vc.NameError != nil {
return vc.NameError
}

topic, err := utils.GetTopicName(vc.NameArg)
if err != nil {
return err
}

admin := cmdutils.NewPulsarClient()
value, err := admin.Topics().GetMaxUnackMessagesPerConsumer(*topic)
if err == nil {
vc.Command.Print(value)
}
return err
}
60 changes: 60 additions & 0 deletions pkg/ctl/topic/max_unack_messages_per_consumer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package topic

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestMaxUnackMessagesPerConsumer(t *testing.T) {
topicName := "persistent://public/default/test-max-unacked-messages-per-consumer-topic"
args := []string{"create", topicName, "1"}
_, execErr, _, _ := TestTopicCommands(CreateTopicCmd, args)
assert.Nil(t, execErr)

setArgs := []string{"set-max-unacked-messages-per-consumer", topicName, "-m", "20"}
setOut, execErr, _, _ := TestTopicCommands(SetMaxUnackMessagesPerConsumerCmd, setArgs)
assert.Nil(t, execErr)
assert.Equal(t, setOut.String(), "Set max unacked messages per consumer successfully for ["+topicName+"]\n")

time.Sleep(time.Duration(1) * time.Second)
getArgs := []string{"get-max-unacked-messages-per-consumer", topicName}
getOut, execErr, _, _ := TestTopicCommands(GetMaxUnackMessagesPerConsumerCmd, getArgs)
assert.Nil(t, execErr)
assert.Equal(t, getOut.String(), "20")

setArgs = []string{"remove-max-unacked-messages-per-consumer", topicName}
setOut, execErr, _, _ = TestTopicCommands(RemoveMaxUnackMessagesPerConsumerCmd, setArgs)
assert.Nil(t, execErr)
assert.Equal(t, setOut.String(), "Remove max unacked messages per consumer successfully for ["+topicName+"]\n")

time.Sleep(time.Duration(1) * time.Second)
getArgs = []string{"get-max-unacked-messages-per-consumer", topicName}
getOut, execErr, _, _ = TestTopicCommands(GetMaxUnackMessagesPerConsumerCmd, getArgs)
assert.Nil(t, execErr)
assert.Equal(t, getOut.String(), "0")

// test negative value for ttl arg
setArgs = []string{"set-max-unacked-messages-per-consumer", topicName, "-m", "-2"}
_, execErr, _, _ = TestTopicCommands(SetMaxUnackMessagesPerConsumerCmd, setArgs)
assert.NotNil(t, execErr)
assert.Equal(t, execErr.Error(), "code: 412 reason: maxUnackedNum must be 0 or more")
}
78 changes: 78 additions & 0 deletions pkg/ctl/topic/remove_max_unack_messages_per_consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package topic

import (
"github.com/streamnative/pulsarctl/pkg/cmdutils"
"github.com/streamnative/pulsarctl/pkg/pulsar/utils"
)

func RemoveMaxUnackMessagesPerConsumerCmd(vc *cmdutils.VerbCmd) {
desc := cmdutils.LongDescription{}
desc.CommandUsedFor = "Remove max unacked messages per consumer for a topic"
desc.CommandPermission = "This command requires tenant admin permissions."

var examples []cmdutils.Example
msg := cmdutils.Example{
Desc: "Remove max unacked messages per consumer for a topic",
Command: "pulsarctl topics remove-max-unacked-messages-per-consumer topic",
}
examples = append(examples, msg)
desc.CommandExamples = examples

var out []cmdutils.Output
successOut := cmdutils.Output{
Desc: "normal output",
Out: "Remove max unacked messages per consumer successfully for [topic]",
}
out = append(out, successOut, ArgError)
out = append(out, TopicNameErrors...)
out = append(out, NamespaceErrors...)
desc.CommandOutput = out

vc.SetDescription(
"remove-max-unacked-messages-per-consumer",
"Remove max unacked messages per consumer for a topic",
desc.ToString(),
desc.ExampleToString(),
"remove-max-unacked-messages-per-consumer",
)

vc.SetRunFuncWithNameArg(func() error {
return doRemoveMaxUnackMessagesPerConsumer(vc)
}, "the topic name is not specified or the topic name is specified more than one")
}

func doRemoveMaxUnackMessagesPerConsumer(vc *cmdutils.VerbCmd) error {
// for testing
if vc.NameError != nil {
return vc.NameError
}

topic, err := utils.GetTopicName(vc.NameArg)
if err != nil {
return err
}

admin := cmdutils.NewPulsarClient()
err = admin.Topics().RemoveMaxUnackMessagesPerConsumer(*topic)
if err == nil {
vc.Command.Printf("Remove max unacked messages per consumer successfully for [%s]\n", topic.String())
}
return err
}
88 changes: 88 additions & 0 deletions pkg/ctl/topic/set_max_unack_messages_per_consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package topic

import (
"github.com/spf13/pflag"
"github.com/streamnative/pulsarctl/pkg/cmdutils"
"github.com/streamnative/pulsarctl/pkg/pulsar/utils"
)

func SetMaxUnackMessagesPerConsumerCmd(vc *cmdutils.VerbCmd) {
desc := cmdutils.LongDescription{}
desc.CommandUsedFor = "Set max unacked messages per consumer for a topic"
desc.CommandPermission = "This command requires tenant admin permissions."

var examples []cmdutils.Example
msg := cmdutils.Example{
Desc: "Set max unacked messages per consumer for a topic",
Command: "pulsarctl topics set-max-unacked-messages-per-consumer topic -m 10",
}
examples = append(examples, msg)
desc.CommandExamples = examples

var out []cmdutils.Output
successOut := cmdutils.Output{
Desc: "normal output",
Out: "Set max unacked messages per consumer successfully for [topic]",
}
out = append(out, successOut, ArgError)
out = append(out, TopicNameErrors...)
out = append(out, NamespaceErrors...)
desc.CommandOutput = out

vc.SetDescription(
"set-max-unacked-messages-per-consumer",
"Set max unacked messages per consumer for a topic",
desc.ToString(),
desc.ExampleToString(),
"set-max-unacked-messages-per-consumer",
)
var maxUnackedNum int
vc.SetRunFuncWithNameArg(func() error {
return doSetMaxUnackMessagesPerConsumer(vc, maxUnackedNum)
}, "the topic name is not specified or the topic name is specified more than one")

vc.FlagSetGroup.InFlagSet("MaxUnackedMessagesPerConsumer", func(set *pflag.FlagSet) {
set.IntVarP(
&maxUnackedNum,
"maxNum",
"m",
0,
"Max unacked messages per consumer for a topic")
})
vc.EnableOutputFlagSet()
}

func doSetMaxUnackMessagesPerConsumer(vc *cmdutils.VerbCmd, maxUnackedNum int) error {
// for testing
if vc.NameError != nil {
return vc.NameError
}

topic, err := utils.GetTopicName(vc.NameArg)
if err != nil {
return err
}
admin := cmdutils.NewPulsarClient()
err = admin.Topics().SetMaxUnackMessagesPerConsumer(*topic, maxUnackedNum)
if err == nil {
vc.Command.Printf("Set max unacked messages per consumer successfully for [%s]\n", topic.String())
}
return err
}
3 changes: 3 additions & 0 deletions pkg/ctl/topic/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ func Command(flagGrouping *cmdutils.FlagGrouping) *cobra.Command {
GetMaxConsumersCmd,
SetMaxConsumersCmd,
RemoveMaxConsumersCmd,
GetMaxUnackMessagesPerConsumerCmd,
SetMaxUnackMessagesPerConsumerCmd,
RemoveMaxUnackMessagesPerConsumerCmd,
}

cmdutils.AddVerbCmds(flagGrouping, resourceCmd, commands...)
Expand Down
28 changes: 28 additions & 0 deletions pkg/pulsar/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,15 @@ type Topics interface {

// RemoveMaxConsumers Remove max number of consumers for a topic
RemoveMaxConsumers(utils.TopicName) error

// GetMaxUnackMessagesPerConsumer Get max unacked messages policy on consumer for a topic
GetMaxUnackMessagesPerConsumer(utils.TopicName) (int, error)

// SetMaxUnackMessagesPerConsumer Set max unacked messages policy on consumer for a topic
SetMaxUnackMessagesPerConsumer(utils.TopicName, int) error

// RemoveMaxUnackMessagesPerConsumer Remove max unacked messages policy on consumer for a topic
RemoveMaxUnackMessagesPerConsumer(utils.TopicName) error
}

type topics struct {
Expand Down Expand Up @@ -389,3 +398,22 @@ func (t *topics) RemoveMaxConsumers(topic utils.TopicName) error {
err := t.pulsar.Client.Delete(endpoint)
return err
}

func (t *topics) GetMaxUnackMessagesPerConsumer(topic utils.TopicName) (int, error) {
var maxNum int
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxUnackedMessagesOnConsumer")
err := t.pulsar.Client.Get(endpoint, &maxNum)
return maxNum, err
}

func (t *topics) SetMaxUnackMessagesPerConsumer(topic utils.TopicName, maxUnackedNum int) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxUnackedMessagesOnConsumer")
err := t.pulsar.Client.Post(endpoint, &maxUnackedNum)
return err
}

func (t *topics) RemoveMaxUnackMessagesPerConsumer(topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxUnackedMessagesOnConsumer")
err := t.pulsar.Client.Delete(endpoint)
return err
}

0 comments on commit 4734d1c

Please sign in to comment.