diff --git a/release/conf/polaris-server.yaml b/release/conf/polaris-server.yaml index 7dd623684..20a7a9f8c 100644 --- a/release/conf/polaris-server.yaml +++ b/release/conf/polaris-server.yaml @@ -320,7 +320,7 @@ naming: clientRegister: open: true queueSize: 10240 - waitTime: 32s + waitTime: 32ms maxBatchCount: 1024 concurrency: 64 clientDeregister: diff --git a/release/upgrade/circuitbreaker_rule_transform/README.md b/release/upgrade/circuitbreaker_rule_transform/README.md new file mode 100644 index 000000000..96febb5d3 --- /dev/null +++ b/release/upgrade/circuitbreaker_rule_transform/README.md @@ -0,0 +1,23 @@ +## 熔断规则迁移工具 + +### 说明 + +本工具主要用于将存量的熔断规则迁移到1.14.x及以上版本的熔断规则。 + +### 使用方式 + +***step1: 编译*** + +将工具进行编译,再此目录下执行```go build```可完成迁移。 + +***step2: 升级数据库*** + +执行数据库升级脚本,将数据库升级到1.14.x及以上版本的数据库,确保数据库中存在circuitbreaker_rule_v2的数据库表。 + +***step3: 执行迁移*** + +执行迁移工具,并输入数据库的地址及用户名等信息,如下所示: + +```shell +./circuitbreaker_rule_transform --db_addr=127.0.0.1:3306 --db_name=polaris_server --db_user=root --db_pwd=123456 +``` \ No newline at end of file diff --git a/release/upgrade/circuitbreaker_rule_transform/go.mod b/release/upgrade/circuitbreaker_rule_transform/go.mod new file mode 100644 index 000000000..2377cb106 --- /dev/null +++ b/release/upgrade/circuitbreaker_rule_transform/go.mod @@ -0,0 +1,14 @@ +module github.com/polarismesh/polaris/release/upgrade/circuitbreaker_rule_transform + +go 1.19 + +require ( + github.com/golang/protobuf v1.5.2 // indirect + github.com/polarismesh/specification v1.1.1 +) + +require ( + github.com/go-sql-driver/mysql v1.7.0 + github.com/google/uuid v1.3.0 + google.golang.org/protobuf v1.28.1 +) diff --git a/release/upgrade/circuitbreaker_rule_transform/go.sum b/release/upgrade/circuitbreaker_rule_transform/go.sum new file mode 100644 index 000000000..f566488d5 --- /dev/null +++ b/release/upgrade/circuitbreaker_rule_transform/go.sum @@ -0,0 +1,17 @@ +github.com/go-sql-driver/mysql v1.7.0 h1:ueSltNNllEqE3qcWBTD0iQd3IpL/6U+mJxLkazJ7YPc= +github.com/go-sql-driver/mysql v1.7.0/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= +github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/polarismesh/specification v1.1.1 h1:BxZ5p78g4kaoGZ6EPM1b+QT/QCI/nkR/1qlcAeKdtdA= +github.com/polarismesh/specification v1.1.1/go.mod h1:rDvMMtl5qebPmqiBLNa5Ps0XtwkP31ZLirbH4kXA0YU= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w= +google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= diff --git a/release/upgrade/circuitbreaker_rule_transform/main.go b/release/upgrade/circuitbreaker_rule_transform/main.go new file mode 100644 index 000000000..56b5717d5 --- /dev/null +++ b/release/upgrade/circuitbreaker_rule_transform/main.go @@ -0,0 +1,399 @@ +/** + * Tencent is pleased to support the open source community by making Polaris available. + * + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package main + +import ( + "database/sql" + "encoding/hex" + "encoding/json" + "flag" + "fmt" + "log" + "os" + "time" + + _ "github.com/go-sql-driver/mysql" + "github.com/google/uuid" + apifault "github.com/polarismesh/specification/source/go/api/v1/fault_tolerance" + apimodel "github.com/polarismesh/specification/source/go/api/v1/model" + "google.golang.org/protobuf/types/known/wrapperspb" +) + +var dbAddr string + +var dbName string + +var dbUser string + +var dbPwd string + +func init() { + flag.StringVar(&dbAddr, "db_addr", "", "Input database address") + flag.StringVar(&dbName, "db_name", "", "Input database name") + flag.StringVar(&dbUser, "db_user", "", "Input database user") + flag.StringVar(&dbPwd, "db_pwd", "", "Input database password") +} + +const ( + queryCbSql = `select rule.id, rule.version, rule.name, rule.namespace, service.name, IFNULL(rule.business, ""), + IFNULL(rule.comment, ""), IFNULL(rule.department, ""), + rule.inbounds, rule.outbounds, rule.owner, rule.revision, + unix_timestamp(rule.ctime), unix_timestamp(rule.mtime) + from circuitbreaker_rule as rule, circuitbreaker_rule_relation as relation, service + where service.id = relation.service_id and relation.rule_id = rule.id + and relation.rule_version = rule.version + and relation.flag = 0 and service.flag = 0 and rule.flag = 0` + insertCircuitBreakerRuleSql = `insert into circuitbreaker_rule_v2( + id, name, namespace, enable, revision, description, level, src_service, src_namespace, + dst_service, dst_namespace, dst_method, config, ctime, mtime, etime) + values(?,?,?,?,?,?,?,?,?,?,?,?,?, sysdate(),sysdate(), %s)` +) + +type CircuitBreakerData struct { + ID string + Version string + Name string + Namespace string + Service string + Business string + Department string + Comment string + Inbounds string + Outbounds string + Token string + Owner string + Revision string + Valid bool + CreateTime time.Time + ModifyTime time.Time +} + +type NewCircuitBreakerData struct { + ID string + Name string + Namespace string + Description string + Level int + SrcService string + SrcNamespace string + DstService string + DstNamespace string + DstMethod string + Rule string + Revision string + Enable bool + Valid bool + CreateTime time.Time + ModifyTime time.Time + EnableTime time.Time +} + +func queryOldRule(db *sql.DB) ([]*CircuitBreakerData, error) { + rows, err := db.Query(queryCbSql) + if nil != err { + return nil, err + } + defer rows.Close() + var out []*CircuitBreakerData + for rows.Next() { + var breaker CircuitBreakerData + var ctime, mtime int64 + err := rows.Scan(&breaker.ID, &breaker.Version, &breaker.Name, &breaker.Namespace, + &breaker.Service, &breaker.Business, &breaker.Comment, &breaker.Department, + &breaker.Inbounds, &breaker.Outbounds, &breaker.Owner, &breaker.Revision, &ctime, &mtime) + if err != nil { + log.Printf("fetch brief circuitbreaker rule scan err: %s", err.Error()) + break + } + breaker.CreateTime = time.Unix(ctime, 0) + breaker.ModifyTime = time.Unix(mtime, 0) + out = append(out, &breaker) + } + + switch { + case err == sql.ErrNoRows: + return nil, nil + case err != nil: + log.Printf("get tag circuitbreaker err: %s", err.Error()) + return nil, err + default: + return out, nil + } +} + +// Time2String Convert time.Time to string time +func Time2String(t time.Time) string { + return t.Format("2006-01-02 15:04:05") +} + +func toCircuitBreakerRule(svc string, + oldRuleProto *apifault.CircuitBreaker, subRule *apifault.CbRule, inbound bool) *apifault.CircuitBreakerRule { + cbRule := &apifault.CircuitBreakerRule{} + ruleMatcher := &apifault.RuleMatcher{} + if inbound { + ruleMatcher.Destination = &apifault.RuleMatcher_DestinationService{ + Service: svc, + Namespace: oldRuleProto.GetNamespace().GetValue(), + } + ruleMatcher.Source = &apifault.RuleMatcher_SourceService{} + if len(subRule.Sources) > 0 { + ruleMatcher.Source.Namespace = subRule.Sources[0].GetNamespace().GetValue() + ruleMatcher.Source.Service = subRule.Sources[0].GetService().GetValue() + } + if len(ruleMatcher.Source.Namespace) == 0 { + ruleMatcher.Source.Namespace = "*" + } + if len(ruleMatcher.Source.Service) == 0 { + ruleMatcher.Source.Service = "*" + } + } else { + ruleMatcher.Source = &apifault.RuleMatcher_SourceService{ + Service: svc, + Namespace: oldRuleProto.GetNamespace().GetValue(), + } + ruleMatcher.Destination = &apifault.RuleMatcher_DestinationService{} + if len(subRule.Destinations) > 0 { + ruleMatcher.Destination.Service = subRule.Destinations[0].GetService().GetValue() + ruleMatcher.Destination.Namespace = subRule.Destinations[0].GetNamespace().GetValue() + } + if len(ruleMatcher.Destination.Namespace) == 0 { + ruleMatcher.Destination.Namespace = "*" + } + if len(ruleMatcher.Destination.Service) == 0 { + ruleMatcher.Destination.Service = "*" + } + } + cbRule.RuleMatcher = ruleMatcher + if len(subRule.Destinations) == 0 { + return nil + } + dstSet := subRule.Destinations[0] + if nil != dstSet.GetMethod() { + ruleMatcher.Destination.Method = &apimodel.MatchString{ + Type: apimodel.MatchString_MatchStringType(dstSet.GetMethod().GetType()), + Value: dstSet.GetMethod().GetValue(), + } + } + var triggers []*apifault.TriggerCondition + policy := dstSet.GetPolicy() + cbRule.MaxEjectionPercent = policy.GetMaxEjectionPercent().GetValue() + if policy.GetConsecutive().GetEnable().GetValue() { + triggers = append(triggers, &apifault.TriggerCondition{ + TriggerType: apifault.TriggerCondition_CONSECUTIVE_ERROR, + ErrorCount: policy.GetConsecutive().GetConsecutiveErrorToOpen().GetValue(), + }) + } + if policy.GetErrorRate().GetEnable().GetValue() { + triggers = append(triggers, &apifault.TriggerCondition{ + TriggerType: apifault.TriggerCondition_ERROR_RATE, + ErrorPercent: policy.GetErrorRate().GetErrorRateToOpen().GetValue(), + MinimumRequest: policy.GetErrorRate().GetRequestVolumeThreshold().GetValue(), + Interval: 60, + }) + } + recoverRule := &apifault.RecoverCondition{ + SleepWindow: uint32(dstSet.GetRecover().GetSleepWindow().GetSeconds()), + ConsecutiveSuccess: dstSet.GetRecover().GetMaxRetryAfterHalfOpen().GetValue(), + } + if recoverRule.ConsecutiveSuccess == 0 { + recoverRule.ConsecutiveSuccess = 3 + } + cbRule.RecoverCondition = recoverRule + if dstSet.GetRecover().GetOutlierDetectWhen() != apifault.RecoverConfig_NEVER { + cbRule.FaultDetectConfig = &apifault.FaultDetectConfig{Enable: true} + } else { + cbRule.FaultDetectConfig = &apifault.FaultDetectConfig{Enable: false} + } + return cbRule +} + +func oldCbRuleToNewCbRule(cbRule *CircuitBreakerData) ([]*NewCircuitBreakerData, error) { + oldRuleProto := &apifault.CircuitBreaker{ + Id: wrapperspb.String(cbRule.ID), + Version: wrapperspb.String(cbRule.Version), + Name: wrapperspb.String(cbRule.Name), + Namespace: wrapperspb.String(cbRule.Namespace), + Owners: wrapperspb.String(cbRule.Owner), + Comment: wrapperspb.String(cbRule.Comment), + Ctime: wrapperspb.String(Time2String(cbRule.CreateTime)), + Mtime: wrapperspb.String(Time2String(cbRule.ModifyTime)), + Revision: wrapperspb.String(cbRule.Revision), + Business: wrapperspb.String(cbRule.Business), + Department: wrapperspb.String(cbRule.Department), + } + if cbRule.Inbounds != "" { + var inBounds []*apifault.CbRule + if err := json.Unmarshal([]byte(cbRule.Inbounds), &inBounds); err != nil { + return nil, err + } + oldRuleProto.Inbounds = inBounds + } + if cbRule.Outbounds != "" { + var outBounds []*apifault.CbRule + if err := json.Unmarshal([]byte(cbRule.Outbounds), &outBounds); err != nil { + return nil, err + } + oldRuleProto.Outbounds = outBounds + } + var ret []*NewCircuitBreakerData + if len(cbRule.Inbounds) > 0 { + for i, inboundRule := range oldRuleProto.Inbounds { + newRuleProto := toCircuitBreakerRule(cbRule.Service, oldRuleProto, inboundRule, true) + if nil == newRuleProto { + continue + } + newRuleStr, err := json.Marshal(newRuleProto) + if nil != err { + log.Printf("fail to marshal proto %+v, err: %v", newRuleProto, err) + return nil, err + } + newRuleData := &NewCircuitBreakerData{ + ID: fmt.Sprintf("%s-%s-%d", oldRuleProto.GetId().GetValue(), "inbound", i), + Name: fmt.Sprintf("%s-%s-%d", oldRuleProto.GetName().GetValue(), "inbound", i), + Namespace: oldRuleProto.GetNamespace().GetValue(), + Description: oldRuleProto.GetComment().GetValue(), + Level: int(apifault.Level_INSTANCE), + SrcService: newRuleProto.GetRuleMatcher().GetSource().GetService(), + SrcNamespace: newRuleProto.GetRuleMatcher().GetSource().GetNamespace(), + DstService: newRuleProto.GetRuleMatcher().GetDestination().GetService(), + DstNamespace: newRuleProto.GetRuleMatcher().GetDestination().GetNamespace(), + DstMethod: newRuleProto.GetRuleMatcher().GetDestination().GetMethod().GetValue().GetValue(), + Rule: string(newRuleStr), + Revision: NewUUID(), + Enable: true, + } + ret = append(ret, newRuleData) + } + } + if len(cbRule.Outbounds) > 0 { + for i, outboundRule := range oldRuleProto.Outbounds { + newRuleProto := toCircuitBreakerRule(cbRule.Service, oldRuleProto, outboundRule, false) + if nil == newRuleProto { + continue + } + newRuleStr, err := json.Marshal(newRuleProto) + if nil != err { + log.Printf("fail to marshal proto %+v, err: %v", newRuleProto, err) + os.Exit(1) + } + newRuleData := &NewCircuitBreakerData{ + ID: fmt.Sprintf("%s-%s-%d", oldRuleProto.GetId().GetValue(), "outbound", i), + Name: fmt.Sprintf("%s-%s-%d", oldRuleProto.GetName().GetValue(), "outbound", i), + Namespace: oldRuleProto.GetNamespace().GetValue(), + Description: oldRuleProto.GetComment().GetValue(), + Level: int(apifault.Level_INSTANCE), + SrcService: newRuleProto.GetRuleMatcher().GetSource().GetService(), + SrcNamespace: newRuleProto.GetRuleMatcher().GetSource().GetNamespace(), + DstService: newRuleProto.GetRuleMatcher().GetDestination().GetService(), + DstNamespace: newRuleProto.GetRuleMatcher().GetDestination().GetNamespace(), + DstMethod: newRuleProto.GetRuleMatcher().GetDestination().GetMethod().GetValue().GetValue(), + Rule: string(newRuleStr), + Revision: NewUUID(), + Enable: true, + } + ret = append(ret, newRuleData) + } + } + return ret, nil +} + +// NewUUID 返回一个随机的UUID +func NewUUID() string { + uuidBytes := uuid.New() + return hex.EncodeToString(uuidBytes[:]) +} + +func main() { + flag.Parse() + if len(dbAddr) == 0 || len(dbName) == 0 || len(dbUser) == 0 || len(dbPwd) == 0 { + log.Printf("invalid arguments: dbAddr %s, dbName %s, dbUser %s, dbPwd %s", dbAddr, dbName, dbUser, dbPwd) + os.Exit(1) + } + dns := fmt.Sprintf("%s:%s@tcp(%s)/%s", dbUser, dbPwd, dbAddr, dbName) + db, err := sql.Open("mysql", dns) + if err != nil { + log.Printf("sql open err: %s", err.Error()) + os.Exit(1) + } + defer db.Close() + oldRuleDatas, err := queryOldRule(db) + if err != nil { + log.Printf("get old rule err: %s", err.Error()) + os.Exit(1) + } + log.Printf("selected old circuitbreaker rule count %d", len(oldRuleDatas)) + + var newRules []*NewCircuitBreakerData + for _, oldRuleData := range oldRuleDatas { + subNewRules, err := oldCbRuleToNewCbRule(oldRuleData) + if err != nil { + log.Printf("convert old rule to new rule err: %s", err.Error()) + os.Exit(1) + } + newRules = append(newRules, subNewRules...) + } + log.Printf("converted new circuitbreaker rule count %d", len(newRules)) + + for _, newRule := range newRules { + log.Printf("start to process rule %s, name %s", newRule.ID, newRule.Name) + err = createCircuitBreakerRule(db, newRule) + if nil != err { + log.Printf("fail to process rule %s, name %s, err: %v", newRule.ID, newRule.Name, err) + } + } +} + +func processWithTransaction(db *sql.DB, handle func(*sql.Tx) error) error { + tx, err := db.Begin() + if err != nil { + log.Printf("[Store][database] begin tx err: %s", err.Error()) + return err + } + + defer func() { + _ = tx.Rollback() + }() + return handle(tx) +} + +func buildEtimeStr(enable bool) string { + etimeStr := "sysdate()" + if !enable { + etimeStr = "STR_TO_DATE('1980-01-01 00:00:01', '%Y-%m-%d %H:%i:%s')" + } + return etimeStr +} + +func createCircuitBreakerRule(db *sql.DB, cbRule *NewCircuitBreakerData) error { + return processWithTransaction(db, func(tx *sql.Tx) error { + etimeStr := buildEtimeStr(cbRule.Enable) + str := fmt.Sprintf(insertCircuitBreakerRuleSql, etimeStr) + if _, err := tx.Exec(str, cbRule.ID, cbRule.Name, cbRule.Namespace, cbRule.Enable, cbRule.Revision, + cbRule.Description, cbRule.Level, cbRule.SrcService, cbRule.SrcNamespace, cbRule.DstService, + cbRule.DstNamespace, cbRule.DstMethod, cbRule.Rule); err != nil { + log.Printf("[Store][database] fail to insert cbRule exec sql, err: %s", err.Error()) + return err + } + if err := tx.Commit(); err != nil { + log.Printf("[Store][database] fail to insert cbRule commit tx, rule(%+v) commit tx err: %s", + cbRule, err.Error()) + return err + } + return nil + }) +} diff --git a/service/client_check_test.go b/service/client_check_test.go new file mode 100644 index 000000000..5d93ba93d --- /dev/null +++ b/service/client_check_test.go @@ -0,0 +1,80 @@ +/** + * Tencent is pleased to support the open source community by making Polaris available. + * + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package service + +import ( + "context" + "fmt" + "testing" + "time" + + apiservice "github.com/polarismesh/specification/source/go/api/v1/service_manage" + "github.com/stretchr/testify/assert" + "google.golang.org/protobuf/types/known/wrapperspb" +) + +func TestClientCheck(t *testing.T) { + discoverSuit := &DiscoverTestSuit{} + if err := discoverSuit.initialize(); err != nil { + t.Fatal(err) + } + defer discoverSuit.Destroy() + + clientId1 := "111" + clientId2 := "222" + + discoverSuit.addInstance(t, &apiservice.Instance{ + Service: wrapperspb.String("polaris.checker"), + Namespace: wrapperspb.String("Polaris"), + Host: wrapperspb.String("127.0.0.1"), + Port: wrapperspb.UInt32(8091), + Protocol: wrapperspb.String("grpc"), + Metadata: map[string]string{"polaris_service": "polaris.checker"}, + }) + time.Sleep(20 * time.Second) + clientIds := map[string]bool{clientId1: true, clientId2: true} + for i := 0; i < 50; i++ { + for clientId := range clientIds { + fmt.Printf("%d report client for %s, round 1\n", i, clientId) + discoverSuit.server.ReportClient(context.Background(), + &apiservice.Client{ + Id: &wrapperspb.StringValue{Value: clientId}, Host: &wrapperspb.StringValue{Value: "127.0.0.1"}}) + } + time.Sleep(1 * time.Second) + } + + client1 := discoverSuit.server.Cache().Client().GetClient(clientId1) + assert.NotNil(t, client1) + client2 := discoverSuit.server.Cache().Client().GetClient(clientId2) + assert.NotNil(t, client2) + + delete(clientIds, clientId2) + for i := 0; i < 50; i++ { + for clientId := range clientIds { + fmt.Printf("%d report client for %s, round 2\n", i, clientId) + discoverSuit.server.ReportClient(context.Background(), + &apiservice.Client{Id: &wrapperspb.StringValue{Value: clientId}, + Host: &wrapperspb.StringValue{Value: "127.0.0.1"}, Type: apiservice.Client_SDK}) + } + time.Sleep(1 * time.Second) + } + client1 = discoverSuit.server.Cache().Client().GetClient(clientId1) + assert.NotNil(t, client1) + client2 = discoverSuit.server.Cache().Client().GetClient(clientId2) + assert.Nil(t, client2) +} diff --git a/service/healthcheck/cache.go b/service/healthcheck/cache.go index 0b468c927..6cc99ee8f 100644 --- a/service/healthcheck/cache.go +++ b/service/healthcheck/cache.go @@ -133,11 +133,11 @@ func storeClient(clientWithChecker *ClientWithChecker, values *shardMap) bool { } func deleteClient(client *apiservice.Client, values *shardMap) bool { - instanceId := client.GetId().GetValue() - ok := values.DeleteIfExist(instanceId) + clientId := client.GetId().GetValue() + ok := values.DeleteIfExist(clientId) if ok { - log.Infof("[Health Check][Cache]delete service instance is %s, id is %s", - client.GetHost().GetValue(), instanceId) + log.Infof("[Health Check][Cache]delete client is %s, id is %s", + client.GetHost().GetValue(), clientId) } return true } diff --git a/service/healthcheck/check.go b/service/healthcheck/check.go index 2c35c38cd..4a6d7ffcc 100644 --- a/service/healthcheck/check.go +++ b/service/healthcheck/check.go @@ -19,7 +19,6 @@ package healthcheck import ( "context" - "strconv" "sync" "time" @@ -32,7 +31,6 @@ import ( "github.com/polarismesh/polaris/common/timewheel" "github.com/polarismesh/polaris/common/utils" "github.com/polarismesh/polaris/plugin" - "github.com/polarismesh/polaris/store" ) const ( @@ -43,10 +41,13 @@ const ( type CheckScheduler struct { rwMutex *sync.RWMutex scheduledInstances map[string]*itemValue + scheduledClients map[string]*clientItemValue - timeWheel *timewheel.TimeWheel - minCheckIntervalSec int64 - maxCheckIntervalSec int64 + timeWheel *timewheel.TimeWheel + minCheckIntervalSec int64 + maxCheckIntervalSec int64 + clientCheckIntervalSec int64 + clientCheckTtlSec int64 adoptInstancesChan chan AdoptEvent ctx context.Context @@ -59,69 +60,43 @@ type AdoptEvent struct { Checker plugin.HealthChecker } -//go:generate stringer -type=ItemType -type ItemType int - -const ( - itemTypeInstance ItemType = iota - itemTypeClient -) - -func _() { - // An "invalid array index" compiler error signifies that the constant values have changed. - // Re-run the stringer command to generate them again. - var x [1]struct{} - _ = x[itemTypeInstance-0] - _ = x[itemTypeClient-1] -} - -const _ItemType_name = "itemTypeInstanceitemTypeClient" - -var _ItemType_index = [...]uint8{0, 16, 30} - -func (i ItemType) String() string { - if i < 0 || i >= ItemType(len(_ItemType_index)-1) { - return "ItemType(" + strconv.FormatInt(int64(i), 10) + ")" - } - return _ItemType_name[_ItemType_index[i]:_ItemType_index[i+1]] +type clientItemValue struct { + itemValue + lastCheckTimeSec int64 } type itemValue struct { - mutex *sync.Mutex - id string - host string - port uint32 - scheduled uint32 - lastSetEventTimeSec int64 - ttlDurationSec uint32 - expireDurationSec uint32 - checker plugin.HealthChecker - ItemType ItemType -} - -func (i *itemValue) eventExpired() (int64, bool) { - curTimeSec := time.Now().Unix() - return curTimeSec, curTimeSec-i.lastSetEventTimeSec >= int64(i.expireDurationSec) -} - -func newCheckScheduler(ctx context.Context, slotNum int, - minCheckInterval time.Duration, maxCheckInterval time.Duration) *CheckScheduler { + mutex *sync.Mutex + id string + host string + port uint32 + scheduled uint32 + ttlDurationSec uint32 + expireDurationSec uint32 + checker plugin.HealthChecker +} + +func newCheckScheduler(ctx context.Context, slotNum int, minCheckInterval time.Duration, + maxCheckInterval time.Duration, clientCheckInterval time.Duration, clientCheckTtl time.Duration) *CheckScheduler { scheduler := &CheckScheduler{ - rwMutex: &sync.RWMutex{}, - scheduledInstances: make(map[string]*itemValue), - timeWheel: timewheel.New(time.Second, slotNum, "health-interval-check"), - minCheckIntervalSec: int64(minCheckInterval.Seconds()), - maxCheckIntervalSec: int64(maxCheckInterval.Seconds()), - adoptInstancesChan: make(chan AdoptEvent, 1024), - ctx: ctx, - } - - go scheduler.doCheck(ctx) + rwMutex: &sync.RWMutex{}, + scheduledInstances: make(map[string]*itemValue), + scheduledClients: make(map[string]*clientItemValue), + timeWheel: timewheel.New(time.Second, slotNum, "health-interval-check"), + minCheckIntervalSec: int64(minCheckInterval.Seconds()), + maxCheckIntervalSec: int64(maxCheckInterval.Seconds()), + clientCheckIntervalSec: int64(clientCheckInterval.Seconds()), + clientCheckTtlSec: int64(clientCheckTtl.Seconds()), + adoptInstancesChan: make(chan AdoptEvent, 1024), + ctx: ctx, + } + go scheduler.doCheckInstances(ctx) + go scheduler.doCheckClient(ctx) go scheduler.doAdopt(ctx) return scheduler } -func (c *CheckScheduler) doCheck(ctx context.Context) { +func (c *CheckScheduler) doCheckInstances(ctx context.Context) { c.timeWheel.Start() log.Infof("[Health Check][Check]timeWheel has been started") @@ -242,35 +217,34 @@ func (c *CheckScheduler) putInstanceIfAbsent(instanceWithChecker *InstanceWithCh expireDurationSec: getExpireDurationSec(instance.Proto), checker: instanceWithChecker.checker, ttlDurationSec: instance.HealthCheck().GetHeartbeat().GetTtl().GetValue(), - ItemType: itemTypeInstance, } c.scheduledInstances[instance.ID()] = instValue return false, instValue } -const clientReportTtlSec uint32 = 120 - -func (c *CheckScheduler) putClientIfAbsent(clientWithChecker *ClientWithChecker) (bool, *itemValue) { +func (c *CheckScheduler) putClientIfAbsent(clientWithChecker *ClientWithChecker) (bool, *clientItemValue) { c.rwMutex.Lock() defer c.rwMutex.Unlock() client := clientWithChecker.client - var instValue *itemValue + var instValue *clientItemValue var ok bool clientId := client.Proto().GetId().GetValue() - if instValue, ok = c.scheduledInstances[clientId]; ok { + if instValue, ok = c.scheduledClients[clientId]; ok { return true, instValue } - instValue = &itemValue{ - mutex: &sync.Mutex{}, - host: client.Proto().GetHost().GetValue(), - port: 0, - id: clientId, - expireDurationSec: expireTtlCount * clientReportTtlSec, - checker: clientWithChecker.checker, - ttlDurationSec: clientReportTtlSec, - ItemType: itemTypeClient, - } - c.scheduledInstances[clientId] = instValue + instValue = &clientItemValue{ + itemValue: itemValue{ + mutex: &sync.Mutex{}, + host: client.Proto().GetHost().GetValue(), + port: 0, + id: clientId, + expireDurationSec: uint32(expireTtlCount * c.clientCheckTtlSec), + checker: clientWithChecker.checker, + ttlDurationSec: uint32(c.clientCheckTtlSec), + }, + lastCheckTimeSec: 0, + } + c.scheduledClients[clientId] = instValue return false, instValue } @@ -281,6 +255,13 @@ func (c *CheckScheduler) getInstanceValue(instanceId string) (*itemValue, bool) return value, ok } +func (c *CheckScheduler) getClientValue(clientId string) (*clientItemValue, bool) { + c.rwMutex.RLock() + defer c.rwMutex.RUnlock() + value, ok := c.scheduledClients[clientId] + return value, ok +} + // AddInstance add instance to check func (c *CheckScheduler) AddInstance(instanceWithChecker *InstanceWithChecker) { exists, instValue := c.putInstanceIfAbsent(instanceWithChecker) @@ -294,7 +275,7 @@ func (c *CheckScheduler) AddInstance(instanceWithChecker *InstanceWithChecker) { c.addUnHealthyCallback(instValue) } -// AddInstance add instance to check +// AddClient add client to check func (c *CheckScheduler) AddClient(clientWithChecker *ClientWithChecker) { exists, instValue := c.putClientIfAbsent(clientWithChecker) if exists { @@ -302,9 +283,8 @@ func (c *CheckScheduler) AddClient(clientWithChecker *ClientWithChecker) { } c.addAdopting(instValue.id, instValue.checker) client := clientWithChecker.client - log.Infof("[Health Check][Check]add check instance is %s, host is %s:%d", + log.Infof("[Health Check][Check]add check client is %s, host is %s:%d", client.Proto().GetId().GetValue(), client.Proto().GetHost(), 0) - c.addUnHealthyCallback(instValue) } func getExpireDurationSec(instance *apiservice.Instance) uint32 { @@ -337,13 +317,9 @@ func (c *CheckScheduler) addHealthyCallback(instance *itemValue, lastHeartbeatTi port := instance.port instanceId := instance.id delayMilli := delaySec*1000 + getRandDelayMilli() - log.Debugf("[Health Check][Check]add healthy callback, %s is %s:%d, id is %s, delay is %d(ms)", - instance.ItemType.String(), host, port, instanceId, delayMilli) - if instance.ItemType == itemTypeClient { - c.timeWheel.AddTask(delayMilli, instanceId, c.checkCallbackClient) - } else { - c.timeWheel.AddTask(delayMilli, instanceId, c.checkCallbackInstance) - } + log.Debugf("[Health Check][Check]add healthy instance callback, addr is %s:%d, id is %s, delay is %d(ms)", + host, port, instanceId, delayMilli) + c.timeWheel.AddTask(delayMilli, instanceId, c.checkCallbackInstance) } func (c *CheckScheduler) addUnHealthyCallback(instance *itemValue) { @@ -355,66 +331,55 @@ func (c *CheckScheduler) addUnHealthyCallback(instance *itemValue) { port := instance.port instanceId := instance.id delayMilli := delaySec*1000 + getRandDelayMilli() - log.Debugf("[Health Check][Check]add unhealthy callback, %s is %s:%d, id is %s, delay is %d(ms)", - instance.ItemType.String(), host, port, instanceId, delayMilli) - if instance.ItemType == itemTypeClient { - c.timeWheel.AddTask(delayMilli, instanceId, c.checkCallbackClient) - } else { - c.timeWheel.AddTask(delayMilli, instanceId, c.checkCallbackInstance) - } + log.Debugf("[Health Check][Check]add unhealthy instance callback, addr is %s:%d, id is %s, delay is %d(ms)", + host, port, instanceId, delayMilli) + c.timeWheel.AddTask(delayMilli, instanceId, c.checkCallbackInstance) } -func (c *CheckScheduler) checkCallbackClient(value interface{}) { - clientId := value.(string) - instanceValue, ok := c.getInstanceValue(clientId) +func (c *CheckScheduler) checkCallbackClient(clientId string) *clientItemValue { + clientValue, ok := c.getClientValue(clientId) if !ok { log.Infof("[Health Check][Check]client %s has been removed from callback", clientId) - return + return nil } - instanceValue.mutex.Lock() - defer instanceValue.mutex.Unlock() + clientValue.mutex.Lock() + defer clientValue.mutex.Unlock() var checkResp *plugin.CheckResponse var err error - defer func() { - if checkResp != nil && checkResp.Regular && checkResp.Healthy { - c.addHealthyCallback(instanceValue, checkResp.LastHeartbeatTimeSec) - } else { - c.addUnHealthyCallback(instanceValue) - } - }() cachedClient := server.cacheProvider.GetClient(clientId) if cachedClient == nil { - log.Infof("[Health Check][Check]client %s has been deleted", instanceValue.id) - return + log.Infof("[Health Check][Check]client %s has been deleted", clientValue.id) + return clientValue } request := &plugin.CheckRequest{ QueryRequest: plugin.QueryRequest{ - InstanceId: toClientId(instanceValue.id), - Host: instanceValue.host, - Port: instanceValue.port, + InstanceId: toClientId(clientValue.id), + Host: clientValue.host, + Port: clientValue.port, Healthy: true, }, CurTimeSec: currentTimeSec, - ExpireDurationSec: instanceValue.expireDurationSec, + ExpireDurationSec: clientValue.expireDurationSec, } - checkResp, err = instanceValue.checker.Check(request) + checkResp, err = clientValue.checker.Check(request) if err != nil { log.Errorf("[Health Check][Check]fail to check client %s, id is %s, err is %v", - instanceValue.host, instanceValue.id, err) - return + clientValue.host, clientValue.id, err) + return clientValue } if !checkResp.StayUnchanged { if !checkResp.Healthy { log.Infof( "[Health Check][Check]client change from healthy to unhealthy, id is %s, address is %s", - instanceValue.id, instanceValue.host) - code := server.asyncDeleteClient(cachedClient.Proto()) + clientValue.id, clientValue.host) + code := asyncDeleteClient(cachedClient.Proto()) if code != apimodel.Code_ExecuteSuccess { log.Errorf("[Health Check][Check]fail to update client, id is %s, address is %s, code is %d", - instanceValue.id, instanceValue.host, code) + clientValue.id, clientValue.host, code) } } } + return clientValue } func (c *CheckScheduler) checkCallbackInstance(value interface{}) { @@ -480,12 +445,12 @@ func (c *CheckScheduler) checkCallbackInstance(value interface{}) { } } -// DelInstance del instance from check +// DelClient del client from check func (c *CheckScheduler) DelClient(clientWithChecker *ClientWithChecker) { client := clientWithChecker.client clientId := client.Proto().GetId().GetValue() - exists := c.delIfPresent(clientId) - log.Infof("[Health Check][Check]remove check instance is %s:%d, id is %s, exists is %v", + exists := c.delClientIfPresent(clientId) + log.Infof("[Health Check][Check]remove check client is %s:%d, id is %s, exists is %v", client.Proto().GetHost().GetValue(), 0, clientId, exists) if exists { c.removeAdopting(clientId, clientWithChecker.checker) @@ -496,7 +461,7 @@ func (c *CheckScheduler) DelClient(clientWithChecker *ClientWithChecker) { func (c *CheckScheduler) DelInstance(instanceWithChecker *InstanceWithChecker) { instance := instanceWithChecker.instance instanceId := instance.ID() - exists := c.delIfPresent(instanceId) + exists := c.delInstanceIfPresent(instanceId) log.Infof("[Health Check][Check]remove check instance is %s:%d, id is %s, exists is %v", instance.Host(), instance.Port(), instanceId, exists) if exists { @@ -504,7 +469,7 @@ func (c *CheckScheduler) DelInstance(instanceWithChecker *InstanceWithChecker) { } } -func (c *CheckScheduler) delIfPresent(instanceId string) bool { +func (c *CheckScheduler) delInstanceIfPresent(instanceId string) bool { c.rwMutex.Lock() defer c.rwMutex.Unlock() _, ok := c.scheduledInstances[instanceId] @@ -512,6 +477,57 @@ func (c *CheckScheduler) delIfPresent(instanceId string) bool { return ok } +func (c *CheckScheduler) delClientIfPresent(clientId string) bool { + c.rwMutex.Lock() + defer c.rwMutex.Unlock() + _, ok := c.scheduledClients[clientId] + delete(c.scheduledClients, clientId) + return ok +} + +func (c *CheckScheduler) doCheckClient(ctx context.Context) { + log.Infof("[Health Check][Check]client check worker has been started, tick seconds is %d", + c.clientCheckIntervalSec) + tick := time.NewTicker(time.Duration(c.clientCheckIntervalSec*1000+int64(getRandDelayMilli())) * time.Millisecond) + defer tick.Stop() + for { + select { + case <-tick.C: + var itemsToCheck []string + if len(c.scheduledClients) == 0 { + continue + } + curTimeSec := currentTimeSec() + c.rwMutex.RLock() + for id, value := range c.scheduledClients { + if value.lastCheckTimeSec == 0 { + itemsToCheck = append(itemsToCheck, id) + } + diff := curTimeSec - value.lastCheckTimeSec + if diff < 0 || diff >= int64(value.expireDurationSec) { + itemsToCheck = append(itemsToCheck, id) + } + } + c.rwMutex.RUnlock() + if len(itemsToCheck) == 0 { + continue + } + for _, id := range itemsToCheck { + item := c.checkCallbackClient(id) + if nil != item { + item.lastCheckTimeSec = currentTimeSec() + } + } + timeCost := currentTimeSec() - curTimeSec + log.Infof("[Health Check][Check]client check finished, time cost %d, client count %d", + timeCost, len(itemsToCheck)) + case <-ctx.Done(): + log.Infof("[Health Check][Check]client check worker has been stopped") + return + } + } +} + // setInsDbStatus 修改实例状态, 需要打印操作记录 func setInsDbStatus(instance *model.Instance, healthStatus bool) apimodel.Code { id := instance.ID() @@ -521,9 +537,9 @@ func setInsDbStatus(instance *model.Instance, healthStatus bool) apimodel.Code { var code apimodel.Code if server.bc.HeartbeatOpen() { - code = server.asyncSetInsDbStatus(instance.Proto, healthStatus) + code = asyncSetInsDbStatus(instance.Proto, healthStatus) } else { - code = server.serialSetInsDbStatus(instance.Proto, healthStatus) + code = serialSetInsDbStatus(instance.Proto, healthStatus) } if code != apimodel.Code_ExecuteSuccess { return code @@ -556,8 +572,8 @@ func setInsDbStatus(instance *model.Instance, healthStatus bool) apimodel.Code { // 底层函数会合并delete请求,增加并发创建的吞吐 // req 原始请求 // ins 包含了req数据与instanceID,serviceToken -func (s *Server) asyncSetInsDbStatus(ins *apiservice.Instance, healthStatus bool) apimodel.Code { - future := s.bc.AsyncHeartbeat(ins, healthStatus) +func asyncSetInsDbStatus(ins *apiservice.Instance, healthStatus bool) apimodel.Code { + future := server.bc.AsyncHeartbeat(ins, healthStatus) if err := future.Wait(); err != nil { log.Error(err.Error()) } @@ -568,8 +584,8 @@ func (s *Server) asyncSetInsDbStatus(ins *apiservice.Instance, healthStatus bool // 底层函数会合并delete请求,增加并发创建的吞吐 // req 原始请求 // ins 包含了req数据与instanceID,serviceToken -func (s *Server) asyncDeleteClient(client *apiservice.Client) apimodel.Code { - future := s.bc.AsyncDeregisterClient(client) +func asyncDeleteClient(client *apiservice.Client) apimodel.Code { + future := server.bc.AsyncDeregisterClient(client) if err := future.Wait(); err != nil { log.Error("[Health Check][Check] async delete client", zap.String("client-id", client.GetId().GetValue()), zap.Error(err)) @@ -580,7 +596,7 @@ func (s *Server) asyncDeleteClient(client *apiservice.Client) apimodel.Code { // serialSetInsDbStatus 同步串行创建实例 // req为原始的请求体 // ins包括了req的内容,并且填充了instanceID与serviceToken -func (s *Server) serialSetInsDbStatus(ins *apiservice.Instance, healthStatus bool) apimodel.Code { +func serialSetInsDbStatus(ins *apiservice.Instance, healthStatus bool) apimodel.Code { id := ins.GetId().GetValue() err := server.storage.SetInstanceHealthStatus(id, model.StatusBoolToInt(healthStatus), utils.NewUUID()) if err != nil { @@ -589,120 +605,3 @@ func (s *Server) serialSetInsDbStatus(ins *apiservice.Instance, healthStatus boo } return apimodel.Code_ExecuteSuccess } - -type leaderChangeEventHandler struct { - cacheProvider *CacheProvider - ctx context.Context - cancel context.CancelFunc - minCheckInterval time.Duration -} - -// newLeaderChangeEventHandler -func newLeaderChangeEventHandler(cacheProvider *CacheProvider, - minCheckInterval time.Duration) *leaderChangeEventHandler { - - return &leaderChangeEventHandler{ - cacheProvider: cacheProvider, - minCheckInterval: minCheckInterval, - } -} - -// checkSelfServiceInstances -func (handler *leaderChangeEventHandler) handle(ctx context.Context, i interface{}) error { - e := i.(store.LeaderChangeEvent) - if e.Key != store.ELECTION_KEY_SELF_SERVICE_CHECKER { - return nil - } - - if e.Leader { - handler.startCheckSelfServiceInstances() - } else { - handler.stopCheckSelfServiceInstances() - } - return nil -} - -// startCheckSelfServiceInstances -func (handler *leaderChangeEventHandler) startCheckSelfServiceInstances() { - if handler.ctx != nil { - log.Warn("[healthcheck] receive unexpected leader state event") - return - } - - ctx, cancel := context.WithCancel(context.Background()) - handler.ctx = ctx - handler.cancel = cancel - go func() { - log.Info("[healthcheck] i am leader, start check health of selfService instances") - ticker := time.NewTicker(handler.minCheckInterval) - defer ticker.Stop() - for { - select { - case <-ticker.C: - handler.cacheProvider.selfServiceInstances.Range(func(instanceId string, value ItemWithChecker) { - handler.doCheckSelfServiceInstance(value.GetInstance()) - }) - case <-ctx.Done(): - log.Info("[healthcheck] stop check health of selfService instances") - return - } - } - }() -} - -// startCheckSelfServiceInstances -func (handler *leaderChangeEventHandler) stopCheckSelfServiceInstances() { - if handler.ctx == nil { - log.Warn("[healthcheck] receive unexpected follower state event") - return - } - handler.cancel() - handler.ctx = nil - handler.cancel = nil -} - -// startCheckSelfServiceInstances -func (handler *leaderChangeEventHandler) doCheckSelfServiceInstance(cachedInstance *model.Instance) { - hcEnable, checker := handler.cacheProvider.isHealthCheckEnable(cachedInstance.Proto) - if !hcEnable { - log.Warnf("[Health Check][Check] selfService instance %s:%d not enable healthcheck", - cachedInstance.Host(), cachedInstance.Port()) - return - } - - request := &plugin.CheckRequest{ - QueryRequest: plugin.QueryRequest{ - InstanceId: cachedInstance.ID(), - Host: cachedInstance.Host(), - Port: cachedInstance.Port(), - Healthy: cachedInstance.Healthy(), - }, - CurTimeSec: currentTimeSec, - ExpireDurationSec: getExpireDurationSec(cachedInstance.Proto), - } - checkResp, err := checker.Check(request) - if err != nil { - log.Errorf("[Health Check][Check]fail to check selfService instance %s:%d, id is %s, err is %v", - cachedInstance.Host(), cachedInstance.Port(), cachedInstance.ID(), err) - return - } - if !checkResp.StayUnchanged { - code := setInsDbStatus(cachedInstance, checkResp.Healthy) - if checkResp.Healthy { - // from unhealthy to healthy - log.Infof( - "[Health Check][Check]selfService instance change from unhealthy to healthy, id is %s, address is %s:%d", - cachedInstance.ID(), cachedInstance.Host(), cachedInstance.Port()) - } else { - // from healthy to unhealthy - log.Infof( - "[Health Check][Check]selfService instance change from healthy to unhealthy, id is %s, address is %s:%d", - cachedInstance.ID(), cachedInstance.Host(), cachedInstance.Port()) - } - if code != apimodel.Code_ExecuteSuccess { - log.Errorf( - "[Health Check][Check]fail to update selfService instance, id is %s, address is %s:%d, code is %d", - cachedInstance.ID(), cachedInstance.Host(), cachedInstance.Port(), code) - } - } -} diff --git a/service/healthcheck/config.go b/service/healthcheck/config.go index bd525ef10..6dc6c59d9 100644 --- a/service/healthcheck/config.go +++ b/service/healthcheck/config.go @@ -25,21 +25,24 @@ import ( // Config 健康检查配置 type Config struct { - Open bool `yaml:"open"` - Service string `yaml:"service"` - SlotNum int `yaml:"slotNum"` - LocalHost string `yaml:"localHost"` - MinCheckInterval time.Duration `yaml:"minCheckInterval"` - MaxCheckInterval time.Duration `yaml:"maxCheckInterval"` - ClientReportInterval time.Duration `yaml:"clientReportInterval"` - Checkers []plugin.ConfigEntry `yaml:"checkers"` - Batch map[string]interface{} `yaml:"batch"` + Open bool `yaml:"open"` + Service string `yaml:"service"` + SlotNum int `yaml:"slotNum"` + LocalHost string `yaml:"localHost"` + MinCheckInterval time.Duration `yaml:"minCheckInterval"` + MaxCheckInterval time.Duration `yaml:"maxCheckInterval"` + ClientCheckInterval time.Duration `yaml:"clientCheckInterval"` + ClientCheckTtl time.Duration `yaml:"clientCheckTtl"` + Checkers []plugin.ConfigEntry `yaml:"checkers"` + Batch map[string]interface{} `yaml:"batch"` } const ( - minCheckInterval = 1 * time.Second - maxCheckInterval = 30 * time.Second - defaultClientReportInterval = 120 * time.Second + defaultMinCheckInterval = 1 * time.Second + defaultMaxCheckInterval = 30 * time.Second + defaultSlotNum = 30 + defaultClientReportTtl = 120 * time.Second + defaultClientCheckInterval = 120 * time.Second ) // SetDefault 设置默认值 @@ -48,19 +51,22 @@ func (c *Config) SetDefault() { c.Service = "polaris.checker" } if c.SlotNum == 0 { - c.SlotNum = 30 + c.SlotNum = defaultSlotNum } if c.MinCheckInterval == 0 { - c.MinCheckInterval = minCheckInterval + c.MinCheckInterval = defaultMinCheckInterval } if c.MaxCheckInterval == 0 { - c.MaxCheckInterval = maxCheckInterval + c.MaxCheckInterval = defaultMaxCheckInterval } if c.MinCheckInterval > c.MaxCheckInterval { - c.MinCheckInterval = minCheckInterval - c.MaxCheckInterval = maxCheckInterval + c.MinCheckInterval = defaultMinCheckInterval + c.MaxCheckInterval = defaultMaxCheckInterval } - if c.ClientReportInterval == 0 { - c.ClientReportInterval = defaultClientReportInterval + if c.ClientCheckInterval == 0 { + c.ClientCheckInterval = defaultClientCheckInterval + } + if c.ClientCheckTtl == 0 { + c.ClientCheckTtl = defaultClientReportTtl } } diff --git a/service/healthcheck/leader.go b/service/healthcheck/leader.go new file mode 100644 index 000000000..fa71d843d --- /dev/null +++ b/service/healthcheck/leader.go @@ -0,0 +1,147 @@ +/** + * Tencent is pleased to support the open source community by making Polaris available. + * + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package healthcheck + +import ( + "context" + "time" + + apimodel "github.com/polarismesh/specification/source/go/api/v1/model" + + "github.com/polarismesh/polaris/common/model" + "github.com/polarismesh/polaris/plugin" + "github.com/polarismesh/polaris/store" +) + +// LeaderChangeEventHandler process the event when server act as leader +type LeaderChangeEventHandler struct { + cacheProvider *CacheProvider + ctx context.Context + cancel context.CancelFunc + minCheckInterval time.Duration +} + +// newLeaderChangeEventHandler +func newLeaderChangeEventHandler(cacheProvider *CacheProvider, + minCheckInterval time.Duration) *LeaderChangeEventHandler { + + return &LeaderChangeEventHandler{ + cacheProvider: cacheProvider, + minCheckInterval: minCheckInterval, + } +} + +// checkSelfServiceInstances +func (handler *LeaderChangeEventHandler) handle(ctx context.Context, i interface{}) error { + e := i.(store.LeaderChangeEvent) + if e.Key != store.ELECTION_KEY_SELF_SERVICE_CHECKER { + return nil + } + + if e.Leader { + handler.startCheckSelfServiceInstances() + } else { + handler.stopCheckSelfServiceInstances() + } + return nil +} + +// startCheckSelfServiceInstances +func (handler *LeaderChangeEventHandler) startCheckSelfServiceInstances() { + if handler.ctx != nil { + log.Warn("[healthcheck] receive unexpected leader state event") + return + } + + ctx, cancel := context.WithCancel(context.Background()) + handler.ctx = ctx + handler.cancel = cancel + go func() { + log.Info("[healthcheck] i am leader, start check health of selfService instances") + ticker := time.NewTicker(handler.minCheckInterval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + handler.cacheProvider.selfServiceInstances.Range(func(instanceId string, value ItemWithChecker) { + handler.doCheckSelfServiceInstance(value.GetInstance()) + }) + case <-ctx.Done(): + log.Info("[healthcheck] stop check health of selfService instances") + return + } + } + }() +} + +// startCheckSelfServiceInstances +func (handler *LeaderChangeEventHandler) stopCheckSelfServiceInstances() { + if handler.ctx == nil { + log.Warn("[healthcheck] receive unexpected follower state event") + return + } + handler.cancel() + handler.ctx = nil + handler.cancel = nil +} + +// startCheckSelfServiceInstances +func (handler *LeaderChangeEventHandler) doCheckSelfServiceInstance(cachedInstance *model.Instance) { + hcEnable, checker := handler.cacheProvider.isHealthCheckEnable(cachedInstance.Proto) + if !hcEnable { + log.Warnf("[Health Check][Check] selfService instance %s:%d not enable healthcheck", + cachedInstance.Host(), cachedInstance.Port()) + return + } + + request := &plugin.CheckRequest{ + QueryRequest: plugin.QueryRequest{ + InstanceId: cachedInstance.ID(), + Host: cachedInstance.Host(), + Port: cachedInstance.Port(), + Healthy: cachedInstance.Healthy(), + }, + CurTimeSec: currentTimeSec, + ExpireDurationSec: getExpireDurationSec(cachedInstance.Proto), + } + checkResp, err := checker.Check(request) + if err != nil { + log.Errorf("[Health Check][Check]fail to check selfService instance %s:%d, id is %s, err is %v", + cachedInstance.Host(), cachedInstance.Port(), cachedInstance.ID(), err) + return + } + if !checkResp.StayUnchanged { + code := setInsDbStatus(cachedInstance, checkResp.Healthy) + if checkResp.Healthy { + // from unhealthy to healthy + log.Infof( + "[Health Check][Check]selfService instance change from unhealthy to healthy, id is %s, address is %s:%d", + cachedInstance.ID(), cachedInstance.Host(), cachedInstance.Port()) + } else { + // from healthy to unhealthy + log.Infof( + "[Health Check][Check]selfService instance change from healthy to unhealthy, id is %s, address is %s:%d", + cachedInstance.ID(), cachedInstance.Host(), cachedInstance.Port()) + } + if code != apimodel.Code_ExecuteSuccess { + log.Errorf( + "[Health Check][Check]fail to update selfService instance, id is %s, address is %s:%d, code is %d", + cachedInstance.ID(), cachedInstance.Host(), cachedInstance.Port(), code) + } + } +} diff --git a/service/healthcheck/server.go b/service/healthcheck/server.go index 745a68e4e..54b7b5186 100644 --- a/service/healthcheck/server.go +++ b/service/healthcheck/server.go @@ -114,7 +114,8 @@ func initialize(ctx context.Context, hcOpt *Config, cacheOpen bool, bc *batch.Co server.cacheProvider = newCacheProvider(hcOpt.Service, server) server.timeAdjuster = newTimeAdjuster(ctx, server.storage) - server.checkScheduler = newCheckScheduler(ctx, hcOpt.SlotNum, hcOpt.MinCheckInterval, hcOpt.MaxCheckInterval) + server.checkScheduler = newCheckScheduler(ctx, hcOpt.SlotNum, hcOpt.MinCheckInterval, + hcOpt.MaxCheckInterval, hcOpt.ClientCheckInterval, hcOpt.ClientCheckTtl) server.dispatcher = newDispatcher(ctx, server) server.discoverCh = make(chan eventWrapper, 32) @@ -151,6 +152,11 @@ func GetServer() (*Server, error) { return server, nil } +// SetServer for test only +func SetServer(srv *Server) { + server = srv +} + // SetServiceCache 设置服务缓存 func (s *Server) SetServiceCache(serviceCache cache.ServiceCache) { s.serviceCache = serviceCache diff --git a/service/healthcheck/test_export.go b/service/healthcheck/test_export.go index 4c7f9a277..04806e1a4 100644 --- a/service/healthcheck/test_export.go +++ b/service/healthcheck/test_export.go @@ -65,7 +65,8 @@ func TestInitialize(ctx context.Context, hcOpt *Config, cacheOpen bool, bc *batc testServer.cacheProvider = newCacheProvider(hcOpt.Service, testServer) testServer.timeAdjuster = newTimeAdjuster(ctx, storage) - testServer.checkScheduler = newCheckScheduler(ctx, hcOpt.SlotNum, hcOpt.MinCheckInterval, hcOpt.MaxCheckInterval) + testServer.checkScheduler = newCheckScheduler(ctx, hcOpt.SlotNum, hcOpt.MinCheckInterval, + hcOpt.MaxCheckInterval, hcOpt.ClientCheckInterval, hcOpt.ClientCheckTtl) testServer.dispatcher = newDispatcher(ctx, testServer) testServer.discoverCh = make(chan eventWrapper, 32) diff --git a/service/healthcheck/time_adjust.go b/service/healthcheck/time_adjust.go index 178fa2540..a64c01a18 100644 --- a/service/healthcheck/time_adjust.go +++ b/service/healthcheck/time_adjust.go @@ -73,5 +73,8 @@ func (t *TimeAdjuster) calcDiff() { // GetDiff get diff time between store and current PC func (t *TimeAdjuster) GetDiff() int64 { + if nil == t { + return 0 + } return atomic.LoadInt64(&t.diff) } diff --git a/service/main_test.go b/service/main_test.go index 828469746..366e28303 100644 --- a/service/main_test.go +++ b/service/main_test.go @@ -242,6 +242,7 @@ func (d *DiscoverTestSuit) initialize(opts ...options) error { if err != nil { panic(err) } + healthcheck.SetServer(healthCheckServer) cacheProvider, err := healthCheckServer.CacheProvider() if err != nil { panic(err) diff --git a/store/boltdb/client.go b/store/boltdb/client.go index 23aa38230..2234cb94a 100644 --- a/store/boltdb/client.go +++ b/store/boltdb/client.go @@ -141,15 +141,15 @@ func convertToClientObject(client *model.Client) (*clientObject, error) { } tn := time.Now() return &clientObject{ - Host: client.Proto().Host.Value, - Type: client.Proto().Type.String(), - Version: client.Proto().Version.Value, + Host: client.Proto().GetHost().GetValue(), + Type: client.Proto().GetType().String(), + Version: client.Proto().GetVersion().GetValue(), Location: map[string]string{ "region": client.Proto().GetLocation().GetRegion().GetValue(), "zone": client.Proto().GetLocation().GetZone().GetValue(), "campus": client.Proto().GetLocation().GetCampus().GetValue(), }, - Id: client.Proto().Id.Value, + Id: client.Proto().GetId().GetValue(), Ctime: tn, Mtime: tn, StatArrStr: string(data), diff --git a/testdata/service_test.yaml b/testdata/service_test.yaml index d9abec26f..727d82804 100644 --- a/testdata/service_test.yaml +++ b/testdata/service_test.yaml @@ -185,11 +185,14 @@ bootstrap: # - stdout # errorOutputPaths: # - stderr -# - name: l5pbserver -# option: -# listenIP: 0.0.0.0 -# listenPort: 7779 -# clusterName: cl5.discover + polaris_service: + # probe_address: ##DB_ADDR## + enable_register: true + isolated: false + services: + - name: polaris.checker + protocols: + - service-grpc # 核心逻辑的配置 auth: # 鉴权插件 @@ -225,7 +228,7 @@ naming: clientRegister: open: true queueSize: 10240 - waitTime: 32s + waitTime: 32ms maxBatchCount: 1024 concurrency: 64 clientDeregister: @@ -248,6 +251,8 @@ healthcheck: slotNum: 30 minCheckInterval: 1s maxCheckInterval: 30s + clientCheckInterval: 1s + clientCheckTtl: 2s batch: heartbeat: open: true diff --git a/testdata/service_test_sqldb.yaml b/testdata/service_test_sqldb.yaml index bcdb496b3..9f7afdc1f 100644 --- a/testdata/service_test_sqldb.yaml +++ b/testdata/service_test_sqldb.yaml @@ -225,7 +225,7 @@ naming: clientRegister: open: true queueSize: 10240 - waitTime: 32s + waitTime: 32ms maxBatchCount: 1024 concurrency: 64 clientDeregister: @@ -245,6 +245,8 @@ healthcheck: slotNum: 30 minCheckInterval: 1s maxCheckInterval: 30s + clientCheckInterval: 1s + clientCheckTtl: 2s batch: heartbeat: open: true