Skip to content

Commit

Permalink
fix import cycle
Browse files Browse the repository at this point in the history
  • Loading branch information
180909 committed Aug 7, 2021
1 parent aeac9d3 commit e4a000b
Show file tree
Hide file tree
Showing 18 changed files with 53 additions and 53 deletions.
3 changes: 2 additions & 1 deletion api.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package rocketmq

import (
"context"
"github.com/apache/rocketmq-client-go/v2/errors"

"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
Expand Down Expand Up @@ -132,5 +133,5 @@ type PullConsumer interface {
//
// The PullConsumer will be supported in next release
func NewPullConsumer(opts ...consumer.Option) (PullConsumer, error) {
return nil, ErrPullConsumer
return nil, errors.ErrPullConsumer
}
16 changes: 8 additions & 8 deletions benchmark/stable.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package main
import (
"flag"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/errors"
"os"
"os/signal"
"syscall"
Expand Down Expand Up @@ -52,23 +52,23 @@ func (st *stableTest) buildFlags(name string) {

func (st *stableTest) checkFlag() error {
if st.topic == "" {
return rocketmq.ErrEmptyTopic
return errors.ErrEmptyTopic
}

if st.nameSrv == "" {
return rocketmq.ErrEmptyNameSrv
return errors.ErrEmptyNameSrv
}

if st.groupID == "" {
return rocketmq.ErrEmptyGroupID
return errors.ErrEmptyGroupID
}

if st.testMin <= 0 {
return rocketmq.ErrTestMin
return errors.ErrTestMin
}

if st.opIntervalSec <= 0 {
return rocketmq.ErrOperationInterval
return errors.ErrOperationInterval
}

return nil
Expand Down Expand Up @@ -114,7 +114,7 @@ func (stp *stableTestProducer) checkFlag() error {
return err
}
if stp.bodySize <= 0 {
return rocketmq.ErrMessageBody
return errors.ErrMessageBody
}

return nil
Expand Down Expand Up @@ -187,7 +187,7 @@ func (stc *stableTestConsumer) checkFlag() error {
}

if stc.expression == "" {
return rocketmq.ErrEmptyExpression
return errors.ErrEmptyExpression
}
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package consumer
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/errors"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -818,7 +818,7 @@ func (dc *defaultConsumer) pullInner(ctx context.Context, queue *primitive.Messa
rlog.Warning("no broker found for mq", map[string]interface{}{
rlog.LogKeyMessageQueue: queue,
})
return nil, rocketmq.ErrBrokerNotFound
return nil, errors.ErrBrokerNotFound
}

if brokerResult.Slave {
Expand Down
8 changes: 4 additions & 4 deletions consumer/pull_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package consumer
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
errors2 "github.com/apache/rocketmq-client-go/v2/errors"
"sync"
"sync/atomic"

Expand Down Expand Up @@ -174,15 +174,15 @@ func (dc *defaultConsumer) checkPull(ctx context.Context, mq *primitive.MessageQ
}

if mq == nil {
return rocketmq.ErrMQEmpty
return errors2.ErrMQEmpty
}

if offset < 0 {
return rocketmq.ErrOffset
return errors2.ErrOffset
}

if numbers <= 0 {
return rocketmq.ErrNumbers
return errors2.ErrNumbers
}
return nil
}
Expand Down
6 changes: 3 additions & 3 deletions consumer/push_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package consumer
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
errors2 "github.com/apache/rocketmq-client-go/v2/errors"
"math"
"strconv"
"strings"
Expand Down Expand Up @@ -139,7 +139,7 @@ func (pc *pushConsumer) Start() error {
rlog.Error("the consumer group has been created, specify another one", map[string]interface{}{
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
err = rocketmq.ErrCreated
err = errors2.ErrCreated
return
}

Expand Down Expand Up @@ -225,7 +225,7 @@ func (pc *pushConsumer) Subscribe(topic string, selector MessageSelector,
f func(context.Context, ...*primitive.MessageExt) (ConsumeResult, error)) error {
if atomic.LoadInt32(&pc.state) == int32(internal.StateStartFailed) ||
atomic.LoadInt32(&pc.state) == int32(internal.StateShutdown) {
return rocketmq.ErrStartTopic
return errors2.ErrStartTopic
}

// add retry topic subscription for resubscribe
Expand Down
2 changes: 1 addition & 1 deletion errors.go → errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package rocketmq
package errors

import "errors"

Expand Down
3 changes: 2 additions & 1 deletion examples/consumer/pull/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package main
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2/errors"
"time"

"github.com/apache/rocketmq-client-go/v2"
Expand Down Expand Up @@ -52,7 +53,7 @@ func main() {
for {
resp, err := c.PullFrom(ctx, queue, offset, 10)
if err != nil {
if err == rocketmq.ErrRequestTimeout {
if err == errors.ErrRequestTimeout {
fmt.Printf("timeout \n")
time.Sleep(1 * time.Second)
continue
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,12 @@ golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHl
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190425150028-36563e24a262 h1:qsl9y/CJx34tuA7QCPNp86JNJe4spst6Ff8MjvPUdPg=
golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c h1:IGkKhmfzcztjm6gYkykvu/NiS8kaqbCWAEWWAyf8J5U=
golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
Expand Down
4 changes: 2 additions & 2 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"context"
"errors"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
errors2 "github.com/apache/rocketmq-client-go/v2/errors"
"net"
"os"
"strconv"
Expand Down Expand Up @@ -56,7 +56,7 @@ const (
)

var (
ErrServiceState = rocketmq.ErrService
ErrServiceState = errors2.ErrService

_VIPChannelEnable = false
)
Expand Down
4 changes: 2 additions & 2 deletions internal/remote/future.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package remote

import (
"context"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/errors"
"sync"
)

Expand Down Expand Up @@ -61,7 +61,7 @@ func (r *ResponseFuture) waitResponse() (*RemotingCommand, error) {
case <-r.Done:
cmd, err = r.ResponseCommand, r.Err
case <-r.ctx.Done():
err = rocketmq.ErrRequestTimeout
err = errors.ErrRequestTimeout
r.Err = err
}
return cmd, err
Expand Down
10 changes: 5 additions & 5 deletions internal/remote/remote_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package remote
import (
"bytes"
"context"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/errors"
"math/rand"
"net"
"reflect"
Expand Down Expand Up @@ -78,12 +78,12 @@ func TestResponseFutureWaitResponse(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(1000))
defer cancel()
future := NewResponseFuture(ctx, 10, nil)
if _, err := future.waitResponse(); err != rocketmq.ErrRequestTimeout {
if _, err := future.waitResponse(); err != errors.ErrRequestTimeout {
t.Errorf("wrong ResponseFuture waitResponse. want=%v, got=%v",
rocketmq.ErrRequestTimeout, err)
errors.ErrRequestTimeout, err)
}
future = NewResponseFuture(context.Background(), 10, nil)
responseError := rocketmq.ErrResponse
responseError := errors.ErrResponse
go func() {
time.Sleep(100 * time.Millisecond)
future.Err = responseError
Expand Down Expand Up @@ -293,7 +293,7 @@ func TestInvokeAsyncTimeout(t *testing.T) {
err := client.InvokeAsync(ctx, addr, clientSendRemtingCommand,
func(r *ResponseFuture) {
assert.NotNil(t, r.Err)
assert.Equal(t, rocketmq.ErrRequestTimeout, r.Err)
assert.Equal(t, errors.ErrRequestTimeout, r.Err)
wg.Done()
})
assert.Nil(t, err, "failed to invokeSync.")
Expand Down
4 changes: 2 additions & 2 deletions internal/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package internal

import (
"context"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/errors"
"math/rand"
"sort"
"strconv"
Expand Down Expand Up @@ -401,7 +401,7 @@ func (s *namesrvs) queryTopicRouteInfoFromServer(topic string) (*TopicRouteData,
}
return routeData, nil
case ResTopicNotExist:
return nil, rocketmq.ErrTopicNotExist
return nil, errors.ErrTopicNotExist
default:
return nil, primitive.NewMQClientErr(response.Code, response.Remark)
}
Expand Down
6 changes: 3 additions & 3 deletions internal/route_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package internal

import (
"context"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/errors"
"sync"
"testing"

Expand Down Expand Up @@ -53,7 +53,7 @@ func TestQueryTopicRouteInfoFromServer(t *testing.T) {
func(ctx context.Context, addr string, request *remote.RemotingCommand) (*remote.RemotingCommand, error) {
count++
if count < 3 {
return nil, rocketmq.ErrNotExisted
return nil, errors.ErrNotExisted
}
return &remote.RemotingCommand{
Code: ResTopicNotExist,
Expand All @@ -62,7 +62,7 @@ func TestQueryTopicRouteInfoFromServer(t *testing.T) {

data, err := namesrv.queryTopicRouteInfoFromServer("notexisted")
assert.Nil(t, data)
assert.Equal(t, rocketmq.ErrTopicNotExist, err)
assert.Equal(t, errors.ErrTopicNotExist, err)
})
})
}
Expand Down
4 changes: 2 additions & 2 deletions internal/utils/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package utils
import (
"bytes"
"compress/zlib"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/errors"
"io/ioutil"
"sync"
)
Expand Down Expand Up @@ -48,7 +48,7 @@ func init() {

func Compress(raw []byte, compressLevel int) ([]byte, error) {
if compressLevel < zlib.BestSpeed || compressLevel > zlib.BestCompression {
return nil, rocketmq.ErrCompressLevel
return nil, errors.ErrCompressLevel
}

buf := bufPool.Get().(*bytes.Buffer)
Expand Down
4 changes: 2 additions & 2 deletions internal/utils/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package utils
import (
"bytes"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/errors"
"net"
"strconv"
"time"
Expand Down Expand Up @@ -56,7 +56,7 @@ func ClientIP4() ([]byte, error) {
}
}
}
return nil, rocketmq.ErrUnknownIP
return nil, errors.ErrUnknownIP
}

func FakeIP() []byte {
Expand Down
10 changes: 5 additions & 5 deletions primitive/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ limitations under the License.
package primitive

import (
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/errors"
"regexp"
"strings"
)
Expand All @@ -32,7 +32,7 @@ type NamesrvAddr []string

func NewNamesrvAddr(s ...string) (NamesrvAddr, error) {
if len(s) == 0 {
return nil, rocketmq.ErrNoNameserver
return nil, errors.ErrNoNameserver
}

ss := s
Expand Down Expand Up @@ -70,17 +70,17 @@ func verifyIP(ip string) error {
return nil
}
if strings.Contains(ip, ";") {
return rocketmq.ErrMultiIP
return errors.ErrMultiIP
}
ipV4s := ipv4Regex.FindAllString(ip, -1)
ipV6s := ipv6Regex.FindAllString(ip, -1)

if len(ipV4s) == 0 && len(ipV6s) == 0 {
return rocketmq.ErrIllegalIP
return errors.ErrIllegalIP
}

if len(ipV4s) > 1 || len(ipV6s) > 1 {
return rocketmq.ErrMultiIP
return errors.ErrMultiIP
}
return nil
}
Expand Down
Loading

0 comments on commit e4a000b

Please sign in to comment.