From 1c8f4b2696ca94696499e02f6ae0e4cfa1fcd085 Mon Sep 17 00:00:00 2001 From: 180909 <734461790@qq.com> Date: Sat, 7 Aug 2021 09:38:51 +0800 Subject: [PATCH 1/5] centralized management errors --- api.go | 4 +--- benchmark/stable.go | 16 +++++++-------- consumer/consumer.go | 8 ++------ consumer/push_consumer.go | 5 +++-- errors.go | 29 +++++++++++++++++++++++---- internal/client.go | 3 ++- internal/remote/remote_client_test.go | 4 ++-- internal/route.go | 8 ++------ internal/route_test.go | 6 +++--- internal/utils/compression.go | 4 ++-- internal/utils/errors.go | 10 +-------- internal/utils/net.go | 4 ++-- primitive/errors.go | 7 ------- producer/producer.go | 12 ++++------- producer/producer_test.go | 7 ++++--- 15 files changed, 61 insertions(+), 66 deletions(-) diff --git a/api.go b/api.go index 0e149e9a..46bb9586 100644 --- a/api.go +++ b/api.go @@ -20,8 +20,6 @@ package rocketmq import ( "context" - "github.com/pkg/errors" - "github.com/apache/rocketmq-client-go/v2/consumer" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/apache/rocketmq-client-go/v2/producer" @@ -134,5 +132,5 @@ type PullConsumer interface { // // The PullConsumer will be supported in next release func NewPullConsumer(opts ...consumer.Option) (PullConsumer, error) { - return nil, errors.New("pull consumer has not supported") + return nil, ErrPullConsumer } diff --git a/benchmark/stable.go b/benchmark/stable.go index 2c3dc1b8..ea3fe3b8 100644 --- a/benchmark/stable.go +++ b/benchmark/stable.go @@ -18,9 +18,9 @@ package main import ( - "errors" "flag" "fmt" + "github.com/apache/rocketmq-client-go/v2" "os" "os/signal" "syscall" @@ -52,23 +52,23 @@ func (st *stableTest) buildFlags(name string) { func (st *stableTest) checkFlag() error { if st.topic == "" { - return errors.New("empty topic") + return rocketmq.ErrEmptyTopic } if st.nameSrv == "" { - return errors.New("empty namesrv") + return rocketmq.ErrEmptyNameSrv } if st.groupID == "" { - return errors.New("empty group id") + return rocketmq.ErrEmptyGroupID } if st.testMin <= 0 { - return errors.New("test miniutes must be positive integer") + return rocketmq.ErrTestMin } if st.opIntervalSec <= 0 { - return errors.New("operation interval must be positive integer") + return rocketmq.ErrOperationInterval } return nil @@ -114,7 +114,7 @@ func (stp *stableTestProducer) checkFlag() error { return err } if stp.bodySize <= 0 { - return errors.New("message body size must be positive integer") + return rocketmq.ErrMessageBody } return nil @@ -187,7 +187,7 @@ func (stc *stableTestConsumer) checkFlag() error { } if stc.expression == "" { - return errors.New("empty expression") + return rocketmq.ErrEmptyExpression } return nil } diff --git a/consumer/consumer.go b/consumer/consumer.go index 42bc973a..dc617d8f 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -20,6 +20,7 @@ package consumer import ( "context" "fmt" + "github.com/apache/rocketmq-client-go/v2" "sort" "strconv" "strings" @@ -29,7 +30,6 @@ import ( jsoniter "github.com/json-iterator/go" - "github.com/pkg/errors" "github.com/tidwall/gjson" "github.com/apache/rocketmq-client-go/v2/internal" @@ -68,10 +68,6 @@ const ( _SubAll = "*" ) -var ( - ErrCreated = errors.New("consumer group has been created") - ErrBrokerNotFound = errors.New("broker can not found") -) // Message model defines the way how messages are delivered to each consumer clients. //

@@ -822,7 +818,7 @@ func (dc *defaultConsumer) pullInner(ctx context.Context, queue *primitive.Messa rlog.Warning("no broker found for mq", map[string]interface{}{ rlog.LogKeyMessageQueue: queue, }) - return nil, ErrBrokerNotFound + return nil, rocketmq.ErrBrokerNotFound } if brokerResult.Slave { diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go index c84ce84c..7b06bd62 100644 --- a/consumer/push_consumer.go +++ b/consumer/push_consumer.go @@ -20,6 +20,7 @@ package consumer import ( "context" "fmt" + "github.com/apache/rocketmq-client-go/v2" "math" "strconv" "strings" @@ -138,7 +139,7 @@ func (pc *pushConsumer) Start() error { rlog.Error("the consumer group has been created, specify another one", map[string]interface{}{ rlog.LogKeyConsumerGroup: pc.consumerGroup, }) - err = ErrCreated + err = rocketmq.ErrCreated return } @@ -224,7 +225,7 @@ func (pc *pushConsumer) Subscribe(topic string, selector MessageSelector, f func(context.Context, ...*primitive.MessageExt) (ConsumeResult, error)) error { if atomic.LoadInt32(&pc.state) == int32(internal.StateStartFailed) || atomic.LoadInt32(&pc.state) == int32(internal.StateShutdown) { - return errors.New("cannot subscribe topic since client either failed to start or has been shutdown.") + return rocketmq.ErrStartTopic } // add retry topic subscription for resubscribe diff --git a/errors.go b/errors.go index fe9ba33f..1ea0c370 100644 --- a/errors.go +++ b/errors.go @@ -17,13 +17,34 @@ limitations under the License. package rocketmq -import ( - "github.com/pkg/errors" -) +import "errors" var ( - ErrRequestTimeout = errors.New("request timeout") + ErrRequestTimeout = errors.New("equest timeout") ErrMQEmpty = errors.New("MessageQueue is nil") ErrOffset = errors.New("offset < 0") ErrNumbers = errors.New("numbers < 0") + ErrEmptyTopic = errors.New("empty topic") + ErrEmptyNameSrv = errors.New("empty namesrv") + ErrEmptyGroupID = errors.New("empty group id") + ErrTestMin = errors.New("test minutes must be positive integer") + ErrOperationInterval = errors.New("operation interval must be positive integer") + ErrMessageBody = errors.New("message body size must be positive integer") + ErrEmptyExpression = errors.New("empty expression") + ErrCreated = errors.New("consumer group has been created") + ErrBrokerNotFound = errors.New("broker can not found") + ErrStartTopic = errors.New("cannot subscribe topic since client either failed to start or has been shutdown.") + ErrResponse = errors.New("response error") + ErrCompressLevel = errors.New("unsupported compress level") + ErrUnknownIP = errors.New("unknown IP address") + ErrService = errors.New("service close is not running, please check") + ErrTopicNotExist = errors.New("topic not exist") + ErrNotExisted = errors.New("not existed") + ErrNoNameserver = errors.New("nameServerAddrs can't be empty.") + ErrMultiIP = errors.New("multiple IP addr does not support") + ErrIllegalIP = errors.New("IP addr error") + ErrTopicEmpty = errors.New("topic is nil") + ErrMessageEmpty = errors.New("message is nil") + ErrNotRunning = errors.New("producer not started") + ErrPullConsumer = errors.New("pull consumer has not supported") ) diff --git a/internal/client.go b/internal/client.go index 3a09ea88..9cb7dff4 100644 --- a/internal/client.go +++ b/internal/client.go @@ -21,6 +21,7 @@ import ( "context" "errors" "fmt" + "github.com/apache/rocketmq-client-go/v2" "net" "os" "strconv" @@ -55,7 +56,7 @@ const ( ) var ( - ErrServiceState = errors.New("service close is not running, please check") + ErrServiceState = rocketmq.ErrService _VIPChannelEnable = false ) diff --git a/internal/remote/remote_client_test.go b/internal/remote/remote_client_test.go index f1606742..942425f6 100644 --- a/internal/remote/remote_client_test.go +++ b/internal/remote/remote_client_test.go @@ -19,7 +19,7 @@ package remote import ( "bytes" "context" - "errors" + "github.com/apache/rocketmq-client-go/v2" "math/rand" "net" "reflect" @@ -85,7 +85,7 @@ func TestResponseFutureWaitResponse(t *testing.T) { utils.ErrRequestTimeout, err) } future = NewResponseFuture(context.Background(), 10, nil) - responseError := errors.New("response error") + responseError := rocketmq.ErrResponse go func() { time.Sleep(100 * time.Millisecond) future.Err = responseError diff --git a/internal/route.go b/internal/route.go index 66be96dc..e2bc7619 100644 --- a/internal/route.go +++ b/internal/route.go @@ -19,7 +19,7 @@ package internal import ( "context" - "errors" + "github.com/apache/rocketmq-client-go/v2" "math/rand" "sort" "strconv" @@ -46,10 +46,6 @@ const ( MasterId = int64(0) ) -var ( - ErrTopicNotExist = errors.New("topic not exist") -) - func (s *namesrvs) cleanOfflineBroker() { // TODO optimize s.lockNamesrv.Lock() @@ -405,7 +401,7 @@ func (s *namesrvs) queryTopicRouteInfoFromServer(topic string) (*TopicRouteData, } return routeData, nil case ResTopicNotExist: - return nil, ErrTopicNotExist + return nil, rocketmq.ErrTopicNotExist default: return nil, primitive.NewMQClientErr(response.Code, response.Remark) } diff --git a/internal/route_test.go b/internal/route_test.go index ded77800..9cbeb3e8 100644 --- a/internal/route_test.go +++ b/internal/route_test.go @@ -19,11 +19,11 @@ package internal import ( "context" + "github.com/apache/rocketmq-client-go/v2" "sync" "testing" "github.com/golang/mock/gomock" - "github.com/pkg/errors" . "github.com/smartystreets/goconvey/convey" "github.com/stretchr/testify/assert" @@ -53,7 +53,7 @@ func TestQueryTopicRouteInfoFromServer(t *testing.T) { func(ctx context.Context, addr string, request *remote.RemotingCommand) (*remote.RemotingCommand, error) { count++ if count < 3 { - return nil, errors.New("not existed") + return nil, rocketmq.ErrNotExisted } return &remote.RemotingCommand{ Code: ResTopicNotExist, @@ -62,7 +62,7 @@ func TestQueryTopicRouteInfoFromServer(t *testing.T) { data, err := namesrv.queryTopicRouteInfoFromServer("notexisted") assert.Nil(t, data) - assert.Equal(t, ErrTopicNotExist, err) + assert.Equal(t, rocketmq.ErrTopicNotExist, err) }) }) } diff --git a/internal/utils/compression.go b/internal/utils/compression.go index 379cdf2f..9adc58a3 100644 --- a/internal/utils/compression.go +++ b/internal/utils/compression.go @@ -20,7 +20,7 @@ package utils import ( "bytes" "compress/zlib" - "errors" + "github.com/apache/rocketmq-client-go/v2" "io/ioutil" "sync" ) @@ -48,7 +48,7 @@ func init() { func Compress(raw []byte, compressLevel int) ([]byte, error) { if compressLevel < zlib.BestSpeed || compressLevel > zlib.BestCompression { - return nil, errors.New("unsupported compress level") + return nil, rocketmq.ErrCompressLevel } buf := bufPool.Get().(*bytes.Buffer) diff --git a/internal/utils/errors.go b/internal/utils/errors.go index 0b7ffc25..c2f26948 100644 --- a/internal/utils/errors.go +++ b/internal/utils/errors.go @@ -18,18 +18,10 @@ limitations under the License. package utils import ( - "errors" - "github.com/apache/rocketmq-client-go/v2/rlog" ) -var ( - // ErrRequestTimeout for request timeout error - ErrRequestTimeout = errors.New("request timeout") - ErrMQEmpty = errors.New("MessageQueue is nil") - ErrOffset = errors.New("offset < 0") - ErrNumbers = errors.New("numbers < 0") -) + func CheckError(action string, err error) { if err != nil { diff --git a/internal/utils/net.go b/internal/utils/net.go index cf35594b..e7fee9a6 100644 --- a/internal/utils/net.go +++ b/internal/utils/net.go @@ -19,8 +19,8 @@ package utils import ( "bytes" - "errors" "fmt" + "github.com/apache/rocketmq-client-go/v2" "net" "strconv" "time" @@ -56,7 +56,7 @@ func ClientIP4() ([]byte, error) { } } } - return nil, errors.New("unknown IP address") + return nil, rocketmq.ErrUnknownIP } func FakeIP() []byte { diff --git a/primitive/errors.go b/primitive/errors.go index 9b1a88e6..0fc8a7fa 100644 --- a/primitive/errors.go +++ b/primitive/errors.go @@ -18,16 +18,9 @@ limitations under the License. package primitive import ( - "errors" "strconv" ) -var ( - ErrNoNameserver = errors.New("nameServerAddrs can't be empty.") - ErrMultiIP = errors.New("multiple IP addr does not support") - ErrIllegalIP = errors.New("IP addr error") -) - type MQBrokerErr struct { ResponseCode int16 ErrorMessage string diff --git a/producer/producer.go b/producer/producer.go index 8ebb660f..4ef768b5 100644 --- a/producer/producer.go +++ b/producer/producer.go @@ -21,6 +21,7 @@ import ( "bytes" "context" "fmt" + "github.com/apache/rocketmq-client-go/v2" "strconv" "sync" "sync/atomic" @@ -35,11 +36,6 @@ import ( "github.com/apache/rocketmq-client-go/v2/rlog" ) -var ( - ErrTopicEmpty = errors.New("topic is nil") - ErrMessageEmpty = errors.New("message is nil") - ErrNotRunning = errors.New("producer not started") -) type defaultProducer struct { group string @@ -95,15 +91,15 @@ func (p *defaultProducer) Shutdown() error { func (p *defaultProducer) checkMsg(msgs ...*primitive.Message) error { if atomic.LoadInt32(&p.state) != int32(internal.StateRunning) { - return ErrNotRunning + return rocketmq.ErrNotRunning } if len(msgs) == 0 { - return errors.New("message is nil") + return rocketmq.ErrMessageEmpty } if len(msgs[0].Topic) == 0 { - return errors.New("topic is nil") + return rocketmq.ErrTopicEmpty } return nil } diff --git a/producer/producer_test.go b/producer/producer_test.go index 12cefb85..b2e8d953 100644 --- a/producer/producer_test.go +++ b/producer/producer_test.go @@ -19,6 +19,7 @@ package producer import ( "context" + "github.com/apache/rocketmq-client-go/v2" "testing" "github.com/golang/mock/gomock" @@ -60,17 +61,17 @@ func TestShutdown(t *testing.T) { msg := new(primitive.Message) r, err := p.SendSync(ctx, msg) - assert.Equal(t, ErrNotRunning, err) + assert.Equal(t, rocketmq.ErrNotRunning, err) assert.Nil(t, r) err = p.SendOneWay(ctx, msg) - assert.Equal(t, ErrNotRunning, err) + assert.Equal(t, rocketmq.ErrNotRunning, err) f := func(context.Context, *primitive.SendResult, error) { assert.False(t, true, "should not come in") } err = p.SendAsync(ctx, f, msg) - assert.Equal(t, ErrNotRunning, err) + assert.Equal(t, rocketmq.ErrNotRunning, err) } func mockB4Send(p *defaultProducer) { From d42fe61f3e11f9d433f554011c4bdca1eb1fd96b Mon Sep 17 00:00:00 2001 From: 180909 <734461790@qq.com> Date: Sat, 7 Aug 2021 09:48:37 +0800 Subject: [PATCH 2/5] update missed --- consumer/pull_consumer.go | 8 ++++---- primitive/base.go | 9 +++++---- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go index 0b0ef564..81eefc03 100644 --- a/consumer/pull_consumer.go +++ b/consumer/pull_consumer.go @@ -20,13 +20,13 @@ package consumer import ( "context" "fmt" + "github.com/apache/rocketmq-client-go/v2" "sync" "sync/atomic" "github.com/pkg/errors" "github.com/apache/rocketmq-client-go/v2/internal" - "github.com/apache/rocketmq-client-go/v2/internal/utils" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/apache/rocketmq-client-go/v2/rlog" ) @@ -174,15 +174,15 @@ func (dc *defaultConsumer) checkPull(ctx context.Context, mq *primitive.MessageQ } if mq == nil { - return utils.ErrMQEmpty + return rocketmq.ErrMQEmpty } if offset < 0 { - return utils.ErrOffset + return rocketmq.ErrOffset } if numbers <= 0 { - return utils.ErrNumbers + return rocketmq.ErrNumbers } return nil } diff --git a/primitive/base.go b/primitive/base.go index fca34a2d..a14ae586 100644 --- a/primitive/base.go +++ b/primitive/base.go @@ -18,6 +18,7 @@ limitations under the License. package primitive import ( + "github.com/apache/rocketmq-client-go/v2" "regexp" "strings" ) @@ -31,7 +32,7 @@ type NamesrvAddr []string func NewNamesrvAddr(s ...string) (NamesrvAddr, error) { if len(s) == 0 { - return nil, ErrNoNameserver + return nil, rocketmq.ErrNoNameserver } ss := s @@ -69,17 +70,17 @@ func verifyIP(ip string) error { return nil } if strings.Contains(ip, ";") { - return ErrMultiIP + return rocketmq.ErrMultiIP } ipV4s := ipv4Regex.FindAllString(ip, -1) ipV6s := ipv6Regex.FindAllString(ip, -1) if len(ipV4s) == 0 && len(ipV6s) == 0 { - return ErrIllegalIP + return rocketmq.ErrIllegalIP } if len(ipV4s) > 1 || len(ipV6s) > 1 { - return ErrMultiIP + return rocketmq.ErrMultiIP } return nil } From aeac9d30032a89617fc19068b0ea1a963c7e1b03 Mon Sep 17 00:00:00 2001 From: 180909 <734461790@qq.com> Date: Sat, 7 Aug 2021 09:55:33 +0800 Subject: [PATCH 3/5] update missed --- internal/remote/future.go | 5 ++--- internal/remote/remote_client_test.go | 8 +++----- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/internal/remote/future.go b/internal/remote/future.go index ffbf781f..7d3d92c5 100644 --- a/internal/remote/future.go +++ b/internal/remote/future.go @@ -19,9 +19,8 @@ package remote import ( "context" + "github.com/apache/rocketmq-client-go/v2" "sync" - - "github.com/apache/rocketmq-client-go/v2/internal/utils" ) // ResponseFuture @@ -62,7 +61,7 @@ func (r *ResponseFuture) waitResponse() (*RemotingCommand, error) { case <-r.Done: cmd, err = r.ResponseCommand, r.Err case <-r.ctx.Done(): - err = utils.ErrRequestTimeout + err = rocketmq.ErrRequestTimeout r.Err = err } return cmd, err diff --git a/internal/remote/remote_client_test.go b/internal/remote/remote_client_test.go index 942425f6..2cb9355b 100644 --- a/internal/remote/remote_client_test.go +++ b/internal/remote/remote_client_test.go @@ -27,8 +27,6 @@ import ( "testing" "time" - "github.com/apache/rocketmq-client-go/v2/internal/utils" - "github.com/stretchr/testify/assert" ) @@ -80,9 +78,9 @@ func TestResponseFutureWaitResponse(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Duration(1000)) defer cancel() future := NewResponseFuture(ctx, 10, nil) - if _, err := future.waitResponse(); err != utils.ErrRequestTimeout { + if _, err := future.waitResponse(); err != rocketmq.ErrRequestTimeout { t.Errorf("wrong ResponseFuture waitResponse. want=%v, got=%v", - utils.ErrRequestTimeout, err) + rocketmq.ErrRequestTimeout, err) } future = NewResponseFuture(context.Background(), 10, nil) responseError := rocketmq.ErrResponse @@ -295,7 +293,7 @@ func TestInvokeAsyncTimeout(t *testing.T) { err := client.InvokeAsync(ctx, addr, clientSendRemtingCommand, func(r *ResponseFuture) { assert.NotNil(t, r.Err) - assert.Equal(t, utils.ErrRequestTimeout, r.Err) + assert.Equal(t, rocketmq.ErrRequestTimeout, r.Err) wg.Done() }) assert.Nil(t, err, "failed to invokeSync.") From e4a000b2c41021e717cb7643ae321f09a1ac999e Mon Sep 17 00:00:00 2001 From: 180909 <734461790@qq.com> Date: Sat, 7 Aug 2021 13:05:55 +0800 Subject: [PATCH 4/5] fix import cycle --- api.go | 3 ++- benchmark/stable.go | 16 ++++++++-------- consumer/consumer.go | 4 ++-- consumer/pull_consumer.go | 8 ++++---- consumer/push_consumer.go | 6 +++--- errors.go => errors/errors.go | 2 +- examples/consumer/pull/main.go | 3 ++- go.sum | 2 -- internal/client.go | 4 ++-- internal/remote/future.go | 4 ++-- internal/remote/remote_client_test.go | 10 +++++----- internal/route.go | 4 ++-- internal/route_test.go | 6 +++--- internal/utils/compression.go | 4 ++-- internal/utils/net.go | 4 ++-- primitive/base.go | 10 +++++----- producer/producer.go | 8 ++++---- producer/producer_test.go | 8 ++++---- 18 files changed, 53 insertions(+), 53 deletions(-) rename errors.go => errors/errors.go (99%) diff --git a/api.go b/api.go index 46bb9586..31f58d50 100644 --- a/api.go +++ b/api.go @@ -19,6 +19,7 @@ package rocketmq import ( "context" + "github.com/apache/rocketmq-client-go/v2/errors" "github.com/apache/rocketmq-client-go/v2/consumer" "github.com/apache/rocketmq-client-go/v2/primitive" @@ -132,5 +133,5 @@ type PullConsumer interface { // // The PullConsumer will be supported in next release func NewPullConsumer(opts ...consumer.Option) (PullConsumer, error) { - return nil, ErrPullConsumer + return nil, errors.ErrPullConsumer } diff --git a/benchmark/stable.go b/benchmark/stable.go index ea3fe3b8..cd5fb9b0 100644 --- a/benchmark/stable.go +++ b/benchmark/stable.go @@ -20,7 +20,7 @@ package main import ( "flag" "fmt" - "github.com/apache/rocketmq-client-go/v2" + "github.com/apache/rocketmq-client-go/v2/errors" "os" "os/signal" "syscall" @@ -52,23 +52,23 @@ func (st *stableTest) buildFlags(name string) { func (st *stableTest) checkFlag() error { if st.topic == "" { - return rocketmq.ErrEmptyTopic + return errors.ErrEmptyTopic } if st.nameSrv == "" { - return rocketmq.ErrEmptyNameSrv + return errors.ErrEmptyNameSrv } if st.groupID == "" { - return rocketmq.ErrEmptyGroupID + return errors.ErrEmptyGroupID } if st.testMin <= 0 { - return rocketmq.ErrTestMin + return errors.ErrTestMin } if st.opIntervalSec <= 0 { - return rocketmq.ErrOperationInterval + return errors.ErrOperationInterval } return nil @@ -114,7 +114,7 @@ func (stp *stableTestProducer) checkFlag() error { return err } if stp.bodySize <= 0 { - return rocketmq.ErrMessageBody + return errors.ErrMessageBody } return nil @@ -187,7 +187,7 @@ func (stc *stableTestConsumer) checkFlag() error { } if stc.expression == "" { - return rocketmq.ErrEmptyExpression + return errors.ErrEmptyExpression } return nil } diff --git a/consumer/consumer.go b/consumer/consumer.go index dc617d8f..a8be3106 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -20,7 +20,7 @@ package consumer import ( "context" "fmt" - "github.com/apache/rocketmq-client-go/v2" + "github.com/apache/rocketmq-client-go/v2/errors" "sort" "strconv" "strings" @@ -818,7 +818,7 @@ func (dc *defaultConsumer) pullInner(ctx context.Context, queue *primitive.Messa rlog.Warning("no broker found for mq", map[string]interface{}{ rlog.LogKeyMessageQueue: queue, }) - return nil, rocketmq.ErrBrokerNotFound + return nil, errors.ErrBrokerNotFound } if brokerResult.Slave { diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go index 81eefc03..4625565d 100644 --- a/consumer/pull_consumer.go +++ b/consumer/pull_consumer.go @@ -20,7 +20,7 @@ package consumer import ( "context" "fmt" - "github.com/apache/rocketmq-client-go/v2" + errors2 "github.com/apache/rocketmq-client-go/v2/errors" "sync" "sync/atomic" @@ -174,15 +174,15 @@ func (dc *defaultConsumer) checkPull(ctx context.Context, mq *primitive.MessageQ } if mq == nil { - return rocketmq.ErrMQEmpty + return errors2.ErrMQEmpty } if offset < 0 { - return rocketmq.ErrOffset + return errors2.ErrOffset } if numbers <= 0 { - return rocketmq.ErrNumbers + return errors2.ErrNumbers } return nil } diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go index 7b06bd62..26d41dcd 100644 --- a/consumer/push_consumer.go +++ b/consumer/push_consumer.go @@ -20,7 +20,7 @@ package consumer import ( "context" "fmt" - "github.com/apache/rocketmq-client-go/v2" + errors2 "github.com/apache/rocketmq-client-go/v2/errors" "math" "strconv" "strings" @@ -139,7 +139,7 @@ func (pc *pushConsumer) Start() error { rlog.Error("the consumer group has been created, specify another one", map[string]interface{}{ rlog.LogKeyConsumerGroup: pc.consumerGroup, }) - err = rocketmq.ErrCreated + err = errors2.ErrCreated return } @@ -225,7 +225,7 @@ func (pc *pushConsumer) Subscribe(topic string, selector MessageSelector, f func(context.Context, ...*primitive.MessageExt) (ConsumeResult, error)) error { if atomic.LoadInt32(&pc.state) == int32(internal.StateStartFailed) || atomic.LoadInt32(&pc.state) == int32(internal.StateShutdown) { - return rocketmq.ErrStartTopic + return errors2.ErrStartTopic } // add retry topic subscription for resubscribe diff --git a/errors.go b/errors/errors.go similarity index 99% rename from errors.go rename to errors/errors.go index 1ea0c370..a9150549 100644 --- a/errors.go +++ b/errors/errors.go @@ -15,7 +15,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package rocketmq +package errors import "errors" diff --git a/examples/consumer/pull/main.go b/examples/consumer/pull/main.go index 3d49e7e5..5b5819e9 100644 --- a/examples/consumer/pull/main.go +++ b/examples/consumer/pull/main.go @@ -20,6 +20,7 @@ package main import ( "context" "fmt" + "github.com/apache/rocketmq-client-go/v2/errors" "time" "github.com/apache/rocketmq-client-go/v2" @@ -52,7 +53,7 @@ func main() { for { resp, err := c.PullFrom(ctx, queue, offset, 10) if err != nil { - if err == rocketmq.ErrRequestTimeout { + if err == errors.ErrRequestTimeout { fmt.Printf("timeout \n") time.Sleep(1 * time.Second) continue diff --git a/go.sum b/go.sum index 3b724dbd..b2965216 100644 --- a/go.sum +++ b/go.sum @@ -49,14 +49,12 @@ golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHl golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= -golang.org/x/tools v0.0.0-20190425150028-36563e24a262 h1:qsl9y/CJx34tuA7QCPNp86JNJe4spst6Ff8MjvPUdPg= golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c h1:IGkKhmfzcztjm6gYkykvu/NiS8kaqbCWAEWWAyf8J5U= golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= diff --git a/internal/client.go b/internal/client.go index 9cb7dff4..053b0ac7 100644 --- a/internal/client.go +++ b/internal/client.go @@ -21,7 +21,7 @@ import ( "context" "errors" "fmt" - "github.com/apache/rocketmq-client-go/v2" + errors2 "github.com/apache/rocketmq-client-go/v2/errors" "net" "os" "strconv" @@ -56,7 +56,7 @@ const ( ) var ( - ErrServiceState = rocketmq.ErrService + ErrServiceState = errors2.ErrService _VIPChannelEnable = false ) diff --git a/internal/remote/future.go b/internal/remote/future.go index 7d3d92c5..a7d32684 100644 --- a/internal/remote/future.go +++ b/internal/remote/future.go @@ -19,7 +19,7 @@ package remote import ( "context" - "github.com/apache/rocketmq-client-go/v2" + "github.com/apache/rocketmq-client-go/v2/errors" "sync" ) @@ -61,7 +61,7 @@ func (r *ResponseFuture) waitResponse() (*RemotingCommand, error) { case <-r.Done: cmd, err = r.ResponseCommand, r.Err case <-r.ctx.Done(): - err = rocketmq.ErrRequestTimeout + err = errors.ErrRequestTimeout r.Err = err } return cmd, err diff --git a/internal/remote/remote_client_test.go b/internal/remote/remote_client_test.go index 2cb9355b..fa33f4f9 100644 --- a/internal/remote/remote_client_test.go +++ b/internal/remote/remote_client_test.go @@ -19,7 +19,7 @@ package remote import ( "bytes" "context" - "github.com/apache/rocketmq-client-go/v2" + "github.com/apache/rocketmq-client-go/v2/errors" "math/rand" "net" "reflect" @@ -78,12 +78,12 @@ func TestResponseFutureWaitResponse(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Duration(1000)) defer cancel() future := NewResponseFuture(ctx, 10, nil) - if _, err := future.waitResponse(); err != rocketmq.ErrRequestTimeout { + if _, err := future.waitResponse(); err != errors.ErrRequestTimeout { t.Errorf("wrong ResponseFuture waitResponse. want=%v, got=%v", - rocketmq.ErrRequestTimeout, err) + errors.ErrRequestTimeout, err) } future = NewResponseFuture(context.Background(), 10, nil) - responseError := rocketmq.ErrResponse + responseError := errors.ErrResponse go func() { time.Sleep(100 * time.Millisecond) future.Err = responseError @@ -293,7 +293,7 @@ func TestInvokeAsyncTimeout(t *testing.T) { err := client.InvokeAsync(ctx, addr, clientSendRemtingCommand, func(r *ResponseFuture) { assert.NotNil(t, r.Err) - assert.Equal(t, rocketmq.ErrRequestTimeout, r.Err) + assert.Equal(t, errors.ErrRequestTimeout, r.Err) wg.Done() }) assert.Nil(t, err, "failed to invokeSync.") diff --git a/internal/route.go b/internal/route.go index e2bc7619..deca69ff 100644 --- a/internal/route.go +++ b/internal/route.go @@ -19,7 +19,7 @@ package internal import ( "context" - "github.com/apache/rocketmq-client-go/v2" + "github.com/apache/rocketmq-client-go/v2/errors" "math/rand" "sort" "strconv" @@ -401,7 +401,7 @@ func (s *namesrvs) queryTopicRouteInfoFromServer(topic string) (*TopicRouteData, } return routeData, nil case ResTopicNotExist: - return nil, rocketmq.ErrTopicNotExist + return nil, errors.ErrTopicNotExist default: return nil, primitive.NewMQClientErr(response.Code, response.Remark) } diff --git a/internal/route_test.go b/internal/route_test.go index 9cbeb3e8..a1ebec48 100644 --- a/internal/route_test.go +++ b/internal/route_test.go @@ -19,7 +19,7 @@ package internal import ( "context" - "github.com/apache/rocketmq-client-go/v2" + "github.com/apache/rocketmq-client-go/v2/errors" "sync" "testing" @@ -53,7 +53,7 @@ func TestQueryTopicRouteInfoFromServer(t *testing.T) { func(ctx context.Context, addr string, request *remote.RemotingCommand) (*remote.RemotingCommand, error) { count++ if count < 3 { - return nil, rocketmq.ErrNotExisted + return nil, errors.ErrNotExisted } return &remote.RemotingCommand{ Code: ResTopicNotExist, @@ -62,7 +62,7 @@ func TestQueryTopicRouteInfoFromServer(t *testing.T) { data, err := namesrv.queryTopicRouteInfoFromServer("notexisted") assert.Nil(t, data) - assert.Equal(t, rocketmq.ErrTopicNotExist, err) + assert.Equal(t, errors.ErrTopicNotExist, err) }) }) } diff --git a/internal/utils/compression.go b/internal/utils/compression.go index 9adc58a3..162864fe 100644 --- a/internal/utils/compression.go +++ b/internal/utils/compression.go @@ -20,7 +20,7 @@ package utils import ( "bytes" "compress/zlib" - "github.com/apache/rocketmq-client-go/v2" + "github.com/apache/rocketmq-client-go/v2/errors" "io/ioutil" "sync" ) @@ -48,7 +48,7 @@ func init() { func Compress(raw []byte, compressLevel int) ([]byte, error) { if compressLevel < zlib.BestSpeed || compressLevel > zlib.BestCompression { - return nil, rocketmq.ErrCompressLevel + return nil, errors.ErrCompressLevel } buf := bufPool.Get().(*bytes.Buffer) diff --git a/internal/utils/net.go b/internal/utils/net.go index e7fee9a6..a4eeb56d 100644 --- a/internal/utils/net.go +++ b/internal/utils/net.go @@ -20,7 +20,7 @@ package utils import ( "bytes" "fmt" - "github.com/apache/rocketmq-client-go/v2" + "github.com/apache/rocketmq-client-go/v2/errors" "net" "strconv" "time" @@ -56,7 +56,7 @@ func ClientIP4() ([]byte, error) { } } } - return nil, rocketmq.ErrUnknownIP + return nil, errors.ErrUnknownIP } func FakeIP() []byte { diff --git a/primitive/base.go b/primitive/base.go index a14ae586..a45fbb96 100644 --- a/primitive/base.go +++ b/primitive/base.go @@ -18,7 +18,7 @@ limitations under the License. package primitive import ( - "github.com/apache/rocketmq-client-go/v2" + "github.com/apache/rocketmq-client-go/v2/errors" "regexp" "strings" ) @@ -32,7 +32,7 @@ type NamesrvAddr []string func NewNamesrvAddr(s ...string) (NamesrvAddr, error) { if len(s) == 0 { - return nil, rocketmq.ErrNoNameserver + return nil, errors.ErrNoNameserver } ss := s @@ -70,17 +70,17 @@ func verifyIP(ip string) error { return nil } if strings.Contains(ip, ";") { - return rocketmq.ErrMultiIP + return errors.ErrMultiIP } ipV4s := ipv4Regex.FindAllString(ip, -1) ipV6s := ipv6Regex.FindAllString(ip, -1) if len(ipV4s) == 0 && len(ipV6s) == 0 { - return rocketmq.ErrIllegalIP + return errors.ErrIllegalIP } if len(ipV4s) > 1 || len(ipV6s) > 1 { - return rocketmq.ErrMultiIP + return errors.ErrMultiIP } return nil } diff --git a/producer/producer.go b/producer/producer.go index 4ef768b5..57bb357b 100644 --- a/producer/producer.go +++ b/producer/producer.go @@ -21,7 +21,7 @@ import ( "bytes" "context" "fmt" - "github.com/apache/rocketmq-client-go/v2" + errors2 "github.com/apache/rocketmq-client-go/v2/errors" "strconv" "sync" "sync/atomic" @@ -91,15 +91,15 @@ func (p *defaultProducer) Shutdown() error { func (p *defaultProducer) checkMsg(msgs ...*primitive.Message) error { if atomic.LoadInt32(&p.state) != int32(internal.StateRunning) { - return rocketmq.ErrNotRunning + return errors2.ErrNotRunning } if len(msgs) == 0 { - return rocketmq.ErrMessageEmpty + return errors2.ErrMessageEmpty } if len(msgs[0].Topic) == 0 { - return rocketmq.ErrTopicEmpty + return errors2.ErrTopicEmpty } return nil } diff --git a/producer/producer_test.go b/producer/producer_test.go index b2e8d953..b6ec84d1 100644 --- a/producer/producer_test.go +++ b/producer/producer_test.go @@ -19,7 +19,7 @@ package producer import ( "context" - "github.com/apache/rocketmq-client-go/v2" + "github.com/apache/rocketmq-client-go/v2/errors" "testing" "github.com/golang/mock/gomock" @@ -61,17 +61,17 @@ func TestShutdown(t *testing.T) { msg := new(primitive.Message) r, err := p.SendSync(ctx, msg) - assert.Equal(t, rocketmq.ErrNotRunning, err) + assert.Equal(t, errors.ErrNotRunning, err) assert.Nil(t, r) err = p.SendOneWay(ctx, msg) - assert.Equal(t, rocketmq.ErrNotRunning, err) + assert.Equal(t, errors.ErrNotRunning, err) f := func(context.Context, *primitive.SendResult, error) { assert.False(t, true, "should not come in") } err = p.SendAsync(ctx, f, msg) - assert.Equal(t, rocketmq.ErrNotRunning, err) + assert.Equal(t, errors.ErrNotRunning, err) } func mockB4Send(p *defaultProducer) { From 98eaad39e120d64abcec074e7230ec7650a98054 Mon Sep 17 00:00:00 2001 From: 180909 <734461790@qq.com> Date: Sat, 7 Aug 2021 13:16:56 +0800 Subject: [PATCH 5/5] go fmt --- consumer/consumer.go | 1 - errors/errors.go | 52 ++++++++++++++++++++-------------------- internal/utils/errors.go | 2 -- producer/producer.go | 1 - 4 files changed, 26 insertions(+), 30 deletions(-) diff --git a/consumer/consumer.go b/consumer/consumer.go index a8be3106..4c8f1f14 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -68,7 +68,6 @@ const ( _SubAll = "*" ) - // Message model defines the way how messages are delivered to each consumer clients. //

// diff --git a/errors/errors.go b/errors/errors.go index a9150549..793fcda1 100644 --- a/errors/errors.go +++ b/errors/errors.go @@ -20,31 +20,31 @@ package errors import "errors" var ( - ErrRequestTimeout = errors.New("equest timeout") - ErrMQEmpty = errors.New("MessageQueue is nil") - ErrOffset = errors.New("offset < 0") - ErrNumbers = errors.New("numbers < 0") - ErrEmptyTopic = errors.New("empty topic") - ErrEmptyNameSrv = errors.New("empty namesrv") - ErrEmptyGroupID = errors.New("empty group id") - ErrTestMin = errors.New("test minutes must be positive integer") + ErrRequestTimeout = errors.New("equest timeout") + ErrMQEmpty = errors.New("MessageQueue is nil") + ErrOffset = errors.New("offset < 0") + ErrNumbers = errors.New("numbers < 0") + ErrEmptyTopic = errors.New("empty topic") + ErrEmptyNameSrv = errors.New("empty namesrv") + ErrEmptyGroupID = errors.New("empty group id") + ErrTestMin = errors.New("test minutes must be positive integer") ErrOperationInterval = errors.New("operation interval must be positive integer") - ErrMessageBody = errors.New("message body size must be positive integer") - ErrEmptyExpression = errors.New("empty expression") - ErrCreated = errors.New("consumer group has been created") - ErrBrokerNotFound = errors.New("broker can not found") - ErrStartTopic = errors.New("cannot subscribe topic since client either failed to start or has been shutdown.") - ErrResponse = errors.New("response error") - ErrCompressLevel = errors.New("unsupported compress level") - ErrUnknownIP = errors.New("unknown IP address") - ErrService = errors.New("service close is not running, please check") - ErrTopicNotExist = errors.New("topic not exist") - ErrNotExisted = errors.New("not existed") - ErrNoNameserver = errors.New("nameServerAddrs can't be empty.") - ErrMultiIP = errors.New("multiple IP addr does not support") - ErrIllegalIP = errors.New("IP addr error") - ErrTopicEmpty = errors.New("topic is nil") - ErrMessageEmpty = errors.New("message is nil") - ErrNotRunning = errors.New("producer not started") - ErrPullConsumer = errors.New("pull consumer has not supported") + ErrMessageBody = errors.New("message body size must be positive integer") + ErrEmptyExpression = errors.New("empty expression") + ErrCreated = errors.New("consumer group has been created") + ErrBrokerNotFound = errors.New("broker can not found") + ErrStartTopic = errors.New("cannot subscribe topic since client either failed to start or has been shutdown.") + ErrResponse = errors.New("response error") + ErrCompressLevel = errors.New("unsupported compress level") + ErrUnknownIP = errors.New("unknown IP address") + ErrService = errors.New("service close is not running, please check") + ErrTopicNotExist = errors.New("topic not exist") + ErrNotExisted = errors.New("not existed") + ErrNoNameserver = errors.New("nameServerAddrs can't be empty.") + ErrMultiIP = errors.New("multiple IP addr does not support") + ErrIllegalIP = errors.New("IP addr error") + ErrTopicEmpty = errors.New("topic is nil") + ErrMessageEmpty = errors.New("message is nil") + ErrNotRunning = errors.New("producer not started") + ErrPullConsumer = errors.New("pull consumer has not supported") ) diff --git a/internal/utils/errors.go b/internal/utils/errors.go index c2f26948..0887a371 100644 --- a/internal/utils/errors.go +++ b/internal/utils/errors.go @@ -21,8 +21,6 @@ import ( "github.com/apache/rocketmq-client-go/v2/rlog" ) - - func CheckError(action string, err error) { if err != nil { rlog.Error(action, map[string]interface{}{ diff --git a/producer/producer.go b/producer/producer.go index 57bb357b..278e57ae 100644 --- a/producer/producer.go +++ b/producer/producer.go @@ -36,7 +36,6 @@ import ( "github.com/apache/rocketmq-client-go/v2/rlog" ) - type defaultProducer struct { group string client internal.RMQClient