diff --git a/apiserver/grpcserver/base_test.go b/apiserver/grpcserver/base_test.go index cb4446ccd..d73d0b1a7 100644 --- a/apiserver/grpcserver/base_test.go +++ b/apiserver/grpcserver/base_test.go @@ -24,8 +24,6 @@ import ( "google.golang.org/grpc/metadata" - "github.com/polarismesh/polaris/apiserver" - grpchelp "github.com/polarismesh/polaris/apiserver/grpcserver/utils" "github.com/polarismesh/polaris/common/utils" ) @@ -122,58 +120,3 @@ func TestConvertContext(t *testing.T) { }) } } - -func TestGetClientOpenMethod(t *testing.T) { - type args struct { - include []string - protocol string - } - tests := []struct { - name string - args args - want map[string]bool - wantErr bool - }{ - { - name: "case=1", - args: args{ - include: []string{ - apiserver.RegisterAccess, - }, - protocol: "grpc", - }, - want: map[string]bool{ - "/v1.PolarisGRPC/RegisterInstance": true, - "/v1.PolarisGRPC/DeregisterInstance": true, - }, - wantErr: false, - }, - { - name: "case=2", - args: args{ - include: []string{ - apiserver.DiscoverAccess, - }, - protocol: "grpc", - }, - want: map[string]bool{ - "/v1.PolarisGRPC/Discover": true, - "/v1.PolarisGRPC/ReportClient": true, - "/v1.PolarisServiceContractGRPC/ReportServiceContract": true, - }, - wantErr: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, err := grpchelp.GetClientOpenMethod(tt.args.include, tt.args.protocol) - if (err != nil) != tt.wantErr { - t.Errorf("GetClientOpenMethod() error = %v, wantErr %v", err, tt.wantErr) - return - } - if !reflect.DeepEqual(got, tt.want) { - t.Errorf("GetClientOpenMethod() = %v, want %v", got, tt.want) - } - }) - } -} diff --git a/apiserver/grpcserver/config/client_access.go b/apiserver/grpcserver/config/client_access.go index 86922c882..414f7eca7 100644 --- a/apiserver/grpcserver/config/client_access.go +++ b/apiserver/grpcserver/config/client_access.go @@ -88,6 +88,21 @@ func (g *ConfigGRPCServer) PublishConfigFile(ctx context.Context, return response, nil } +func (g *ConfigGRPCServer) UpsertAndPublishConfigFile(ctx context.Context, + req *apiconfig.ConfigFilePublishInfo) (*apiconfig.ConfigClientResponse, error) { + ctx = utils.ConvertGRPCContext(ctx) + response := g.configServer.CasUpsertAndReleaseConfigFileFromClient(ctx, req) + return &apiconfig.ConfigClientResponse{ + Code: response.Code, + Info: response.Info, + ConfigFile: &apiconfig.ClientConfigFileInfo{ + Namespace: req.Namespace, + Group: req.Group, + FileName: req.FileName, + }, + }, nil +} + // WatchConfigFiles 订阅配置变更 func (g *ConfigGRPCServer) WatchConfigFiles(ctx context.Context, request *apiconfig.ClientWatchConfigFileRequest) (*apiconfig.ConfigClientResponse, error) { diff --git a/apiserver/grpcserver/config/server.go b/apiserver/grpcserver/config/server.go index b64266618..f6b169020 100644 --- a/apiserver/grpcserver/config/server.go +++ b/apiserver/grpcserver/config/server.go @@ -128,8 +128,16 @@ func (g *ConfigGRPCServer) allowAccess(method string) bool { // GetClientOpenMethod . func GetClientOpenMethod(protocol string) (map[string]bool, error) { - openMethods := []string{"GetConfigFile", "CreateConfigFile", - "UpdateConfigFile", "PublishConfigFile", "WatchConfigFiles", "GetConfigFileMetadataList", "Discover"} + openMethods := []string{ + "GetConfigFile", + "CreateConfigFile", + "UpdateConfigFile", + "PublishConfigFile", + "WatchConfigFiles", + "GetConfigFileMetadataList", + "UpsertAndPublishConfigFile", + "Discover", + } openMethod := make(map[string]bool) diff --git a/apiserver/grpcserver/discover/v1/client_access.go b/apiserver/grpcserver/discover/v1/client_access.go index 30b967c73..89a3d5d19 100644 --- a/apiserver/grpcserver/discover/v1/client_access.go +++ b/apiserver/grpcserver/discover/v1/client_access.go @@ -163,9 +163,6 @@ func (g *DiscoverServer) Discover(server apiservice.PolarisGRPC_DiscoverServer) case apiservice.DiscoverRequest_FAULT_DETECTOR: action = metrics.ActionDiscoverFaultDetect out = g.namingServer.GetFaultDetectWithCache(ctx, in.Service) - case apiservice.DiscoverRequest_SERVICE_CONTRACT: - action = metrics.ActionDiscoverServiceContract - out = g.namingServer.GetServiceContractWithCache(ctx, in.ServiceContract) default: out = api.NewDiscoverRoutingResponse(apimodel.Code_InvalidDiscoverResource, in.Service) } @@ -186,6 +183,16 @@ func (g *DiscoverServer) ReportServiceContract(ctx context.Context, in *apiservi return out, nil } +// 查询服务契约 +func (g *DiscoverServer) GetServiceContract(ctx context.Context, req *apiservice.ServiceContract) (*apiservice.Response, error) { + // 需要记录操作来源,提高效率,只针对特殊接口添加operator + rCtx := utils.ConvertGRPCContext(ctx) + rCtx = context.WithValue(rCtx, utils.StringContext("operator"), ParseGrpcOperator(ctx)) + + out := g.namingServer.GetServiceContractWithCache(rCtx, req) + return out, nil +} + // ParseGrpcOperator 构造请求源 func ParseGrpcOperator(ctx context.Context) string { // 获取请求源 diff --git a/apiserver/grpcserver/utils/help.go b/apiserver/grpcserver/utils/help.go index 28245434e..1d9a70a00 100644 --- a/apiserver/grpcserver/utils/help.go +++ b/apiserver/grpcserver/utils/help.go @@ -30,7 +30,7 @@ import ( // GetClientOpenMethod 获取客户端openMethod func GetClientOpenMethod(include []string, protocol string) (map[string]bool, error) { clientAccess := make(map[string][]string) - clientAccess[apiserver.DiscoverAccess] = []string{"Discover", "ReportClient", "ReportServiceContract"} + clientAccess[apiserver.DiscoverAccess] = []string{"Discover", "ReportClient", "ReportServiceContract", "GetServiceContract"} clientAccess[apiserver.RegisterAccess] = []string{"RegisterInstance", "DeregisterInstance"} clientAccess[apiserver.HealthcheckAccess] = []string{"Heartbeat", "BatchHeartbeat", "BatchGetHeartbeat", "BatchDelHeartbeat"} @@ -49,7 +49,7 @@ func GetClientOpenMethod(include []string, protocol string) (map[string]bool, er if item == apiserver.HealthcheckAccess && method != "Heartbeat" { recordMethod = "/v1.PolarisHeartbeat" + strings.ToUpper(protocol) + "/" + method } - if method == "ReportServiceContract" { + if method == "ReportServiceContract" || method == "GetServiceContract" { recordMethod = "/v1.PolarisServiceContract" + strings.ToUpper(protocol) + "/" + method } openMethod[recordMethod] = true diff --git a/apiserver/grpcserver/utils/help_test.go b/apiserver/grpcserver/utils/help_test.go index e9c42abb8..f499c11f1 100644 --- a/apiserver/grpcserver/utils/help_test.go +++ b/apiserver/grpcserver/utils/help_test.go @@ -61,6 +61,7 @@ func TestGetClientOpenMethod(t *testing.T) { "/v1.PolarisGRPC/Discover": true, "/v1.PolarisGRPC/ReportClient": true, "/v1.PolarisServiceContractGRPC/ReportServiceContract": true, + "/v1.PolarisServiceContractGRPC/GetServiceContract": true, }, wantErr: false, }, diff --git a/apiserver/nacosserver/core/storage.go b/apiserver/nacosserver/core/storage.go index 3347a23df..969b58ac9 100644 --- a/apiserver/nacosserver/core/storage.go +++ b/apiserver/nacosserver/core/storage.go @@ -282,6 +282,9 @@ type ServiceData struct { } func (s *ServiceData) loadInstances(svcIns *model.ServiceInstances) { + if svcIns == nil { + return + } var ( finalInstances = map[string]*nacosmodel.Instance{} ) diff --git a/apiserver/xdsserverv3/cds.go b/apiserver/xdsserverv3/cds.go index 91d774504..a4b9f0718 100644 --- a/apiserver/xdsserverv3/cds.go +++ b/apiserver/xdsserverv3/cds.go @@ -148,8 +148,7 @@ func (cds *CDSBuilder) makeCluster(svcInfo *resource.ServiceInfo, trafficDirection corev3.TrafficDirection, opt *resource.BuildOption) *cluster.Cluster { name := resource.MakeServiceName(svcInfo.ServiceKey, trafficDirection, opt) - - return &cluster.Cluster{ + c := &cluster.Cluster{ Name: name, ConnectTimeout: durationpb.New(5 * time.Second), ClusterDiscoveryType: &cluster.Cluster_Type{Type: cluster.Cluster_EDS}, @@ -161,8 +160,12 @@ func (cds *CDSBuilder) makeCluster(svcInfo *resource.ServiceInfo, }, }, }, - LbSubsetConfig: resource.MakeLbSubsetConfig(svcInfo), - OutlierDetection: resource.MakeOutlierDetection(svcInfo), - HealthChecks: resource.MakeHealthCheck(svcInfo), } + // 只有针对出流量场景才能设置 Cluster 的相关信息 + if opt.TrafficDirection == corev3.TrafficDirection_OUTBOUND { + c.LbSubsetConfig = resource.MakeLbSubsetConfig(svcInfo) + c.OutlierDetection = resource.MakeOutlierDetection(svcInfo) + c.HealthChecks = resource.MakeHealthCheck(svcInfo) + } + return c } diff --git a/apiserver/xdsserverv3/rds.go b/apiserver/xdsserverv3/rds.go index 67649a20c..69cffb3ff 100644 --- a/apiserver/xdsserverv3/rds.go +++ b/apiserver/xdsserverv3/rds.go @@ -156,15 +156,8 @@ func (rds *RDSBuilder) makeSidecarInBoundRoutes(selfService model.ServiceKey, }, Action: &route.Route_Route{ Route: &route.RouteAction{ - ClusterSpecifier: &route.RouteAction_WeightedClusters{ - WeightedClusters: &route.WeightedCluster{ - Clusters: []*route.WeightedCluster_ClusterWeight{ - { - Name: resource.MakeServiceName(selfService, trafficDirection, opt), - Weight: wrapperspb.UInt32(100), - }, - }, - }, + ClusterSpecifier: &route.RouteAction_Cluster{ + Cluster: resource.MakeServiceName(selfService, trafficDirection, opt), }, }, }, diff --git a/apiserver/xdsserverv3/resource/help.go b/apiserver/xdsserverv3/resource/help.go index 699ab05fb..9d78bc498 100644 --- a/apiserver/xdsserverv3/resource/help.go +++ b/apiserver/xdsserverv3/resource/help.go @@ -234,7 +234,6 @@ func BuildWeightClustersV2(trafficDirection corev3.TrafficDirection, weightedClusters = append(weightedClusters, weightCluster) totalWeight += destination.Weight } - return &route.WeightedCluster{ TotalWeight: &wrappers.UInt32Value{Value: totalWeight}, Clusters: weightedClusters, @@ -1045,10 +1044,7 @@ func MakeLbSubsetConfig(serviceInfo *ServiceInfo) *cluster.Cluster_LbSubsetConfi return nil } - lbSubsetConfig := &cluster.Cluster_LbSubsetConfig{} var subsetSelectors []*cluster.Cluster_LbSubsetConfig_LbSubsetSelector - lbSubsetConfig.FallbackPolicy = cluster.Cluster_LbSubsetConfig_ANY_ENDPOINT - for _, rule := range rules { // 对每一个 destination 产生一个 subset for _, destination := range rule.GetDestinations() { @@ -1058,13 +1054,15 @@ func MakeLbSubsetConfig(serviceInfo *ServiceInfo) *cluster.Cluster_LbSubsetConfi } subsetSelectors = append(subsetSelectors, &cluster.Cluster_LbSubsetConfig_LbSubsetSelector{ Keys: keys, - FallbackPolicy: cluster.Cluster_LbSubsetConfig_LbSubsetSelector_NO_FALLBACK, + FallbackPolicy: cluster.Cluster_LbSubsetConfig_LbSubsetSelector_ANY_ENDPOINT, }) } } - lbSubsetConfig.SubsetSelectors = subsetSelectors - return lbSubsetConfig + return &cluster.Cluster_LbSubsetConfig{ + SubsetSelectors: subsetSelectors, + FallbackPolicy: cluster.Cluster_LbSubsetConfig_ANY_ENDPOINT, + } } func GenEndpointMetaFromPolarisIns(ins *apiservice.Instance) *core.Metadata { @@ -1082,7 +1080,7 @@ func GenEndpointMetaFromPolarisIns(ins *apiservice.Instance) *core.Metadata { meta.FilterMetadata["envoy.lb"] = &_struct.Struct{ Fields: fields, } - if ins.Metadata != nil && ins.Metadata[TLSModeTag] != "" { + if ins.Metadata != nil && EnableTLS(TLSMode(ins.Metadata[TLSModeTag])) { meta.FilterMetadata["envoy.transport_socket_match"] = MTLSTransportSocketMatch } return meta diff --git a/apiserver/xdsserverv3/server.go b/apiserver/xdsserverv3/server.go index e4b13167e..b647ca3e4 100644 --- a/apiserver/xdsserverv3/server.go +++ b/apiserver/xdsserverv3/server.go @@ -242,11 +242,8 @@ func (x *XDSServer) activeUpdateTask() { func (x *XDSServer) startSynTask(ctx context.Context) { // 读取 polaris 缓存数据 synXdsConfFunc := func() { - - registryInfo := make(map[string]map[model.ServiceKey]*resource.ServiceInfo) - - err := x.getRegistryInfoWithCache(ctx, registryInfo) - if err != nil { + curRegistryInfo := make(map[string]map[model.ServiceKey]*resource.ServiceInfo) + if err := x.getRegistryInfoWithCache(ctx, curRegistryInfo); err != nil { log.Error("get registry info from cache", zap.Error(err)) return } @@ -254,24 +251,19 @@ func (x *XDSServer) startSynTask(ctx context.Context) { needPush := make(map[string]map[model.ServiceKey]*resource.ServiceInfo) needRemove := make(map[string]map[model.ServiceKey]*resource.ServiceInfo) - // 处理删除 ns 中最后一个 service - for ns, infos := range x.registryInfo { - _, ok := registryInfo[ns] - if !ok && len(infos) > 0 { - // 这一次轮询时,该命名空间下的最后一个服务已经被删除了,此时,当前的命名空间需要处理 - needPush[ns] = map[model.ServiceKey]*resource.ServiceInfo{} - x.registryInfo[ns] = map[model.ServiceKey]*resource.ServiceInfo{} - } - } + // 与本地缓存对比,是否发生了变化,对发生变化的命名空间,推送配置 + // step 1: 这里先生成需要删除 XDS 资源数据的资源信息 for ns, infos := range x.registryInfo { - if _, exist := registryInfo[ns]; !exist { + // 如果当前整个命名空间都不存在,直接按照整个 namespace 级别进行数据删除 + if _, exist := curRegistryInfo[ns]; !exist { needRemove[ns] = infos continue } + // 命名空间存在,但是命名空间下的服务有删除情况,需要找出来 for _, info := range infos { - cacheServiceInfos := registryInfo[ns] + cacheServiceInfos := curRegistryInfo[ns] if _, ok := cacheServiceInfos[info.ServiceKey]; !ok { if _, ok := needRemove[ns]; !ok { needRemove[ns] = make(map[model.ServiceKey]*resource.ServiceInfo) @@ -282,21 +274,17 @@ func (x *XDSServer) startSynTask(ctx context.Context) { } } - // 与本地缓存对比,是否发生了变化,对发生变化的命名空间,推送配置 - for ns, infos := range registryInfo { + for ns, infos := range curRegistryInfo { cacheServiceInfos, ok := x.registryInfo[ns] if !ok { // 新命名空间,需要处理 needPush[ns] = infos - x.registryInfo[ns] = infos continue } - // todo 不考虑命名空间删除的情况 // 判断当前这个空间,是否需要更新配置 if x.checkUpdate(infos, cacheServiceInfos) { needPush[ns] = infos - x.registryInfo[ns] = infos } } @@ -305,6 +293,7 @@ func (x *XDSServer) startSynTask(ctx context.Context) { zap.Int("need-remove", len(needRemove))) x.Generate(needPush, needRemove) } + x.registryInfo = curRegistryInfo } ticker := time.NewTicker(5 * cache.UpdateCacheInterval) @@ -357,7 +346,7 @@ func (x *XDSServer) getRegistryInfoWithCache(ctx context.Context, } if err := x.namingServer.Cache().Service().IteratorServices(serviceIterProc); err != nil { - log.Errorf("syn polaris services error %v", err) + log.Errorf("sync polaris services error %v", err) return err } diff --git a/auth/defaultauth/utils.go b/auth/defaultauth/utils.go index 09aeb0668..f8ddcfde8 100644 --- a/auth/defaultauth/utils.go +++ b/auth/defaultauth/utils.go @@ -108,40 +108,6 @@ func checkOwner(owner *wrappers.StringValue) error { return nil } -// checkMobile 检查用户的 mobile 信息 -func checkMobile(mobile *wrappers.StringValue) error { - if mobile == nil { - return nil - } - - if mobile.GetValue() == "" { - return nil - } - - if utf8.RuneCountInString(mobile.GetValue()) != 11 { - return errors.New("invalid mobile") - } - - return nil -} - -// checkEmail 检查用户的 email 信息 -func checkEmail(email *wrappers.StringValue) error { - if email == nil { - return nil - } - - if email.GetValue() == "" { - return nil - } - - if ok := regEmail.MatchString(email.GetValue()); !ok { - return errors.New("invalid email") - } - - return nil -} - // verifyAuth 用于 user、group 以及 strategy 模块的鉴权工作检查 func verifyAuth(ctx context.Context, isWrite bool, needOwner bool, authMgn *DefaultAuthChecker) (context.Context, *apiservice.Response) { diff --git a/cache/service/instance.go b/cache/service/instance.go index 913c8f34b..491b833b3 100644 --- a/cache/service/instance.go +++ b/cache/service/instance.go @@ -616,7 +616,6 @@ func (b *instancePorts) listPort(serviceID string) []*model.ServicePort { ret := make([]*model.ServicePort, 0, 4) val, ok := b.ports[serviceID] - if !ok { return ret } diff --git a/common/eventhub/types.go b/common/eventhub/types.go index 2f215ec8b..53cf6ac80 100644 --- a/common/eventhub/types.go +++ b/common/eventhub/types.go @@ -33,6 +33,8 @@ const ( CacheClientEventTopic = "cache_client_event" // CacheNamespaceEventTopic record cache occur namespace add/update/del event CacheNamespaceEventTopic = "cache_namespace_event" + // ClientEventTopic . + ClientEventTopic = "client_event" ) // PublishConfigFileEvent 事件对象,包含类型和事件消息 diff --git a/common/model/naming.go b/common/model/naming.go index 9f5ae4a40..f4a234c7d 100644 --- a/common/model/naming.go +++ b/common/model/naming.go @@ -859,6 +859,8 @@ const ( EventInstanceSendHeartbeat InstanceEventType = "InstanceSendHeartbeat" // EventInstanceUpdate Instance metadata and info update event EventInstanceUpdate InstanceEventType = "InstanceUpdate" + // EventClientOffline . + EventClientOffline InstanceEventType = "ClientOffline" ) // CtxEventKeyMetadata 用于将metadata从Context中传入并取出 @@ -893,3 +895,8 @@ func (i *InstanceEvent) String() string { return fmt.Sprintf("InstanceEvent(id=%s, namespace=%s, svcId=%s, service=%s, type=%v, instance=%s, healthy=%v)", i.Id, i.Namespace, i.SvcId, i.Service, i.EType, hostPortStr, i.Instance.GetHealthy().GetValue()) } + +type ClientEvent struct { + EType InstanceEventType + Id string +} diff --git a/common/redispool/pool.go b/common/redispool/pool.go index 83c3f55da..c4e46128e 100644 --- a/common/redispool/pool.go +++ b/common/redispool/pool.go @@ -20,6 +20,7 @@ package redispool // Resp ckv任务结果 type Resp struct { Value string + Values []interface{} Err error Exists bool Compatible bool @@ -37,22 +38,18 @@ type RedisObject interface { type Pool interface { // Start 启动ckv连接池工作 Start() - // Sdd 使用连接池,向redis发起Sdd请求 Sdd(id string, members []string) *Resp - // Srem 使用连接池,向redis发起Srem请求 Srem(id string, members []string) *Resp - // Get 使用连接池,向redis发起Get请求 Get(id string) *Resp - + // MGet 使用连接池,向redis发起MGet请求 + MGet(keys []string) *Resp // Set 使用连接池,向redis发起Set请求 Set(id string, redisObj RedisObject) *Resp - // Del 使用连接池,向redis发起Del请求 Del(id string) *Resp - // RecoverTimeSec the time second record when recover RecoverTimeSec() int64 } diff --git a/common/redispool/redis_pool.go b/common/redispool/redis_pool.go index bbb99e161..69c6689ca 100644 --- a/common/redispool/redis_pool.go +++ b/common/redispool/redis_pool.go @@ -43,6 +43,8 @@ const ( Sadd // Srem del method define Srem + // MGet multi get keys method define + MGet ) var ( @@ -52,6 +54,7 @@ var ( Del: "DEL", Sadd: "SADD", Srem: "SREM", + MGet: "MGET", } ) @@ -67,10 +70,23 @@ func toRedisKey(instanceID string, compatible bool) string { return fmt.Sprintf("%s%s", keyPrefix, instanceID) } +func toRedisKeys(instanceID []string, compatible bool) []string { + ret := make([]string, 0, len(instanceID)) + for i := range instanceID { + if compatible { + ret = append(ret, instanceID[i]) + } else { + ret = append(ret, fmt.Sprintf("%s%s", keyPrefix, instanceID[i])) + } + } + return ret +} + // Task ckv任务请求结构体 type Task struct { taskType int id string + ids []string value string members []string respChan chan *Resp @@ -152,6 +168,18 @@ func (p *redisPool) Get(id string) *Resp { return p.handleTask(task) } +// MGet 使用连接池,向redis发起 MGet 请求 +func (p *redisPool) MGet(keys []string) *Resp { + if err := p.checkRedisDead(); err != nil { + return &Resp{Err: err} + } + task := &Task{ + taskType: MGet, + ids: keys, + } + return p.handleTaskWithRetries(task) +} + // Sdd 使用连接池,向redis发起Sdd请求 func (p *redisPool) Sdd(id string, members []string) *Resp { if err := p.checkRedisDead(); err != nil { @@ -443,6 +471,8 @@ func (p *redisPool) doHandleTask(task *Task, piper redis.Pipeliner) redis.Cmder return piper.SAdd(context.Background(), task.id, task.members) case Srem: return piper.SRem(context.Background(), task.id, task.members) + case MGet: + return piper.MGet(context.Background(), toRedisKeys(task.ids, p.config.Compatible)...) default: return piper.Get(context.Background(), toRedisKey(task.id, p.config.Compatible)) } diff --git a/common/utils/common.go b/common/utils/common.go index 75b86ccaf..78bcea4d2 100644 --- a/common/utils/common.go +++ b/common/utils/common.go @@ -566,26 +566,6 @@ func CalculateContractID(namespace, service, name, protocol, version string) (st // CheckContractTetrad 根据服务实例四元组计算ID func CheckContractTetrad(req *apiservice.ServiceContract) (string, *apiservice.Response) { - if err := CheckResourceName(NewStringValue(req.GetService())); err != nil { - return "", api.NewResponse(apimodel.Code_InvalidServiceName) - } - - if err := CheckResourceName(NewStringValue(req.GetNamespace())); err != nil { - return "", api.NewResponse(apimodel.Code_InvalidNamespaceName) - } - - if err := CheckResourceName(NewStringValue(req.GetName())); err != nil { - return "", api.NewResponseWithMsg(apimodel.Code_BadRequest, "invalid service_contract name") - } - - if req.GetProtocol() == "" { - return "", api.NewResponseWithMsg(apimodel.Code_BadRequest, "invalid service_contract protocol") - } - - if req.GetVersion() == "" { - return "", api.NewResponseWithMsg(apimodel.Code_BadRequest, "invalid service_contract version") - } - id, err := CalculateContractID( req.GetNamespace(), req.GetService(), @@ -608,9 +588,6 @@ func CheckContractInterfaceTetrad(contractId string, source apiservice.Interface if req.GetId() != "" { return req.GetId(), nil } - if req.GetMethod() == "" { - return "", api.NewResponseWithMsg(apimodel.Code_BadRequest, "invalid service_contract interface method") - } if req.GetPath() == "" { return "", api.NewResponseWithMsg(apimodel.Code_BadRequest, "invalid service_contract interface path") } diff --git a/config/client.go b/config/client.go index d54736490..5f4076ddf 100644 --- a/config/client.go +++ b/config/client.go @@ -64,8 +64,8 @@ func (s *Server) GetConfigFileWithCache(ctx context.Context, return api.NewConfigClientResponse(apimodel.Code_NotFoundResource, req) } } - // 客户端版本号大于等于服务端版本号,服务端不返回变更 - if req.GetVersion().GetValue() >= release.Version { + // 客户端版本号大于服务端版本号,服务端不返回变更 + if req.GetVersion().GetValue() > release.Version { return api.NewConfigClientResponse(apimodel.Code_DataNoChange, req) } configFile, err := toClientInfo(req, release) diff --git a/config/config_file_release.go b/config/config_file_release.go index d2d07a5ff..cb79a7348 100644 --- a/config/config_file_release.go +++ b/config/config_file_release.go @@ -546,16 +546,15 @@ func (s *Server) handleRollbackConfigFileRelease(ctx context.Context, tx store.T // CasUpsertAndReleaseConfigFile 根据版本比对决定是否允许进行配置修改发布 func (s *Server) CasUpsertAndReleaseConfigFile(ctx context.Context, req *apiconfig.ConfigFilePublishInfo) *apiconfig.ConfigResponse { - + if err := CheckFileName(req.GetFileName()); err != nil { + return api.NewConfigResponse(apimodel.Code_InvalidConfigFileName) + } if err := utils.CheckResourceName(req.GetNamespace()); err != nil { return api.NewConfigResponseWithInfo(apimodel.Code_BadRequest, "invalid config namespace") } if err := utils.CheckResourceName(req.GetGroup()); err != nil { return api.NewConfigResponseWithInfo(apimodel.Code_BadRequest, "invalid config group") } - if err := CheckFileName(req.GetFileName()); err != nil { - return api.NewConfigResponseWithInfo(apimodel.Code_BadRequest, "invalid config file_name") - } upsertFileReq := &apiconfig.ConfigFile{ Name: req.GetFileName(), @@ -575,7 +574,9 @@ func (s *Server) CasUpsertAndReleaseConfigFile(ctx context.Context, tx, err := s.storage.StartTx() if err != nil { - log.Error("[Config][File] upsert config file when begin tx.", utils.RequestID(ctx), zap.Error(err)) + log.Error("[Config][File] upsert config file when begin tx.", utils.RequestID(ctx), + zap.String("namespace", req.GetNamespace().GetValue()), zap.String("group", req.GetGroup().GetValue()), + zap.String("fileName", req.GetFileName().GetValue()), zap.Error(err)) return api.NewConfigResponse(commonstore.StoreCode2APICode(err)) } @@ -588,7 +589,9 @@ func (s *Server) CasUpsertAndReleaseConfigFile(ctx context.Context, Name: req.GetFileName().GetValue(), }) if err != nil { - log.Error("[Config][File] lock config file when begin tx.", utils.RequestID(ctx), zap.Error(err)) + log.Error("[Config][File] lock config file when begin tx.", utils.RequestID(ctx), + zap.String("namespace", req.GetNamespace().GetValue()), zap.String("group", req.GetGroup().GetValue()), + zap.String("fileName", req.GetFileName().GetValue()), zap.Error(err)) return api.NewConfigResponse(commonstore.StoreCode2APICode(err)) } @@ -596,15 +599,17 @@ func (s *Server) CasUpsertAndReleaseConfigFile(ctx context.Context, var upsertResp *apiconfig.ConfigResponse if saveFile == nil { - if req.GetMd5().GetValue() != "" { - return api.NewConfigResponse(apimodel.Code_DataConflict) - } upsertResp = s.handleCreateConfigFile(ctx, tx, upsertFileReq) historyRecords = append(historyRecords, func() { s.RecordHistory(ctx, configFileRecordEntry(ctx, upsertFileReq, model.OCreate)) }) } else { - if req.GetMd5().GetValue() != CalMd5(saveFile.Content) { + actualMd5 := CalMd5(saveFile.Content) + if req.GetMd5().GetValue() != actualMd5 { + log.Error("[Config][File] cas compare config file.", utils.RequestID(ctx), + zap.String("namespace", req.GetNamespace().GetValue()), zap.String("group", req.GetGroup().GetValue()), + zap.String("fileName", req.GetFileName().GetValue()), + zap.String("expect", req.GetMd5().GetValue()), zap.String("actual", actualMd5)) return api.NewConfigResponse(apimodel.Code_DataConflict) } upsertResp = s.handleUpdateConfigFile(ctx, tx, upsertFileReq) diff --git a/config/config_file_release_test.go b/config/config_file_release_test.go index 90d4d3987..67e9ce356 100644 --- a/config/config_file_release_test.go +++ b/config/config_file_release_test.go @@ -19,6 +19,7 @@ package config_test import ( "testing" + "time" "github.com/polarismesh/specification/source/go/api/v1/config_manage" apimodel "github.com/polarismesh/specification/source/go/api/v1/model" @@ -673,7 +674,7 @@ func TestServer_CasUpsertAndReleaseConfigFile(t *testing.T) { }) t.Run("publish_cas", func(t *testing.T) { - // 发布灰度配置 + // 第一次配置发布,就算带了 MD5,也是可以发布成功 pubResp := testSuit.ConfigServer().CasUpsertAndReleaseConfigFileFromClient(testSuit.DefaultCtx, &config_manage.ConfigFilePublishInfo{ Namespace: utils.NewStringValue(mockNamespace), Group: utils.NewStringValue(mockGroup), @@ -683,19 +684,19 @@ func TestServer_CasUpsertAndReleaseConfigFile(t *testing.T) { Md5: wrapperspb.String(config.CalMd5(mockContent)), }) // 正常发布失败,数据冲突无法处理 - assert.Equal(t, uint32(apimodel.Code_DataConflict), pubResp.GetCode().GetValue(), pubResp.GetInfo().GetValue()) + assert.Equal(t, uint32(apimodel.Code_ExecuteSuccess), pubResp.GetCode().GetValue(), pubResp.GetInfo().GetValue()) - // 正常发布一个配置 - pubResp = testSuit.ConfigServer().UpsertAndReleaseConfigFileFromClient(testSuit.DefaultCtx, &config_manage.ConfigFilePublishInfo{ + // MD5 不一致,直接发布失败 + pubResp = testSuit.ConfigServer().CasUpsertAndReleaseConfigFileFromClient(testSuit.DefaultCtx, &config_manage.ConfigFilePublishInfo{ Namespace: utils.NewStringValue(mockNamespace), Group: utils.NewStringValue(mockGroup), FileName: utils.NewStringValue(mockFileName), ReleaseName: utils.NewStringValue(mockReleaseName), Content: utils.NewStringValue(mockContent), - Md5: wrapperspb.String(config.CalMd5(mockContent)), + Md5: wrapperspb.String(config.CalMd5(time.Now().UTC().GoString())), }) - // 正常发布成功 - assert.Equal(t, uint32(apimodel.Code_ExecuteSuccess), pubResp.GetCode().GetValue(), pubResp.GetInfo().GetValue()) + // 正常发布失败,数据冲突无法处理 + assert.Equal(t, uint32(apimodel.Code_DataConflict), pubResp.GetCode().GetValue(), pubResp.GetInfo().GetValue()) // 获取下当前配置的 Release queryRsp := testSuit.ConfigServer().GetConfigFileRelease(testSuit.DefaultCtx, &config_manage.ConfigFileRelease{ diff --git a/config/utils.go b/config/utils.go index ac82e9592..12a5f6454 100644 --- a/config/utils.go +++ b/config/utils.go @@ -48,11 +48,6 @@ func CheckFileName(name *wrappers.StringValue) error { if name.GetValue() == "" { return errors.New(utils.EmptyErrString) } - - if ok := regFileName.MatchString(name.GetValue()); !ok { - return errors.New("name contains invalid character") - } - return nil } diff --git a/go.mod b/go.mod index bdfc7b8a5..ccdc5ea1b 100644 --- a/go.mod +++ b/go.mod @@ -80,7 +80,7 @@ require ( require ( github.com/DATA-DOG/go-sqlmock v1.5.0 - github.com/polarismesh/specification v1.4.2-alpha.7 + github.com/polarismesh/specification v1.4.2-alpha.9 ) require github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect diff --git a/go.sum b/go.sum index a0be8da0a..1074118c7 100644 --- a/go.sum +++ b/go.sum @@ -296,8 +296,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/polarismesh/go-restful-openapi/v2 v2.0.0-20220928152401-083908d10219 h1:XnFyNUWnciM6zgXaz6tm+Egs35rhoD0KGMmKh4gCdi0= github.com/polarismesh/go-restful-openapi/v2 v2.0.0-20220928152401-083908d10219/go.mod h1:4WhwBysTom9Eoy0hQ4W69I0FmO+T0EpjEW9/5sgHoUk= -github.com/polarismesh/specification v1.4.2-alpha.7 h1:4BhlGD/xJ/092cuu/T5BgwAMwlPFg8vIMfBNMRyEtak= -github.com/polarismesh/specification v1.4.2-alpha.7/go.mod h1:rDvMMtl5qebPmqiBLNa5Ps0XtwkP31ZLirbH4kXA0YU= +github.com/polarismesh/specification v1.4.2-alpha.9 h1:5mJB0KcFWaNZFUpR0EtB3N8XbixXFCKQ3KegsEfAQ38= +github.com/polarismesh/specification v1.4.2-alpha.9/go.mod h1:rDvMMtl5qebPmqiBLNa5Ps0XtwkP31ZLirbH4kXA0YU= github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI= github.com/prometheus/client_golang v1.18.0 h1:HzFfmkOzH5Q8L8G+kSJKUx5dtG87sewO+FoDDqP5Tbk= github.com/prometheus/client_golang v1.18.0/go.mod h1:T+GXkCk5wSJyOqMIzVgvvjFDlkOQntgjkJWKrN5txjA= diff --git a/plugin/healthchecker.go b/plugin/healthchecker.go index 218c082b7..c999a755c 100644 --- a/plugin/healthchecker.go +++ b/plugin/healthchecker.go @@ -56,6 +56,11 @@ type QueryRequest struct { Healthy bool } +// BatchQueryRequest batch query heartbeat request +type BatchQueryRequest struct { + Requests []*QueryRequest +} + // QueryResponse query heartbeat response type QueryResponse struct { Server string @@ -64,6 +69,11 @@ type QueryResponse struct { Count int64 } +// BatchQueryResponse batch query heartbeat response +type BatchQueryResponse struct { + Responses []*QueryResponse +} + // AddCheckRequest add check request type AddCheckRequest struct { Instances []string @@ -92,6 +102,8 @@ type HealthChecker interface { Check(request *CheckRequest) (*CheckResponse, error) // Query queries the heartbeat time Query(ctx context.Context, request *QueryRequest) (*QueryResponse, error) + // BatchQuery batch queries the heartbeat time + BatchQuery(ctx context.Context, request *BatchQueryRequest) (*BatchQueryResponse, error) // Suspend health checker for entire expired duration manually Suspend() // SuspendTimeSec get the suspend time in seconds diff --git a/plugin/healthchecker/leader/beat_cache.go b/plugin/healthchecker/leader/beat_cache.go index 5027f9dd3..71f5510cd 100644 --- a/plugin/healthchecker/leader/beat_cache.go +++ b/plugin/healthchecker/leader/beat_cache.go @@ -56,23 +56,25 @@ type ( // HashFunction hash function to caul record id need locate in SegmentMap HashFunction func(string) int // RecordSaver beat record saver - RecordSaver func(req *apiservice.HeartbeatsRequest) + RecordSaver func(req *apiservice.HeartbeatsRequest) error // RecordDelter beat record delter - RecordDelter func(req *apiservice.DelHeartbeatsRequest) + RecordDelter func(req *apiservice.DelHeartbeatsRequest) error // RecordGetter beat record getter - RecordGetter func(req *apiservice.GetHeartbeatsRequest) *apiservice.GetHeartbeatsResponse + RecordGetter func(req *apiservice.GetHeartbeatsRequest) (*apiservice.GetHeartbeatsResponse, error) // BeatRecordCache Heartbeat data cache BeatRecordCache interface { // Get get records - Get(keys ...string) map[string]*ReadBeatRecord + Get(keys ...string) (map[string]*ReadBeatRecord, error) // Put put records - Put(records ...WriteBeatRecord) + Put(records ...WriteBeatRecord) error // Del del records - Del(keys ...string) + Del(keys ...string) error // Clean . Clean() // Snapshot Snapshot() map[string]*ReadBeatRecord + // Ping + Ping() error } ) @@ -98,7 +100,11 @@ type LocalBeatRecordCache struct { beatCache *utils.SegmentMap[string, RecordValue] } -func (lc *LocalBeatRecordCache) Get(keys ...string) map[string]*ReadBeatRecord { +func (lc *LocalBeatRecordCache) Ping() error { + return nil +} + +func (lc *LocalBeatRecordCache) Get(keys ...string) (map[string]*ReadBeatRecord, error) { lc.lock.RLock() defer lc.lock.RUnlock() ret := make(map[string]*ReadBeatRecord, len(keys)) @@ -110,10 +116,10 @@ func (lc *LocalBeatRecordCache) Get(keys ...string) map[string]*ReadBeatRecord { Exist: ok, } } - return ret + return ret, nil } -func (lc *LocalBeatRecordCache) Put(records ...WriteBeatRecord) { +func (lc *LocalBeatRecordCache) Put(records ...WriteBeatRecord) error { lc.lock.RLock() defer lc.lock.RUnlock() for i := range records { @@ -123,9 +129,10 @@ func (lc *LocalBeatRecordCache) Put(records ...WriteBeatRecord) { } lc.beatCache.Put(record.Key, record.Record) } + return nil } -func (lc *LocalBeatRecordCache) Del(keys ...string) { +func (lc *LocalBeatRecordCache) Del(keys ...string) error { lc.lock.RLock() defer lc.lock.RUnlock() for i := range keys { @@ -135,6 +142,7 @@ func (lc *LocalBeatRecordCache) Del(keys ...string) { plog.Debug("delete result", zap.String("key", key), zap.Bool("exist", ok)) } } + return nil } func (lc *LocalBeatRecordCache) Clean() { @@ -159,11 +167,12 @@ func (lc *LocalBeatRecordCache) Snapshot() map[string]*ReadBeatRecord { // newRemoteBeatRecordCache func newRemoteBeatRecordCache(getter RecordGetter, saver RecordSaver, - delter RecordDelter) BeatRecordCache { + delter RecordDelter, ping func() error) BeatRecordCache { return &RemoteBeatRecordCache{ getter: getter, saver: saver, delter: delter, + ping: ping, } } @@ -172,18 +181,26 @@ type RemoteBeatRecordCache struct { saver RecordSaver delter RecordDelter getter RecordGetter + ping func() error } -func (rc *RemoteBeatRecordCache) Get(keys ...string) map[string]*ReadBeatRecord { +func (rc *RemoteBeatRecordCache) Ping() error { + return rc.ping() +} + +func (rc *RemoteBeatRecordCache) Get(keys ...string) (map[string]*ReadBeatRecord, error) { ret := make(map[string]*ReadBeatRecord) for i := range keys { ret[keys[i]] = &ReadBeatRecord{ Exist: false, } } - resp := rc.getter(&apiservice.GetHeartbeatsRequest{ + resp, err := rc.getter(&apiservice.GetHeartbeatsRequest{ InstanceIds: keys, }) + if err != nil { + return nil, err + } records := resp.GetRecords() for i := range records { record := records[i] @@ -197,10 +214,10 @@ func (rc *RemoteBeatRecordCache) Get(keys ...string) map[string]*ReadBeatRecord CurTimeSec: record.GetLastHeartbeatSec(), } } - return ret + return ret, nil } -func (rc *RemoteBeatRecordCache) Put(records ...WriteBeatRecord) { +func (rc *RemoteBeatRecordCache) Put(records ...WriteBeatRecord) error { req := &apiservice.HeartbeatsRequest{ Heartbeats: make([]*apiservice.InstanceHeartbeat, 0, len(records)), } @@ -210,14 +227,14 @@ func (rc *RemoteBeatRecordCache) Put(records ...WriteBeatRecord) { InstanceId: record.Key, }) } - rc.saver(req) + return rc.saver(req) } -func (rc *RemoteBeatRecordCache) Del(key ...string) { +func (rc *RemoteBeatRecordCache) Del(key ...string) error { req := &apiservice.DelHeartbeatsRequest{ InstanceIds: key, } - rc.delter(req) + return rc.delter(req) } func (lc *RemoteBeatRecordCache) Clean() { diff --git a/plugin/healthchecker/leader/checker_leader.go b/plugin/healthchecker/leader/checker_leader.go index 4c62a4553..54c8be57c 100644 --- a/plugin/healthchecker/leader/checker_leader.go +++ b/plugin/healthchecker/leader/checker_leader.go @@ -268,7 +268,7 @@ func (c *LeaderHealthChecker) Report(ctx context.Context, request *plugin.Report }, Key: request.InstanceId, } - if err := responsible.Put(record); err != nil { + if err := responsible.Storage().Put(record); err != nil { return err } if log.DebugEnabled() { @@ -322,6 +322,50 @@ func (c *LeaderHealthChecker) Check(request *plugin.CheckRequest) (*plugin.Check return checkResp, nil } +func (c *LeaderHealthChecker) BatchQuery(ctx context.Context, request *plugin.BatchQueryRequest) (*plugin.BatchQueryResponse, error) { + if isSendFromPeer(ctx) { + return nil, ErrorRedirectOnlyOnce + } + + c.lock.RLock() + defer c.lock.RUnlock() + if !c.isInitialize() { + plog.Infof("[Health Check][Leader] leader checker uninitialize, ignore query") + return &plugin.BatchQueryResponse{}, errors.New("leader checker uninitialize") + } + responsible := c.findLeaderPeer() + + keys := make([]string, 0, len(request.Requests)) + for i := range request.Requests { + keys = append(keys, request.Requests[i].InstanceId) + } + ret, err := responsible.Storage().Get(keys...) + if err != nil { + return nil, err + } + + rsp := &plugin.BatchQueryResponse{Responses: make([]*plugin.QueryResponse, 0, len(request.Requests))} + for i := range request.Requests { + req := request.Requests[i] + record, ok := ret[req.InstanceId] + if !ok { + rsp.Responses = append(rsp.Responses, &plugin.QueryResponse{ + Server: responsible.Host(), + Exists: false, + }) + } else { + rsp.Responses = append(rsp.Responses, &plugin.QueryResponse{ + Server: responsible.Host(), + Exists: record.Exist, + LastHeartbeatSec: record.Record.CurTimeSec, + Count: record.Record.Count, + }) + } + + } + return rsp, nil +} + // Query queries the heartbeat time func (c *LeaderHealthChecker) Query(ctx context.Context, request *plugin.QueryRequest) (*plugin.QueryResponse, error) { if isSendFromPeer(ctx) { @@ -337,10 +381,11 @@ func (c *LeaderHealthChecker) Query(ctx context.Context, request *plugin.QueryRe }, nil } responsible := c.findLeaderPeer() - record, err := responsible.Get(request.InstanceId) + ret, err := responsible.Storage().Get(request.InstanceId) if err != nil { return nil, err } + record := ret[request.InstanceId] if log.DebugEnabled() { log.Debugf("[HealthCheck][Leader] query hb record, instanceId %s, record %+v", request.InstanceId, record) } @@ -360,7 +405,7 @@ func (c *LeaderHealthChecker) Delete(ctx context.Context, key string) error { c.lock.RLock() defer c.lock.RUnlock() responsible := c.findLeaderPeer() - return responsible.Del(key) + return responsible.Storage().Del(key) } // Suspend checker for an entire expired interval @@ -429,6 +474,33 @@ func (c *LeaderHealthChecker) isLeader() bool { return atomic.LoadInt32(&c.leader) == 1 } +const ( + errCountThreshold = 2 + maxCheckCount = 3 +) + +func (c *LeaderHealthChecker) checkLeaderAlive(ctx context.Context) { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + peer := c.findLeaderPeer() + if peer == nil { + // 可能是在 Leader 调整中,不处理探测 + continue + } + + if !peer.IsAlive() { + log.Infof("[Health Check][Leader] leader peer not alive, do suspend") + c.Suspend() + } + } + } +} + func (c *LeaderHealthChecker) DebugHandlers() []model.DebugHandler { return []model.DebugHandler{ { diff --git a/plugin/healthchecker/leader/peer.go b/plugin/healthchecker/leader/peer.go index 1fa67c40a..f10d9b119 100644 --- a/plugin/healthchecker/leader/peer.go +++ b/plugin/healthchecker/leader/peer.go @@ -21,8 +21,8 @@ import ( "context" "errors" "fmt" + "io" "math/rand" - "net" "sync" "sync/atomic" "time" @@ -37,6 +37,10 @@ import ( "github.com/polarismesh/polaris/common/utils" ) +var ( + ErrorLeaderNotAlive = errors.New("leader not alive") +) + var ( NewLocalPeerFunc = newLocalPeer NewRemotePeerFunc = newRemotePeer @@ -56,12 +60,6 @@ type Peer interface { Initialize(conf Config) // Serve . Serve(ctx context.Context, checker *LeaderHealthChecker, listenIP string, listenPort uint32) error - // Get . - Get(key string) (*ReadBeatRecord, error) - // Put . - Put(record WriteBeatRecord) error - // Del . - Del(key string) error // Close . Close() error // Host . @@ -100,24 +98,6 @@ func (p *LocalPeer) Host() string { return utils.LocalHost } -// Get get records -func (p *LocalPeer) Get(key string) (*ReadBeatRecord, error) { - ret := p.Cache.Get(key) - return ret[key], nil -} - -// Put put records -func (p *LocalPeer) Put(record WriteBeatRecord) error { - p.Cache.Put(record) - return nil -} - -// Del del records -func (p *LocalPeer) Del(key string) error { - p.Cache.Del(key) - return nil -} - // Close close peer life func (p *LocalPeer) Close() error { log.Info("[HealthCheck][Leader] local peer close") @@ -150,47 +130,25 @@ type RemotePeer struct { conf Config // closed . closed int32 + // leaderAlive . + leaderAlive int32 } func (p *RemotePeer) Initialize(conf Config) { p.conf = conf } -func (p *RemotePeer) isClose() bool { - return atomic.LoadInt32(&p.closed) == 1 -} - func (p *RemotePeer) Serve(_ context.Context, checker *LeaderHealthChecker, listenIP string, listenPort uint32) error { ctx, cancel := context.WithCancel(context.Background()) p.cancel = cancel p.host = listenIP p.port = listenPort - p.conns = make([]*grpc.ClientConn, 0, streamNum) - p.puters = make([]*beatSender, 0, streamNum) - for i := 0; i < streamNum; i++ { - conn, err := grpc.DialContext(ctx, fmt.Sprintf("%s:%d", listenIP, listenPort), - grpc.WithBlock(), - grpc.WithTransportCredentials(insecure.NewCredentials()), - ) - if err != nil { - _ = p.Close() - return err - } - p.conns = append(p.conns, conn) + if err := p.doConnect(); err != nil { + return err } - for i := 0; i < streamNum; i++ { - client := apiservice.NewPolarisHeartbeatGRPCClient(p.conns[i]) - puter, err := client.BatchHeartbeat(ctx, grpc.Header(&metadata.MD{ - sendResource: []string{utils.LocalHost}, - })) - if err != nil { - _ = p.Close() - return err - } - p.puters = append(p.puters, newBeatSender(ctx, p, puter)) - } - p.Cache = newRemoteBeatRecordCache(p.GetFunc, p.PutFunc, p.DelFunc) + p.Cache = newRemoteBeatRecordCache(p.GetFunc, p.PutFunc, p.DelFunc, p.Ping) + go p.checkLeaderAlive(ctx) return nil } @@ -199,38 +157,22 @@ func (p *RemotePeer) Host() string { } func (p *RemotePeer) IsAlive() bool { - conn, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%v", p.Host(), p.port), time.Second) - defer func() { - if conn != nil { - _ = conn.Close() - } - }() - - if err != nil { - return false - } - return true + return atomic.LoadInt32(&p.leaderAlive) == 1 } -// Get get records -func (p *RemotePeer) Get(key string) (*ReadBeatRecord, error) { - ret := p.Cache.Get(key) - return ret[key], nil -} - -// Put put records -func (p *RemotePeer) Put(record WriteBeatRecord) error { - p.Cache.Put(record) - return nil -} - -// Del del records -func (p *RemotePeer) Del(key string) error { - p.Cache.Del(key) - return nil +func (p *RemotePeer) Ping() error { + client := p.choseOneClient() + _, err := client.BatchGetHeartbeat(context.Background(), &apiservice.GetHeartbeatsRequest{}, + grpc.Header(&metadata.MD{ + sendResource: []string{utils.LocalHost}, + })) + return err } -func (p *RemotePeer) GetFunc(req *apiservice.GetHeartbeatsRequest) *apiservice.GetHeartbeatsResponse { +func (p *RemotePeer) GetFunc(req *apiservice.GetHeartbeatsRequest) (*apiservice.GetHeartbeatsResponse, error) { + if !p.IsAlive() { + return nil, ErrorLeaderNotAlive + } start := time.Now() code := "0" defer func() { @@ -248,12 +190,15 @@ func (p *RemotePeer) GetFunc(req *apiservice.GetHeartbeatsRequest) *apiservice.G code = "-1" plog.Error("[HealthCheck][Leader] send get record request", zap.String("host", p.Host()), zap.Uint32("port", p.port), zap.Error(err)) - return &apiservice.GetHeartbeatsResponse{} + return nil, err } - return resp + return resp, nil } -func (p *RemotePeer) PutFunc(req *apiservice.HeartbeatsRequest) { +func (p *RemotePeer) PutFunc(req *apiservice.HeartbeatsRequest) error { + if !p.IsAlive() { + return ErrorLeaderNotAlive + } start := time.Now() code := "0" defer func() { @@ -263,15 +208,19 @@ func (p *RemotePeer) PutFunc(req *apiservice.HeartbeatsRequest) { }) observer.Observe(float64(time.Since(start).Milliseconds())) }() - index := rand.Intn(len(p.puters)) - if err := p.puters[index].Send(req); err != nil { + if err := p.choseOneSender().Send(req); err != nil { code = "-1" plog.Error("[HealthCheck][Leader] send put record request", zap.String("host", p.Host()), zap.Uint32("port", p.port), zap.Error(err)) + return err } + return nil } -func (p *RemotePeer) DelFunc(req *apiservice.DelHeartbeatsRequest) { +func (p *RemotePeer) DelFunc(req *apiservice.DelHeartbeatsRequest) error { + if !p.IsAlive() { + return ErrorLeaderNotAlive + } start := time.Now() code := "0" defer func() { @@ -288,12 +237,9 @@ func (p *RemotePeer) DelFunc(req *apiservice.DelHeartbeatsRequest) { code = "-1" plog.Error("send del record request", zap.String("host", p.Host()), zap.Uint32("port", p.port), zap.Error(err)) + return err } -} - -func (p *RemotePeer) choseOneClient() apiservice.PolarisHeartbeatGRPCClient { - index := rand.Intn(len(p.conns)) - return apiservice.NewPolarisHeartbeatGRPCClient(p.conns[index]) + return nil } func (p *RemotePeer) Storage() BeatRecordCache { @@ -305,6 +251,47 @@ func (p *RemotePeer) Close() error { if !atomic.CompareAndSwapInt32(&p.closed, 0, 1) { return nil } + p.doClose() + return nil +} + +func (p *RemotePeer) choseOneClient() apiservice.PolarisHeartbeatGRPCClient { + index := rand.Intn(len(p.conns)) + return apiservice.NewPolarisHeartbeatGRPCClient(p.conns[index]) +} + +func (p *RemotePeer) choseOneSender() *beatSender { + index := rand.Intn(len(p.puters)) + return p.puters[index] +} + +func (p *RemotePeer) checkLeaderAlive(ctx context.Context) { + ticker := time.NewTicker(time.Second) + for { + select { + case <-ctx.Done(): + ticker.Stop() + case <-ticker.C: + var errCount int + for i := 0; i < maxCheckCount; i++ { + if err := p.Ping(); err != nil { + plog.Error("check leader is alive fail", zap.String("host", p.Host()), + zap.Uint32("port", p.port), zap.Error(err)) + errCount++ + } + } + if errCount >= errCountThreshold { + log.Warn("[Health Check][Leader] leader peer not alive, set leader is dead", zap.String("host", p.Host()), + zap.Uint32("port", p.port)) + atomic.StoreInt32(&p.leaderAlive, 0) + } else { + atomic.StoreInt32(&p.leaderAlive, 1) + } + } + } +} + +func (p *RemotePeer) doClose() { if p.cancel != nil { p.cancel() } @@ -318,39 +305,59 @@ func (p *RemotePeer) Close() error { _ = p.conns[i].Close() } } +} + +func (p *RemotePeer) doConnect() error { + p.conns = make([]*grpc.ClientConn, 0, streamNum) + p.puters = make([]*beatSender, 0, streamNum) + for i := 0; i < streamNum; i++ { + conn, err := grpc.DialContext(context.Background(), fmt.Sprintf("%s:%d", p.Host(), p.port), + grpc.WithBlock(), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithTimeout(5*time.Second), + ) + if err != nil { + p.doClose() + return err + } + p.conns = append(p.conns, conn) + } + for i := 0; i < streamNum; i++ { + client := apiservice.NewPolarisHeartbeatGRPCClient(p.conns[i]) + puter, err := client.BatchHeartbeat(context.Background(), grpc.Header(&metadata.MD{ + sendResource: []string{utils.LocalHost}, + })) + if err != nil { + p.doClose() + return err + } + sender := &beatSender{ + peer: p, + lock: &sync.RWMutex{}, + sender: puter, + } + p.puters = append(p.puters, sender) + } return nil } -var ( - ErrorRecordNotFound = errors.New("beat record not found") - ErrorPeerClosed = errors.New("peer alrady closed") -) +func newBeatSender(p *RemotePeer, client apiservice.PolarisHeartbeatGRPC_BatchHeartbeatClient) *beatSender { + ctx, cancel := context.WithCancel(context.Background()) + sender := &beatSender{ + peer: p, + lock: &sync.RWMutex{}, + sender: client, + cancel: cancel, + } + go sender.doRecv(ctx) + return sender +} type beatSender struct { - lock sync.RWMutex + peer *RemotePeer + lock *sync.RWMutex sender apiservice.PolarisHeartbeatGRPC_BatchHeartbeatClient -} - -func newBeatSender(ctx context.Context, p *RemotePeer, sender apiservice.PolarisHeartbeatGRPC_BatchHeartbeatClient) *beatSender { - go func(ctx context.Context) { - for { - select { - case <-ctx.Done(): - plog.Info("[HealthCheck][Leader] cancel receive put record result", zap.String("host", p.Host()), - zap.Uint32("port", p.port)) - return - default: - if _, err := sender.Recv(); err != nil { - plog.Error("[HealthCheck][Leader] receive put record result", zap.String("host", p.Host()), - zap.Uint32("port", p.port), zap.Error(err)) - } - } - } - }(ctx) - - return &beatSender{ - sender: sender, - } + cancel context.CancelFunc } func (s *beatSender) Send(req *apiservice.HeartbeatsRequest) error { @@ -359,6 +366,27 @@ func (s *beatSender) Send(req *apiservice.HeartbeatsRequest) error { return s.sender.Send(req) } +func (s *beatSender) doRecv(ctx context.Context) { + for { + select { + case <-ctx.Done(): + plog.Info("[HealthCheck][Leader] cancel receive put record result", zap.String("host", s.peer.Host()), + zap.Uint32("port", s.peer.port)) + return + default: + if _, err := s.sender.Recv(); err != nil { + if err != io.EOF { + plog.Error("[HealthCheck][Leader] receive put record result", zap.String("host", s.peer.Host()), + zap.Uint32("port", s.peer.port), zap.Error(err)) + } + } + } + } +} + func (s *beatSender) close() error { + if s.cancel != nil { + s.cancel() + } return s.sender.CloseSend() } diff --git a/plugin/healthchecker/leader/peer_test.go b/plugin/healthchecker/leader/peer_test.go index e727f3602..cf15b8106 100644 --- a/plugin/healthchecker/leader/peer_test.go +++ b/plugin/healthchecker/leader/peer_test.go @@ -133,12 +133,12 @@ func TestLocalPeer(t *testing.T) { mockKey := utils.NewUUID() mockVal := time.Now().Unix() - ret, err := localPeer.Get(mockKey) + ret, err := localPeer.Storage().Get(mockKey) assert.NoError(t, err) assert.NotNil(t, ret) - assert.False(t, ret.Exist) + assert.False(t, ret[mockKey].Exist) - err = localPeer.Put(WriteBeatRecord{ + err = localPeer.Storage().Put(WriteBeatRecord{ Record: RecordValue{ CurTimeSec: mockVal, Count: 0, @@ -147,19 +147,19 @@ func TestLocalPeer(t *testing.T) { }) assert.NoError(t, err) - ret, err = localPeer.Get(mockKey) + ret, err = localPeer.Storage().Get(mockKey) assert.NoError(t, err) assert.NotNil(t, ret) - assert.True(t, ret.Exist) - assert.Equal(t, mockVal, ret.Record.CurTimeSec) + assert.True(t, ret[mockKey].Exist) + assert.Equal(t, mockVal, ret[mockKey].Record.CurTimeSec) - err = localPeer.Del(mockKey) + err = localPeer.Storage().Del(mockKey) assert.NoError(t, err) - ret, err = localPeer.Get(mockKey) + ret, err = localPeer.Storage().Get(mockKey) assert.NoError(t, err) assert.NotNil(t, ret) - assert.False(t, ret.Exist) + assert.False(t, ret[mockKey].Exist) err = localPeer.Close() assert.NoError(t, err) @@ -205,12 +205,12 @@ func TestRemotePeer(t *testing.T) { mockKey := utils.NewUUID() mockVal := time.Now().Unix() - ret, err := remotePeer.Get(mockKey) + ret, err := remotePeer.Storage().Get(mockKey) assert.NoError(t, err) assert.NotNil(t, ret) - assert.False(t, ret.Exist) + assert.False(t, ret[mockKey].Exist) - err = remotePeer.Put(WriteBeatRecord{ + err = remotePeer.Storage().Put(WriteBeatRecord{ Record: RecordValue{ CurTimeSec: mockVal, Count: 0, @@ -219,19 +219,19 @@ func TestRemotePeer(t *testing.T) { }) assert.NoError(t, err) - ret, err = remotePeer.Get(mockKey) + ret, err = remotePeer.Storage().Get(mockKey) assert.NoError(t, err) assert.NotNil(t, ret) - assert.True(t, ret.Exist) - assert.True(t, mockVal <= ret.Record.CurTimeSec) + assert.True(t, ret[mockKey].Exist) + assert.True(t, mockVal <= ret[mockKey].Record.CurTimeSec) - err = remotePeer.Del(mockKey) + err = remotePeer.Storage().Del(mockKey) assert.NoError(t, err) - ret, err = remotePeer.Get(mockKey) + ret, err = remotePeer.Storage().Get(mockKey) assert.NoError(t, err) assert.NotNil(t, ret) - assert.False(t, ret.Exist) + assert.False(t, ret[mockKey].Exist) err = remotePeer.Close() assert.NoError(t, err) @@ -308,7 +308,7 @@ func (ms *MockPolarisGRPCServer) BatchHeartbeat(svr service_manage.PolarisHeartb heartbeats := req.GetHeartbeats() for i := range heartbeats { - ms.peer.Put(WriteBeatRecord{ + ms.peer.Storage().Put(WriteBeatRecord{ Record: RecordValue{ CurTimeSec: time.Now().Unix(), }, @@ -328,14 +328,14 @@ func (ms *MockPolarisGRPCServer) BatchGetHeartbeat(_ context.Context, keys := req.GetInstanceIds() records := make([]*service_manage.HeartbeatRecord, 0, len(keys)) for i := range keys { - ret, err := ms.peer.Get(keys[i]) + ret, err := ms.peer.Storage().Get(keys[i]) if err != nil { return nil, err } record := &service_manage.HeartbeatRecord{ InstanceId: keys[i], - LastHeartbeatSec: ret.Record.CurTimeSec, - Exist: ret.Exist, + LastHeartbeatSec: ret[keys[i]].Record.CurTimeSec, + Exist: ret[keys[i]].Exist, } records = append(records, record) } @@ -349,7 +349,7 @@ func (ms *MockPolarisGRPCServer) BatchDelHeartbeat(_ context.Context, req *service_manage.DelHeartbeatsRequest) (*service_manage.DelHeartbeatsResponse, error) { keys := req.GetInstanceIds() for i := range keys { - if err := ms.peer.Del(keys[i]); err != nil { + if err := ms.peer.Storage().Del(keys[i]); err != nil { return nil, err } } diff --git a/plugin/healthchecker/memory/checker_memory.go b/plugin/healthchecker/memory/checker_memory.go index 8182ebbe7..5395b238b 100644 --- a/plugin/healthchecker/memory/checker_memory.go +++ b/plugin/healthchecker/memory/checker_memory.go @@ -98,6 +98,18 @@ func (r *MemoryHealthChecker) Query(ctx context.Context, request *plugin.QueryRe }, nil } +func (r *MemoryHealthChecker) BatchQuery(ctx context.Context, request *plugin.BatchQueryRequest) (*plugin.BatchQueryResponse, error) { + rsp := &plugin.BatchQueryResponse{Responses: make([]*plugin.QueryResponse, 0, len(request.Requests))} + for i := range request.Requests { + subRsp, err := r.Query(ctx, request.Requests[i]) + if err != nil { + return nil, err + } + rsp.Responses = append(rsp.Responses, subRsp) + } + return rsp, nil +} + func (r *MemoryHealthChecker) skipCheck(instanceId string, expireDurationSec int64) bool { suspendTimeSec := r.SuspendTimeSec() localCurTimeSec := commontime.CurrentMillisecond() / 1000 diff --git a/plugin/healthchecker/redis/checker_redis.go b/plugin/healthchecker/redis/checker_redis.go index 7897c8925..349d6a56f 100644 --- a/plugin/healthchecker/redis/checker_redis.go +++ b/plugin/healthchecker/redis/checker_redis.go @@ -215,6 +215,44 @@ func (r *RedisHealthChecker) Query(ctx context.Context, request *plugin.QueryReq return queryResp, nil } +func (r *RedisHealthChecker) BatchQuery(ctx context.Context, request *plugin.BatchQueryRequest) (*plugin.BatchQueryResponse, error) { + keys := make([]string, 0, len(request.Requests)) + for i := range request.Requests { + keys = append(keys, request.Requests[i].InstanceId) + } + + resp := r.checkPool.MGet(keys) + if resp.Err != nil { + log.Errorf("[Health Check][RedisCheck] mget redis err:%s", resp.Err) + return nil, resp.Err + } + values := resp.Values + queryResp := &plugin.BatchQueryResponse{ + Responses: make([]*plugin.QueryResponse, 0, len(values)), + } + if len(values) == 0 { + return queryResp, nil + } + for i := range values { + subRsp := &plugin.QueryResponse{} + value := values[i] + if value == nil { + subRsp.Exists = false + } else { + heathCheckRecord := &HeathCheckRecord{} + if err := heathCheckRecord.Deserialize(fmt.Sprintf("%+v", value), resp.Compatible); err != nil { + log.Errorf("[Health Check][RedisCheck] mget parse %s err:%v", value, err) + return nil, err + } + subRsp.Server = heathCheckRecord.LocalHost + subRsp.LastHeartbeatSec = heathCheckRecord.CurTimeSec + subRsp.Count = heathCheckRecord.Count + subRsp.Exists = true + } + } + return queryResp, nil +} + const maxCheckDuration = 500 * time.Second func (r *RedisHealthChecker) skipCheck(instanceId string, expireDurationSec int64) bool { diff --git a/plugin/healthchecker/redis/checker_redis_test.go b/plugin/healthchecker/redis/checker_redis_test.go index f1c8a6e10..f436270ac 100644 --- a/plugin/healthchecker/redis/checker_redis_test.go +++ b/plugin/healthchecker/redis/checker_redis_test.go @@ -77,6 +77,23 @@ func (m *mockPool) Get(id string) *redispool.Resp { } } +// Get 使用连接池,向redis发起Get请求 +func (m *mockPool) MGet(id []string) *redispool.Resp { + rsp := &redispool.Resp{ + Values: make([]interface{}, 0, len(id)), + Compatible: m.compatible, + } + for i := range id { + value, ok := m.itemValues[id[i]] + if ok { + rsp.Values = append(rsp.Values, value) + } else { + rsp.Values = append(rsp.Values, nil) + } + } + return rsp +} + // Set 使用连接池,向redis发起Set请求 func (m *mockPool) Set(id string, redisObj redispool.RedisObject) *redispool.Resp { value := redisObj.Serialize(m.compatible) diff --git a/plugin/statis/prometheus/statis.go b/plugin/statis/prometheus/statis.go index d7efd2528..e3528b2f9 100644 --- a/plugin/statis/prometheus/statis.go +++ b/plugin/statis/prometheus/statis.go @@ -114,6 +114,10 @@ func (s *StatisWorker) registerMetrics() error { // ReportCallMetrics report call metrics info func (s *StatisWorker) ReportCallMetrics(metric metrics.CallMetric) { + // 只上报服务端接受客户端请求调用的结果 + if metric.Type != metrics.ServerCallMetric { + return + } s.BaseWorker.ReportCallMetrics(metric) } diff --git a/release/cluster/helm/templates/config-polaris-server.yaml b/release/cluster/helm/templates/config-polaris-server.yaml index 2e599bef0..4660cafb1 100644 --- a/release/cluster/helm/templates/config-polaris-server.yaml +++ b/release/cluster/helm/templates/config-polaris-server.yaml @@ -209,16 +209,6 @@ data: whiteList: 127.0.0.1 purgeCounterInterval: 10s purgeCounterExpired: 5s - - name: service-nacos - option: - listenIP: "0.0.0.0" - listenPort: {{ .Values.service.nacosPort }} - # 设置 nacos 默认命名空间对应 Polaris 命名空间信息 - defaultNamespace: default - connLimit: - openConnLimit: false - maxConnPerHost: 128 - maxConnLimit: 10240 - name: api-http # 协议名,全局唯一 option: listenIP: "0.0.0.0" @@ -278,7 +268,7 @@ data: - name: service-nacos option: listenIP: "0.0.0.0" - listenPort: 8848 + listenPort: {{ .Values.service.nacosPort }} # 设置 nacos 默认命名空间对应 Polaris 命名空间信息 defaultNamespace: default connLimit: diff --git a/release/cluster/helm/templates/polaris-server.yaml b/release/cluster/helm/templates/polaris-server.yaml index 731041364..bc50dbe8c 100644 --- a/release/cluster/helm/templates/polaris-server.yaml +++ b/release/cluster/helm/templates/polaris-server.yaml @@ -30,9 +30,9 @@ spec: - port: {{ .Values.service.eurekaPort }} name: service-nacos-http targetPort: {{ .Values.service.nacosPort }} - - port: {{ .Values.service.nacosPort + 1000 }} + - port: {{ add .Values.service.nacosPort 1000 }} name: service-nacos-grpc - targetPort: {{ .Values.service.nacosPort + 1000 }} + targetPort: {{ add .Values.service.nacosPort 1000 }} selector: app: polaris --- diff --git a/release/conf/polaris-server.yaml b/release/conf/polaris-server.yaml index e72911d65..dfb8f2f6d 100644 --- a/release/conf/polaris-server.yaml +++ b/release/conf/polaris-server.yaml @@ -461,9 +461,9 @@ store: # master: # dbType: mysql # dbName: polaris_server - # dbUser: ##DB_USER## - # dbPwd: ##DB_PWD## - # dbAddr: ##DB_ADDR## + # dbUser: ${MYSQL_USER} ##DB_USER## + # dbPwd: ${MYSQL_PWD} ##DB_PWD## + # dbAddr: ${MYSQL_HOST} ##DB_ADDR## # maxOpenConns: 300 # maxIdleConns: 50 # connMaxLifetime: 300 # Unit second diff --git a/service/api_v1.go b/service/api_v1.go index 388631c8e..236aaa734 100644 --- a/service/api_v1.go +++ b/service/api_v1.go @@ -181,7 +181,7 @@ type ClientServer interface { // GetFaultDetectWithCache User Client Get FaultDetect Rule Information GetFaultDetectWithCache(ctx context.Context, req *apiservice.Service) *apiservice.DiscoverResponse // GetServiceContractWithCache User Client Get ServiceContract Rule Information - GetServiceContractWithCache(ctx context.Context, req *apiservice.ServiceContract) *apiservice.DiscoverResponse + GetServiceContractWithCache(ctx context.Context, req *apiservice.ServiceContract) *apiservice.Response // UpdateInstance update one instance by client UpdateInstance(ctx context.Context, req *apiservice.Instance) *apiservice.Response // ReportServiceContract client report service_contract diff --git a/service/client_v1.go b/service/client_v1.go index 32ea4fc32..3402269d4 100644 --- a/service/client_v1.go +++ b/service/client_v1.go @@ -496,16 +496,8 @@ func (s *Server) GetCircuitBreakerWithCache(ctx context.Context, req *apiservice // GetServiceContractWithCache User Client Get ServiceContract Rule Information func (s *Server) GetServiceContractWithCache(ctx context.Context, - req *apiservice.ServiceContract) *apiservice.DiscoverResponse { - resp := &apiservice.DiscoverResponse{ - Code: &wrappers.UInt32Value{Value: uint32(apimodel.Code_ExecuteSuccess)}, - Info: &wrappers.StringValue{Value: api.Code2Info(uint32(apimodel.Code_ExecuteSuccess))}, - Type: apiservice.DiscoverResponse_SERVICE_CONTRACT, - Service: &apiservice.Service{ - Name: wrapperspb.String(req.GetService()), - Namespace: wrapperspb.String(req.GetNamespace()), - }, - } + req *apiservice.ServiceContract) *apiservice.Response { + resp := api.NewResponse(apimodel.Code_ExecuteSuccess) if !s.serviceContractCheckDiscoverRequest(req, resp) { return resp } @@ -636,7 +628,7 @@ func (s *Server) commonCheckDiscoverRequest(req *apiservice.Service, resp *apise return true } -func (s *Server) serviceContractCheckDiscoverRequest(req *apiservice.ServiceContract, resp *apiservice.DiscoverResponse) bool { +func (s *Server) serviceContractCheckDiscoverRequest(req *apiservice.ServiceContract, resp *apiservice.Response) bool { svc := &apiservice.Service{ Name: wrapperspb.String(req.GetService()), Namespace: wrapperspb.String(req.GetNamespace()), @@ -653,7 +645,6 @@ func (s *Server) serviceContractCheckDiscoverRequest(req *apiservice.ServiceCont resp.Code = utils.NewUInt32Value(uint32(apimodel.Code_EmptyRequest)) resp.Info = utils.NewStringValue(api.Code2Info(resp.GetCode().GetValue())) resp.Service = svc - resp.ServiceContract = req return false } diff --git a/service/healthcheck/check.go b/service/healthcheck/check.go index 280079540..d2412b5b7 100644 --- a/service/healthcheck/check.go +++ b/service/healthcheck/check.go @@ -27,6 +27,7 @@ import ( apiservice "github.com/polarismesh/specification/source/go/api/v1/service_manage" "go.uber.org/zap" + "github.com/polarismesh/polaris/common/eventhub" "github.com/polarismesh/polaris/common/model" "github.com/polarismesh/polaris/common/srand" commonstore "github.com/polarismesh/polaris/common/store" @@ -81,33 +82,67 @@ type itemValue struct { checker plugin.HealthChecker } -type InstanceEventHealthCheckHandler struct { +type ResourceHealthCheckHandler struct { svr *Server ctx context.Context instanceEventChannel chan *model.InstanceEvent } // newLeaderChangeEventHandler -func newInstanceEventHealthCheckHandler(ctx context.Context, svr *Server) *InstanceEventHealthCheckHandler { - return &InstanceEventHealthCheckHandler{ - svr: svr, - ctx: ctx, - instanceEventChannel: svr.instanceEventChannel, +func newResourceHealthCheckHandler(ctx context.Context, svr *Server) *ResourceHealthCheckHandler { + return &ResourceHealthCheckHandler{ + svr: svr, + ctx: ctx, } } -func (handler *InstanceEventHealthCheckHandler) PreProcess(ctx context.Context, value any) any { +func (handler *ResourceHealthCheckHandler) PreProcess(ctx context.Context, value any) any { return value } // OnEvent event trigger -func (handler *InstanceEventHealthCheckHandler) OnEvent(ctx context.Context, i interface{}) error { - e := i.(model.InstanceEvent) - select { - case handler.instanceEventChannel <- &e: - log.Debugf("[Health Check]get instance event, id is %s, type is %s", e.Id, e.EType) - default: - log.Errorf("[Health Check]instance event chan full, drop event, id is %s, type is %s", e.Id, e.EType) +func (handler *ResourceHealthCheckHandler) OnEvent(ctx context.Context, i interface{}) error { + s := handler.svr + switch event := i.(type) { + case model.InstanceEvent: + log.Debugf("[Health Check]get instance event, id is %s, type is %s", event.Id, event.EType) + if event.EType != model.EventInstanceOffline { + return nil + } + insCache := s.cacheProvider.GetInstance(event.Id) + if insCache == nil { + log.Errorf("[Health Check] cannot get instance from cache, instance id is %s", event.Id) + break + } + checker, ok := s.checkers[int32(insCache.HealthCheck().GetType())] + if !ok { + log.Errorf("[Health Check]heart beat type not found checkType %d", + int32(insCache.HealthCheck().GetType())) + break + } + log.Infof("[Health Check]delete instance heart beat information, id is %s", event.Id) + if err := checker.Delete(context.Background(), event.Id); err != nil { + log.Errorf("[Health Check]addr is %s:%d, id is %s, delete err is %s", + insCache.Host(), insCache.Port(), insCache.ID(), err) + } + case model.ClientEvent: + if event.EType != model.EventInstanceOffline { + return nil + } + clientCache := s.cacheProvider.GetClient(event.Id) + if clientCache == nil { + log.Errorf("[Health Check] cannot get instance from cache, instance id is %s", event.Id) + break + } + checker, ok := s.checkers[int32(apiservice.HealthCheck_HEARTBEAT)] + if !ok { + log.Errorf("[Health Check]heart beat type not found checkType %d", int32(apiservice.HealthCheck_HEARTBEAT)) + break + } + log.Infof("[Health Check]delete client heart beat information, id is %s", event.Id) + if err := checker.Delete(context.Background(), event.Id); err != nil { + log.Errorf("[Health Check] client id is %s, delete err is %+v", clientCache.Proto().GetId().Value, err) + } } return nil } @@ -620,6 +655,10 @@ func asyncDeleteClient(svr *Server, client *apiservice.Client) apimodel.Code { log.Error("[Health Check][Check] async delete client", zap.String("client-id", client.GetId().GetValue()), zap.Error(err)) } + _ = eventhub.Publish(eventhub.ClientEventTopic, &model.ClientEvent{ + EType: model.EventClientOffline, + Id: client.GetId().GetValue(), + }) return future.Code() } diff --git a/service/healthcheck/option.go b/service/healthcheck/option.go index 66c1c2acb..1c8c71da4 100644 --- a/service/healthcheck/option.go +++ b/service/healthcheck/option.go @@ -23,7 +23,6 @@ import ( "github.com/polarismesh/polaris/cache" "github.com/polarismesh/polaris/common/eventhub" - "github.com/polarismesh/polaris/common/model" "github.com/polarismesh/polaris/plugin" "github.com/polarismesh/polaris/service/batch" "github.com/polarismesh/polaris/store" @@ -151,9 +150,6 @@ func withSubscriber(ctx context.Context) serverOption { } svr.subCtxs = append(svr.subCtxs, subCtx) - svr.instanceEventChannel = make(chan *model.InstanceEvent, 1024) - go svr.handleInstanceEventWorker(ctx) - leaderChangeEventHandler := newLeaderChangeEventHandler(svr) subCtx, err = eventhub.Subscribe(eventhub.LeaderChangeEventTopic, leaderChangeEventHandler) if err != nil { @@ -161,8 +157,16 @@ func withSubscriber(ctx context.Context) serverOption { } svr.subCtxs = append(svr.subCtxs, subCtx) - instanceEventHandler := newInstanceEventHealthCheckHandler(ctx, svr) - subCtx, err = eventhub.Subscribe(eventhub.InstanceEventTopic, instanceEventHandler) + resourceEventHandler := newResourceHealthCheckHandler(ctx, svr) + // 监听服务实例的删除事件,然后清理心跳 key 数据 + subCtx, err = eventhub.Subscribe(eventhub.InstanceEventTopic, resourceEventHandler) + if err != nil { + return err + } + svr.subCtxs = append(svr.subCtxs, subCtx) + + // 监听客户端实例的删除事件,然后清理心跳 key 数据 + subCtx, err = eventhub.Subscribe(eventhub.ClientEventTopic, resourceEventHandler) if err != nil { return err } diff --git a/service/healthcheck/server.go b/service/healthcheck/server.go index 529e4a689..466362a6e 100644 --- a/service/healthcheck/server.go +++ b/service/healthcheck/server.go @@ -46,21 +46,20 @@ var ( // Server health checks the main server type Server struct { - hcOpt *Config - storage store.Store - defaultChecker plugin.HealthChecker - checkers map[int32]plugin.HealthChecker - cacheProvider *CacheProvider - timeAdjuster *TimeAdjuster - dispatcher *Dispatcher - checkScheduler *CheckScheduler - history plugin.History - discoverEvent plugin.DiscoverChannel - localHost string - bc *batch.Controller - serviceCache cachetypes.ServiceCache - instanceCache cachetypes.InstanceCache - instanceEventChannel chan *model.InstanceEvent + hcOpt *Config + storage store.Store + defaultChecker plugin.HealthChecker + checkers map[int32]plugin.HealthChecker + cacheProvider *CacheProvider + timeAdjuster *TimeAdjuster + dispatcher *Dispatcher + checkScheduler *CheckScheduler + history plugin.History + discoverEvent plugin.DiscoverChannel + localHost string + bc *batch.Controller + serviceCache cachetypes.ServiceCache + instanceCache cachetypes.InstanceCache subCtxs []*eventhub.SubscribtionContext } @@ -273,37 +272,6 @@ func (s *Server) GetLastHeartbeat(req *apiservice.Instance) *apiservice.Response return api.NewInstanceResponse(apimodel.Code_ExecuteSuccess, req) } -func (s *Server) handleInstanceEventWorker(ctx context.Context) { - for { - select { - case event := <-s.instanceEventChannel: - switch event.EType { - case model.EventInstanceOffline: - insCache := s.cacheProvider.GetInstance(event.Id) - if insCache == nil { - log.Errorf("[Health Check] cannot get instance from cache, instance id is %s", event.Id) - break - } - checker, ok := s.checkers[int32(insCache.HealthCheck().GetType())] - if !ok { - log.Errorf("[Health Check]heart beat type not found checkType %d", - int32(insCache.HealthCheck().GetType())) - break - } - log.Infof("[Health Check]delete instance heart beat information, id is %s", event.Id) - err := checker.Delete(context.Background(), event.Id) - if err != nil { - log.Errorf("[Health Check]addr is %s:%d, id is %s, delete err is %s", - insCache.Host(), insCache.Port(), insCache.ID(), err) - } - } - case <-ctx.Done(): - log.Infof("[Health Check]instance event handler loop stopped") - return - } - } -} - // Checkers get all health checker, for test only func (s *Server) Checkers() map[int32]plugin.HealthChecker { return s.checkers diff --git a/service/instance.go b/service/instance.go index 392937616..a8508d02a 100644 --- a/service/instance.go +++ b/service/instance.go @@ -21,6 +21,7 @@ import ( "context" "errors" "fmt" + "strconv" "time" "github.com/gogo/protobuf/jsonpb" @@ -34,7 +35,9 @@ import ( "github.com/polarismesh/polaris/common/eventhub" "github.com/polarismesh/polaris/common/model" commonstore "github.com/polarismesh/polaris/common/store" + commontime "github.com/polarismesh/polaris/common/time" "github.com/polarismesh/polaris/common/utils" + "github.com/polarismesh/polaris/plugin" ) var ( @@ -752,6 +755,8 @@ func updateHealthCheck(req *apiservice.Instance, instance *model.Instance) bool // GetInstances 查询服务实例 func (s *Server) GetInstances(ctx context.Context, query map[string]string) *apiservice.BatchQueryResponse { + showLastHeartbeat := query["show_last_heartbeat"] == "true" + delete(query, "show_last_heartbeat") // 对数据先进行提前处理一下 filters, metaFilter, batchErr := preGetInstances(query) if batchErr != nil { @@ -786,11 +791,44 @@ func (s *Server) GetInstances(ctx context.Context, query map[string]string) *api s.packCmdb(protoIns) apiInstances = append(apiInstances, protoIns) } - out.Instances = apiInstances + if showLastHeartbeat { + s.fillLastHeartbeatTime(apiInstances) + } + out.Instances = apiInstances return out } +func (s *Server) fillLastHeartbeatTime(instances []*apiservice.Instance) { + checker, ok := s.healthServer.Checkers()[int32(apiservice.HealthCheck_HEARTBEAT)] + if !ok { + return + } + req := &plugin.BatchQueryRequest{Requests: make([]*plugin.QueryRequest, 0, len(instances))} + for i := range instances { + item := instances[i] + req.Requests = append(req.Requests, &plugin.QueryRequest{ + InstanceId: item.GetId().GetValue(), + }) + } + rsp, err := checker.BatchQuery(context.Background(), req) + if err != nil { + return + } + for i := range rsp.Responses { + item := instances[i] + copyMetadata := make(map[string]string, len(item.GetMetadata())) + for k, v := range item.GetMetadata() { + copyMetadata[k] = v + } + if queryRsp := rsp.Responses[i]; queryRsp.Exists { + copyMetadata["last-heartbeat-timestamp"] = strconv.Itoa(int(queryRsp.LastHeartbeatSec)) + copyMetadata["last-heartbeat-time"] = commontime.Time2String(time.Unix(queryRsp.LastHeartbeatSec, 0)) + } + item.Metadata = copyMetadata + } +} + var ( ignoreReturnOSSInstanceMetadata = map[string]struct{}{ "version": {}, diff --git a/service/interceptor/auth/client_v1_authability.go b/service/interceptor/auth/client_v1_authability.go index b967993db..5a3cd7221 100644 --- a/service/interceptor/auth/client_v1_authability.go +++ b/service/interceptor/auth/client_v1_authability.go @@ -219,7 +219,7 @@ func (svr *ServerAuthAbility) UpdateInstance(ctx context.Context, req *apiservic // GetServiceContractWithCache User Client Get ServiceContract Rule Information func (svr *ServerAuthAbility) GetServiceContractWithCache(ctx context.Context, - req *apiservice.ServiceContract) *apiservice.DiscoverResponse { + req *apiservice.ServiceContract) *apiservice.Response { authCtx := svr.collectServiceAuthContext(ctx, []*apiservice.Service{{ Namespace: wrapperspb.String(req.Namespace), Name: wrapperspb.String(req.Service), @@ -227,7 +227,7 @@ func (svr *ServerAuthAbility) GetServiceContractWithCache(ctx context.Context, _, err := svr.strategyMgn.GetAuthChecker().CheckClientPermission(authCtx) if err != nil { - resp := api.NewDiscoverResponse(convertToErrCode(err)) + resp := api.NewResponse(convertToErrCode(err)) resp.Info = utils.NewStringValue(err.Error()) return resp } diff --git a/service/service_contract.go b/service/service_contract.go index 7f6cf937e..21d59de4c 100644 --- a/service/service_contract.go +++ b/service/service_contract.go @@ -61,10 +61,17 @@ func (s *Server) CreateServiceContracts(ctx context.Context, } func (s *Server) CreateServiceContract(ctx context.Context, contract *apiservice.ServiceContract) *apiservice.Response { - contractId, errRsp := utils.CheckContractTetrad(contract) - if errRsp != nil { + if errRsp := checkBaseServiceContract(contract); errRsp != nil { return errRsp } + contractId := contract.GetId() + if contractId == "" { + tmpId, errRsp := utils.CheckContractTetrad(contract) + if errRsp != nil { + return errRsp + } + contractId = tmpId + } existContract, err := s.storage.GetServiceContract(contractId) if err != nil { @@ -441,3 +448,22 @@ func serviceContractRecordEntry(ctx context.Context, req *apiservice.ServiceCont return entry } + +func checkBaseServiceContract(req *apiservice.ServiceContract) *apiservice.Response { + if err := utils.CheckResourceName(utils.NewStringValue(req.GetService())); err != nil { + return api.NewResponse(apimodel.Code_InvalidServiceName) + } + if err := utils.CheckResourceName(utils.NewStringValue(req.GetNamespace())); err != nil { + return api.NewResponse(apimodel.Code_InvalidNamespaceName) + } + if err := utils.CheckResourceName(utils.NewStringValue(req.GetName())); err != nil { + return api.NewResponseWithMsg(apimodel.Code_BadRequest, "invalid service_contract name") + } + if req.GetProtocol() == "" { + return api.NewResponseWithMsg(apimodel.Code_BadRequest, "invalid service_contract protocol") + } + if req.GetVersion() == "" { + return api.NewResponseWithMsg(apimodel.Code_BadRequest, "invalid service_contract version") + } + return nil +} diff --git a/store/boltdb/service.go b/store/boltdb/service.go index 06a72583f..4a54c4708 100644 --- a/store/boltdb/service.go +++ b/store/boltdb/service.go @@ -676,10 +676,15 @@ func (ss *serviceStore) getServiceByNameAndNsIgnoreValid(name string, namespace func (ss *serviceStore) getServiceByID(id string) (*model.Service, error) { - fields := []string{SvcFieldID} + fields := []string{SvcFieldID, svcFieldValid} svc, err := ss.handler.LoadValuesByFilter(tblNameService, fields, &Service{}, func(m map[string]interface{}) bool { + valid, ok := m[SvcFieldValid] + if ok && !valid.(bool) { + return false + } + svcId, ok := m[SvcFieldID] if !ok { return false diff --git a/store/mysql/scripts/delta/v1_17_3-v1_18_0.sql b/store/mysql/scripts/delta/v1_17_3-v1_18_0.sql index 590fe7c2d..003822cef 100644 --- a/store/mysql/scripts/delta/v1_17_3-v1_18_0.sql +++ b/store/mysql/scripts/delta/v1_17_3-v1_18_0.sql @@ -61,6 +61,7 @@ CREATE TABLE service_contract ( CREATE TABLE service_contract_detail ( `id` VARCHAR(128) NOT NULL COMMENT '服务契约单个接口定义记录主键', `contract_id` VARCHAR(128) NOT NULL COMMENT '服务契约 ID', + `name` VARCHAR(128) NOT NULL COMMENT '接口名称', `method` VARCHAR(32) NOT NULL COMMENT 'http协议中的 method 字段, eg:POST/GET/PUT/DELETE, 其他 gRPC 可以用来标识 stream 类型', `path` VARCHAR(128) NOT NULL COMMENT '接口具体全路径描述', `source` INT COMMENT '该条记录来源, 0:SDK/1:MANUAL',