Skip to content

Commit

Permalink
Merge pull request #246 from flycash/feature/ExecuteLimitFilter
Browse files Browse the repository at this point in the history
Feature: ExecuteLimit Support
  • Loading branch information
AlexStocks authored Oct 29, 2019
2 parents e53f2a2 + 32f8233 commit d39a57d
Show file tree
Hide file tree
Showing 17 changed files with 345 additions and 97 deletions.
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,12 @@ Finished List:
- Dynamic Configure Center & Service Management Configurator: Zookeeper
- Cluster Strategy: Failover/[Failfast](https://github.com/apache/dubbo-go/pull/140)/[Failsafe/Failback](https://github.com/apache/dubbo-go/pull/136)/[Available](https://github.com/apache/dubbo-go/pull/155)/[Broadcast](https://github.com/apache/dubbo-go/pull/158)/[Forking](https://github.com/apache/dubbo-go/pull/161)
- Load Balance: Random/[RoundRobin](https://github.com/apache/dubbo-go/pull/66)/[LeastActive](https://github.com/apache/dubbo-go/pull/65)
- Filter: Echo Health Check/[Circuit break and service downgrade](https://github.com/apache/dubbo-go/pull/133)/[TokenFilter](https://github.com/apache/dubbo-go/pull/202)/[AccessLogFilter](https://github.com/apache/dubbo-go/pull/214)/[TpsLimitFilter](https://github.com/apache/dubbo-go/pull/237)
- Filter: Echo Health Check/[Circuit break and service downgrade](https://github.com/apache/dubbo-go/pull/133)/[TokenFilter](https://github.com/apache/dubbo-go/pull/202)/[AccessLogFilter](https://github.com/apache/dubbo-go/pull/214)/[TpsLimitFilter](https://github.com/apache/dubbo-go/pull/237)/[ExecuteLimitFilter](https://github.com/apache/dubbo-go/pull/246)
- Other feature: [generic invoke](https://github.com/apache/dubbo-go/pull/122)/start check/connecting certain provider/multi-protocols/multi-registries/multi-versions/service group

Working List:

- Load Balance: ConsistentHash
- Filter: CountFilter/ExecuteLimitFilter
- Registry: k8s
- Configure Center: apollo
- Metadata Center (dubbo v2.7.x)
Expand Down
3 changes: 1 addition & 2 deletions README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,13 @@ Apache License, Version 2.0
- 动态配置中心与服务治理配置器(config center): Zookeeper
- 集群策略: Failover/[Failfast](https://github.com/apache/dubbo-go/pull/140)/[Failsafe/Failback](https://github.com/apache/dubbo-go/pull/136)/[Available](https://github.com/apache/dubbo-go/pull/155)/[Broadcast](https://github.com/apache/dubbo-go/pull/158)/[Forking](https://github.com/apache/dubbo-go/pull/161)
- 负载均衡策略: Random/[RoundRobin](https://github.com/apache/dubbo-go/pull/66)/[LeastActive](https://github.com/apache/dubbo-go/pull/65)
- 过滤器: Echo Health Check/[服务熔断&降级](https://github.com/apache/dubbo-go/pull/133)/[TokenFilter](https://github.com/apache/dubbo-go/pull/202)/[AccessLogFilter](https://github.com/apache/dubbo-go/pull/214)/[TpsLimitFilter](https://github.com/apache/dubbo-go/pull/237)
- 过滤器: Echo Health Check/[服务熔断&降级](https://github.com/apache/dubbo-go/pull/133)/[TokenFilter](https://github.com/apache/dubbo-go/pull/202)/[AccessLogFilter](https://github.com/apache/dubbo-go/pull/214)/[TpsLimitFilter](https://github.com/apache/dubbo-go/pull/237)[ExecuteLimitFilter](https://github.com/apache/dubbo-go/pull/246)
- 其他功能支持: [泛化调用](https://github.com/apache/dubbo-go/pull/122)/启动时检查/服务直连/多服务协议/多注册中心/多服务版本/服务分组

开发中列表:

- 集群策略: Forking
- 负载均衡策略: ConsistentHash
- 过滤器: CountFilter/ExecuteLimitFilter
- 注册中心: k8s
- 配置中心: apollo
- 元数据中心 (dubbo v2.7.x)
Expand Down
2 changes: 1 addition & 1 deletion common/constant/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ const (
const (
DEFAULT_KEY = "default"
PREFIX_DEFAULT_KEY = "default."
DEFAULT_SERVICE_FILTERS = "echo,token,accesslog,tps"
DEFAULT_SERVICE_FILTERS = "echo,token,accesslog,tps,execute"
DEFAULT_REFERENCE_FILTERS = ""
GENERIC_REFERENCE_FILTERS = "generic"
GENERIC = "$invoke"
Expand Down
43 changes: 23 additions & 20 deletions common/constant/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,26 +48,29 @@ const (
)

const (
TIMESTAMP_KEY = "timestamp"
REMOTE_TIMESTAMP_KEY = "remote.timestamp"
CLUSTER_KEY = "cluster"
LOADBALANCE_KEY = "loadbalance"
WEIGHT_KEY = "weight"
WARMUP_KEY = "warmup"
RETRIES_KEY = "retries"
BEAN_NAME = "bean.name"
FAIL_BACK_TASKS_KEY = "failbacktasks"
FORKS_KEY = "forks"
DEFAULT_FORKS = 2
DEFAULT_TIMEOUT = 1000
ACCESS_LOG_KEY = "accesslog"
TPS_LIMITER_KEY = "tps.limiter"
TPS_REJECTED_EXECUTION_HANDLER_KEY = "tps.limit.rejected.handler"
TPS_LIMIT_RATE_KEY = "tps.limit.rate"
DEFAULT_TPS_LIMIT_RATE = "-1"
TPS_LIMIT_INTERVAL_KEY = "tps.limit.interval"
DEFAULT_TPS_LIMIT_INTERVAL = "60000"
TPS_LIMIT_STRATEGY_KEY = "tps.limit.strategy"
TIMESTAMP_KEY = "timestamp"
REMOTE_TIMESTAMP_KEY = "remote.timestamp"
CLUSTER_KEY = "cluster"
LOADBALANCE_KEY = "loadbalance"
WEIGHT_KEY = "weight"
WARMUP_KEY = "warmup"
RETRIES_KEY = "retries"
BEAN_NAME = "bean.name"
FAIL_BACK_TASKS_KEY = "failbacktasks"
FORKS_KEY = "forks"
DEFAULT_FORKS = 2
DEFAULT_TIMEOUT = 1000
ACCESS_LOG_KEY = "accesslog"
TPS_LIMITER_KEY = "tps.limiter"
TPS_REJECTED_EXECUTION_HANDLER_KEY = "tps.limit.rejected.handler"
TPS_LIMIT_RATE_KEY = "tps.limit.rate"
DEFAULT_TPS_LIMIT_RATE = "-1"
TPS_LIMIT_INTERVAL_KEY = "tps.limit.interval"
DEFAULT_TPS_LIMIT_INTERVAL = "60000"
TPS_LIMIT_STRATEGY_KEY = "tps.limit.strategy"
EXECUTE_LIMIT_KEY = "execute.limit"
DEFAULT_EXECUTE_LIMIT = "-1"
EXECUTE_REJECTED_EXECUTION_HANDLER_KEY = "execute.limit.rejected.handler"
)

const (
Expand Down
17 changes: 16 additions & 1 deletion common/extension/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ package extension

import (
"github.com/apache/dubbo-go/filter"
"github.com/apache/dubbo-go/filter/common"
)

var (
filters = make(map[string]func() filter.Filter)
filters = make(map[string]func() filter.Filter)
rejectedExecutionHandler = make(map[string]func() common.RejectedExecutionHandler)
)

func SetFilter(name string, v func() filter.Filter) {
Expand All @@ -35,3 +37,16 @@ func GetFilter(name string) filter.Filter {
}
return filters[name]()
}

func SetRejectedExecutionHandler(name string, creator func() common.RejectedExecutionHandler) {
rejectedExecutionHandler[name] = creator
}

func GetRejectedExecutionHandler(name string) common.RejectedExecutionHandler {
creator, ok := rejectedExecutionHandler[name]
if !ok {
panic("RejectedExecutionHandler for " + name + " is not existing, make sure you have import the package " +
"and you have register it by invoking extension.SetRejectedExecutionHandler.")
}
return creator()
}
18 changes: 2 additions & 16 deletions common/extension/tps_limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ import (
)

var (
tpsLimitStrategy = make(map[string]func(rate int, interval int) tps.TpsLimitStrategy)
tpsLimiter = make(map[string]func() tps.TpsLimiter)
tpsRejectedExecutionHandler = make(map[string]func() tps.RejectedExecutionHandler)
tpsLimitStrategy = make(map[string]func(rate int, interval int) tps.TpsLimitStrategy)
tpsLimiter = make(map[string]func() tps.TpsLimiter)
)

func SetTpsLimiter(name string, creator func() tps.TpsLimiter) {
Expand Down Expand Up @@ -52,16 +51,3 @@ func GetTpsLimitStrategyCreator(name string) func(rate int, interval int) tps.Tp
}
return creator
}

func SetTpsRejectedExecutionHandler(name string, creator func() tps.RejectedExecutionHandler) {
tpsRejectedExecutionHandler[name] = creator
}

func GetTpsRejectedExecutionHandler(name string) tps.RejectedExecutionHandler {
creator, ok := tpsRejectedExecutionHandler[name]
if !ok {
panic("TpsRejectedExecutionHandler for " + name + " is not existing, make sure you have import the package " +
"and you have register it by invoking extension.SetTpsRejectedExecutionHandler.")
}
return creator()
}
20 changes: 11 additions & 9 deletions config/method_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,17 @@ import (
)

type MethodConfig struct {
InterfaceId string
InterfaceName string
Name string `yaml:"name" json:"name,omitempty" property:"name"`
Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"`
Loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"`
Weight int64 `yaml:"weight" json:"weight,omitempty" property:"weight"`
TpsLimitInterval string `yaml:"tps.limit.interval" json:"tps.limit.interval,omitempty" property:"tps.limit.interval"`
TpsLimitRate string `yaml:"tps.limit.rate" json:"tps.limit.rate,omitempty" property:"tps.limit.rate"`
TpsLimitStrategy string `yaml:"tps.limit.strategy" json:"tps.limit.strategy,omitempty" property:"tps.limit.strategy"`
InterfaceId string
InterfaceName string
Name string `yaml:"name" json:"name,omitempty" property:"name"`
Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"`
Loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"`
Weight int64 `yaml:"weight" json:"weight,omitempty" property:"weight"`
TpsLimitInterval string `yaml:"tps.limit.interval" json:"tps.limit.interval,omitempty" property:"tps.limit.interval"`
TpsLimitRate string `yaml:"tps.limit.rate" json:"tps.limit.rate,omitempty" property:"tps.limit.rate"`
TpsLimitStrategy string `yaml:"tps.limit.strategy" json:"tps.limit.strategy,omitempty" property:"tps.limit.strategy"`
ExecuteLimit string `yaml:"execute.limit" json:"execute.limit,omitempty" property:"execute.limit"`
ExecuteLimitRejectedHandler string `yaml:"execute.limit.rejected.handler" json:"execute.limit.rejected.handler,omitempty" property:"execute.limit.rejected.handler"`
}

func (c *MethodConfig) Prefix() string {
Expand Down
63 changes: 37 additions & 26 deletions config/service_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,32 +43,35 @@ import (
)

type ServiceConfig struct {
context context.Context
id string
Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"`
Protocol string `default:"dubbo" required:"true" yaml:"protocol" json:"protocol,omitempty" property:"protocol"` // multi protocol support, split by ','
InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty" property:"interface"`
Registry string `yaml:"registry" json:"registry,omitempty" property:"registry"`
Cluster string `default:"failover" yaml:"cluster" json:"cluster,omitempty" property:"cluster"`
Loadbalance string `default:"random" yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"`
Group string `yaml:"group" json:"group,omitempty" property:"group"`
Version string `yaml:"version" json:"version,omitempty" property:"version" `
Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"`
Warmup string `yaml:"warmup" json:"warmup,omitempty" property:"warmup"`
Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"`
Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"`
Token string `yaml:"token" json:"token,omitempty" property:"token"`
AccessLog string `yaml:"accesslog" json:"accesslog,omitempty" property:"accesslog"`
TpsLimiter string `yaml:"tps.limiter" json:"tps.limiter,omitempty" property:"tps.limiter"`
TpsLimitInterval string `yaml:"tps.limit.interval" json:"tps.limit.interval,omitempty" property:"tps.limit.interval"`
TpsLimitRate string `yaml:"tps.limit.rate" json:"tps.limit.rate,omitempty" property:"tps.limit.rate"`
TpsLimitStrategy string `yaml:"tps.limit.strategy" json:"tps.limit.strategy,omitempty" property:"tps.limit.strategy"`
TpsLimitRejectedHandler string `yaml:"tps.limit.rejected.handler" json:"tps.limit.rejected.handler,omitempty" property:"tps.limit.rejected.handler"`
unexported *atomic.Bool
exported *atomic.Bool
rpcService common.RPCService
cacheProtocol protocol.Protocol
cacheMutex sync.Mutex
context context.Context
id string
Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"`
Protocol string `default:"dubbo" required:"true" yaml:"protocol" json:"protocol,omitempty" property:"protocol"` // multi protocol support, split by ','
InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty" property:"interface"`
Registry string `yaml:"registry" json:"registry,omitempty" property:"registry"`
Cluster string `default:"failover" yaml:"cluster" json:"cluster,omitempty" property:"cluster"`
Loadbalance string `default:"random" yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"`
Group string `yaml:"group" json:"group,omitempty" property:"group"`
Version string `yaml:"version" json:"version,omitempty" property:"version" `
Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"`
Warmup string `yaml:"warmup" json:"warmup,omitempty" property:"warmup"`
Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"`
Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"`
Token string `yaml:"token" json:"token,omitempty" property:"token"`
AccessLog string `yaml:"accesslog" json:"accesslog,omitempty" property:"accesslog"`
TpsLimiter string `yaml:"tps.limiter" json:"tps.limiter,omitempty" property:"tps.limiter"`
TpsLimitInterval string `yaml:"tps.limit.interval" json:"tps.limit.interval,omitempty" property:"tps.limit.interval"`
TpsLimitRate string `yaml:"tps.limit.rate" json:"tps.limit.rate,omitempty" property:"tps.limit.rate"`
TpsLimitStrategy string `yaml:"tps.limit.strategy" json:"tps.limit.strategy,omitempty" property:"tps.limit.strategy"`
TpsLimitRejectedHandler string `yaml:"tps.limit.rejected.handler" json:"tps.limit.rejected.handler,omitempty" property:"tps.limit.rejected.handler"`
ExecuteLimit string `yaml:"execute.limit" json:"execute.limit,omitempty" property:"execute.limit"`
ExecuteLimitRejectedHandler string `yaml:"execute.limit.rejected.handler" json:"execute.limit.rejected.handler,omitempty" property:"execute.limit.rejected.handler"`

unexported *atomic.Bool
exported *atomic.Bool
rpcService common.RPCService
cacheProtocol protocol.Protocol
cacheMutex sync.Mutex
}

func (c *ServiceConfig) Prefix() string {
Expand Down Expand Up @@ -203,6 +206,10 @@ func (srvconfig *ServiceConfig) getUrlMap() url.Values {
urlMap.Set(constant.TPS_LIMITER_KEY, srvconfig.TpsLimiter)
urlMap.Set(constant.TPS_REJECTED_EXECUTION_HANDLER_KEY, srvconfig.TpsLimitRejectedHandler)

// execute limit filter
urlMap.Set(constant.EXECUTE_LIMIT_KEY, srvconfig.ExecuteLimit)
urlMap.Set(constant.EXECUTE_REJECTED_EXECUTION_HANDLER_KEY, srvconfig.ExecuteLimitRejectedHandler)

for _, v := range srvconfig.Methods {
prefix := "methods." + v.Name + "."
urlMap.Set(prefix+constant.LOADBALANCE_KEY, v.Loadbalance)
Expand All @@ -212,6 +219,10 @@ func (srvconfig *ServiceConfig) getUrlMap() url.Values {
urlMap.Set(prefix+constant.TPS_LIMIT_STRATEGY_KEY, v.TpsLimitStrategy)
urlMap.Set(prefix+constant.TPS_LIMIT_INTERVAL_KEY, v.TpsLimitInterval)
urlMap.Set(prefix+constant.TPS_LIMIT_RATE_KEY, v.TpsLimitRate)

urlMap.Set(constant.EXECUTE_LIMIT_KEY, v.ExecuteLimit)
urlMap.Set(constant.EXECUTE_REJECTED_EXECUTION_HANDLER_KEY, v.ExecuteLimitRejectedHandler)

}

return urlMap
Expand Down
10 changes: 10 additions & 0 deletions config/testdata/provider_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ services:
tps.limit.strategy: "slidingWindow"
# the name of RejectedExecutionHandler
tps.limit.rejected.handler: "default"
# the concurrent request limitation of this service
# if the value < 0, it will not be limited.
execute.limit: "200"
# the name of RejectedExecutionHandler
execute.limit.rejected.handler: "default"
protocol : "dubbo"
# equivalent to interface of dubbo.xml
interface : "com.ikurento.user.UserProvider"
Expand All @@ -50,6 +55,11 @@ services:
- name: "GetUser"
retries: 1
loadbalance: "random"
# the concurrent request limitation of this method
# if the value < 0, it will not be limited.
execute.limit: "200"
# the name of RejectedExecutionHandler
execute.limit.rejected.handler: "default"

protocols:
"dubbo":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ import (
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/filter/impl/tps"
filterCommon "github.com/apache/dubbo-go/filter/common"
"github.com/apache/dubbo-go/protocol"
)

const HandlerName = "log"

func init() {
extension.SetTpsRejectedExecutionHandler(HandlerName, GetOnlyLogRejectedExecutionHandler)
extension.SetTpsRejectedExecutionHandler(constant.DEFAULT_KEY, GetOnlyLogRejectedExecutionHandler)
extension.SetRejectedExecutionHandler(HandlerName, GetOnlyLogRejectedExecutionHandler)
extension.SetRejectedExecutionHandler(constant.DEFAULT_KEY, GetOnlyLogRejectedExecutionHandler)
}

var onlyLogHandlerInstance *OnlyLogRejectedExecutionHandler
Expand All @@ -57,11 +57,11 @@ type OnlyLogRejectedExecutionHandler struct {
}

func (handler *OnlyLogRejectedExecutionHandler) RejectedExecution(url common.URL, invocation protocol.Invocation) protocol.Result {
logger.Errorf("The invocation was rejected due to over rate limitation. url: %s", url.String())
logger.Errorf("The invocation was rejected. url: %s", url.String())
return &protocol.RPCResult{}
}

func GetOnlyLogRejectedExecutionHandler() tps.RejectedExecutionHandler {
func GetOnlyLogRejectedExecutionHandler() filterCommon.RejectedExecutionHandler {
onlyLogHandlerOnce.Do(func() {
onlyLogHandlerInstance = &OnlyLogRejectedExecutionHandler{}
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package tps
package common

import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/protocol"
)

/**
* This implementation only logs the invocation info.
* it always return en error inside the result.
* "UserProvider":
* registry: "hangzhouzk"
* protocol : "dubbo"
* interface : "com.ikurento.user.UserProvider"
* ... # other configuration
* tps.limiter: "method-service" # the name of limiter
* tps.limit.rejected.handler: "name of handler"
* methods:
* - name: "GetUser"
* If the invocation cannot pass any validation in filter, like ExecuteLimitFilter and TpsLimitFilter,
* the implementation will be used.
* The common case is that sometimes you want to return the default value when the request was rejected.
* Or you want to be warned if any request was rejected.
* In such situation, implement this interface and register it by invoking extension.SetRejectedExecutionHandler.
*/
type RejectedExecutionHandler interface {
RejectedExecution(url common.URL, invocation protocol.Invocation) protocol.Result
Expand Down
Loading

0 comments on commit d39a57d

Please sign in to comment.