Skip to content

Commit

Permalink
[ISSUE #631] Support Consuming from Slave
Browse files Browse the repository at this point in the history
Co-authored-by: zhangxu16 <[email protected]>
  • Loading branch information
maixiaohai and zhangxu16 authored Jul 1, 2021
1 parent e69e907 commit 76dd050
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 14 deletions.
48 changes: 34 additions & 14 deletions internal/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,10 +219,12 @@ func (s *namesrvs) FindBrokerAddrByName(brokerName string) string {
func (s *namesrvs) FindBrokerAddressInSubscribe(brokerName string, brokerId int64, onlyThisBroker bool) *FindBrokerResult {
var (
brokerAddr = ""
//slave = false
//found = false
slave = false
found = false
)

rlog.Debug("broker id "+strconv.FormatInt(brokerId, 10), nil)

v, exist := s.brokerAddressesMap.Load(brokerName)

if !exist {
Expand All @@ -234,22 +236,40 @@ func (s *namesrvs) FindBrokerAddressInSubscribe(brokerName string, brokerId int6
}

brokerAddr = data.BrokerAddresses[brokerId]
//for k, v := range data.BrokerAddresses {
// if v != "" {
// found = true
// if k != MasterId {
// slave = true
// }
// brokerAddr = v
// break
// }
//}
slave = brokerId != MasterId
if brokerAddr != "" {
found = true
}

// not found && read from slave, try again use next brokerId
if !found && slave {
rlog.Debug("Not found broker addr and slave "+strconv.FormatBool(slave), nil)
brokerAddr = data.BrokerAddresses[brokerId+1]
found = brokerAddr != ""
}

// still not found && cloud use other broker addr, find anyone in BrokerAddresses
if !found && !onlyThisBroker {
rlog.Debug("STILL Not found broker addr", nil)
for k, v := range data.BrokerAddresses {
if v != "" {
brokerAddr = v
found = true
slave = k != MasterId
break
}
}
}

if found {
rlog.Debug("Find broker addr "+brokerAddr, nil)
}

var result *FindBrokerResult
if brokerAddr != "" {
if found {
result = &FindBrokerResult{
BrokerAddr: brokerAddr,
Slave: brokerId != 0,
Slave: slave,
BrokerVersion: s.findBrokerVersion(brokerName, brokerAddr),
}
}
Expand Down
53 changes: 53 additions & 0 deletions internal/route_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,56 @@ func TestAddBrokerVersion(t *testing.T) {
v = s.findBrokerVersion("b1", "addr2")
assert.Equal(t, v, int32(0))
}

func TestFindBrokerAddressInSubscribe(t *testing.T) {
s := &namesrvs{}
s.brokerVersionMap = make(map[string]map[string]int32, 0)
s.brokerLock = new(sync.RWMutex)

brokerDataRaft1 := &BrokerData{
Cluster: "cluster",
BrokerName: "raft01",
BrokerAddresses: map[int64]string{
0: "127.0.0.1:10911",
1: "127.0.0.1:10912",
2: "127.0.0.1:10913",
},
}
s.brokerAddressesMap.Store(brokerDataRaft1.BrokerName, brokerDataRaft1)
brokerDataRaft2 := &BrokerData{
Cluster: "cluster",
BrokerName: "raft02",
BrokerAddresses: map[int64]string{
0: "127.0.0.1:10911",
2: "127.0.0.1:10912",
3: "127.0.0.1:10913",
},
}
s.brokerAddressesMap.Store(brokerDataRaft2.BrokerName, brokerDataRaft2)

Convey("Request master broker", t, func() {
result := s.FindBrokerAddressInSubscribe(brokerDataRaft1.BrokerName, 0, false)
assert.NotNil(t, result)
assert.Equal(t, result.BrokerAddr, brokerDataRaft1.BrokerAddresses[0])
assert.Equal(t, result.Slave, false)
})

Convey("Request slave broker from normal broker group", t, func() {
result := s.FindBrokerAddressInSubscribe(brokerDataRaft1.BrokerName, 1, false)
assert.NotNil(t, result)
assert.Equal(t, result.BrokerAddr, brokerDataRaft1.BrokerAddresses[1])
assert.Equal(t, result.Slave, true)
})

Convey("Request slave broker from non normal broker group", t, func() {
result := s.FindBrokerAddressInSubscribe(brokerDataRaft2.BrokerName, 1, false)
assert.NotNil(t, result)
assert.Equal(t, result.BrokerAddr, brokerDataRaft2.BrokerAddresses[2])
assert.Equal(t, result.Slave, true)
})

Convey("Request not exist broker", t, func() {
result := s.FindBrokerAddressInSubscribe(brokerDataRaft1.BrokerName, 4, false)
assert.NotNil(t, result)
})
}

0 comments on commit 76dd050

Please sign in to comment.