Skip to content

Commit

Permalink
[ISSUE apache#933]fix consumer option validate
Browse files Browse the repository at this point in the history
  • Loading branch information
wuxb02 committed Oct 11, 2022
1 parent 92515f7 commit 2a62fd9
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 26 deletions.
32 changes: 26 additions & 6 deletions consumer/pull_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,18 @@ func (pc *defaultPullConsumer) Unsubscribe(topic string) error {
}

func (pc *defaultPullConsumer) Start() error {
atomic.StoreInt32(&pc.state, int32(internal.StateRunning))

var err error
pc.once.Do(func() {
consumerGroupWithNs := utils.WrapNamespace(pc.option.Namespace, pc.consumerGroup)
err = pc.defaultConsumer.client.RegisterConsumer(consumerGroupWithNs, pc)
err = pc.validate()
if err != nil {
rlog.Error("the consumer group option validate fail", map[string]interface{}{
rlog.LogKeyConsumerGroup: pc.consumerGroup,
rlog.LogKeyUnderlayError: err.Error(),
})
err = errors.Wrap(err, "the consumer group option validate fail")
return
}
err = pc.defaultConsumer.client.RegisterConsumer(pc.consumerGroup, pc)
if err != nil {
rlog.Error("defaultPullConsumer the consumer group has been created, specify another one", map[string]interface{}{
rlog.LogKeyConsumerGroup: pc.consumerGroup,
Expand All @@ -157,7 +163,7 @@ func (pc *defaultPullConsumer) Start() error {
if err != nil {
return
}

atomic.StoreInt32(&pc.state, int32(internal.StateRunning))
go func() {
for {
select {
Expand All @@ -174,7 +180,9 @@ func (pc *defaultPullConsumer) Start() error {
}
}()
})

if err != nil {
return err
}
pc.client.UpdateTopicRouteInfo()
_, exist := pc.topicSubscribeInfoTable.Load(pc.topic)
if !exist {
Expand Down Expand Up @@ -815,3 +823,15 @@ func (pc *defaultPullConsumer) consumeMessageCurrently(pq *processQueue, mq *pri
case pc.consumeRequestCache <- cr:
}
}

func (pc *defaultPullConsumer) validate() error {
if err := internal.ValidateGroup(pc.consumerGroup); err != nil {
return err
}

if pc.consumerGroup == internal.DefaultConsumerGroup {
return fmt.Errorf("consumerGroup can't equal [%s], please specify another one", internal.DefaultConsumerGroup)
}

return nil
}
45 changes: 32 additions & 13 deletions consumer/push_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,15 @@ func (pc *pushConsumer) Start() error {
"unitMode": pc.unitMode,
})
atomic.StoreInt32(&pc.state, int32(internal.StateStartFailed))
pc.validate()
err = pc.validate()
if err != nil {
rlog.Error("the consumer group option validate fail", map[string]interface{}{
rlog.LogKeyConsumerGroup: pc.consumerGroup,
rlog.LogKeyUnderlayError: err.Error(),
})
err = errors.Wrap(err, "the consumer group option validate fail")
return
}

err = pc.client.RegisterConsumer(pc.consumerGroup, pc)
if err != nil {
Expand Down Expand Up @@ -496,77 +504,88 @@ func (pc *pushConsumer) messageQueueChanged(topic string, mqAll, mqDivided []*pr
pc.client.SendHeartbeatToAllBrokerWithLock()
}

func (pc *pushConsumer) validate() {
internal.ValidateGroup(pc.consumerGroup)
func (pc *pushConsumer) validate() error {
if err := internal.ValidateGroup(pc.consumerGroup); err != nil {
return err
}

if pc.consumerGroup == internal.DefaultConsumerGroup {
// TODO FQA
rlog.Error(fmt.Sprintf("consumerGroup can't equal [%s], please specify another one.", internal.DefaultConsumerGroup), nil)
return fmt.Errorf("consumerGroup can't equal [%s], please specify another one", internal.DefaultConsumerGroup)
}

if len(pc.subscribedTopic) == 0 {
rlog.Error("number of subscribed topics is 0.", nil)
return errors.New("number of subscribed topics is 0.")
}

if pc.option.ConsumeConcurrentlyMaxSpan < 1 || pc.option.ConsumeConcurrentlyMaxSpan > 65535 {
if pc.option.ConsumeConcurrentlyMaxSpan == 0 {
pc.option.ConsumeConcurrentlyMaxSpan = 1000
} else {
rlog.Error("option.ConsumeConcurrentlyMaxSpan out of range [1, 65535]", nil)
return errors.New("option.ConsumeConcurrentlyMaxSpan out of range [1, 65535]")
}
}

if pc.option.PullThresholdForQueue < 1 || pc.option.PullThresholdForQueue > 65535 {
if pc.option.PullThresholdForQueue == 0 {
pc.option.PullThresholdForQueue = 1024
} else {
rlog.Error("option.PullThresholdForQueue out of range [1, 65535]", nil)
return errors.New("option.PullThresholdForQueue out of range [1, 65535]")
}
}

if pc.option.PullThresholdForTopic < 1 || pc.option.PullThresholdForTopic > 6553500 {
if pc.option.PullThresholdForTopic == 0 {
pc.option.PullThresholdForTopic = 102400
} else {
rlog.Error("option.PullThresholdForTopic out of range [1, 6553500]", nil)
return errors.New("option.PullThresholdForTopic out of range [1, 6553500]")
}
}

if pc.option.PullThresholdSizeForQueue < 1 || pc.option.PullThresholdSizeForQueue > 1024 {
if pc.option.PullThresholdSizeForQueue == 0 {
pc.option.PullThresholdSizeForQueue = 512
} else {
rlog.Error("option.PullThresholdSizeForQueue out of range [1, 1024]", nil)
return errors.New("option.PullThresholdSizeForQueue out of range [1, 1024]")
}
}

if pc.option.PullThresholdSizeForTopic < 1 || pc.option.PullThresholdSizeForTopic > 102400 {
if pc.option.PullThresholdSizeForTopic == 0 {
pc.option.PullThresholdSizeForTopic = 51200
} else {
rlog.Error("option.PullThresholdSizeForTopic out of range [1, 102400]", nil)
return errors.New("option.PullThresholdSizeForTopic out of range [1, 102400]")
}
}

if pc.option.PullInterval < 0 || pc.option.PullInterval > 65535*time.Millisecond {
rlog.Error("option.PullInterval out of range [0, 65535]", nil)
return errors.New("option.PullInterval out of range [0, 65535]")
}

if pc.option.ConsumeMessageBatchMaxSize < 1 || pc.option.ConsumeMessageBatchMaxSize > 1024 {
if pc.option.ConsumeMessageBatchMaxSize == 0 {
pc.option.ConsumeMessageBatchMaxSize = 1
} else {
rlog.Error("option.ConsumeMessageBatchMaxSize out of range [1, 1024]", nil)
return errors.New("option.ConsumeMessageBatchMaxSize out of range [1, 1024]")
}
}

if pc.option.PullBatchSize < 1 || pc.option.PullBatchSize > 1024 {
if pc.option.PullBatchSize == 0 {
pc.option.PullBatchSize = 32
} else {
rlog.Error("option.PullBatchSize out of range [1, 1024]", nil)
return errors.New("option.PullBatchSize out of range [1, 1024]")
}
}

if pc.option.ConsumeGoroutineNums < 1 || pc.option.ConsumeGoroutineNums > 1000 {
if pc.option.ConsumeGoroutineNums == 0 {
pc.option.ConsumeGoroutineNums = 20
} else {
return errors.New("option.ConsumeGoroutineNums out of range [1, 1000]")
}
}
return nil
}

func (pc *pushConsumer) pullMessage(request *PullRequest) {
Expand Down
17 changes: 10 additions & 7 deletions internal/validators.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ limitations under the License.
package internal

import (
"errors"
"fmt"
"regexp"

"github.com/apache/rocketmq-client-go/v2/rlog"
)

const (
Expand All @@ -29,15 +29,18 @@ const (
)

var (
_Pattern, _ = regexp.Compile(_ValidPattern)
_Pattern = regexp.MustCompile(_ValidPattern)
)

func ValidateGroup(group string) {
func ValidateGroup(group string) error {
if group == "" {
rlog.Fatal("consumerGroup is empty", nil)
return errors.New("consumerGroup is empty")
}

if len(group) > _CharacterMaxLength {
rlog.Fatal("the specified group is longer than group max length 255.", nil)
return errors.New("the specified group is longer than group max length 255")
}
if !_Pattern.MatchString(group) {
return fmt.Errorf("the specified group[%s] contains illegal characters, allowing only %s", group, _ValidPattern)
}
return nil
}

0 comments on commit 2a62fd9

Please sign in to comment.