Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor the client instance struct,converge the namesrv module #788

Merged
merged 1 commit into from
Mar 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 18 additions & 12 deletions admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package admin

import (
"context"
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -61,8 +62,7 @@ func WithResolver(resolver primitive.NsResolver) AdminOption {
}

type admin struct {
cli internal.RMQClient
namesrv internal.Namesrvs
cli internal.RMQClient

opts *adminOptions

Expand All @@ -75,17 +75,21 @@ func NewAdmin(opts ...AdminOption) (Admin, error) {
for _, opt := range opts {
opt(defaultOpts)
}

cli := internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil)
namesrv, err := internal.NewNamesrv(defaultOpts.Resolver)
defaultOpts.Namesrv = namesrv
if err != nil {
return nil, err
}

cli := internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil)
if cli == nil {
return nil, fmt.Errorf("GetOrNewRocketMQClient faild")
}
defaultOpts.Namesrv = cli.GetNameSrv()
//log.Printf("Client: %#v", namesrv.srvs)
return &admin{
cli: cli,
namesrv: namesrv,
opts: defaultOpts,
cli: cli,
opts: defaultOpts,
}, nil
}

Expand Down Expand Up @@ -153,8 +157,8 @@ func (a *admin) DeleteTopic(ctx context.Context, opts ...OptionDelete) error {
}
//delete topic in broker
if cfg.BrokerAddr == "" {
a.namesrv.UpdateTopicRouteInfo(cfg.Topic)
cfg.BrokerAddr = a.namesrv.FindBrokerAddrByTopic(cfg.Topic)
a.cli.GetNameSrv().UpdateTopicRouteInfo(cfg.Topic)
cfg.BrokerAddr = a.cli.GetNameSrv().FindBrokerAddrByTopic(cfg.Topic)
}

