From 38ec9fbb8fa604dcf07d7027720cf8ad8e3c8c56 Mon Sep 17 00:00:00 2001 From: zhangxu16 Date: Wed, 24 Mar 2021 20:08:41 +0800 Subject: [PATCH] [Route] Consumer can't consume message When cluster deployed on DLedger mode and enable slave read --- internal/route.go | 48 +++++++++++++++++++++++++++----------- internal/route_test.go | 53 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 87 insertions(+), 14 deletions(-) diff --git a/internal/route.go b/internal/route.go index 09b6e53a..7b27c9d0 100644 --- a/internal/route.go +++ b/internal/route.go @@ -219,10 +219,12 @@ func (s *namesrvs) FindBrokerAddrByName(brokerName string) string { func (s *namesrvs) FindBrokerAddressInSubscribe(brokerName string, brokerId int64, onlyThisBroker bool) *FindBrokerResult { var ( brokerAddr = "" - //slave = false - //found = false + slave = false + found = false ) + rlog.Debug("broker id "+strconv.FormatInt(brokerId, 10), nil) + v, exist := s.brokerAddressesMap.Load(brokerName) if !exist { @@ -234,22 +236,40 @@ func (s *namesrvs) FindBrokerAddressInSubscribe(brokerName string, brokerId int6 } brokerAddr = data.BrokerAddresses[brokerId] - //for k, v := range data.BrokerAddresses { - // if v != "" { - // found = true - // if k != MasterId { - // slave = true - // } - // brokerAddr = v - // break - // } - //} + slave = brokerId != MasterId + if brokerAddr != "" { + found = true + } + + // not found && read from slave, try again use next brokerId + if !found && slave { + rlog.Debug("Not found broker addr and slave "+strconv.FormatBool(slave), nil) + brokerAddr = data.BrokerAddresses[brokerId+1] + found = brokerAddr != "" + } + + // still not found && cloud use other broker addr, find anyone in BrokerAddresses + if !found && !onlyThisBroker { + rlog.Debug("STILL Not found broker addr", nil) + for k, v := range data.BrokerAddresses { + if v != "" { + brokerAddr = v + found = true + slave = k != MasterId + break + } + } + } + + if found { + rlog.Debug("Find broker addr "+brokerAddr, nil) + } var result *FindBrokerResult - if brokerAddr != "" { + if found { result = &FindBrokerResult{ BrokerAddr: brokerAddr, - Slave: brokerId != 0, + Slave: slave, BrokerVersion: s.findBrokerVersion(brokerName, brokerAddr), } } diff --git a/internal/route_test.go b/internal/route_test.go index c9b65f0c..ded77800 100644 --- a/internal/route_test.go +++ b/internal/route_test.go @@ -82,3 +82,56 @@ func TestAddBrokerVersion(t *testing.T) { v = s.findBrokerVersion("b1", "addr2") assert.Equal(t, v, int32(0)) } + +func TestFindBrokerAddressInSubscribe(t *testing.T) { + s := &namesrvs{} + s.brokerVersionMap = make(map[string]map[string]int32, 0) + s.brokerLock = new(sync.RWMutex) + + brokerDataRaft1 := &BrokerData{ + Cluster: "cluster", + BrokerName: "raft01", + BrokerAddresses: map[int64]string{ + 0: "127.0.0.1:10911", + 1: "127.0.0.1:10912", + 2: "127.0.0.1:10913", + }, + } + s.brokerAddressesMap.Store(brokerDataRaft1.BrokerName, brokerDataRaft1) + brokerDataRaft2 := &BrokerData{ + Cluster: "cluster", + BrokerName: "raft02", + BrokerAddresses: map[int64]string{ + 0: "127.0.0.1:10911", + 2: "127.0.0.1:10912", + 3: "127.0.0.1:10913", + }, + } + s.brokerAddressesMap.Store(brokerDataRaft2.BrokerName, brokerDataRaft2) + + Convey("Request master broker", t, func() { + result := s.FindBrokerAddressInSubscribe(brokerDataRaft1.BrokerName, 0, false) + assert.NotNil(t, result) + assert.Equal(t, result.BrokerAddr, brokerDataRaft1.BrokerAddresses[0]) + assert.Equal(t, result.Slave, false) + }) + + Convey("Request slave broker from normal broker group", t, func() { + result := s.FindBrokerAddressInSubscribe(brokerDataRaft1.BrokerName, 1, false) + assert.NotNil(t, result) + assert.Equal(t, result.BrokerAddr, brokerDataRaft1.BrokerAddresses[1]) + assert.Equal(t, result.Slave, true) + }) + + Convey("Request slave broker from non normal broker group", t, func() { + result := s.FindBrokerAddressInSubscribe(brokerDataRaft2.BrokerName, 1, false) + assert.NotNil(t, result) + assert.Equal(t, result.BrokerAddr, brokerDataRaft2.BrokerAddresses[2]) + assert.Equal(t, result.Slave, true) + }) + + Convey("Request not exist broker", t, func() { + result := s.FindBrokerAddressInSubscribe(brokerDataRaft1.BrokerName, 4, false) + assert.NotNil(t, result) + }) +}