Skip to content

Commit

Permalink
Merge branch 'master' into max-producers
Browse files Browse the repository at this point in the history
* master:
  Add command topic message ttl. (streamnative#246) (streamnative#348)
  • Loading branch information
limingnihao committed Jun 2, 2021
1 parent db194d9 commit 5468603
Show file tree
Hide file tree
Showing 9 changed files with 366 additions and 3 deletions.
22 changes: 22 additions & 0 deletions pkg/cli/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,28 @@ func (c *Client) PostWithMultiPart(endpoint string, in interface{}, body io.Read
return nil
}

func (c *Client) PostWithQueryParams(endpoint string, params map[string]string) error {
req, err := c.newRequest(http.MethodPost, endpoint)
if err != nil {
return err
}
if params != nil {
query := req.url.Query()
for k, v := range params {
query.Add(k, v)
}
req.params = query
}
// nolint
resp, err := checkSuccessful(c.doRequest(req))
if err != nil {
return err
}
defer safeRespClose(resp)

return nil
}

type request struct {
method string
contentType string
Expand Down
78 changes: 78 additions & 0 deletions pkg/ctl/topic/get_message_ttl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package topic

import (
"github.com/streamnative/pulsarctl/pkg/cmdutils"
"github.com/streamnative/pulsarctl/pkg/pulsar/utils"
)

func GetMessageTTLCmd(vc *cmdutils.VerbCmd) {
desc := cmdutils.LongDescription{}
desc.CommandUsedFor = "Get message TTL settings of a topic"
desc.CommandPermission = "This command requires tenant admin permissions."

var examples []cmdutils.Example
setMsgTTL := cmdutils.Example{
Desc: "Get message TTL settings of a topic",
Command: "pulsarctl topics get-message-ttl topic",
}
examples = append(examples, setMsgTTL)
desc.CommandExamples = examples

var out []cmdutils.Output
successOut := cmdutils.Output{
Desc: "normal output",
Out: "(ttl-value)",
}
out = append(out, successOut, ArgError)
out = append(out, TopicNameErrors...)
out = append(out, NamespaceErrors...)
desc.CommandOutput = out

vc.SetDescription(
"get-message-ttl",
"Get message TTL settings of a topic",
desc.ToString(),
desc.ExampleToString(),
"get-message-ttl",
)

vc.SetRunFuncWithNameArg(func() error {
return doGetMessageTTL(vc)
}, "the topic name is not specified or the topic name is specified more than one")
}

func doGetMessageTTL(vc *cmdutils.VerbCmd) error {
// for testing
if vc.NameError != nil {
return vc.NameError
}

topic, err := utils.GetTopicName(vc.NameArg)
if err != nil {
return err
}

admin := cmdutils.NewPulsarClient()
ttl, err := admin.Topics().GetMessageTTL(*topic)
if err == nil {
vc.Command.Print(ttl)
}
return err
}
60 changes: 60 additions & 0 deletions pkg/ctl/topic/message_ttl_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package topic

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestMessageTTL(t *testing.T) {
topicName := "persistent://public/default/test-message-ttl-topic"
args := []string{"create", topicName, "1"}
_, execErr, _, _ := TestTopicCommands(CreateTopicCmd, args)
assert.Nil(t, execErr)

setTTLArgs := []string{"set-message-ttl", topicName, "-t", "20"}
setOut, execErr, _, _ := TestTopicCommands(SetMessageTTLCmd, setTTLArgs)
assert.Nil(t, execErr)
assert.Equal(t, setOut.String(), "Set message TTL successfully for ["+topicName+"]\n")

time.Sleep(time.Duration(1) * time.Second)
getTTLArgs := []string{"get-message-ttl", topicName}
getOut, execErr, _, _ := TestTopicCommands(GetMessageTTLCmd, getTTLArgs)
assert.Nil(t, execErr)
assert.Equal(t, getOut.String(), "20")

setTTLArgs = []string{"remove-message-ttl", topicName}
setOut, execErr, _, _ = TestTopicCommands(RemoveMessageTTLCmd, setTTLArgs)
assert.Nil(t, execErr)
assert.Equal(t, setOut.String(), "Remove message TTL successfully for ["+topicName+"]\n")

time.Sleep(time.Duration(1) * time.Second)
getTTLArgs = []string{"get-message-ttl", topicName}
getOut, execErr, _, _ = TestTopicCommands(GetMessageTTLCmd, getTTLArgs)
assert.Nil(t, execErr)
assert.Equal(t, getOut.String(), "0")

// test negative value for ttl arg
setTTLArgs = []string{"set-message-ttl", topicName, "-t", "-2"}
_, execErr, _, _ = TestTopicCommands(SetMessageTTLCmd, setTTLArgs)
assert.NotNil(t, execErr)
assert.Equal(t, execErr.Error(), "code: 412 reason: Invalid value for message TTL")
}
76 changes: 76 additions & 0 deletions pkg/ctl/topic/remove_message_ttl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package topic

import (
"github.com/streamnative/pulsarctl/pkg/cmdutils"
"github.com/streamnative/pulsarctl/pkg/pulsar/utils"
)

func RemoveMessageTTLCmd(vc *cmdutils.VerbCmd) {
desc := cmdutils.LongDescription{}
desc.CommandUsedFor = "Remove Message TTL for a topic"
desc.CommandPermission = "This command requires tenant admin permissions."

var examples []cmdutils.Example
removeMsgTTL := cmdutils.Example{
Desc: "Remove Message TTL for a topic",
Command: "pulsarctl topics remove-message-ttl topic",
}
examples = append(examples, removeMsgTTL)
desc.CommandExamples = examples

var out []cmdutils.Output
successOut := cmdutils.Output{
Desc: "normal output",
Out: "Remove message TTL successfully for [topic]",
}
out = append(out, successOut, ArgError)
out = append(out, TopicNameErrors...)
out = append(out, NamespaceErrors...)
desc.CommandOutput = out

vc.SetDescription(
"remove-message-ttl",
"Remove Message TTL for a topic",
desc.ToString(),
desc.ExampleToString(),
"remove-message-ttl",
)
vc.SetRunFuncWithNameArg(func() error {
return doRemoveMessageTTL(vc)
}, "the topic name is not specified or the topic name is specified more than one")
}

func doRemoveMessageTTL(vc *cmdutils.VerbCmd) error {
// for testing
if vc.NameError != nil {
return vc.NameError
}

topic, err := utils.GetTopicName(vc.NameArg)
if err != nil {
return err
}
admin := cmdutils.NewPulsarClient()
err = admin.Topics().RemoveMessageTTL(*topic)
if err == nil {
vc.Command.Printf("Remove message TTL successfully for [%s]\n", topic.String())
}
return err
}
88 changes: 88 additions & 0 deletions pkg/ctl/topic/set_message_ttl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package topic

import (
"github.com/spf13/pflag"
"github.com/streamnative/pulsarctl/pkg/cmdutils"
"github.com/streamnative/pulsarctl/pkg/pulsar/utils"
)

func SetMessageTTLCmd(vc *cmdutils.VerbCmd) {
desc := cmdutils.LongDescription{}
desc.CommandUsedFor = "Set Message TTL for a topic"
desc.CommandPermission = "This command requires tenant admin permissions."

var examples []cmdutils.Example
setMsgTTL := cmdutils.Example{
Desc: "Set Message TTL for a topic",
Command: "pulsarctl topics set-message-ttl topic -t 10",
}
examples = append(examples, setMsgTTL)
desc.CommandExamples = examples

var out []cmdutils.Output
successOut := cmdutils.Output{
Desc: "normal output",
Out: "Set message TTL successfully for [topic]",
}
out = append(out, successOut, ArgError)
out = append(out, TopicNameErrors...)
out = append(out, NamespaceErrors...)
desc.CommandOutput = out

vc.SetDescription(
"set-message-ttl",
"Set Message TTL for a topic",
desc.ToString(),
desc.ExampleToString(),
"set-message-ttl",
)
var messageTTL int
vc.SetRunFuncWithNameArg(func() error {
return doSetMessageTTL(vc, messageTTL)
}, "the topic name is not specified or the topic name is specified more than one")

vc.FlagSetGroup.InFlagSet("MessageTTL", func(set *pflag.FlagSet) {
set.IntVarP(
&messageTTL,
"ttl",
"t",
0,
"Message TTL in seconds")
})
vc.EnableOutputFlagSet()
}

func doSetMessageTTL(vc *cmdutils.VerbCmd, messageTTL int) error {
// for testing
if vc.NameError != nil {
return vc.NameError
}

topic, err := utils.GetTopicName(vc.NameArg)
if err != nil {
return err
}
admin := cmdutils.NewPulsarClient()
err = admin.Topics().SetMessageTTL(*topic, messageTTL)
if err == nil {
vc.Command.Printf("Set message TTL successfully for [%s]\n", topic.String())
}
return err
}
3 changes: 3 additions & 0 deletions pkg/ctl/topic/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ func Command(flagGrouping *cmdutils.FlagGrouping) *cobra.Command {
GetStatsCmd,
GetInternalStatsCmd,
GetInternalInfoCmd,
GetMessageTTLCmd,
SetMessageTTLCmd,
RemoveMessageTTLCmd,
GetMaxProducersCmd,
SetMaxProducersCmd,
RemoveMaxProducersCmd,
Expand Down
Loading

0 comments on commit 5468603

Please sign in to comment.