Skip to content

Commit

Permalink
fix:修复熔断规则atomic.Value使用存在panic问题 (#191)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuntaojun authored Dec 18, 2023
1 parent ca98b72 commit 0bcd204
Show file tree
Hide file tree
Showing 16 changed files with 528 additions and 70 deletions.
7 changes: 7 additions & 0 deletions api/config_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,18 @@ package api

import "github.com/polarismesh/polaris-go/pkg/model"

type GetConfigFileRequest struct {
*model.GetConfigFileRequest
}

// ConfigFileAPI 配置文件的 API
type ConfigFileAPI interface {
SDKOwner
// Deprecated: please use FetchConfigFile
// GetConfigFile 获取配置文件
GetConfigFile(namespace, fileGroup, fileName string) (model.ConfigFile, error)
// FetchConfigFile 获取配置文件
FetchConfigFile(*GetConfigFileRequest) (model.ConfigFile, error)
// CreateConfigFile 创建配置文件
CreateConfigFile(namespace, fileGroup, fileName, content string) error
// UpdateConfigFile 更新配置文件
Expand Down
12 changes: 11 additions & 1 deletion api/config_file_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,17 @@ func newConfigFileAPIBySDKContext(context SDKContext) ConfigFileAPI {

// GetConfigFile 获取配置文件
func (c *configFileAPI) GetConfigFile(namespace, fileGroup, fileName string) (model.ConfigFile, error) {
return c.context.GetEngine().SyncGetConfigFile(namespace, fileGroup, fileName)
return c.context.GetEngine().SyncGetConfigFile(&model.GetConfigFileRequest{
Namespace: namespace,
FileGroup: fileGroup,
FileName: fileName,
Subscribe: true,
})
}

// FetchConfigFile 获取配置文件
func (c *configFileAPI) FetchConfigFile(req *GetConfigFileRequest) (model.ConfigFile, error) {
return c.context.GetEngine().SyncGetConfigFile(req.GetConfigFileRequest)
}

// CreateConfigFile 创建配置文件
Expand Down
18 changes: 10 additions & 8 deletions pkg/flow/configuration/config_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,11 @@ func (c *ConfigFileFlow) Destroy() {
}

// GetConfigFile 获取配置文件
func (c *ConfigFileFlow) GetConfigFile(namespace, fileGroup, fileName string) (model.ConfigFile, error) {
func (c *ConfigFileFlow) GetConfigFile(req *model.GetConfigFileRequest) (model.ConfigFile, error) {
configFileMetadata := &model.DefaultConfigFileMetadata{
Namespace: namespace,
FileGroup: fileGroup,
FileName: fileName,
Namespace: req.Namespace,
FileGroup: req.FileGroup,
FileName: req.FileName,
}

cacheKey := genCacheKeyByMetadata(configFileMetadata)
Expand All @@ -116,11 +116,13 @@ func (c *ConfigFileFlow) GetConfigFile(namespace, fileGroup, fileName string) (m
if err != nil {
return nil, err
}
c.addConfigFileToLongPollingPool(fileRepo)
c.repos = append(c.repos, fileRepo)

configFile = newDefaultConfigFile(configFileMetadata, fileRepo)
c.configFileCache[cacheKey] = configFile

if req.Subscribe {
c.addConfigFileToLongPollingPool(fileRepo)
c.repos = append(c.repos, fileRepo)
c.configFileCache[cacheKey] = configFile
}
return configFile, nil
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/flow/sync_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,8 +594,8 @@ func (e *Engine) realInitCalleeService(req *model.InitCalleeServiceRequest,
}

// SyncGetConfigFile 同步获取配置文件
func (e *Engine) SyncGetConfigFile(namespace, fileGroup, fileName string) (model.ConfigFile, error) {
return e.configFlow.GetConfigFile(namespace, fileGroup, fileName)
func (e *Engine) SyncGetConfigFile(req *model.GetConfigFileRequest) (model.ConfigFile, error) {
return e.configFlow.GetConfigFile(req)
}

// SyncGetConfigGroup 同步获取配置文件
Expand Down
7 changes: 7 additions & 0 deletions pkg/model/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,10 @@ type ConfigFileGroup interface {
// AddChangeListener 增加配置文件变更监听器
AddChangeListener(cb OnConfigGroupChange)
}

type GetConfigFileRequest struct {
Namespace string
FileGroup string
FileName string
Subscribe bool
}
2 changes: 1 addition & 1 deletion pkg/model/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ type Engine interface {
// InitCalleeService 所需的被调初始化
InitCalleeService(req *InitCalleeServiceRequest) error
// SyncGetConfigFile 同步获取配置文件
SyncGetConfigFile(namespace, fileGroup, fileName string) (ConfigFile, error)
SyncGetConfigFile(req *GetConfigFileRequest) (ConfigFile, error)
// SyncGetConfigGroup 同步获取配置文件
SyncGetConfigGroup(namespace, fileGroup string) (ConfigFileGroup, error)
// SyncCreateConfigFile 同步创建配置文件
Expand Down
15 changes: 8 additions & 7 deletions pkg/plugin/healthcheck/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
type HealthChecker interface {
plugin.Plugin
// DetectInstance 对单个实例进行探测,返回探测结果, 每个探测方法自己去判断当前周期是否需要探测,如果无需探测,则返回nil
DetectInstance(model.Instance) (DetectResult, error)
DetectInstance(model.Instance, *fault_tolerance.FaultDetectRule) (DetectResult, error)
// Protocol .
Protocol() fault_tolerance.FaultDetectRule_Protocol
}
Expand All @@ -57,9 +57,7 @@ type DetectResultImp struct {
Success bool
DetectTime time.Time // 探测时间
DetectInstance model.Instance // 探测的实例
delay time.Duration
code string
status model.RetStatus
Code string
}

// IsSuccess 探测类型,与探测插件名相同
Expand All @@ -79,16 +77,19 @@ func (r *DetectResultImp) GetDetectInstance() model.Instance {

// GetCode() return code
func (r *DetectResultImp) GetCode() string {
return r.code
return r.Code
}

// GetDelay
func (r *DetectResultImp) GetDelay() time.Duration {
return r.delay
return time.Since(r.GetDetectTime())
}

func (r *DetectResultImp) GetRetStatus() model.RetStatus {
return r.status
if r.IsSuccess() {
return model.RetSuccess
}
return model.RetFail
}

// init 初始化
Expand Down
4 changes: 2 additions & 2 deletions pkg/plugin/healthcheck/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ func (p *Proxy) SetRealPlugin(plug plugin.Plugin, engine model.Engine) {
}

// DetectInstance proxy HealthChecker DetectInstance
func (p *Proxy) DetectInstance(inst model.Instance) (DetectResult, error) {
result, err := p.HealthChecker.DetectInstance(inst)
func (p *Proxy) DetectInstance(inst model.Instance, rule *fault_tolerance.FaultDetectRule) (DetectResult, error) {
result, err := p.HealthChecker.DetectInstance(inst, rule)
return result, err
}

Expand Down
1 change: 1 addition & 0 deletions pkg/plugin/register/plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
_ "github.com/polarismesh/polaris-go/plugin/configfilter/crypto/aes"
_ "github.com/polarismesh/polaris-go/plugin/healthcheck/http"
_ "github.com/polarismesh/polaris-go/plugin/healthcheck/tcp"
_ "github.com/polarismesh/polaris-go/plugin/healthcheck/udp"
_ "github.com/polarismesh/polaris-go/plugin/loadbalancer/hash"
_ "github.com/polarismesh/polaris-go/plugin/loadbalancer/maglev"
_ "github.com/polarismesh/polaris-go/plugin/loadbalancer/ringhash"
Expand Down
2 changes: 1 addition & 1 deletion plugin/circuitbreaker/composite/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (c *ResourceHealthChecker) doCheck(ins model.Instance, protocol fault_toler
ins.GetHost(), ins.GetPort(), c.resource.String(), protocol.String())
return false
}
ret, err := checker.DetectInstance(ins)
ret, err := checker.DetectInstance(ins, rule)
if err != nil {
return false
}
Expand Down
11 changes: 9 additions & 2 deletions plugin/circuitbreaker/composite/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,15 +122,18 @@ func (rc *ResourceCounters) CurrentActiveRule() *fault_tolerance.CircuitBreakerR
}

func (rc *ResourceCounters) updateCircuitBreakerStatus(status model.CircuitBreakerStatus) {
rc.statusRef.Store(status)
rc.statusRef.Store(&circuitBreakerStatusWrapper{
val: status,
})
}

func (rc *ResourceCounters) CurrentCircuitBreakerStatus() model.CircuitBreakerStatus {
val := rc.statusRef.Load()
if val == nil {
return nil
}
return val.(model.CircuitBreakerStatus)
wrapper := val.(circuitBreakerStatusWrapper)
return wrapper.val
}

func (rc *ResourceCounters) CloseToOpen(breaker string) {
Expand Down Expand Up @@ -301,3 +304,7 @@ func buildFallbackInfo(rule *fault_tolerance.CircuitBreakerRule) *model.Fallback
}
return ret
}

type circuitBreakerStatusWrapper struct {
val model.CircuitBreakerStatus
}
112 changes: 74 additions & 38 deletions plugin/healthcheck/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@
package http

import (
"bytes"
"context"
"fmt"
"net/http"
"net/url"
"strconv"
"strings"
"time"

"github.com/polarismesh/specification/source/go/api/v1/fault_tolerance"
Expand All @@ -30,14 +34,18 @@ import (
"github.com/polarismesh/polaris-go/pkg/plugin"
"github.com/polarismesh/polaris-go/pkg/plugin/common"
"github.com/polarismesh/polaris-go/pkg/plugin/healthcheck"
"github.com/polarismesh/polaris-go/plugin/healthcheck/utils"
)

type HttpSender interface {
Do(req *http.Request) (*http.Response, error)
}

// Detector TCP协议的实例健康探测器
type Detector struct {
*plugin.PluginBase
cfg *Config
timeout time.Duration
client HttpSender
}

// Type 插件类型
Expand All @@ -57,6 +65,7 @@ func (g *Detector) Init(ctx *plugin.InitContext) (err error) {
if cfgValue != nil {
g.cfg = cfgValue.(*Config)
}
g.client = &http.Client{}
g.timeout = ctx.Config.GetConsumer().GetHealthCheck().GetTimeout()
return nil
}
Expand All @@ -67,15 +76,26 @@ func (g *Detector) Destroy() error {
}

// DetectInstance 探测服务实例健康
func (g *Detector) DetectInstance(ins model.Instance) (result healthcheck.DetectResult, err error) {
func (g *Detector) DetectInstance(ins model.Instance, rule *fault_tolerance.FaultDetectRule) (result healthcheck.DetectResult, err error) {
start := time.Now()
timeout := g.timeout
if rule != nil && rule.Protocol == fault_tolerance.FaultDetectRule_HTTP {
timeout = time.Duration(rule.GetTimeout()) * time.Millisecond
}

ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
// 得到Http address
address := utils.GetAddressByInstance(ins)
success := g.doHttpDetect(address)
detReq, err := g.generateHttpRequest(ctx, ins, rule)
if err != nil {
return nil, err
}
code, success := g.doHttpDetect(detReq, rule)
result = &healthcheck.DetectResultImp{
Success: success,
DetectTime: start,
DetectInstance: ins,
Code: code,
}
return result, nil
}
Expand All @@ -86,50 +106,66 @@ func (g *Detector) IsEnable(cfg config.Configuration) bool {
}

// doHttpDetect 执行一次健康探测逻辑
func (g *Detector) doHttpDetect(address string) bool {
c := &http.Client{
Timeout: g.timeout,
}
request := &http.Request{
Method: http.MethodGet,
URL: &url.URL{
Scheme: "http",
Host: address,
Path: g.cfg.Path,
},
}
header := http.Header{}
if len(g.cfg.Host) > 0 {
header.Add("Host", g.cfg.Host)
}
if len(g.cfg.RequestHeadersToAdd) > 0 {
for _, requestHeader := range g.cfg.RequestHeadersToAdd {
header.Add(requestHeader.Key, requestHeader.Value)
}
}
if len(header) > 0 {
request.Header = header
}
resp, err := c.Do(request)
func (g *Detector) doHttpDetect(detReq *http.Request, rule *fault_tolerance.FaultDetectRule) (string, bool) {
resp, err := g.client.Do(detReq)
if err != nil {
log.GetDetectLogger().Errorf("[HealthCheck][http] fail to check %s, err is %v", address, err)
return false
log.GetDetectLogger().Errorf("[HealthCheck][http] fail to check %+v, err is %v", detReq.URL, err)
return "", false
}
defer resp.Body.Close()
code := resp.StatusCode
for _, statusCodeRange := range g.cfg.ExpectedStatuses {
if code >= statusCodeRange.Start && code < statusCodeRange.End {
return true
}
if code := resp.StatusCode; code >= 200 && code < 500 {
return strconv.Itoa(resp.StatusCode), true
}
return false
return strconv.Itoa(resp.StatusCode), false
}

// Protocol .
func (g *Detector) Protocol() fault_tolerance.FaultDetectRule_Protocol {
return fault_tolerance.FaultDetectRule_HTTP
}

func (g *Detector) generateHttpRequest(ctx context.Context, ins model.Instance, rule *fault_tolerance.FaultDetectRule) (*http.Request, error) {
var (
address string
customUrl = g.cfg.Path
port = ins.GetPort()
)
header := http.Header{}
if rule == nil {
customUrl = strings.TrimPrefix(customUrl, "/")
if len(g.cfg.Host) > 0 {
header.Add("Host", g.cfg.Host)
}
if len(g.cfg.RequestHeadersToAdd) > 0 {
for _, requestHeader := range g.cfg.RequestHeadersToAdd {
header.Add(requestHeader.Key, requestHeader.Value)
}
}
} else {
if rule.GetPort() > 0 {
port = rule.Port
}
customUrl = rule.GetHttpConfig().GetUrl()
customUrl = strings.TrimPrefix(customUrl, "/")
ruleHeaders := rule.GetHttpConfig().GetHeaders()
for i := range ruleHeaders {
header.Add(ruleHeaders[i].Key, ruleHeaders[i].Value)
}
}
address = fmt.Sprintf("http://%s:%d/%s", ins.GetHost(), port, customUrl)

request, err := http.NewRequestWithContext(ctx, rule.GetHttpConfig().Method, address, bytes.NewBufferString(rule.HttpConfig.GetBody()))
if err != nil {
log.GetDetectLogger().Errorf("[HealthCheck][http] fail to build request %+v, err is %v", address, err)
return nil, err
}

if len(header) > 0 {
request.Header = header
}
return request, nil
}

// init 注册插件信息
func init() {
plugin.RegisterConfigurablePlugin(&Detector{}, &Config{})
Expand Down
Loading

0 comments on commit 0bcd204

Please sign in to comment.