diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go index 6c24c631..8af935b5 100644 --- a/consumer/pull_consumer.go +++ b/consumer/pull_consumer.go @@ -140,12 +140,18 @@ func (pc *defaultPullConsumer) Unsubscribe(topic string) error { } func (pc *defaultPullConsumer) Start() error { - atomic.StoreInt32(&pc.state, int32(internal.StateRunning)) - var err error pc.once.Do(func() { - consumerGroupWithNs := utils.WrapNamespace(pc.option.Namespace, pc.consumerGroup) - err = pc.defaultConsumer.client.RegisterConsumer(consumerGroupWithNs, pc) + err = pc.validate() + if err != nil { + rlog.Error("the consumer group option validate fail", map[string]interface{}{ + rlog.LogKeyConsumerGroup: pc.consumerGroup, + rlog.LogKeyUnderlayError: err.Error(), + }) + err = errors.Wrap(err, "the consumer group option validate fail") + return + } + err = pc.defaultConsumer.client.RegisterConsumer(pc.consumerGroup, pc) if err != nil { rlog.Error("defaultPullConsumer the consumer group has been created, specify another one", map[string]interface{}{ rlog.LogKeyConsumerGroup: pc.consumerGroup, @@ -157,7 +163,7 @@ func (pc *defaultPullConsumer) Start() error { if err != nil { return } - + atomic.StoreInt32(&pc.state, int32(internal.StateRunning)) go func() { for { select { @@ -174,7 +180,9 @@ func (pc *defaultPullConsumer) Start() error { } }() }) - + if err != nil { + return err + } pc.client.UpdateTopicRouteInfo() _, exist := pc.topicSubscribeInfoTable.Load(pc.topic) if !exist { @@ -815,3 +823,15 @@ func (pc *defaultPullConsumer) consumeMessageCurrently(pq *processQueue, mq *pri case pc.consumeRequestCache <- cr: } } + +func (pc *defaultPullConsumer) validate() error { + if err := internal.ValidateGroup(pc.consumerGroup); err != nil { + return err + } + + if pc.consumerGroup == internal.DefaultConsumerGroup { + return fmt.Errorf("consumerGroup can't equal [%s], please specify another one", internal.DefaultConsumerGroup) + } + + return nil +} diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go index e81d4efc..cc1a0a80 100644 --- a/consumer/push_consumer.go +++ b/consumer/push_consumer.go @@ -139,7 +139,15 @@ func (pc *pushConsumer) Start() error { "unitMode": pc.unitMode, }) atomic.StoreInt32(&pc.state, int32(internal.StateStartFailed)) - pc.validate() + err = pc.validate() + if err != nil { + rlog.Error("the consumer group option validate fail", map[string]interface{}{ + rlog.LogKeyConsumerGroup: pc.consumerGroup, + rlog.LogKeyUnderlayError: err.Error(), + }) + err = errors.Wrap(err, "the consumer group option validate fail") + return + } err = pc.client.RegisterConsumer(pc.consumerGroup, pc) if err != nil { @@ -496,23 +504,25 @@ func (pc *pushConsumer) messageQueueChanged(topic string, mqAll, mqDivided []*pr pc.client.SendHeartbeatToAllBrokerWithLock() } -func (pc *pushConsumer) validate() { - internal.ValidateGroup(pc.consumerGroup) +func (pc *pushConsumer) validate() error { + if err := internal.ValidateGroup(pc.consumerGroup); err != nil { + return err + } if pc.consumerGroup == internal.DefaultConsumerGroup { // TODO FQA - rlog.Error(fmt.Sprintf("consumerGroup can't equal [%s], please specify another one.", internal.DefaultConsumerGroup), nil) + return fmt.Errorf("consumerGroup can't equal [%s], please specify another one", internal.DefaultConsumerGroup) } if len(pc.subscribedTopic) == 0 { - rlog.Error("number of subscribed topics is 0.", nil) + return errors.New("number of subscribed topics is 0.") } if pc.option.ConsumeConcurrentlyMaxSpan < 1 || pc.option.ConsumeConcurrentlyMaxSpan > 65535 { if pc.option.ConsumeConcurrentlyMaxSpan == 0 { pc.option.ConsumeConcurrentlyMaxSpan = 1000 } else { - rlog.Error("option.ConsumeConcurrentlyMaxSpan out of range [1, 65535]", nil) + return errors.New("option.ConsumeConcurrentlyMaxSpan out of range [1, 65535]") } } @@ -520,7 +530,7 @@ func (pc *pushConsumer) validate() { if pc.option.PullThresholdForQueue == 0 { pc.option.PullThresholdForQueue = 1024 } else { - rlog.Error("option.PullThresholdForQueue out of range [1, 65535]", nil) + return errors.New("option.PullThresholdForQueue out of range [1, 65535]") } } @@ -528,7 +538,7 @@ func (pc *pushConsumer) validate() { if pc.option.PullThresholdForTopic == 0 { pc.option.PullThresholdForTopic = 102400 } else { - rlog.Error("option.PullThresholdForTopic out of range [1, 6553500]", nil) + return errors.New("option.PullThresholdForTopic out of range [1, 6553500]") } } @@ -536,7 +546,7 @@ func (pc *pushConsumer) validate() { if pc.option.PullThresholdSizeForQueue == 0 { pc.option.PullThresholdSizeForQueue = 512 } else { - rlog.Error("option.PullThresholdSizeForQueue out of range [1, 1024]", nil) + return errors.New("option.PullThresholdSizeForQueue out of range [1, 1024]") } } @@ -544,19 +554,19 @@ func (pc *pushConsumer) validate() { if pc.option.PullThresholdSizeForTopic == 0 { pc.option.PullThresholdSizeForTopic = 51200 } else { - rlog.Error("option.PullThresholdSizeForTopic out of range [1, 102400]", nil) + return errors.New("option.PullThresholdSizeForTopic out of range [1, 102400]") } } if pc.option.PullInterval < 0 || pc.option.PullInterval > 65535*time.Millisecond { - rlog.Error("option.PullInterval out of range [0, 65535]", nil) + return errors.New("option.PullInterval out of range [0, 65535]") } if pc.option.ConsumeMessageBatchMaxSize < 1 || pc.option.ConsumeMessageBatchMaxSize > 1024 { if pc.option.ConsumeMessageBatchMaxSize == 0 { pc.option.ConsumeMessageBatchMaxSize = 1 } else { - rlog.Error("option.ConsumeMessageBatchMaxSize out of range [1, 1024]", nil) + return errors.New("option.ConsumeMessageBatchMaxSize out of range [1, 1024]") } } @@ -564,9 +574,18 @@ func (pc *pushConsumer) validate() { if pc.option.PullBatchSize == 0 { pc.option.PullBatchSize = 32 } else { - rlog.Error("option.PullBatchSize out of range [1, 1024]", nil) + return errors.New("option.PullBatchSize out of range [1, 1024]") + } + } + + if pc.option.ConsumeGoroutineNums < 1 || pc.option.ConsumeGoroutineNums > 1000 { + if pc.option.ConsumeGoroutineNums == 0 { + pc.option.ConsumeGoroutineNums = 20 + } else { + return errors.New("option.ConsumeGoroutineNums out of range [1, 1000]") } } + return nil } func (pc *pushConsumer) pullMessage(request *PullRequest) { diff --git a/internal/validators.go b/internal/validators.go index ac51db2c..61bd45ec 100644 --- a/internal/validators.go +++ b/internal/validators.go @@ -18,9 +18,9 @@ limitations under the License. package internal import ( + "errors" + "fmt" "regexp" - - "github.com/apache/rocketmq-client-go/v2/rlog" ) const ( @@ -29,15 +29,18 @@ const ( ) var ( - _Pattern, _ = regexp.Compile(_ValidPattern) + _Pattern = regexp.MustCompile(_ValidPattern) ) -func ValidateGroup(group string) { +func ValidateGroup(group string) error { if group == "" { - rlog.Fatal("consumerGroup is empty", nil) + return errors.New("consumerGroup is empty") } - if len(group) > _CharacterMaxLength { - rlog.Fatal("the specified group is longer than group max length 255.", nil) + return errors.New("the specified group is longer than group max length 255") + } + if !_Pattern.MatchString(group) { + return fmt.Errorf("the specified group[%s] contains illegal characters, allowing only %s", group, _ValidPattern) } + return nil }