Skip to content

Commit

Permalink
Add command topic Delayed Delivery Policies. (#246) (#374)
Browse files Browse the repository at this point in the history
Add command topic Delayed Delivery Policies:

- pulsarctl topics get-delayed-delivery [topic]
- pulsarctl topics set-delayed-delivery [topic] -t 22s -e
- pulsarctl topics remove-delayed-delivery [topic]
  • Loading branch information
limingnihao authored Jun 18, 2021
1 parent 15d93d0 commit 01757cf
Show file tree
Hide file tree
Showing 7 changed files with 389 additions and 0 deletions.
73 changes: 73 additions & 0 deletions pkg/ctl/topic/delayed_delivery_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package topic

import (
"encoding/json"
"testing"
"time"

"github.com/streamnative/pulsarctl/pkg/pulsar/utils"
"github.com/stretchr/testify/assert"
)

func TestDelayedDelivery(t *testing.T) {
topicName := "persistent://public/default/test-delayed-delivery-topic"
args := []string{"create", topicName, "1"}
_, execErr, _, _ := TestTopicCommands(CreateTopicCmd, args)
assert.Nil(t, execErr)

args = []string{"set-delayed-delivery", topicName, "-t", "10s", "-e"}
out, execErr, _, _ := TestTopicCommands(SetDelayedDeliveryCmd, args)
assert.Nil(t, execErr)
assert.Equal(t, out.String(), "Set delayed delivery policy successfully for ["+topicName+"]\n")

time.Sleep(time.Duration(1) * time.Second)
args = []string{"get-delayed-delivery", topicName}
out, execErr, _, _ = TestTopicCommands(GetDelayedDeliveryCmd, args)
var delayedDeliveryData utils.DelayedDeliveryData
err := json.Unmarshal(out.Bytes(), &delayedDeliveryData)
if err != nil {
t.Fatal(err)
}
assert.Nil(t, execErr)
assert.Equal(t, delayedDeliveryData.Active, true)
assert.Equal(t, delayedDeliveryData.TickTime, float64(10))

args = []string{"remove-delayed-delivery", topicName}
out, execErr, _, _ = TestTopicCommands(RemoveDelayedDeliveryCmd, args)
assert.Nil(t, execErr)
assert.Equal(t, out.String(), "Remove delayed delivery policy successfully for ["+topicName+"]\n")

time.Sleep(time.Duration(1) * time.Second)
args = []string{"get-delayed-delivery", topicName}
out, execErr, _, _ = TestTopicCommands(GetDelayedDeliveryCmd, args)
err = json.Unmarshal(out.Bytes(), &delayedDeliveryData)
if err != nil {
t.Fatal(err)
}
assert.Nil(t, execErr)
assert.Equal(t, delayedDeliveryData.Active, false)
assert.Equal(t, delayedDeliveryData.TickTime, float64(0))

// test specify either --enable or --disable
args = []string{"set-delayed-delivery", topicName, "-t", "10s", "-e", "-d"}
_, execErr, _, _ = TestTopicCommands(SetDelayedDeliveryCmd, args)
assert.NotNil(t, execErr)
assert.Equal(t, execErr.Error(), "Need to specify either --enable or --disable")
}
81 changes: 81 additions & 0 deletions pkg/ctl/topic/get_delayed_delivery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package topic

import (
"github.com/streamnative/pulsarctl/pkg/cmdutils"
"github.com/streamnative/pulsarctl/pkg/pulsar/utils"
)

func GetDelayedDeliveryCmd(vc *cmdutils.VerbCmd) {
desc := cmdutils.LongDescription{}
desc.CommandUsedFor = "Get delayed delivery policy for a topic"
desc.CommandPermission = "This command requires tenant admin permissions."

var examples []cmdutils.Example
msg := cmdutils.Example{
Desc: "Get delayed delivery policy for a topic",
Command: "pulsarctl topics get-delayed-delivery topic",
}
examples = append(examples, msg)
desc.CommandExamples = examples

var out []cmdutils.Output
successOut := cmdutils.Output{
Desc: "normal output",
Out: "Get delayed delivery policy successfully for [topic]",
}
out = append(out, successOut, ArgError)
out = append(out, TopicNameErrors...)
out = append(out, NamespaceErrors...)
desc.CommandOutput = out

vc.SetDescription(
"get-delayed-delivery",
"Get delayed delivery policy for a topic",
desc.ToString(),
desc.ExampleToString(),
"get-delayed-delivery",
)

vc.SetRunFuncWithNameArg(func() error {
return doGetDelayedDelivery(vc)
}, "the topic name is not specified or the topic name is specified more than one")

vc.EnableOutputFlagSet()
}

func doGetDelayedDelivery(vc *cmdutils.VerbCmd) error {
// for testing
if vc.NameError != nil {
return vc.NameError
}

topic, err := utils.GetTopicName(vc.NameArg)
if err != nil {
return err
}

admin := cmdutils.NewPulsarClient()
delayedDeliveryData, err := admin.Topics().GetDelayedDelivery(*topic)
if err == nil {
oc := cmdutils.NewOutputContent().WithObject(delayedDeliveryData)
err = vc.OutputConfig.WriteOutput(vc.Command.OutOrStdout(), oc)
}
return err
}
78 changes: 78 additions & 0 deletions pkg/ctl/topic/remove_delayed_delivery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package topic

import (
"github.com/streamnative/pulsarctl/pkg/cmdutils"
"github.com/streamnative/pulsarctl/pkg/pulsar/utils"
)

func RemoveDelayedDeliveryCmd(vc *cmdutils.VerbCmd) {
desc := cmdutils.LongDescription{}
desc.CommandUsedFor = "Remove delayed delivery policy for a topic"
desc.CommandPermission = "This command requires tenant admin permissions."

var examples []cmdutils.Example
msg := cmdutils.Example{
Desc: "Remove delayed delivery policy for a topic",
Command: "pulsarctl topics remove-delayed-delivery topic",
}
examples = append(examples, msg)
desc.CommandExamples = examples

var out []cmdutils.Output
successOut := cmdutils.Output{
Desc: "normal output",
Out: "Remove delayed delivery policy successfully for [topic]",
}
out = append(out, successOut, ArgError)
out = append(out, TopicNameErrors...)
out = append(out, NamespaceErrors...)
desc.CommandOutput = out

vc.SetDescription(
"remove-delayed-delivery",
"Remove delayed delivery policy for a topic",
desc.ToString(),
desc.ExampleToString(),
"remove-delayed-delivery",
)

vc.SetRunFuncWithNameArg(func() error {
return doRemoveDelayedDelivery(vc)
}, "the topic name is not specified or the topic name is specified more than one")
}

func doRemoveDelayedDelivery(vc *cmdutils.VerbCmd) error {
// for testing
if vc.NameError != nil {
return vc.NameError
}

topic, err := utils.GetTopicName(vc.NameArg)
if err != nil {
return err
}

admin := cmdutils.NewPulsarClient()
err = admin.Topics().RemoveDelayedDelivery(*topic)
if err == nil {
vc.Command.Printf("Remove delayed delivery policy successfully for [%s]\n", topic.String())
}
return err
}
117 changes: 117 additions & 0 deletions pkg/ctl/topic/set_delayed_delivery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package topic

import (
"github.com/pkg/errors"
"github.com/spf13/pflag"
"github.com/streamnative/pulsarctl/pkg/cmdutils"
ctlUtil "github.com/streamnative/pulsarctl/pkg/ctl/utils"
"github.com/streamnative/pulsarctl/pkg/pulsar/utils"
)

func SetDelayedDeliveryCmd(vc *cmdutils.VerbCmd) {
desc := cmdutils.LongDescription{}
desc.CommandUsedFor = "Set delayed delivery policy for a topic"
desc.CommandPermission = "This command requires tenant admin permissions."

var examples []cmdutils.Example
msg := cmdutils.Example{
Desc: "Set delayed delivery policy for a topic",
Command: "pulsarctl topics set-delayed-delivery topic -t 10s -e",
}
examples = append(examples, msg)
desc.CommandExamples = examples

var out []cmdutils.Output
successOut := cmdutils.Output{
Desc: "normal output",
Out: "Set delayed delivery policy successfully for [topic]",
}
out = append(out, successOut, ArgError)
out = append(out, TopicNameErrors...)
out = append(out, NamespaceErrors...)
desc.CommandOutput = out

vc.SetDescription(
"set-delayed-delivery",
"Set delayed delivery policy for a topic",
desc.ToString(),
desc.ExampleToString(),
"set-delayed-delivery",
)
delayedDeliveryData := &utils.DelayedDeliveryCmdData{}
vc.SetRunFuncWithNameArg(func() error {
return doSetDelayedDelivery(vc, delayedDeliveryData)
}, "the topic name is not specified or the topic name is specified more than one")

vc.FlagSetGroup.InFlagSet("Persistence", func(set *pflag.FlagSet) {
set.BoolVarP(
&delayedDeliveryData.Enable,
"enable",
"e",
false,
"Enable delayed delivery messages")
set.BoolVarP(
&delayedDeliveryData.Disable,
"disable",
"d",
false,
"Disable delayed delivery messages")
set.StringVarP(
&delayedDeliveryData.DelayedDeliveryTimeStr,
"time",
"t",
"1s",
"The tick time for when retrying on delayed delivery messages, affecting the"+
" accuracy of the delivery time compared to the scheduled time. (eg: 1s, 10s, 1m, 5h, 3d)")
})
vc.EnableOutputFlagSet()
}

func doSetDelayedDelivery(vc *cmdutils.VerbCmd, delayedDeliveryCmdData *utils.DelayedDeliveryCmdData) error {
// for testing
if vc.NameError != nil {
return vc.NameError
}

topic, err := utils.GetTopicName(vc.NameArg)
if err != nil {
return err
}
admin := cmdutils.NewPulsarClient()
delayedDeliveryData := &utils.DelayedDeliveryData{}
if delayedDeliveryCmdData.Enable == delayedDeliveryCmdData.Disable {
msg := "Need to specify either --enable or --disable"
vc.Command.Printf(msg)
return errors.Errorf(msg)
}
if delayedDeliveryCmdData.Enable {
tickTimeInSecond, err := ctlUtil.ParseRelativeTimeInSeconds(delayedDeliveryCmdData.DelayedDeliveryTimeStr)
if err != nil {
return err
}
delayedDeliveryData.TickTime = tickTimeInSecond.Seconds()
delayedDeliveryData.Active = true
}
err = admin.Topics().SetDelayedDelivery(*topic, *delayedDeliveryData)
if err == nil {
vc.Command.Printf("Set delayed delivery policy successfully for [%s]\n", topic.String())
}
return err
}
3 changes: 3 additions & 0 deletions pkg/ctl/topic/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ func Command(flagGrouping *cmdutils.FlagGrouping) *cobra.Command {
GetPersistenceCmd,
SetPersistenceCmd,
RemovePersistenceCmd,
GetDelayedDeliveryCmd,
SetDelayedDeliveryCmd,
RemoveDelayedDeliveryCmd,
}

cmdutils.AddVerbCmds(flagGrouping, resourceCmd, commands...)
Expand Down
26 changes: 26 additions & 0 deletions pkg/pulsar/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,15 @@ type Topics interface {

// RemovePersistence Remove the persistence policies for a topic
RemovePersistence(utils.TopicName) error

// GetDelayedDelivery Get the delayed delivery policy for a topic
GetDelayedDelivery(utils.TopicName) (*utils.DelayedDeliveryData, error)

// SetDelayedDelivery Set the delayed delivery policy on a topic
SetDelayedDelivery(utils.TopicName, utils.DelayedDeliveryData) error

// RemoveDelayedDelivery Remove the delayed delivery policy on a topic
RemoveDelayedDelivery(utils.TopicName) error
}

type topics struct {
Expand Down Expand Up @@ -467,3 +476,20 @@ func (t *topics) RemovePersistence(topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "persistence")
return t.pulsar.Client.Delete(endpoint)
}

func (t *topics) GetDelayedDelivery(topic utils.TopicName) (*utils.DelayedDeliveryData, error) {
var delayedDeliveryData utils.DelayedDeliveryData
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "delayedDelivery")
err := t.pulsar.Client.Get(endpoint, &delayedDeliveryData)
return &delayedDeliveryData, err
}

func (t *topics) SetDelayedDelivery(topic utils.TopicName, delayedDeliveryData utils.DelayedDeliveryData) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "delayedDelivery")
return t.pulsar.Client.Post(endpoint, &delayedDeliveryData)
}

func (t *topics) RemoveDelayedDelivery(topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "delayedDelivery")
return t.pulsar.Client.Delete(endpoint)
}
Loading

0 comments on commit 01757cf

Please sign in to comment.