Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #735] centralized management errors #708

Merged
merged 5 commits into from
Nov 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ package rocketmq

import (
"context"

"github.com/pkg/errors"
"github.com/apache/rocketmq-client-go/v2/errors"

"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
Expand Down Expand Up @@ -134,5 +133,5 @@ type PullConsumer interface {
//
// The PullConsumer will be supported in next release
func NewPullConsumer(opts ...consumer.Option) (PullConsumer, error) {
return nil, errors.New("pull consumer has not supported")
return nil, errors.ErrPullConsumer
}
16 changes: 8 additions & 8 deletions benchmark/stable.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
package main

import (
"errors"
"flag"
"fmt"
"github.com/apache/rocketmq-client-go/v2/errors"
"os"
"os/signal"
"syscall"
Expand Down Expand Up @@ -52,23 +52,23 @@ func (st *stableTest) buildFlags(name string) {

func (st *stableTest) checkFlag() error {
if st.topic == "" {
return errors.New("empty topic")
return errors.ErrEmptyTopic
}

if st.nameSrv == "" {
return errors.New("empty namesrv")
return errors.ErrEmptyNameSrv
}

if st.groupID == "" {
return errors.New("empty group id")
return errors.ErrEmptyGroupID
}

if st.testMin <= 0 {
return errors.New("test miniutes must be positive integer")
return errors.ErrTestMin
}

if st.opIntervalSec <= 0 {
return errors.New("operation interval must be positive integer")
return errors.ErrOperationInterval
}

return nil
Expand Down Expand Up @@ -114,7 +114,7 @@ func (stp *stableTestProducer) checkFlag() error {
return err
}
if stp.bodySize <= 0 {
return errors.New("message body size must be positive integer")
return errors.ErrMessageBody
}

return nil
Expand Down Expand Up @@ -187,7 +187,7 @@ func (stc *stableTestConsumer) checkFlag() error {
}

if stc.expression == "" {
return errors.New("empty expression")
return errors.ErrEmptyExpression
}
return nil
}
Expand Down
9 changes: 2 additions & 7 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package consumer
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2/errors"
"sort"
"strconv"
"strings"
Expand All @@ -29,7 +30,6 @@ import (

jsoniter "github.com/json-iterator/go"

"github.com/pkg/errors"
"github.com/tidwall/gjson"

"github.com/apache/rocketmq-client-go/v2/internal"
Expand Down Expand Up @@ -68,11 +68,6 @@ const (
_SubAll = "*"
)

var (
ErrCreated = errors.New("consumer group has been created")
ErrBrokerNotFound = errors.New("broker can not found")
)

// Message model defines the way how messages are delivered to each consumer clients.
// </p>
//
Expand Down Expand Up @@ -822,7 +817,7 @@ func (dc *defaultConsumer) pullInner(ctx context.Context, queue *primitive.Messa
rlog.Warning("no broker found for mq", map[string]interface{}{
rlog.LogKeyMessageQueue: queue,
})
return nil, ErrBrokerNotFound
return nil, errors.ErrBrokerNotFound
}

if brokerResult.Slave {
Expand Down
8 changes: 4 additions & 4 deletions consumer/pull_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ package consumer
import (
"context"
"fmt"
errors2 "github.com/apache/rocketmq-client-go/v2/errors"
"sync"
"sync/atomic"

"github.com/pkg/errors"

"github.com/apache/rocketmq-client-go/v2/internal"
"github.com/apache/rocketmq-client-go/v2/internal/utils"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/rlog"
)
Expand Down Expand Up @@ -174,15 +174,15 @@ func (dc *defaultConsumer) checkPull(ctx context.Context, mq *primitive.MessageQ
}

if mq == nil {
return utils.ErrMQEmpty
return errors2.ErrMQEmpty
}

if offset < 0 {
return utils.ErrOffset
return errors2.ErrOffset
}

if numbers <= 0 {
return utils.ErrNumbers
return errors2.ErrNumbers
}
return nil
}
Expand Down
5 changes: 3 additions & 2 deletions consumer/push_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package consumer
import (
"context"
"fmt"
errors2 "github.com/apache/rocketmq-client-go/v2/errors"
"math"
"strconv"
"strings"
Expand Down Expand Up @@ -138,7 +139,7 @@ func (pc *pushConsumer) Start() error {
rlog.Error("the consumer group has been created, specify another one", map[string]interface{}{
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
err = ErrCreated
err = errors2.ErrCreated
return
}

Expand Down Expand Up @@ -224,7 +225,7 @@ func (pc *pushConsumer) Subscribe(topic string, selector MessageSelector,
f func(context.Context, ...*primitive.MessageExt) (ConsumeResult, error)) error {
if atomic.LoadInt32(&pc.state) == int32(internal.StateStartFailed) ||
atomic.LoadInt32(&pc.state) == int32(internal.StateShutdown) {
return errors.New("cannot subscribe topic since client either failed to start or has been shutdown.")
return errors2.ErrStartTopic
}

// add retry topic subscription for resubscribe
Expand Down
29 changes: 0 additions & 29 deletions errors.go

This file was deleted.

50 changes: 50 additions & 0 deletions errors/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package errors

import "errors"

var (
ErrRequestTimeout = errors.New("equest timeout")
ErrMQEmpty = errors.New("MessageQueue is nil")
ErrOffset = errors.New("offset < 0")
ErrNumbers = errors.New("numbers < 0")
ErrEmptyTopic = errors.New("empty topic")
ErrEmptyNameSrv = errors.New("empty namesrv")
ErrEmptyGroupID = errors.New("empty group id")
ErrTestMin = errors.New("test minutes must be positive integer")
ErrOperationInterval = errors.New("operation interval must be positive integer")
ErrMessageBody = errors.New("message body size must be positive integer")
ErrEmptyExpression = errors.New("empty expression")
ErrCreated = errors.New("consumer group has been created")
ErrBrokerNotFound = errors.New("broker can not found")
ErrStartTopic = errors.New("cannot subscribe topic since client either failed to start or has been shutdown.")
ErrResponse = errors.New("response error")
ErrCompressLevel = errors.New("unsupported compress level")
ErrUnknownIP = errors.New("unknown IP address")
ErrService = errors.New("service close is not running, please check")
ErrTopicNotExist = errors.New("topic not exist")
ErrNotExisted = errors.New("not existed")
ErrNoNameserver = errors.New("nameServerAddrs can't be empty.")
ErrMultiIP = errors.New("multiple IP addr does not support")
ErrIllegalIP = errors.New("IP addr error")
ErrTopicEmpty = errors.New("topic is nil")
ErrMessageEmpty = errors.New("message is nil")
ErrNotRunning = errors.New("producer not started")
ErrPullConsumer = errors.New("pull consumer has not supported")
)
3 changes: 2 additions & 1 deletion examples/consumer/pull/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package main
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2/errors"
"time"

"github.com/apache/rocketmq-client-go/v2"
Expand Down Expand Up @@ -52,7 +53,7 @@ func main() {
for {
resp, err := c.PullFrom(ctx, queue, offset, 10)
if err != nil {
if err == rocketmq.ErrRequestTimeout {
if err == errors.ErrRequestTimeout {
fmt.Printf("timeout \n")
time.Sleep(1 * time.Second)
continue
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,12 @@ golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHl
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190425150028-36563e24a262 h1:qsl9y/CJx34tuA7QCPNp86JNJe4spst6Ff8MjvPUdPg=
golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c h1:IGkKhmfzcztjm6gYkykvu/NiS8kaqbCWAEWWAyf8J5U=
golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
Expand Down
3 changes: 2 additions & 1 deletion internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"errors"
"fmt"
errors2 "github.com/apache/rocketmq-client-go/v2/errors"
"net"
"os"
"strconv"
Expand Down Expand Up @@ -55,7 +56,7 @@ const (
)

var (
ErrServiceState = errors.New("service close is not running, please check")
ErrServiceState = errors2.ErrService

_VIPChannelEnable = false
)
Expand Down
5 changes: 2 additions & 3 deletions internal/remote/future.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ package remote

import (
"context"
"github.com/apache/rocketmq-client-go/v2/errors"
"sync"

"github.com/apache/rocketmq-client-go/v2/internal/utils"
)

// ResponseFuture
Expand Down Expand Up @@ -62,7 +61,7 @@ func (r *ResponseFuture) waitResponse() (*RemotingCommand, error) {
case <-r.Done:
cmd, err = r.ResponseCommand, r.Err
case <-r.ctx.Done():
err = utils.ErrRequestTimeout
err = errors.ErrRequestTimeout
r.Err = err
}
return cmd, err
Expand Down
12 changes: 5 additions & 7 deletions internal/remote/remote_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,14 @@ package remote
import (
"bytes"
"context"
"errors"
"github.com/apache/rocketmq-client-go/v2/errors"
"math/rand"
"net"
"reflect"
"sync"
"testing"
"time"

"github.com/apache/rocketmq-client-go/v2/internal/utils"

"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -80,12 +78,12 @@ func TestResponseFutureWaitResponse(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(1000))
defer cancel()
future := NewResponseFuture(ctx, 10, nil)
if _, err := future.waitResponse(); err != utils.ErrRequestTimeout {
if _, err := future.waitResponse(); err != errors.ErrRequestTimeout {
t.Errorf("wrong ResponseFuture waitResponse. want=%v, got=%v",
utils.ErrRequestTimeout, err)
errors.ErrRequestTimeout, err)
}
future = NewResponseFuture(context.Background(), 10, nil)
responseError := errors.New("response error")
responseError := errors.ErrResponse
go func() {
time.Sleep(100 * time.Millisecond)
future.Err = responseError
Expand Down Expand Up @@ -295,7 +293,7 @@ func TestInvokeAsyncTimeout(t *testing.T) {
err := client.InvokeAsync(ctx, addr, clientSendRemtingCommand,
func(r *ResponseFuture) {
assert.NotNil(t, r.Err)
assert.Equal(t, utils.ErrRequestTimeout, r.Err)
assert.Equal(t, errors.ErrRequestTimeout, r.Err)
wg.Done()
})
assert.Nil(t, err, "failed to invokeSync.")
Expand Down
8 changes: 2 additions & 6 deletions internal/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package internal

import (
"context"
"errors"
"github.com/apache/rocketmq-client-go/v2/errors"
"math/rand"
"sort"
"strconv"
Expand All @@ -46,10 +46,6 @@ const (
MasterId = int64(0)
)

var (
ErrTopicNotExist = errors.New("topic not exist")
)

func (s *namesrvs) cleanOfflineBroker() {
// TODO optimize
s.lockNamesrv.Lock()
Expand Down Expand Up @@ -405,7 +401,7 @@ func (s *namesrvs) queryTopicRouteInfoFromServer(topic string) (*TopicRouteData,
}
return routeData, nil
case ResTopicNotExist:
return nil, ErrTopicNotExist
return nil, errors.ErrTopicNotExist
default:
return nil, primitive.NewMQClientErr(response.Code, response.Remark)
}
Expand Down
Loading