if _, err := a.deleteTopicInBroker(ctx, cfg.Topic, cfg.BrokerAddr); err != nil {
Expand All @@ -168,14 +172,16 @@ func (a *admin) DeleteTopic(ctx context.Context, opts ...OptionDelete) error {

//delete topic in nameserver
if len(cfg.NameSrvAddr) == 0 {
_, _, err := a.namesrv.UpdateTopicRouteInfo(cfg.Topic)
a.cli.GetNameSrv().UpdateTopicRouteInfo(cfg.Topic)
cfg.NameSrvAddr = a.cli.GetNameSrv().AddrList()
_, _, err := a.cli.GetNameSrv().UpdateTopicRouteInfo(cfg.Topic)
if err != nil {
rlog.Error("delete topic in nameserver error", map[string]interface{}{
rlog.LogKeyTopic: cfg.Topic,
rlog.LogKeyTopic: cfg.Topic,
rlog.LogKeyUnderlayError: err,
})
}
cfg.NameSrvAddr = a.namesrv.AddrList()
cfg.NameSrvAddr = a.cli.GetNameSrv().AddrList()
}

for _, nameSrvAddr := range cfg.NameSrvAddr {
Expand Down
36 changes: 17 additions & 19 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,6 @@ type defaultConsumer struct {
// chan for push consumer
prCh chan PullRequest

namesrv internal.Namesrvs

pullFromWhichNodeTable sync.Map

stat *StatsManager
Expand All @@ -280,7 +278,7 @@ func (dc *defaultConsumer) start() error {

if dc.model == Clustering {
dc.option.ChangeInstanceNameToPID()
dc.storage = NewRemoteOffsetStore(dc.consumerGroup, dc.client, dc.namesrv)
dc.storage = NewRemoteOffsetStore(dc.consumerGroup, dc.client, dc.client.GetNameSrv())
} else {
dc.storage = NewLocalFileOffsetStore(dc.consumerGroup, dc.client.ClientID())
}
Expand Down Expand Up @@ -448,7 +446,7 @@ type lockBatchRequestBody struct {
}

func (dc *defaultConsumer) lock(mq *primitive.MessageQueue) bool {
brokerResult := dc.namesrv.FindBrokerAddressInSubscribe(mq.BrokerName, internal.MasterId, true)
brokerResult := dc.client.GetNameSrv().FindBrokerAddressInSubscribe(mq.BrokerName, internal.MasterId, true)

if brokerResult == nil {
return false
Expand Down Expand Up @@ -488,7 +486,7 @@ func (dc *defaultConsumer) lock(mq *primitive.MessageQueue) bool {
}

func (dc *defaultConsumer) unlock(mq *primitive.MessageQueue, oneway bool) {
brokerResult := dc.namesrv.FindBrokerAddressInSubscribe(mq.BrokerName, internal.MasterId, true)
brokerResult := dc.client.GetNameSrv().FindBrokerAddressInSubscribe(mq.BrokerName, internal.MasterId, true)

if brokerResult == nil {
return
Expand All @@ -513,7 +511,7 @@ func (dc *defaultConsumer) lockAll() {
if len(mqs) == 0 {
continue
}
brokerResult := dc.namesrv.FindBrokerAddressInSubscribe(broker, internal.MasterId, true)
brokerResult := dc.client.GetNameSrv().FindBrokerAddressInSubscribe(broker, internal.MasterId, true)
if brokerResult == nil {
continue
}
Expand Down Expand Up @@ -559,7 +557,7 @@ func (dc *defaultConsumer) unlockAll(oneway bool) {
if len(mqs) == 0 {
continue
}
brokerResult := dc.namesrv.FindBrokerAddressInSubscribe(broker, internal.MasterId, true)
brokerResult := dc.client.GetNameSrv().FindBrokerAddressInSubscribe(broker, internal.MasterId, true)
if brokerResult == nil {
continue
}
Expand Down Expand Up @@ -892,10 +890,10 @@ func (dc *defaultConsumer) processPullResult(mq *primitive.MessageQueue, result
}

func (dc *defaultConsumer) findConsumerList(topic string) []string {
brokerAddr := dc.namesrv.FindBrokerAddrByTopic(topic)
brokerAddr := dc.client.GetNameSrv().FindBrokerAddrByTopic(topic)
if brokerAddr == "" {
dc.namesrv.UpdateTopicRouteInfo(topic)
brokerAddr = dc.namesrv.FindBrokerAddrByTopic(topic)
dc.client.GetNameSrv().UpdateTopicRouteInfo(topic)
brokerAddr = dc.client.GetNameSrv().FindBrokerAddrByTopic(topic)
}

if brokerAddr != "" {
Expand Down Expand Up @@ -929,10 +927,10 @@ func (dc *defaultConsumer) sendBack(msg *primitive.MessageExt, level int) error

// QueryMaxOffset with specific queueId and topic
func (dc *defaultConsumer) queryMaxOffset(mq *primitive.MessageQueue) (int64, error) {
brokerAddr := dc.namesrv.FindBrokerAddrByName(mq.BrokerName)
brokerAddr := dc.client.GetNameSrv().FindBrokerAddrByName(mq.BrokerName)
if brokerAddr == "" {
dc.namesrv.UpdateTopicRouteInfo(mq.Topic)
brokerAddr = dc.namesrv.FindBrokerAddrByName(mq.BrokerName)
dc.client.GetNameSrv().UpdateTopicRouteInfo(mq.Topic)
brokerAddr = dc.client.GetNameSrv().FindBrokerAddrByName(mq.BrokerName)
}
if brokerAddr == "" {
return -1, fmt.Errorf("the broker [%s] does not exist", mq.BrokerName)
Expand All @@ -958,10 +956,10 @@ func (dc *defaultConsumer) queryOffset(mq *primitive.MessageQueue) int64 {

// SearchOffsetByTimestamp with specific queueId and topic
func (dc *defaultConsumer) searchOffsetByTimestamp(mq *primitive.MessageQueue, timestamp int64) (int64, error) {
brokerAddr := dc.namesrv.FindBrokerAddrByName(mq.BrokerName)
brokerAddr := dc.client.GetNameSrv().FindBrokerAddrByName(mq.BrokerName)
if brokerAddr == "" {
dc.namesrv.UpdateTopicRouteInfo(mq.Topic)
brokerAddr = dc.namesrv.FindBrokerAddrByName(mq.BrokerName)
dc.client.GetNameSrv().UpdateTopicRouteInfo(mq.Topic)
brokerAddr = dc.client.GetNameSrv().FindBrokerAddrByName(mq.BrokerName)
}
if brokerAddr == "" {
return -1, fmt.Errorf("the broker [%s] does not exist", mq.BrokerName)
Expand Down Expand Up @@ -1044,12 +1042,12 @@ func clearCommitOffsetFlag(sysFlag int32) int32 {
}

func (dc *defaultConsumer) tryFindBroker(mq *primitive.MessageQueue) *internal.FindBrokerResult {
result := dc.namesrv.FindBrokerAddressInSubscribe(mq.BrokerName, dc.recalculatePullFromWhichNode(mq), false)
result := dc.client.GetNameSrv().FindBrokerAddressInSubscribe(mq.BrokerName, dc.recalculatePullFromWhichNode(mq), false)
if result != nil {
return result
}
dc.namesrv.UpdateTopicRouteInfo(mq.Topic)
return dc.namesrv.FindBrokerAddressInSubscribe(mq.BrokerName, dc.recalculatePullFromWhichNode(mq), false)
dc.client.GetNameSrv().UpdateTopicRouteInfo(mq.Topic)
return dc.client.GetNameSrv().FindBrokerAddressInSubscribe(mq.BrokerName, dc.recalculatePullFromWhichNode(mq), false)
}

func (dc *defaultConsumer) updatePullFromWhichNode(mq *primitive.MessageQueue, brokerId int64) {
Expand Down
5 changes: 3 additions & 2 deletions consumer/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,15 @@ func TestDoRebalance(t *testing.T) {
defer ctrl.Finish()
namesrvCli := internal.NewMockNamesrvs(ctrl)
namesrvCli.EXPECT().FindBrokerAddrByTopic(gomock.Any()).Return(broker)
dc.namesrv = namesrvCli

rmqCli := internal.NewMockRMQClient(ctrl)
rmqCli.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(&remote.RemotingCommand{
Body: []byte("{\"consumerIdList\": [\"a1\", \"a2\", \"a3\"] }"),
}, nil)
rmqCli.EXPECT().ClientID().Return(clientID)
rmqCli.SetNameSrv(namesrvCli)

dc.client = rmqCli

var wg sync.WaitGroup
Expand Down Expand Up @@ -109,10 +110,10 @@ func TestComputePullFromWhere(t *testing.T) {
}

namesrvCli := internal.NewMockNamesrvs(ctrl)
dc.namesrv = namesrvCli

rmqCli := internal.NewMockRMQClient(ctrl)
dc.client = rmqCli
rmqCli.SetNameSrv(namesrvCli)

Convey("get effective offset", func() {
offsetStore.EXPECT().read(gomock.Any(), gomock.Any()).Return(int64(10))
Expand Down
8 changes: 7 additions & 1 deletion consumer/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package consumer

import (
"context"
"fmt"
"time"

"github.com/apache/rocketmq-client-go/v2/internal"
Expand All @@ -39,9 +40,14 @@ func WithTrace(traceCfg *primitive.TraceConfig) Option {

func newTraceInterceptor(traceCfg *primitive.TraceConfig) primitive.Interceptor {
dispatcher := internal.NewTraceDispatcher(traceCfg)
dispatcher.Start()
if dispatcher != nil {
dispatcher.Start()
}

return func(ctx context.Context, req, reply interface{}, next primitive.Invoker) error {
if dispatcher == nil {
return fmt.Errorf("GetOrNewRocketMQClient faild")
}
consumerCtx, exist := primitive.GetConsumerCtx(ctx)
if !exist || len(consumerCtx.Msgs) == 0 {
return next(ctx, req, reply)
Expand Down
10 changes: 6 additions & 4 deletions consumer/pull_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,12 @@ func NewPullConsumer(options ...Option) (*defaultPullConsumer, error) {
prCh: make(chan PullRequest, 4),
model: defaultOpts.ConsumerModel,
option: defaultOpts,

namesrv: srvs,
}
dc.option.ClientOptions.Namesrv, err = internal.GetNamesrv(dc.client.ClientID())
if dc.client == nil {
return nil, fmt.Errorf("GetOrNewRocketMQClient faild")
}
defaultOpts.Namesrv = dc.client.GetNameSrv()

c := &defaultPullConsumer{
defaultConsumer: dc,
}
Expand Down Expand Up @@ -132,7 +134,7 @@ func (c *defaultPullConsumer) Pull(ctx context.Context, topic string, selector M
}

func (c *defaultPullConsumer) getNextQueueOf(topic string) *primitive.MessageQueue {
queues, err := c.defaultConsumer.namesrv.FetchSubscribeMessageQueues(topic)
queues, err := c.defaultConsumer.client.GetNameSrv().FetchSubscribeMessageQueues(topic)
if err != nil && len(queues) > 0 {
rlog.Error("get next mq error", map[string]interface{}{
rlog.LogKeyTopic: topic,
Expand Down
18 changes: 6 additions & 12 deletions consumer/push_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,13 @@ func NewPushConsumer(opts ...Option) (*pushConsumer, error) {
consumeOrderly: defaultOpts.ConsumeOrderly,
fromWhere: defaultOpts.FromWhere,
allocate: defaultOpts.Strategy,
namesrv: srvs,
option: defaultOpts,
}
dc.option.ClientOptions.Namesrv, err = internal.GetNamesrv(dc.client.ClientID())
if err != nil {
return nil, err
if dc.client == nil {
return nil, fmt.Errorf("GetOrNewRocketMQClient faild")
}
dc.namesrv = dc.option.ClientOptions.Namesrv
defaultOpts.Namesrv = dc.client.GetNameSrv()

p := &pushConsumer{
defaultConsumer: dc,
subscribedTopic: make(map[string]string, 0),
Expand All @@ -124,11 +123,6 @@ func NewPushConsumer(opts ...Option) (*pushConsumer, error) {

p.interceptor = primitive.ChainInterceptors(p.option.Interceptors...)

if p.model == Clustering {
retryTopic := internal.GetRetryTopic(p.consumerGroup)
sub := buildSubscriptionData(retryTopic, MessageSelector{TAG, _SubAll})
p.subscriptionDataTable.Store(retryTopic, sub)
}
return p, nil
}

Expand Down Expand Up @@ -386,7 +380,7 @@ func (pc *pushConsumer) GetConsumerRunningInfo() *internal.ConsumerRunningInfo {
})

nsAddr := ""
for _, value := range pc.namesrv.AddrList() {
for _, value := range pc.client.GetNameSrv().AddrList() {
nsAddr += fmt.Sprintf("%s;", value)
}
info.Properties[internal.PropNameServerAddr] = nsAddr
Expand Down Expand Up @@ -795,7 +789,7 @@ func (pc *pushConsumer) correctTagsOffset(pr *PullRequest) {
func (pc *pushConsumer) sendMessageBack(brokerName string, msg *primitive.MessageExt, delayLevel int) bool {
var brokerAddr string
if len(brokerName) != 0 {
brokerAddr = pc.defaultConsumer.namesrv.FindBrokerAddrByName(brokerName)
brokerAddr = pc.defaultConsumer.client.GetNameSrv().FindBrokerAddrByName(brokerName)
} else {
brokerAddr = msg.StoreHost
}
Expand Down
Loading