Skip to content

Commit

Permalink
Merge pull request #65 from zonghaishang/loadbalance_leastactive
Browse files Browse the repository at this point in the history
NewLeastActiveLoadBalance
  • Loading branch information
zonghaishang authored Jun 13, 2019
2 parents 17d73c5 + b2585fc commit 2b5f5e4
Show file tree
Hide file tree
Showing 4 changed files with 286 additions and 0 deletions.
101 changes: 101 additions & 0 deletions cluster/loadbalance/least_active.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// @author [email protected]
package loadbalance

import (
"math/rand"
)

import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/protocol"
)

const (
LeastActive = "leastactive"
)

func init() {
extension.SetLoadbalance(LeastActive, NewLeastActiveLoadBalance)
}

type leastActiveLoadBalance struct {
}

func NewLeastActiveLoadBalance() cluster.LoadBalance {
return &leastActiveLoadBalance{}
}

func (lb *leastActiveLoadBalance) Select(invokers []protocol.Invoker, invocation protocol.Invocation) protocol.Invoker {
count := len(invokers)
if count == 0 {
return nil
}
if count == 1 {
return invokers[0]
}

var (
leastActive int32 = -1 // The least active value of all invokers
totalWeight int64 = 0 // The number of invokers having the same least active value (LEAST_ACTIVE)
firstWeight int64 = 0 // Initial value, used for comparision
leastIndexes = make([]int, count) // The index of invokers having the same least active value (LEAST_ACTIVE)
leastCount = 0 // The number of invokers having the same least active value (LEAST_ACTIVE)
sameWeight = true // Every invoker has the same weight value?
)

for i := 0; i < count; i++ {
invoker := invokers[i]
// Active number
active := protocol.GetStatus(invoker.GetUrl(), invocation.MethodName()).GetActive()
// current weight (maybe in warmUp)
weight := GetWeight(invoker, invocation)
// There are smaller active services
if leastActive == -1 || active < leastActive {
leastActive = active
leastIndexes[0] = i
leastCount = 1 // next available leastIndex offset
totalWeight = weight
firstWeight = weight
sameWeight = true
} else if active == leastActive {
leastIndexes[leastCount] = i
totalWeight += weight
leastCount++

if sameWeight && (i > 0) && weight != firstWeight {
sameWeight = false
}
}
}

if leastCount == 1 {
return invokers[0]
}

if !sameWeight && totalWeight > 0 {
offsetWeight := rand.Int63n(totalWeight) + 1
for i := 0; i < leastCount; i++ {
leastIndex := leastIndexes[i]
offsetWeight -= GetWeight(invokers[i], invocation)
if offsetWeight <= 0 {
return invokers[leastIndex]
}
}
}

index := leastIndexes[rand.Intn(leastCount)]
return invokers[index]
}
67 changes: 67 additions & 0 deletions cluster/loadbalance/least_active_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package loadbalance

import (
"context"
"fmt"
"testing"
)

import (
"github.com/stretchr/testify/assert"
)

import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
)

func TestLeastActiveSelect(t *testing.T) {
loadBalance := NewLeastActiveLoadBalance()

var invokers []protocol.Invoker

url, _ := common.NewURL(context.TODO(), "dubbo://192.168.1.0:20000/org.apache.demo.HelloService")
invokers = append(invokers, protocol.NewBaseInvoker(url))
i := loadBalance.Select(invokers, &invocation.RPCInvocation{})
assert.True(t, i.GetUrl().URLEqual(url))

for i := 1; i < 10; i++ {
url, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://192.168.1.%v:20000/org.apache.demo.HelloService", i))
invokers = append(invokers, protocol.NewBaseInvoker(url))
}
loadBalance.Select(invokers, &invocation.RPCInvocation{})
}

func TestLeastActiveByWeight(t *testing.T) {
loadBalance := NewLeastActiveLoadBalance()

var invokers []protocol.Invoker
loop := 3
for i := 1; i <= loop; i++ {
url, _ := common.NewURL(context.TODO(), fmt.Sprintf("test%v://192.168.1.%v:20000/org.apache.demo.HelloService?weight=%v", i, i, i))
invokers = append(invokers, protocol.NewBaseInvoker(url))
}

inv := new(invocation.RPCInvocation)
inv.SetMethod("test")
protocol.BeginCount(invokers[2].GetUrl(), inv.MethodName())

loop = 10000

var (
firstCount int
secondCount int
)

for i := 1; i <= loop; i++ {
invoker := loadBalance.Select(invokers, inv)
if invoker.GetUrl().Protocol == "test1" {
firstCount++
} else if invoker.GetUrl().Protocol == "test2" {
secondCount++
}
}

assert.Equal(t, firstCount+secondCount, loop)
}
47 changes: 47 additions & 0 deletions filter/impl/active_filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// @author [email protected]
package impl

import (
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/filter"
"github.com/apache/dubbo-go/protocol"
)

const active = "active"

func init() {
extension.SetFilter(active, GetActiveFilter)
}

type ActiveFilter struct {
}

func (ef *ActiveFilter) Invoke(invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
logger.Infof("invoking active filter. %v,%v", invocation.MethodName(), len(invocation.Arguments()))

protocol.BeginCount(invoker.GetUrl(), invocation.MethodName())
return invoker.Invoke(invocation)
}

func (ef *ActiveFilter) OnResponse(result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {

protocol.EndCount(invoker.GetUrl(), invocation.MethodName())
return result
}

func GetActiveFilter() filter.Filter {
return &ActiveFilter{}
}
71 changes: 71 additions & 0 deletions protocol/RpcStatus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// @author [email protected]
package protocol

import (
"sync"
"sync/atomic"
)

import (
"github.com/apache/dubbo-go/common"
)

var (
methodStatistics sync.Map // url -> { methodName : RpcStatus}
)

type RpcStatus struct {
active int32
}

func (rpc *RpcStatus) GetActive() int32 {
return atomic.LoadInt32(&rpc.active)
}

func GetStatus(url common.URL, methodName string) *RpcStatus {
identifier := url.Key()
methodMap, found := methodStatistics.Load(identifier)
if !found {
methodMap = &sync.Map{}
methodStatistics.Store(identifier, methodMap)
}

methodActive := methodMap.(*sync.Map)
rpcStatus, found := methodActive.Load(methodName)
if !found {
rpcStatus = &RpcStatus{}
methodActive.Store(methodName, rpcStatus)
}

status := rpcStatus.(*RpcStatus)
return status
}

func BeginCount(url common.URL, methodName string) {
beginCount0(GetStatus(url, methodName))
}

func EndCount(url common.URL, methodName string) {
endCount0(GetStatus(url, methodName))
}

// private methods
func beginCount0(rpcStatus *RpcStatus) {
atomic.AddInt32(&rpcStatus.active, 1)
}

func endCount0(rpcStatus *RpcStatus) {
atomic.AddInt32(&rpcStatus.active, -1)
}

0 comments on commit 2b5f5e4

Please sign in to comment.