From 8ca7997c02b1c2d5cc2ffa419697264a291d76cd Mon Sep 17 00:00:00 2001 From: liaochuntao Date: Mon, 4 Mar 2024 12:22:19 +0800 Subject: [PATCH] fix:xds server resource build cache fail (#1331) --- .golangci.yml | 118 ++- .licenserc.yaml | 1 + apiserver/grpcserver/config/server.go | 27 +- apiserver/grpcserver/discover/server.go | 2 +- apiserver/grpcserver/utils/help.go | 27 +- apiserver/grpcserver/utils/help_test.go | 6 +- apiserver/nacosserver/model/constant.go | 7 - apiserver/nacosserver/v1/discover/instance.go | 2 +- .../nacosserver/v2/discover/subscribe.go | 2 +- .../nacosserver/v2/pb/internal_request.go | 28 +- apiserver/xdsserverv3/cache/cache.go | 956 +++++++++++++++--- apiserver/xdsserverv3/cache/callback.go | 8 +- apiserver/xdsserverv3/cache/response.go | 69 ++ apiserver/xdsserverv3/cache/status.go | 96 ++ apiserver/xdsserverv3/cds.go | 26 +- apiserver/xdsserverv3/debug.go | 59 +- apiserver/xdsserverv3/eds.go | 28 +- apiserver/xdsserverv3/generate.go | 178 ++-- apiserver/xdsserverv3/lds.go | 2 +- apiserver/xdsserverv3/rds.go | 10 +- apiserver/xdsserverv3/resource/api.go | 31 +- apiserver/xdsserverv3/resource/help.go | 18 +- apiserver/xdsserverv3/resource/model.go | 22 + apiserver/xdsserverv3/resource/node.go | 58 +- apiserver/xdsserverv3/server.go | 60 +- apiserver/xdsserverv3/server_test.go | 126 --- auth/defaultauth/auth_checker.go | 22 +- auth/defaultauth/auth_checker_test.go | 6 +- auth/defaultauth/server.go | 34 +- auth/defaultauth/utils.go | 9 +- cache/service/service.go | 3 +- cache/service/service_contract.go | 23 +- common/model/acquire_context.go | 5 +- common/model/contract.go | 2 + common/utils/common.go | 30 +- common/utils/funcs.go | 7 + plugin/healthchecker/leader/peer_mock_test.go | 2 +- plugin/healthchecker/leader/peer_test.go | 2 +- plugin/ratelimit/token/config.go | 2 + plugin/ratelimit/token/implement.go | 4 + plugin/ratelimit/token/invoke.go | 3 + plugin/ratelimit/token/invoke_test.go | 3 + release/conf/polaris-server.yaml | 1 + service/ratelimit_config_test.go | 12 + service/service_contract.go | 38 +- store/mysql/group.go | 7 +- store/mysql/scripts/polaris_server.sql | 1 + store/mysql/service_contract.go | 29 +- 48 files changed, 1530 insertions(+), 682 deletions(-) create mode 100644 apiserver/xdsserverv3/cache/status.go diff --git a/.golangci.yml b/.golangci.yml index fe88535f8..3ef63cc8b 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -39,6 +39,7 @@ run: - .*~ - test - "apiserver/nacosserver/v2/pb" + - "apiserver/xdsserverv3/cache" # Which files to skip: they will be analyzed, but issues from them won't be reported. # Default value is empty list, @@ -54,7 +55,6 @@ run: - ".*\\.yml$" - "apiserver/xdsserverv3/cache/linear.go" - # Main linters configurations. # See https://golangci-lint.run/usage/linters linters: @@ -62,25 +62,24 @@ linters: disable-all: true # Custom enable linters we want to use. enable: - - errcheck # Errcheck is a program for checking for unchecked errors in go programs. - - errchkjson # Checks types passed to the json encoding functions. Reports unsupported types and optionally reports occasions, where the check for the returned error can be omitted. - - funlen # Tool for detection of long functions - - gci # Gci controls golang package import order and makes it always deterministic. - - goconst # Finds repeated strings that could be replaced by a constant - - gocritic # Provides diagnostics that check for bugs, performance and style issues. - - gofmt # Gofmt checks whether code was gofmt-ed. By default this tool runs with -s option to check for code simplification - - gosimple # Linter for Go source code that specializes in simplifying code - - govet # Vet examines Go source code and reports suspicious constructs, such as Printf calls whose arguments do not align with the format string - - misspell # Finds commonly misspelled English words in comments - - nolintlint # Reports ill-formed or insufficient nolint directives - - revive # Fast, configurable, extensible, flexible, and beautiful linter for Go. Drop-in replacement of golint. - - staticcheck # It's a set of rules from staticcheck. It's not the same thing as the staticcheck binary. - - typecheck # Like the front-end of a Go compiler, parses and type-checks Go code + - errcheck # Errcheck is a program for checking for unchecked errors in go programs. + - errchkjson # Checks types passed to the json encoding functions. Reports unsupported types and optionally reports occasions, where the check for the returned error can be omitted. + - funlen # Tool for detection of long functions + - gci # Gci controls golang package import order and makes it always deterministic. + - goconst # Finds repeated strings that could be replaced by a constant + - gocritic # Provides diagnostics that check for bugs, performance and style issues. + - gofmt # Gofmt checks whether code was gofmt-ed. By default this tool runs with -s option to check for code simplification + - gosimple # Linter for Go source code that specializes in simplifying code + - govet # Vet examines Go source code and reports suspicious constructs, such as Printf calls whose arguments do not align with the format string + - misspell # Finds commonly misspelled English words in comments + - nolintlint # Reports ill-formed or insufficient nolint directives + - revive # Fast, configurable, extensible, flexible, and beautiful linter for Go. Drop-in replacement of golint. + - staticcheck # It's a set of rules from staticcheck. It's not the same thing as the staticcheck binary. + - typecheck # Like the front-end of a Go compiler, parses and type-checks Go code - usestdlibvars # A linter that detect the possibility to use variables/constants from the Go standard library. - lll #- whitespace # Tool for detection of leading and trailing whitespace - issues: max-issues-per-linter: 0 max-same-issues: 0 @@ -100,7 +99,6 @@ issues: - gocritic text: "unnecessaryDefer:" - # https://golangci-lint.run/usage/linters linters-settings: # https://golangci-lint.run/usage/linters/#misspell @@ -117,25 +115,40 @@ linters-settings: - name: atomic - name: line-length-limit severity: error - arguments: [ 480 ] + arguments: [480] - name: unhandled-error severity: warning disabled: true - arguments: [ "fmt.Printf", "myFunction" ] + arguments: ["fmt.Printf", "myFunction"] - name: var-naming severity: warning disabled: true arguments: - - [ "ID","URL","IP","HTTP","JSON","API","UID","Id","Api","Uid","Http","Json","Ip","Url" ] # AllowList - - [ "VM" ] # DenyList + - [ + "ID", + "URL", + "IP", + "HTTP", + "JSON", + "API", + "UID", + "Id", + "Api", + "Uid", + "Http", + "Json", + "Ip", + "Url", + ] # AllowList + - ["VM"] # DenyList - name: string-format severity: warning disabled: false arguments: - - - 'core.WriteError[1].Message' - - '/^([^A-Z]|$)/' + - - "core.WriteError[1].Message" + - "/^([^A-Z]|$)/" - must not start with a capital letter - - - 'fmt.Errorf[0]' + - - "fmt.Errorf[0]" - '/(^|[^\.!?])$/' - must not end in punctuation - - panic @@ -144,27 +157,30 @@ linters-settings: - name: function-result-limit severity: warning disabled: false - arguments: [ 5 ] + arguments: [5] - name: import-shadowing severity: warning disabled: false - arguments: [ "github.com/polarismesh/polaris","namespace" ] + arguments: ["github.com/polarismesh/polaris", "namespace"] - name: waitgroup-by-value severity: warning disabled: false - name: max-public-structs severity: warning disabled: false - arguments: [ 35 ] + arguments: [35] - name: indent-error-flow severity: warning disabled: false - name: function-length severity: warning disabled: false - arguments: [ 80,0 ] + arguments: [80, 0] - name: file-header - arguments: [ "Tencent is pleased to support the open source community by making Polaris available." ] + arguments: + [ + "Tencent is pleased to support the open source community by making Polaris available.", + ] - name: exported severity: warning disabled: true @@ -185,7 +201,7 @@ linters-settings: # If lower than 0, disable the check. # Default: 40 statements: -1 - + # https://golangci-lint.run/usage/linters/#lll lll: # Max line length, lines longer will be reported. @@ -244,9 +260,22 @@ linters-settings: go: "1.15" # Sxxxx checks in https://staticcheck.io/docs/configuration/options/#checks # Default: ["*"] - checks: [ - "all", "-S1000", "-S1001", "-S1002", "-S1008", "-S1009", "-S1016", "-S1023", "-S1025", "-S1029", "-S1034", "-S1040","-S1019" - ] + checks: + [ + "all", + "-S1000", + "-S1001", + "-S1002", + "-S1008", + "-S1009", + "-S1016", + "-S1023", + "-S1025", + "-S1029", + "-S1034", + "-S1040", + "-S1019", + ] # https://golangci-lint.run/usage/linters/#govet govet: @@ -335,7 +364,24 @@ linters-settings: go: "1.15" # SAxxxx checks in https://staticcheck.io/docs/configuration/options/#checks # Default: ["*"] - checks: [ "all","-SA1019","-SA4015","-SA1029","-SA1016","-SA9003","-SA4006","-SA6003","-SA1004","-SA4009","-SA6002","-SA4017","-SA4021","-SA1006","-SA4010" ] + checks: + [ + "all", + "-SA1019", + "-SA4015", + "-SA1029", + "-SA1016", + "-SA9003", + "-SA4006", + "-SA6003", + "-SA1004", + "-SA4009", + "-SA6002", + "-SA4017", + "-SA4021", + "-SA1006", + "-SA4010", + ] # https://golangci-lint.run/usage/linters/#gofmt gofmt: @@ -345,7 +391,7 @@ linters-settings: # Apply the rewrite rules to the source before reformatting. # https://pkg.go.dev/cmd/gofmt # Default: [] - rewrite-rules: [ ] + rewrite-rules: [] # https://golangci-lint.run/usage/linters/#gci gci: @@ -375,4 +421,4 @@ linters-settings: multi-if: false # Enforces newlines (or comments) after every multi-line function signature. # Default: false - multi-func: false \ No newline at end of file + multi-func: false diff --git a/.licenserc.yaml b/.licenserc.yaml index c6f3dbeb1..39729bafe 100644 --- a/.licenserc.yaml +++ b/.licenserc.yaml @@ -59,6 +59,7 @@ header: # `header` section is configurations for source codes license header. - "release" - "test/data/xds" - "apiserver/nacosserver/v2/pb" + - "apiserver/xdsserverv3/cache" # single file - "LICENSE" diff --git a/apiserver/grpcserver/config/server.go b/apiserver/grpcserver/config/server.go index f6b169020..b26304a55 100644 --- a/apiserver/grpcserver/config/server.go +++ b/apiserver/grpcserver/config/server.go @@ -20,13 +20,13 @@ package config import ( "context" "fmt" - "strings" apiconfig "github.com/polarismesh/specification/source/go/api/v1/config_manage" "google.golang.org/grpc" "github.com/polarismesh/polaris/apiserver" "github.com/polarismesh/polaris/apiserver/grpcserver" + "github.com/polarismesh/polaris/apiserver/grpcserver/utils" commonlog "github.com/polarismesh/polaris/common/log" "github.com/polarismesh/polaris/common/model" "github.com/polarismesh/polaris/config" @@ -72,7 +72,7 @@ func (g *ConfigGRPCServer) Run(errCh chan error) { case "client": if apiConfig.Enable { apiconfig.RegisterPolarisConfigGRPCServer(server, g) - openMethod, getErr := GetClientOpenMethod(g.GetProtocol()) + openMethod, getErr := utils.GetConfigClientOpenMethod(g.GetProtocol()) if getErr != nil { return getErr } @@ -125,26 +125,3 @@ func (g *ConfigGRPCServer) enterRateLimit(ip string, method string) uint32 { func (g *ConfigGRPCServer) allowAccess(method string) bool { return g.BaseGrpcServer.AllowAccess(method) } - -// GetClientOpenMethod . -func GetClientOpenMethod(protocol string) (map[string]bool, error) { - openMethods := []string{ - "GetConfigFile", - "CreateConfigFile", - "UpdateConfigFile", - "PublishConfigFile", - "WatchConfigFiles", - "GetConfigFileMetadataList", - "UpsertAndPublishConfigFile", - "Discover", - } - - openMethod := make(map[string]bool) - - for _, item := range openMethods { - method := "/v1.PolarisConfig" + strings.ToUpper(protocol) + "/" + item - openMethod[method] = true - } - - return openMethod, nil -} diff --git a/apiserver/grpcserver/discover/server.go b/apiserver/grpcserver/discover/server.go index e5fab7ee1..77d99f6b1 100644 --- a/apiserver/grpcserver/discover/server.go +++ b/apiserver/grpcserver/discover/server.go @@ -109,7 +109,7 @@ func (g *GRPCServer) Run(errCh chan error) { apiservice.RegisterPolarisGRPCServer(server, g.v1server) apiservice.RegisterPolarisHeartbeatGRPCServer(server, g.v1server) apiservice.RegisterPolarisServiceContractGRPCServer(server, g.v1server) - openMethod, getErr := utils.GetClientOpenMethod(config.Include, g.GetProtocol()) + openMethod, getErr := utils.GetDiscoverClientOpenMethod(config.Include, g.GetProtocol()) if getErr != nil { return getErr } diff --git a/apiserver/grpcserver/utils/help.go b/apiserver/grpcserver/utils/help.go index 1d9a70a00..f7572c577 100644 --- a/apiserver/grpcserver/utils/help.go +++ b/apiserver/grpcserver/utils/help.go @@ -27,8 +27,31 @@ import ( "github.com/polarismesh/polaris/common/log" ) -// GetClientOpenMethod 获取客户端openMethod -func GetClientOpenMethod(include []string, protocol string) (map[string]bool, error) { +// GetConfigClientOpenMethod . +func GetConfigClientOpenMethod(protocol string) (map[string]bool, error) { + openMethods := []string{ + "GetConfigFile", + "CreateConfigFile", + "UpdateConfigFile", + "PublishConfigFile", + "WatchConfigFiles", + "GetConfigFileMetadataList", + "UpsertAndPublishConfigFile", + "Discover", + } + + openMethod := make(map[string]bool) + + for _, item := range openMethods { + method := "/v1.PolarisConfig" + strings.ToUpper(protocol) + "/" + item + openMethod[method] = true + } + + return openMethod, nil +} + +// GetDiscoverClientOpenMethod 获取客户端openMethod +func GetDiscoverClientOpenMethod(include []string, protocol string) (map[string]bool, error) { clientAccess := make(map[string][]string) clientAccess[apiserver.DiscoverAccess] = []string{"Discover", "ReportClient", "ReportServiceContract", "GetServiceContract"} clientAccess[apiserver.RegisterAccess] = []string{"RegisterInstance", "DeregisterInstance"} diff --git a/apiserver/grpcserver/utils/help_test.go b/apiserver/grpcserver/utils/help_test.go index f499c11f1..d6c5b5ee0 100644 --- a/apiserver/grpcserver/utils/help_test.go +++ b/apiserver/grpcserver/utils/help_test.go @@ -84,13 +84,13 @@ func TestGetClientOpenMethod(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := GetClientOpenMethod(tt.args.include, tt.args.protocol) + got, err := GetDiscoverClientOpenMethod(tt.args.include, tt.args.protocol) if (err != nil) != tt.wantErr { - t.Errorf("GetClientOpenMethod() error = %v, wantErr %v", err, tt.wantErr) + t.Errorf("GetDiscoverClientOpenMethod() error = %v, wantErr %v", err, tt.wantErr) return } if !reflect.DeepEqual(got, tt.want) { - t.Errorf("GetClientOpenMethod() = %v, want %v", got, tt.want) + t.Errorf("GetDiscoverClientOpenMethod() = %v, want %v", got, tt.want) } }) } diff --git a/apiserver/nacosserver/model/constant.go b/apiserver/nacosserver/model/constant.go index ded1a0870..982800a03 100644 --- a/apiserver/nacosserver/model/constant.go +++ b/apiserver/nacosserver/model/constant.go @@ -111,13 +111,6 @@ func GetGroupName(s string) string { return ss[0] } -func DefaultString(s, d string) string { - if len(s) == 0 { - return d - } - return s -} - var ConvertPolarisNamespaceVal = "default" // ToPolarisNamespace 替换 nacos namespace 为 polaris 的 namespace 信息,主要是针对默认命令空间转为 polaris 的 default diff --git a/apiserver/nacosserver/v1/discover/instance.go b/apiserver/nacosserver/v1/discover/instance.go index 90c67ae31..863edcacf 100644 --- a/apiserver/nacosserver/v1/discover/instance.go +++ b/apiserver/nacosserver/v1/discover/instance.go @@ -143,7 +143,7 @@ func (n *DiscoverServer) handleQueryInstances(ctx context.Context, params map[st if n.pushCenter != nil && udpPort > 0 { n.pushCenter.AddSubscriber(core.Subscriber{ Key: fmt.Sprintf("%s:%d", clientIP, udpPort), - App: model.DefaultString(params["app"], "unknown"), + App: utils.DefaultString(params["app"], "unknown"), AddrStr: clientIP, Ip: clientIP, Port: int(udpPort), diff --git a/apiserver/nacosserver/v2/discover/subscribe.go b/apiserver/nacosserver/v2/discover/subscribe.go index dc5baf285..75756d682 100644 --- a/apiserver/nacosserver/v2/discover/subscribe.go +++ b/apiserver/nacosserver/v2/discover/subscribe.go @@ -47,7 +47,7 @@ func (h *DiscoverServer) handleSubscribeServiceReques(ctx context.Context, req n Key: remote.ValueConnID(ctx), AddrStr: meta.ClientIP, Agent: meta.ClientVersion, - App: nacosmodel.DefaultString(req.GetHeaders()["app"], "unknown"), + App: utils.DefaultString(req.GetHeaders()["app"], "unknown"), Ip: meta.ClientIP, NamespaceId: namespace, Group: group, diff --git a/apiserver/nacosserver/v2/pb/internal_request.go b/apiserver/nacosserver/v2/pb/internal_request.go index c49bd7383..c8b91a3d5 100644 --- a/apiserver/nacosserver/v2/pb/internal_request.go +++ b/apiserver/nacosserver/v2/pb/internal_request.go @@ -20,8 +20,34 @@ type MetaRequestInfo interface { RequestMeta() interface{} } -// ClientAbilities +// ClientAbilities 客户端能力协商请求 type ClientAbilities struct { + // RemoteAbility . + RemoteAbility ClientRemoteAbility `json:"remoteAbility"` + // ConfigAbility . + ConfigAbility ClientConfigAbility `json:"configAbility"` + // NamingAbility . + NamingAbility ClientNamingAbility `json:"namingAbility"` +} + +// 客户端支持能力功能列表 + +// ClientRemoteAbility 客户端支持长连接功能 +type ClientRemoteAbility struct { + // SupportRemoteConnection . + SupportRemoteConnection bool `json:"supportRemoteConnection"` +} + +type ClientConfigAbility struct { + // SupportRemoteMetrics . + SupportRemoteMetrics bool `json:"supportRemoteMetrics"` +} + +type ClientNamingAbility struct { + // SupportDeltaPush 支持增量推送 + SupportDeltaPush bool `json:"supportDeltaPush"` + // SupportRemoteMetric . + SupportRemoteMetric bool `json:"supportRemoteMetric"` } // InternalRequest diff --git a/apiserver/xdsserverv3/cache/cache.go b/apiserver/xdsserverv3/cache/cache.go index a51126384..206c2b896 100644 --- a/apiserver/xdsserverv3/cache/cache.go +++ b/apiserver/xdsserverv3/cache/cache.go @@ -1,25 +1,25 @@ -/** - * Tencent is pleased to support the open source community by making Polaris available. - * - * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. - * - * Licensed under the BSD 3-Clause License (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://opensource.org/licenses/BSD-3-Clause - * - * Unless required by applicable law or agreed to in writing, software distributed - * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ +// Copyright 2018 Envoyproxy Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. package cache import ( "context" - "errors" + "fmt" + "sync" + "sync/atomic" + "time" corev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" "github.com/envoyproxy/go-control-plane/pkg/cache/types" @@ -31,137 +31,859 @@ import ( "github.com/polarismesh/polaris/common/utils" ) -type ( - XDSCache struct { - hook CacheHook - // hash is the hashing function for Envoy nodes - hash cachev3.NodeHash - // Muxed caches. - Caches *utils.SyncMap[string, cachev3.Cache] - } - - // CacheHook - CacheHook interface { - // OnCreateWatch - OnCreateWatch(request *cachev3.Request, streamState stream.StreamState, - value chan cachev3.Response) - // OnCreateDeltaWatch - OnCreateDeltaWatch(request *cachev3.DeltaRequest, state stream.StreamState, - value chan cachev3.DeltaResponse) - // OnFetch - OnFetch(ctx context.Context, request *cachev3.Request) +// CacheHook . +type CacheHook interface { + // OnCreateWatch + OnCreateWatch(request *cachev3.Request, streamState stream.StreamState, + value chan cachev3.Response) + // OnCreateDeltaWatch + OnCreateDeltaWatch(request *cachev3.DeltaRequest, state stream.StreamState, + value chan cachev3.DeltaResponse) + // OnFetch + OnFetch(ctx context.Context, request *cachev3.Request) +} + +// TypeResources 记录每一类 xDS 的更新/删除资源记录 +type TypeResources struct { + // UpsertResources 新增/更新的 XDS resource 信息 + UpsertResources map[string]types.Resource + // RemoveResources 准备删除的 XDS resource 信息 + RemoveResources map[string]struct{} +} + +func NewTypeResources() *TypeResources { + return &TypeResources{ + UpsertResources: map[string]types.Resource{}, + RemoveResources: map[string]struct{}{}, } -) +} + +func (r *TypeResources) AppendUpserts(resources map[string]types.Resource) { + for k, v := range resources { + r.UpsertResources[k] = v + } +} + +func (r *TypeResources) AppendRemoves(resources map[string]types.Resource) { + for k := range resources { + r.RemoveResources[k] = struct{}{} + } +} + +func NewUpdateResourcesRequest() *UpdateResourcesRequest { + return &UpdateResourcesRequest{ + lock: &sync.Mutex{}, + Lds: map[string]map[string]types.Resource{}, + NamespaceResources: map[string]*NamespaceUpdateResourcesRequest{}, + } +} + +// UpdateResourcesRequest 更新 XDS 资源请求 +type UpdateResourcesRequest struct { + lock *sync.Mutex + // Lds LDS 相关的资源 + Lds map[string]map[string]types.Resource + // NamespaceResources . + NamespaceResources map[string]*NamespaceUpdateResourcesRequest +} + +// ------------- 针对普通的 xDS 资源 ------------- + +func (r *UpdateResourcesRequest) AddNormalNamespaces(namespace string, + xdsType resource.XDSType, res []types.Resource) { + r.lock.Lock() + defer r.lock.Unlock() + + if _, ok := r.NamespaceResources[namespace]; !ok { + r.NamespaceResources[namespace] = NewNamespaceUpdateResourcesRequest() + } + r.NamespaceResources[namespace].AddNormals(xdsType, res) +} + +func (r *UpdateResourcesRequest) RemoveNormalNamespaces(namespace string, tls resource.TLSMode, + xdsType resource.XDSType, res []types.Resource) { + r.lock.Lock() + defer r.lock.Unlock() + + if _, ok := r.NamespaceResources[namespace]; !ok { + r.NamespaceResources[namespace] = NewNamespaceUpdateResourcesRequest() + } + r.NamespaceResources[namespace].RemoveNormal(xdsType, res) +} + +// ------------- 针对 TLS 相关的 xDS 资源 ------------- + +// AddTlsNamespaces . +func (r *UpdateResourcesRequest) AddTlsNamespaces(namespace string, tls resource.TLSMode, + xdsType resource.XDSType, res []types.Resource) { + r.lock.Lock() + defer r.lock.Unlock() + + if _, ok := r.NamespaceResources[namespace]; !ok { + r.NamespaceResources[namespace] = NewNamespaceUpdateResourcesRequest() + } + r.NamespaceResources[namespace].AddTls(tls, xdsType, res) +} + +// RemoveTlsNamespaces . +func (r *UpdateResourcesRequest) RemoveTlsNamespaces(namespace string, tls resource.TLSMode, + xdsType resource.XDSType, res []types.Resource) { + r.lock.Lock() + defer r.lock.Unlock() + + if _, ok := r.NamespaceResources[namespace]; !ok { + r.NamespaceResources[namespace] = NewNamespaceUpdateResourcesRequest() + } + r.NamespaceResources[namespace].RemoveTls(tls, xdsType, res) +} + +func NewNamespaceUpdateResourcesRequest() *NamespaceUpdateResourcesRequest { + return &NamespaceUpdateResourcesRequest{ + NormalResources: map[resource.XDSType]*TypeResources{}, + DemandResources: map[resource.XDSType]*TypeResources{}, + TlsResources: map[resource.TLSMode]map[resource.XDSType]*TypeResources{}, + } +} + +// NamespaceUpdateResourcesRequest 记录命名空间下的待更新的 XDS 资源 +type NamespaceUpdateResourcesRequest struct { + // NormalResources CDS/EDS/RDS/VHDS 相关的资源 + NormalResources map[resource.XDSType]*TypeResources + // DemandResources . + DemandResources map[resource.XDSType]*TypeResources + // TlsResources . + TlsResources map[resource.TLSMode]map[resource.XDSType]*TypeResources +} + +func (r *NamespaceUpdateResourcesRequest) AddTls(tlsMode resource.TLSMode, xdsType resource.XDSType, res []types.Resource) { + if _, ok := r.TlsResources[tlsMode]; !ok { + r.TlsResources[tlsMode] = make(map[resource.XDSType]*TypeResources) + } + if _, ok := r.TlsResources[tlsMode][xdsType]; !ok { + r.TlsResources[tlsMode][xdsType] = NewTypeResources() + } + r.TlsResources[tlsMode][xdsType].AppendUpserts(cachev3.IndexRawResourcesByName(res)) +} + +func (r *NamespaceUpdateResourcesRequest) RemoveTls(tlsMode resource.TLSMode, xdsType resource.XDSType, res []types.Resource) { + if _, ok := r.TlsResources[tlsMode]; !ok { + r.TlsResources[tlsMode] = make(map[resource.XDSType]*TypeResources) + } + if _, ok := r.TlsResources[tlsMode][xdsType]; !ok { + r.TlsResources[tlsMode][xdsType] = NewTypeResources() + } + r.TlsResources[tlsMode][xdsType].AppendRemoves(cachev3.IndexRawResourcesByName(res)) +} + +func (r *NamespaceUpdateResourcesRequest) AddNormals(xdsType resource.XDSType, res []types.Resource) { + if _, ok := r.NormalResources[xdsType]; !ok { + r.NormalResources[xdsType] = NewTypeResources() + } + r.NormalResources[xdsType].AppendUpserts(cachev3.IndexRawResourcesByName(res)) +} + +func (r *NamespaceUpdateResourcesRequest) RemoveNormal(xdsType resource.XDSType, res []types.Resource) { + if _, ok := r.NormalResources[xdsType]; !ok { + r.NormalResources[xdsType] = NewTypeResources() + } + r.NormalResources[xdsType].AppendRemoves(cachev3.IndexRawResourcesByName(res)) +} + +// ResourcesContainer +type ResourcesContainer struct { + // GlobalVersion 当前整体 typeUrl 下的所有 resource 的全局共用版本,主要是用于非 Delta 场景下的 Watch + GlobalVersion string + // Resources name -> resource + Resources map[string]types.Resource + // VersionMap holds the current hash map of all resources in the snapshot. + // This field should remain nil until it is used, at which point should be + // instantiated by calling ConstructVersionMap(). + // VersionMap is only to be used with delta xDS. + VersionMap map[string]string +} + +func (s *ResourcesContainer) updateGlobalRevision() { + s.GlobalVersion = utils.NewUUID() +} + +// ConstructVersionMap will construct a version map based on the current state of a snapshot +func (s *ResourcesContainer) ConstructVersionMap(modified []string) error { + if s == nil { + return fmt.Errorf("missing resource container") + } + + if s.VersionMap == nil { + s.VersionMap = map[string]string{} + } + + if len(modified) == 0 { + // construct all resource + for _, res := range s.Resources { + // Hash our version in here and build the version map. + marshaledResource, err := cachev3.MarshalResource(res) + if err != nil { + return err + } + v := cachev3.HashResource(marshaledResource) + if v == "" { + return fmt.Errorf("failed to build resource version: %w", err) + } + s.VersionMap[cachev3.GetResourceName(res)] = v + } + return nil + } + for _, name := range modified { + res, ok := s.Resources[name] + if !ok { + continue + } + // Hash our version in here and build the version map. + marshaledResource, err := cachev3.MarshalResource(res) + if err != nil { + return err + } + v := cachev3.HashResource(marshaledResource) + if v == "" { + return fmt.Errorf("failed to build resource version: %w", err) + } + s.VersionMap[name] = v + } + return nil +} + +func newNamespaceResourcesContainer(ns string) *NamespaceResourcesContainer { + return &NamespaceResourcesContainer{ + namespace: ns, + resourcesContainer: map[resource.XDSType]*ResourcesContainer{}, + demandResources: map[resource.XDSType]*ResourcesContainer{}, + tlsResources: map[resource.TLSMode]map[resource.XDSType]*ResourcesContainer{}, + } +} + +type NamespaceResourcesContainer struct { + namespace string + // resourcesContainer are cached resources indexed by node IDs + resourcesContainer map[resource.XDSType]*ResourcesContainer + // demandResources 记录了支持按需下发的 xds resource 资源 + demandResources map[resource.XDSType]*ResourcesContainer + // tlsResources 记录了使用了 tls 的资源(目前而言只有 CDS) + tlsResources map[resource.TLSMode]map[resource.XDSType]*ResourcesContainer +} + +func newNamespaceStatusInfo(ns string) *NamespaceStatusInfo { + return &NamespaceStatusInfo{ + namespace: ns, + status: map[string]*statusInfo{}, + } +} + +type NamespaceStatusInfo struct { + namespace string + // status information for all nodes indexed by node IDs + status map[string]*statusInfo +} + +type ResourceCache struct { + hook CacheHook + // watchCount and deltaWatchCount are atomic counters incremented for each watch respectively. They need to + // be the first fields in the struct to guarantee 64-bit alignment, + // which is a requirement for atomic operations on 64-bit operands to work on + // 32-bit machines. + watchCount int64 + deltaWatchCount int64 + // ads flag to hold responses until all resources are named + ads bool + // ldsResources 记录 Envoy Node LDS 的资源记录信息 + ldsResources map[string]*ResourcesContainer + // namespaceContainer 按照命名空间级别隔离 xDS resources + namespaceContainer map[string]*NamespaceResourcesContainer + // status information for all nodes indexed by node IDs + status map[string]*NamespaceStatusInfo + + mu sync.RWMutex +} + +// NewResourceCache initializes a simple sc. +// +// ADS flag forces a delay in responding to streaming requests until all +// resources are explicitly named in the request. This avoids the problem of a +// partial request over a single stream for a subset of resources which would +// require generating a fresh version for acknowledgement. ADS flag requires +// snapshot consistency. For non-ADS case (and fetch), multiple partial +// requests are sent across multiple streams and re-using the snapshot version +// is OK. +func NewResourceCache(hook CacheHook) *ResourceCache { + cache := &ResourceCache{ + hook: hook, + ads: true, + ldsResources: make(map[string]*ResourcesContainer), + namespaceContainer: make(map[string]*NamespaceResourcesContainer), + status: make(map[string]*NamespaceStatusInfo), + } + return cache +} + +// CleanEnvoyNodeCache 清理 Envoy Node 强关联的 XDS 规则数据 +func (sc *ResourceCache) CleanEnvoyNodeCache(node *corev3.Node) error { + sc.mu.Lock() + defer sc.mu.Unlock() + + delete(sc.ldsResources, node.GetId()) + return nil +} + +func (sc *ResourceCache) updateResourceContainer(ctx context.Context, req *UpdateResourcesRequest) { + // 更新 LDS 资源信息 + for nodeId, resources := range req.Lds { + if _, ok := sc.ldsResources[nodeId]; !ok { + sc.ldsResources[nodeId] = &ResourcesContainer{ + Resources: resources, + } + sc.ldsResources[nodeId].updateGlobalRevision() + _ = sc.ldsResources[nodeId].ConstructVersionMap(nil) + } + } + + namespaceResources := req.NamespaceResources + for ns, nsResources := range namespaceResources { + if _, ok := sc.namespaceContainer[ns]; !ok { + sc.namespaceContainer[ns] = newNamespaceResourcesContainer(ns) + } + + namespaceContainer := sc.namespaceContainer[ns] + + // OnDemand 场景下的资源直接一把更新 + demandResources := map[resource.XDSType]*ResourcesContainer{} + for xdsType, res := range nsResources.DemandResources { + demandResources[xdsType] = &ResourcesContainer{ + Resources: res.UpsertResources, + } + namespaceContainer.demandResources = demandResources + } + + // 更新 TLS 资源信息 + for tlsMode, xdsResources := range nsResources.TlsResources { + if _, ok := namespaceContainer.tlsResources[tlsMode]; !ok { + namespaceContainer.tlsResources[tlsMode] = make(map[resource.XDSType]*ResourcesContainer, 32) + } + for typeUrl, res := range xdsResources { + if container, ok := namespaceContainer.tlsResources[tlsMode][typeUrl]; !ok { + container = &ResourcesContainer{ + Resources: res.UpsertResources, + } + namespaceContainer.tlsResources[tlsMode][typeUrl] = container + } else { + for name := range res.RemoveResources { + delete(container.Resources, name) + delete(container.VersionMap, name) + } + modified := make([]string, 0, len(res.UpsertResources)) + for name, res := range res.UpsertResources { + container.Resources[name] = res + modified = append(modified, name) + } + } + namespaceContainer.tlsResources[tlsMode][typeUrl].updateGlobalRevision() + } + } + + // 更新非 LDS 的 XDS 规则到 ResourceContainer 中 + for typeUrl, resources := range nsResources.NormalResources { + if container, ok := namespaceContainer.resourcesContainer[typeUrl]; !ok { + container = &ResourcesContainer{ + Resources: resources.UpsertResources, + } + namespaceContainer.resourcesContainer[typeUrl] = container + } else { + for name := range resources.RemoveResources { + delete(container.Resources, name) + delete(container.VersionMap, name) + } + modified := make([]string, 0, len(resources.UpsertResources)) + for name, res := range resources.UpsertResources { + container.Resources[name] = res + modified = append(modified, name) + } + } + + namespaceContainer.resourcesContainer[typeUrl].updateGlobalRevision() + } + } +} -// NewCache create a XDS SnapshotCache to proxy cachev3.SnapshotCache -func NewCache(hook CacheHook) *XDSCache { - sc := &XDSCache{ - hook: hook, - Caches: utils.NewSyncMap[string, cachev3.Cache](), +// UpdateResources updates a snapshot for a node. +func (sc *ResourceCache) UpdateResources(ctx context.Context, req *UpdateResourcesRequest) error { + sc.mu.Lock() + defer sc.mu.Unlock() + + // updateResourceContainer 更新所有 XDS 资源的容器 + sc.updateResourceContainer(ctx, req) + + for ns, nsStatus := range sc.status { + for _, info := range nsStatus.status { + info.mu.Lock() + defer info.mu.Unlock() + for id, watch := range info.watches { + watchType := resource.FormatTypeUrl(watch.Request.TypeUrl) + container, exists := sc.loadResourceContainer(info.client, watchType) + if !exists { + continue + } + curVersion := container.GlobalVersion + if curVersion != watch.Request.VersionInfo { + log.Debugf("respond open watch %d %s%v with new version %q", id, watch.Request.TypeUrl, + watch.Request.ResourceNames, curVersion) + + resources := container.Resources + if err := sc.respond(ctx, watch.Request, watch.Response, resources, curVersion, false); err != nil { + return err + } + // discard the watch + delete(info.watches, id) + } + } + + namespaceContainer, exist := sc.namespaceContainer[ns] + if !exist { + continue + } + // We only calculate version hashes when using delta. We don't + // want to do this when using SOTW so we can avoid unnecessary + // computational cost if not using delta. + if len(info.deltaWatches) > 0 { + for _, container := range namespaceContainer.resourcesContainer { + if err := container.ConstructVersionMap(nil); err != nil { + log.Errorf("failed to compute version for snapshot resources inline: %s", err) + return err + } + } + } + + // process our delta watches + for id, watch := range info.deltaWatches { + watchType := resource.FormatTypeUrl(watch.Request.TypeUrl) + container, exist := sc.loadResourceContainer(info.client, watchType) + if !exist { + continue + } + res, err := sc.respondDelta( + ctx, + container, + watch.Request, + watch.Response, + watch.StreamState, + ) + if err != nil { + return err + } + // If we detect a nil response here, that means there has been no state change + // so we don't want to respond or remove any existing resource watches + if res != nil { + delete(info.deltaWatches, id) + } + } + } } - return sc + return nil } -// CleanEnvoyNodeCache 清理和 Envoy Node 强相关的缓存数据 -func (sc *XDSCache) CleanEnvoyNodeCache(node *corev3.Node) { - cacheKey := resource.LDS.ResourceType() + "~" + node.Id - sc.Caches.Delete(cacheKey) +// nameSet creates a map from a string slice to value true. +func nameSet(names []string) map[string]bool { + set := make(map[string]bool, len(names)) + for _, name := range names { + set[name] = true + } + return set } -// CreateWatch returns a watch for an xDS request. -func (sc *XDSCache) CreateWatch(request *cachev3.Request, streamState stream.StreamState, - value chan cachev3.Response) func() { +// superset checks that all resources are listed in the names set. +func superset(names map[string]bool, resources map[string]types.Resource) error { + for resourceName := range resources { + if _, exists := names[resourceName]; !exists { + return fmt.Errorf("%q not listed", resourceName) + } + } + return nil +} + +// CreateWatch returns a watch for an xDS request. A nil function may be +// returned if an error occurs. +func (sc *ResourceCache) CreateWatch(request *cachev3.Request, streamState stream.StreamState, value chan cachev3.Response) func() { if sc.hook != nil { sc.hook.OnCreateWatch(request, streamState, value) } - item := sc.loadCache(request, streamState) - if item == nil { - value <- nil - return func() {} + + sc.mu.Lock() + defer sc.mu.Unlock() + + info, client := sc.loadWatchStatus(request.GetNode()) + + // update last watch request time + info.mu.Lock() + info.lastWatchRequestTime = time.Now() + info.mu.Unlock() + + var version string + + container, exists := sc.loadResourceContainer(client, resource.FormatTypeUrl(request.GetTypeUrl())) + + if exists { + version = container.GlobalVersion + } + + if exists { + knownResourceNames := streamState.GetKnownResourceNames(request.TypeUrl) + diff := []string{} + for _, r := range request.ResourceNames { + if _, ok := knownResourceNames[r]; !ok { + diff = append(diff, r) + } + } + + log.Debugf("nodeID %q requested %s%v and known %v. Diff %v", client.GetNodeID(), + request.TypeUrl, request.ResourceNames, knownResourceNames, diff) + + if len(diff) > 0 { + resources := container.Resources + for _, name := range diff { + if _, exists := resources[name]; exists { + if err := sc.respond(context.Background(), request, value, resources, version, false); err != nil { + log.Errorf("failed to send a response for %s%v to nodeID %q: %s", request.TypeUrl, + request.ResourceNames, client.GetNodeID(), err) + return nil + } + return func() {} + } + } + } + } + + // if the requested version is up-to-date or missing a response, leave an open watch + if !exists || request.VersionInfo == version { + watchID := sc.nextWatchID() + log.Debugf("open watch %d for %s%v from nodeID %q, version %q", watchID, + request.TypeUrl, request.ResourceNames, client.GetNodeID(), request.VersionInfo) + info.mu.Lock() + info.watches[watchID] = cachev3.ResponseWatch{Request: request, Response: value} + info.mu.Unlock() + return sc.cancelWatch(client.GetNodeID(), watchID) + } + + // otherwise, the watch may be responded immediately + resources := container.Resources + if err := sc.respond(context.Background(), request, value, resources, version, false); err != nil { + log.Errorf("failed to send a response for %s%v to nodeID %q: %s", request.TypeUrl, + request.ResourceNames, client.GetNodeID(), err) + return nil + } + + return func() {} +} + +func (sc *ResourceCache) nextWatchID() int64 { + return atomic.AddInt64(&sc.watchCount, 1) +} + +// cancellation function for cleaning stale watches +func (sc *ResourceCache) cancelWatch(nodeID string, watchID int64) func() { + return func() { + // uses the cache mutex + sc.mu.RLock() + defer sc.mu.RUnlock() + for _, nsStatus := range sc.status { + if info, ok := nsStatus.status[nodeID]; ok { + info.mu.Lock() + delete(info.watches, watchID) + info.mu.Unlock() + } + } + } +} + +// Respond to a watch with the snapshot value. The value channel should have capacity not to block. +// TODO(kuat) do not respond always, see issue https://github.com/envoyproxy/go-control-plane/issues/46 +func (sc *ResourceCache) respond(ctx context.Context, request *cachev3.Request, + value chan cachev3.Response, + resources map[string]types.Resource, + version string, + heartbeat bool, +) error { + + // for ADS, the request names must match the snapshot names + // if they do not, then the watch is never responded, and it is expected that envoy makes another request + if len(request.ResourceNames) != 0 && sc.ads { + if err := superset(nameSet(request.ResourceNames), resources); err != nil { + log.Warnf("ADS mode: not responding to request: %v", err) + return nil + } + } + + log.Debugf("respond %s%v version %q with version %q", request.TypeUrl, request.ResourceNames, request.VersionInfo, version) + + select { + case value <- createResponse(ctx, request, resources, version, heartbeat): + return nil + case <-ctx.Done(): + return context.Canceled + } +} + +func createResponse(ctx context.Context, request *cachev3.Request, resources map[string]types.Resource, + version string, heartbeat bool) cachev3.Response { + filtered := make([]types.ResourceWithTTL, 0, len(resources)) + + // Reply only with the requested resources. Envoy may ask each resource + // individually in a separate stream. It is ok to reply with the same version + // on separate streams since requests do not share their response versions. + if len(request.ResourceNames) != 0 { + set := nameSet(request.ResourceNames) + for name, resource := range resources { + if set[name] { + filtered = append(filtered, types.ResourceWithTTL{Resource: resource}) + } + } + } else { + for _, resource := range resources { + filtered = append(filtered, types.ResourceWithTTL{Resource: resource}) + } + } + + return &cachev3.RawResponse{ + Request: request, + Version: version, + Resources: filtered, + Heartbeat: heartbeat, + Ctx: ctx, } - return item.CreateWatch(request, streamState, value) } -// CreateDeltaWatch returns a watch for a delta xDS request which implements the Simple SnapshotCache. -func (sc *XDSCache) CreateDeltaWatch(request *cachev3.DeltaRequest, state stream.StreamState, +// CreateDeltaWatch returns a watch for a delta xDS request which implements the Simple Snapshotsc. +func (sc *ResourceCache) CreateDeltaWatch(request *cachev3.DeltaRequest, state stream.StreamState, value chan cachev3.DeltaResponse) func() { + if sc.hook != nil { sc.hook.OnCreateDeltaWatch(request, state, value) } - item := sc.loadCache(request, state) - if item == nil { - value <- &NoReadyXdsResponse{} - return func() {} + + sc.mu.Lock() + defer sc.mu.Unlock() + + info, client := sc.loadWatchStatus(request.GetNode()) + + // update last watch request time + info.setLastDeltaWatchRequestTime(time.Now()) + + container, exists := sc.loadResourceContainer(client, resource.FormatTypeUrl(request.GetTypeUrl())) + + // There are three different cases that leads to a delayed watch trigger: + // - no snapshot exists for the requested nodeID + // - a snapshot exists, but we failed to initialize its version map + // - we attempted to issue a response, but the caller is already up to date + delayedResponse := !exists + if exists { + if err := container.ConstructVersionMap(nil); err != nil { + log.Errorf("failed to compute version for snapshot resources inline: %s", err) + } + response, err := sc.respondDelta(context.Background(), container, request, value, state) + if err != nil { + log.Errorf("failed to respond with delta response: %s", err) + } + + delayedResponse = response == nil + } + + if delayedResponse { + watchID := sc.nextDeltaWatchID() + + if exists { + log.Infof("open delta watch ID:%d for %s Resources:%v from nodeID: %q, version %q", watchID, request.GetTypeUrl(), + state.GetSubscribedResourceNames(), client.GetNodeID(), container.GlobalVersion) + } else { + log.Infof("open delta watch ID:%d for %s Resources:%v from nodeID: %q", watchID, request.GetTypeUrl(), + state.GetSubscribedResourceNames(), client.GetNodeID()) + } + + info.setDeltaResponseWatch(watchID, cachev3.DeltaResponseWatch{ + Request: request, + Response: value, + StreamState: state, + }) + return sc.cancelDeltaWatch(client.GetNodeID(), watchID) + } + + return nil +} + +// Respond to a delta watch with the provided snapshot value. If the response is nil, there has been no state change. +func (sc *ResourceCache) respondDelta(ctx context.Context, container *ResourcesContainer, + request *cachev3.DeltaRequest, + value chan cachev3.DeltaResponse, + state stream.StreamState, +) (*cachev3.RawDeltaResponse, error) { + + resp := createDeltaResponse(ctx, request, state, resourceContainer{ + resourceMap: container.Resources, + versionMap: container.VersionMap, + systemVersion: container.GlobalVersion, + }) + + // Only send a response if there were changes + // We want to respond immediately for the first wildcard request in a stream, even if the response is empty + // otherwise, envoy won't complete initialization + if len(resp.Resources) > 0 || len(resp.RemovedResources) > 0 || (state.IsWildcard() && state.IsFirst()) { + if log != nil { + log.Debugf("node: %s, sending delta response for typeURL %s with resources: "+ + " %v removed resources: %v with wildcard: %t", + request.GetNode().GetId(), request.TypeUrl, cachev3.GetResourceNames(resp.Resources), + resp.RemovedResources, state.IsWildcard()) + } + select { + case value <- resp: + return resp, nil + case <-ctx.Done(): + return resp, context.Canceled + } + } + return nil, nil +} + +func (sc *ResourceCache) nextDeltaWatchID() int64 { + return atomic.AddInt64(&sc.deltaWatchCount, 1) +} + +// cancellation function for cleaning stale delta watches +func (sc *ResourceCache) cancelDeltaWatch(nodeID string, watchID int64) func() { + return func() { + // uses the cache mutex + sc.mu.RLock() + defer sc.mu.RUnlock() + for _, nsStatus := range sc.status { + if info, ok := nsStatus.status[nodeID]; ok { + info.mu.Lock() + delete(info.deltaWatches, watchID) + info.mu.Unlock() + } + } } - return item.CreateDeltaWatch(request, state, value) } // Fetch implements the cache fetch function. // Fetch is called on multiple streams, so responding to individual names with the same version works. -func (sc *XDSCache) Fetch(ctx context.Context, request *cachev3.Request) (cachev3.Response, error) { - return nil, errors.New("not implemented") +func (sc *ResourceCache) Fetch(ctx context.Context, request *cachev3.Request) (cachev3.Response, error) { + + client := resource.ParseXDSClient(request.GetNode()) + + sc.mu.RLock() + defer sc.mu.RUnlock() + + container, exists := sc.loadResourceContainer(client, resource.FormatTypeUrl(request.GetTypeUrl())) + + if exists { + // Respond only if the request version is distinct from the current snapshot state. + // It might be beneficial to hold the request since Envoy will re-attempt the refresh. + version := container.GlobalVersion + if request.VersionInfo == version { + log.Warnf("skip fetch: version up to date") + return nil, &types.SkipFetchError{} + } + + out := createResponse(ctx, request, container.Resources, version, false) + return out, nil + } + + return nil, fmt.Errorf("missing snapshot for %q", client.GetNodeID()) } -// DeltaUpdateResource . -func (sc *XDSCache) DeltaUpdateResource(key, typeUrl string, current map[string]types.Resource) error { - val, _ := sc.Caches.ComputeIfAbsent(key, func(_ string) cachev3.Cache { - return cachev3.NewLinearCache(typeUrl, cachev3.WithLogger(log)) - }) - linearCache, _ := val.(*cachev3.LinearCache) - return linearCache.UpdateResources(current, []string{}) +func (sc *ResourceCache) loadWatchStatus(node *corev3.Node) (*statusInfo, *resource.XDSClient) { + client := resource.ParseXDSClient(node) + + namespaceContainer, ok := sc.status[client.GetSelfNamespace()] + if !ok { + namespaceContainer = newNamespaceStatusInfo(client.GetSelfNamespace()) + sc.status[client.GetSelfNamespace()] = namespaceContainer + } + if _, ok := namespaceContainer.status[client.GetNodeID()]; !ok { + namespaceContainer.status[client.GetNodeID()] = newStatusInfo(node) + } + + info := namespaceContainer.status[client.GetNodeID()] + return info, client } -// DeltaRemoveResource . -func (sc *XDSCache) DeltaRemoveResource(key, typeUrl string, current map[string]types.Resource) error { - val, _ := sc.Caches.ComputeIfAbsent(key, func(_ string) cachev3.Cache { - return cachev3.NewLinearCache(typeUrl, cachev3.WithLogger(log)) - }) - linearCache, _ := val.(*cachev3.LinearCache) - - waitRemove := make([]string, 0, len(current)) - for k := range current { - waitRemove = append(waitRemove, k) - } - return linearCache.UpdateResources(nil, waitRemove) -} - -func (sc *XDSCache) loadCache(req interface{}, streamState stream.StreamState) cachev3.Cache { - var ( - typeUrl string - client *resource.XDSClient - ) - switch args := req.(type) { - case *cachev3.Request: - client = resource.ParseXDSClient(args.GetNode()) - typeUrl = args.TypeUrl - case *cachev3.DeltaRequest: - client = resource.ParseXDSClient(args.GetNode()) - typeUrl = args.TypeUrl +func (sc *ResourceCache) loadResourceContainer(client *resource.XDSClient, watchType resource.XDSType) (*ResourcesContainer, bool) { + + namespaceContainer, ok := sc.namespaceContainer[client.GetSelfNamespace()] + if !ok { + log.Error("load resource container not found namespace", zap.String("id", client.GetNodeID()), + zap.String("namespace", client.GetSelfNamespace())) + return nil, false + } + + var container *ResourcesContainer + var exists bool + + switch watchType { + case resource.LDS: + // 获取到 Envoy Node 对应希望看到的 ldsRes 资源 + container, exists = sc.ldsResources[client.GetNodeID()] + case resource.CDS: + switch client.TLSMode { + case resource.TLSModeNone: + container, exists = namespaceContainer.resourcesContainer[watchType] + default: + container, exists = namespaceContainer.tlsResources[client.TLSMode][watchType] + } default: - log.Error("[XDS][V3] no support client request type", zap.Any("req", args)) - return nil + container, exists = namespaceContainer.resourcesContainer[watchType] } - cacheKey := BuildCacheKey(typeUrl, client.TLSMode, client) - val, ok := sc.Caches.Load(cacheKey) - if ok { - log.Info("[XDS][V3] load cache to handle client request", - zap.String("cache-key", cacheKey), zap.String("type", typeUrl), - zap.String("client", client.Node.GetId()), zap.Bool("wildcard", streamState.IsWildcard())) - return val + + if !exists { + log.Error("load resource container not found target type", zap.String("id", client.GetNodeID()), + zap.String("tls", string(client.TLSMode)), zap.String("type", watchType.String())) } - log.Error("[XDS][V3] cache not found to handle client request", zap.String("type", typeUrl), - zap.String("client", client.Node.GetId())) - return nil + + return container, exists +} + +// GetStatusKeys retrieves all node IDs in the status map. +func (sc *ResourceCache) GetStatusKeys() []string { + sc.mu.RLock() + defer sc.mu.RUnlock() + + out := make([]string, 0, len(sc.status)) + for id := range sc.status { + out = append(out, id) + } + + return out } -func BuildCacheKey(typeUrl string, tlsMode resource.TLSMode, client *resource.XDSClient) string { - xdsType := resource.FormatTypeUrl(typeUrl) - if xdsType == resource.LDS { - return typeUrl + "~" + client.GetNodeID() +// GetResources . +func (sc *ResourceCache) GetResources(typeUrl resource.XDSType, ns, nodeId string) map[string]types.Resource { + sc.mu.RLock() + defer sc.mu.RUnlock() + + var data *ResourcesContainer + if typeUrl == resource.LDS { + val, ok := sc.ldsResources[nodeId] + if !ok { + return map[string]types.Resource{} + } + data = val + } else { + if _, ok := sc.namespaceContainer[ns]; !ok { + return map[string]types.Resource{} + } + val, ok := sc.namespaceContainer[ns].resourcesContainer[typeUrl] + if !ok { + return make(map[string]types.Resource) + } + data = val } - key := typeUrl + "~" + client.GetSelfNamespace() - if resource.SupportTLS(xdsType) && resource.EnableTLS(tlsMode) { - key = key + "~" + string(tlsMode) + + copyData := make(map[string]types.Resource, len(data.Resources)) + for k, v := range data.Resources { + copyData[k] = v } - return key + return copyData } diff --git a/apiserver/xdsserverv3/cache/callback.go b/apiserver/xdsserverv3/cache/callback.go index 281b3105e..2bbe99bd8 100644 --- a/apiserver/xdsserverv3/cache/callback.go +++ b/apiserver/xdsserverv3/cache/callback.go @@ -27,7 +27,7 @@ import ( "github.com/polarismesh/polaris/apiserver/xdsserverv3/resource" ) -func NewCallback(cacheMgr *XDSCache, nodeMgr *resource.XDSNodeManager) *Callbacks { +func NewCallback(cacheMgr *ResourceCache, nodeMgr *resource.XDSNodeManager) *Callbacks { return &Callbacks{ cacheMgr: cacheMgr, nodeMgr: nodeMgr, @@ -35,7 +35,7 @@ func NewCallback(cacheMgr *XDSCache, nodeMgr *resource.XDSNodeManager) *Callback } type Callbacks struct { - cacheMgr *XDSCache + cacheMgr *ResourceCache nodeMgr *resource.XDSNodeManager } @@ -50,13 +50,13 @@ func (cb *Callbacks) OnDeltaStreamOpen(_ context.Context, id int64, typ string) func (cb *Callbacks) OnStreamClosed(id int64, node *corev3.Node) { cb.nodeMgr.DelNode(id) // 清理 cache - cb.cacheMgr.CleanEnvoyNodeCache(node) + _ = cb.cacheMgr.CleanEnvoyNodeCache(node) } func (cb *Callbacks) OnDeltaStreamClosed(id int64, node *corev3.Node) { cb.nodeMgr.DelNode(id) // 清理 cache - cb.cacheMgr.CleanEnvoyNodeCache(node) + _ = cb.cacheMgr.CleanEnvoyNodeCache(node) } func (cb *Callbacks) OnStreamRequest(id int64, req *discovery.DiscoveryRequest) error { diff --git a/apiserver/xdsserverv3/cache/response.go b/apiserver/xdsserverv3/cache/response.go index eec1c21b6..1c55baefb 100644 --- a/apiserver/xdsserverv3/cache/response.go +++ b/apiserver/xdsserverv3/cache/response.go @@ -18,12 +18,81 @@ package cache import ( + "context" "errors" discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + "github.com/envoyproxy/go-control-plane/pkg/cache/types" cachev3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3" + "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" ) +// groups together resource-related arguments for the createDeltaResponse function +type resourceContainer struct { + resourceMap map[string]types.Resource + versionMap map[string]string + systemVersion string +} + +func createDeltaResponse(ctx context.Context, req *cachev3.DeltaRequest, state stream.StreamState, resources resourceContainer) *cachev3.RawDeltaResponse { + // variables to build our response with + var nextVersionMap map[string]string + var filtered []types.Resource + var toRemove []string + + // If we are handling a wildcard request, we want to respond with all resources + switch { + case state.IsWildcard(): + if len(state.GetResourceVersions()) == 0 { + filtered = make([]types.Resource, 0, len(resources.resourceMap)) + } + nextVersionMap = make(map[string]string, len(resources.resourceMap)) + for name, r := range resources.resourceMap { + // Since we've already precomputed the version hashes of the new snapshot, + // we can just set it here to be used for comparison later + version := resources.versionMap[name] + nextVersionMap[name] = version + prevVersion, found := state.GetResourceVersions()[name] + if !found || (prevVersion != version) { + filtered = append(filtered, r) + } + } + + // Compute resources for removal + // The resource version can be set to "" here to trigger a removal even if never returned before + for name := range state.GetResourceVersions() { + if _, ok := resources.resourceMap[name]; !ok { + toRemove = append(toRemove, name) + } + } + default: + nextVersionMap = make(map[string]string, len(state.GetSubscribedResourceNames())) + // state.GetResourceVersions() may include resources no longer subscribed + // In the current code this gets silently cleaned when updating the version map + for name := range state.GetSubscribedResourceNames() { + prevVersion, found := state.GetResourceVersions()[name] + if r, ok := resources.resourceMap[name]; ok { + nextVersion := resources.versionMap[name] + if prevVersion != nextVersion { + filtered = append(filtered, r) + } + nextVersionMap[name] = nextVersion + } else if found { + toRemove = append(toRemove, name) + } + } + } + + return &cachev3.RawDeltaResponse{ + DeltaRequest: req, + Resources: filtered, + RemovedResources: toRemove, + NextVersionMap: nextVersionMap, + SystemVersionInfo: resources.systemVersion, + Ctx: ctx, + } +} + type NoReadyXdsResponse struct { cachev3.DeltaResponse } diff --git a/apiserver/xdsserverv3/cache/status.go b/apiserver/xdsserverv3/cache/status.go new file mode 100644 index 000000000..038bd6ca0 --- /dev/null +++ b/apiserver/xdsserverv3/cache/status.go @@ -0,0 +1,96 @@ +// Copyright 2018 Envoyproxy Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cache + +import ( + "sync" + "time" + + core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + cachev3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3" + + "github.com/polarismesh/polaris/apiserver/xdsserverv3/resource" +) + +// statusInfo tracks the server state for the remote Envoy node. +type statusInfo struct { + // node is the constant Envoy node metadata. + client *resource.XDSClient + // watches are indexed channels for the response watches and the original requests. + watches map[int64]cachev3.ResponseWatch + // deltaWatches are indexed channels for the delta response watches and the original requests + deltaWatches map[int64]cachev3.DeltaResponseWatch + // the timestamp of the last watch request + lastWatchRequestTime time.Time + // the timestamp of the last delta watch request + lastDeltaWatchRequestTime time.Time + // mutex to protect the status fields. + // should not acquire mutex of the parent cache after acquiring this mutex. + mu sync.RWMutex +} + +// newStatusInfo initializes a status info data structure. +func newStatusInfo(node *core.Node) *statusInfo { + out := statusInfo{ + client: resource.ParseXDSClient(node), + watches: make(map[int64]cachev3.ResponseWatch), + deltaWatches: make(map[int64]cachev3.DeltaResponseWatch), + } + return &out +} + +func (info *statusInfo) GetNode() *core.Node { + info.mu.RLock() + defer info.mu.RUnlock() + return info.GetNode() +} + +func (info *statusInfo) GetNumWatches() int { + info.mu.RLock() + defer info.mu.RUnlock() + return len(info.watches) +} + +func (info *statusInfo) GetNumDeltaWatches() int { + info.mu.RLock() + defer info.mu.RUnlock() + return len(info.deltaWatches) +} + +func (info *statusInfo) GetLastWatchRequestTime() time.Time { + info.mu.RLock() + defer info.mu.RUnlock() + return info.lastWatchRequestTime +} + +func (info *statusInfo) GetLastDeltaWatchRequestTime() time.Time { + info.mu.RLock() + defer info.mu.RUnlock() + return info.lastDeltaWatchRequestTime +} + +// setLastDeltaWatchRequestTime will set the current time of the last delta discovery watch request. +func (info *statusInfo) setLastDeltaWatchRequestTime(t time.Time) { + info.mu.Lock() + defer info.mu.Unlock() + info.lastDeltaWatchRequestTime = t +} + +// setDeltaResponseWatch will set the provided delta response watch for the associated watch ID. +func (info *statusInfo) setDeltaResponseWatch(id int64, drw cachev3.DeltaResponseWatch) { + info.mu.Lock() + defer info.mu.Unlock() + info.deltaWatches[id] = drw +} diff --git a/apiserver/xdsserverv3/cds.go b/apiserver/xdsserverv3/cds.go index 8e8aed8a7..470c4b0b7 100644 --- a/apiserver/xdsserverv3/cds.go +++ b/apiserver/xdsserverv3/cds.go @@ -33,7 +33,6 @@ import ( "google.golang.org/protobuf/types/known/structpb" "github.com/polarismesh/polaris/apiserver/xdsserverv3/resource" - "github.com/polarismesh/polaris/common/model" "github.com/polarismesh/polaris/service" ) @@ -77,32 +76,9 @@ func (cds *CDSBuilder) GenerateByDirection(option *resource.BuildOption, direction corev3.TrafficDirection) ([]types.Resource, error) { var clusters []types.Resource - selfServiceKey := option.SelfService - - ignore := func(svcKey model.ServiceKey) bool { - // 如果是 INBOUND 场景,只需要下发 XDS Sidecar Node 所归属的服务 INBOUND Cluster 规则 - isGateway := option.RunType == resource.RunTypeGateway - if direction == core.TrafficDirection_INBOUND { - if isGateway { - return true - } - if !isGateway && !selfServiceKey.Equal(&svcKey) { - return true - } - } - // 如果是网关,则自己的数据不会下发 - if isGateway && selfServiceKey.Equal(&svcKey) { - return true - } - return false - } - services := option.Services // 每一个 polaris service 对应一个 envoy cluster - for svcKey, svc := range services { - if ignore(svcKey) { - continue - } + for _, svc := range services { c := cds.makeCluster(svc, direction, option) switch option.TLSMode { case resource.TLSModePermissive: diff --git a/apiserver/xdsserverv3/debug.go b/apiserver/xdsserverv3/debug.go index c50068848..d0986c9c9 100644 --- a/apiserver/xdsserverv3/debug.go +++ b/apiserver/xdsserverv3/debug.go @@ -21,7 +21,7 @@ import ( "net/http" "strings" - cachev3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3" + "github.com/envoyproxy/go-control-plane/pkg/cache/types" apimodel "github.com/polarismesh/specification/source/go/api/v1/model" "github.com/polarismesh/polaris/apiserver/xdsserverv3/resource" @@ -41,51 +41,32 @@ func (x *XDSServer) listXDSNodes(resp http.ResponseWriter, req *http.Request) { _, _ = resp.Write([]byte(ret)) } -func (x *XDSServer) listXDSResources(resp http.ResponseWriter, req *http.Request) { +func (x *XDSServer) listXDSResource(resp http.ResponseWriter, req *http.Request) { cType := req.URL.Query().Get("type") + nodeId := req.URL.Query().Get("nodeId") + service := req.URL.Query().Get("service") + namespace := req.URL.Query().Get("namespace") + if namespace == "" { + namespace = "default" + } - resources := map[string]interface{}{} - x.cache.Caches.ReadRange(func(key string, val cachev3.Cache) { - linearCache := val.(*cachev3.LinearCache) - - if cType == "node" { - if strings.Contains(key, resource.LDS.ResourceType()) { - resources[key] = map[string]interface{}{ - "resources": linearCache.GetResources(), - } - } - } else { - if !strings.Contains(key, resource.LDS.ResourceType()) { - resources[key] = map[string]interface{}{ - "resources": linearCache.GetResources(), - } + res := x.cache.GetResources(resource.FromSimpleXDS(cType), namespace, nodeId) + if len(service) != 0 { + copyData := make(map[string]types.Resource, len(res)) + hasSvc := len(service) != 0 + for k, v := range res { + if hasSvc && !strings.Contains(k, service) { + continue } + copyData[k] = v } - }) - - data := map[string]interface{}{ - "code": apimodel.Code_ExecuteSuccess, - "info": "execute success", - "data": resources, - "count": len(resources), + res = copyData } - ret := utils.MustJson(data) - resp.WriteHeader(http.StatusOK) - _, _ = resp.Write([]byte(ret)) -} - -func (x *XDSServer) listXDSCaches(resp http.ResponseWriter, req *http.Request) { - resources := []string{} - x.cache.Caches.ReadRange(func(key string, val cachev3.Cache) { - resources = append(resources, key) - }) - data := map[string]interface{}{ - "code": apimodel.Code_ExecuteSuccess, - "info": "execute success", - "data": resources, - "count": len(resources), + "code": apimodel.Code_ExecuteSuccess, + "info": "execute success", + "data": res, } ret := utils.MustJson(data) diff --git a/apiserver/xdsserverv3/eds.go b/apiserver/xdsserverv3/eds.go index 97bce79d2..f6a55ffd5 100644 --- a/apiserver/xdsserverv3/eds.go +++ b/apiserver/xdsserverv3/eds.go @@ -42,19 +42,14 @@ func (eds *EDSBuilder) Init(svr service.DiscoverServer) { func (eds *EDSBuilder) Generate(option *resource.BuildOption) (interface{}, error) { var resources []types.Resource - switch option.RunType { - case resource.RunTypeGateway: - resources = append(resources, eds.makeBoundEndpoints(option, core.TrafficDirection_OUTBOUND)...) - case resource.RunTypeSidecar: - // sidecar 场景,如果流量方向是 envoy -> 业务 POD,那么 endpoint 只能是 本地 127.0.0.1 - switch option.TrafficDirection { - case core.TrafficDirection_INBOUND: - inBoundEndpoints := eds.makeSelfEndpoint(option) - resources = append(resources, inBoundEndpoints...) - case core.TrafficDirection_OUTBOUND: - outBoundEndpoints := eds.makeBoundEndpoints(option, core.TrafficDirection_OUTBOUND) - resources = append(resources, outBoundEndpoints...) - } + // sidecar 场景,如果流量方向是 envoy -> 业务 POD,那么 endpoint 只能是 本地 127.0.0.1 + switch option.TrafficDirection { + case core.TrafficDirection_INBOUND: + inBoundEndpoints := eds.makeSelfEndpoint(option) + resources = append(resources, inBoundEndpoints...) + case core.TrafficDirection_OUTBOUND: + outBoundEndpoints := eds.makeBoundEndpoints(option, core.TrafficDirection_OUTBOUND) + resources = append(resources, outBoundEndpoints...) } return resources, nil } @@ -63,15 +58,8 @@ func (eds *EDSBuilder) makeBoundEndpoints(option *resource.BuildOption, direction corev3.TrafficDirection) []types.Resource { services := option.Services - selfServiceKey := option.SelfService - isGateway := option.RunType == resource.RunTypeGateway - var clusterLoads []types.Resource for svcKey, serviceInfo := range services { - if isGateway && selfServiceKey.Equal(&svcKey) { - continue - } - var lbEndpoints []*endpoint.LocalityLbEndpoints if !option.ForceDelete { lbEndpoints = eds.buildServiceEndpoint(serviceInfo) diff --git a/apiserver/xdsserverv3/generate.go b/apiserver/xdsserverv3/generate.go index 8b5da8d27..be4d4e729 100644 --- a/apiserver/xdsserverv3/generate.go +++ b/apiserver/xdsserverv3/generate.go @@ -18,6 +18,7 @@ package xdsserverv3 import ( + "context" "errors" "strconv" "sync" @@ -42,26 +43,48 @@ var ( ) type ( - ServiceInfos map[string]map[model.ServiceKey]*resource.ServiceInfo - XDSGenerate func(xdsType resource.XDSType, opt *resource.BuildOption) + ServiceInfos map[string]map[model.ServiceKey]*resource.ServiceInfo + CurrentServiceInfoProvider func() ServiceInfos ) // XdsResourceGenerator is the xDS resource generator type XdsResourceGenerator struct { - namingServer service.DiscoverServer - cache *cache.XDSCache - versionNum *atomic.Uint64 - xdsNodesMgr *resource.XDSNodeManager + namingServer service.DiscoverServer + cache *cache.ResourceCache + versionNum *atomic.Uint64 + xdsNodesMgr *resource.XDSNodeManager + svcInfoProvider CurrentServiceInfoProvider } -func (x *XdsResourceGenerator) Generate(versionLocal string, - needUpdate, needRemove ServiceInfos) { +// Generate 构建 XDS 资源缓存数据信息 +func (x *XdsResourceGenerator) Generate(versionLocal string, needUpdate, needRemove ServiceInfos) { + updateRequest := cache.NewUpdateResourcesRequest() - deltaOp := func(runType resource.RunType, infos ServiceInfos, f XDSGenerate) { + deltaOp := func(runType resource.RunType, infos ServiceInfos, isRemove bool) { direction := corev3.TrafficDirection_OUTBOUND if runType == resource.RunTypeGateway { direction = corev3.TrafficDirection_INBOUND } + generate := func(opt *resource.BuildOption) { + opt.CloseEnvoyDemand() + opt.TLSMode = resource.TLSModeNone + // 默认构建没有设置 TLS 的 CDS 资源 + x.buildUpdateRequest(updateRequest, resource.CDS, opt, isRemove) + // 构建设置了 TLS Mode == Strict 的 CDS 资源 + opt.TLSMode = resource.TLSModeStrict + x.buildUpdateRequest(updateRequest, resource.CDS, opt, isRemove) + // 构建设置了 TLS Mode == Permissive 的 CDS 资源 + opt.TLSMode = resource.TLSModePermissive + x.buildUpdateRequest(updateRequest, resource.CDS, opt, isRemove) + // 恢复 TLSMode + opt.TLSMode = resource.TLSModeNone + x.buildUpdateRequest(updateRequest, resource.EDS, opt, isRemove) + x.buildUpdateRequest(updateRequest, resource.RDS, opt, isRemove) + // 开启按需 Demand + opt.OpenEnvoyDemand() + x.buildUpdateRequest(updateRequest, resource.RDS, opt, isRemove) + } + // CDS/EDS/VHDS 一起构建 for namespace, services := range infos { opt := &resource.BuildOption{ @@ -69,44 +92,19 @@ func (x *XdsResourceGenerator) Generate(versionLocal string, Namespace: namespace, Services: services, TrafficDirection: direction, - TLSMode: resource.TLSModeNone, } - f(resource.RDS, opt) - f(resource.EDS, opt) - f(resource.VHDS, opt) - - // 默认构建没有设置 TLS 的 CDS 资源 - f(resource.CDS, opt) - // 构建设置了 TLS Mode == Strict 的 CDS 资源 - opt.TLSMode = resource.TLSModeStrict - f(resource.CDS, opt) - // 构建设置了 TLS Mode == Permissive 的 CDS 资源 - opt.TLSMode = resource.TLSModePermissive - f(resource.CDS, opt) - // 构建支持按需加载 - opt.OpenOnDemand = true - f(resource.RDS, opt) + // sidecar 和 gateway 大部份资源都是复用的,所以这里只需要构建一次即可,gateway 只有 RDS/LDS 存在特别,单独针对构建即可 + if runType == resource.RunTypeSidecar { + generate(opt) + x.buildUpdateRequest(updateRequest, resource.VHDS, opt, isRemove) + } if runType == resource.RunTypeSidecar { for svcKey := range services { - opt.OpenOnDemand = false // 换成 INBOUND 构建 CDS、EDS、RDS opt.SelfService = svcKey opt.TrafficDirection = corev3.TrafficDirection_INBOUND - opt.TLSMode = resource.TLSModeNone - f(resource.EDS, opt) - f(resource.RDS, opt) - // 默认构建没有设置 TLS 的 CDS 资源 - f(resource.CDS, opt) - // 构建设置了 TLS Mode == Strict 的 CDS 资源 - opt.TLSMode = resource.TLSModeStrict - f(resource.CDS, opt) - // 构建设置了 TLS Mode == Permissive 的 CDS 资源 - opt.TLSMode = resource.TLSModePermissive - f(resource.CDS, opt) - // 构建支持按需加载 - opt.OpenOnDemand = true - f(resource.RDS, opt) + generate(opt) } } } @@ -116,89 +114,87 @@ func (x *XdsResourceGenerator) Generate(versionLocal string, wg.Add(2) go func() { defer wg.Done() - // 处理 Sideacr - deltaOp(resource.RunTypeSidecar, needUpdate, x.buildAndDeltaUpdate) - deltaOp(resource.RunTypeSidecar, needRemove, x.buildAndDeltaRemove) + deltaOp(resource.RunTypeSidecar, needUpdate, false) + deltaOp(resource.RunTypeSidecar, needRemove, true) }() go func() { defer wg.Done() - // 处理 Gateway - deltaOp(resource.RunTypeGateway, needUpdate, x.buildAndDeltaUpdate) - deltaOp(resource.RunTypeGateway, needRemove, x.buildAndDeltaRemove) + deltaOp(resource.RunTypeGateway, needUpdate, false) + deltaOp(resource.RunTypeGateway, needRemove, true) }() wg.Wait() + + if err := x.cache.UpdateResources(context.Background(), updateRequest); err != nil { + log.Error("[XDS][Envoy] update xds resource fail", zap.Error(err)) + } } func (x *XdsResourceGenerator) buildOneEnvoyXDSCache(node *resource.XDSClient) error { opt := &resource.BuildOption{ - RunType: node.RunType, - Client: node, - TLSMode: node.TLSMode, - Namespace: node.GetSelfNamespace(), - OpenOnDemand: node.OpenOnDemand, + RunType: node.RunType, + Client: node, + TLSMode: node.TLSMode, + Namespace: node.GetSelfNamespace(), SelfService: model.ServiceKey{ Namespace: node.GetSelfNamespace(), Name: node.GetSelfService(), }, } + if node.OpenOnDemand { + opt.OpenEnvoyDemand() + } + + finalResources := make([]types.Resource, 0, 4) + buildCache := func(xdsType resource.XDSType, opt *resource.BuildOption) { + xxds, err := x.generateXDSResource(xdsType, opt) + if err != nil { + log.Error("[XDS][Envoy] generate envoy node resource fail", zap.Error(err)) + return + } + finalResources = append(finalResources, xxds...) + } opt.TrafficDirection = corev3.TrafficDirection_OUTBOUND // 构建 OUTBOUND LDS 资源 - x.buildAndDeltaUpdate(resource.LDS, opt) + buildCache(resource.LDS, opt) opt.TrafficDirection = corev3.TrafficDirection_INBOUND // 构建 INBOUND LDS 资源 - x.buildAndDeltaUpdate(resource.LDS, opt) - return nil -} - -func (x *XdsResourceGenerator) buildAndDeltaRemove(xdsType resource.XDSType, opt *resource.BuildOption) { - opt.ForceDelete = true - xxds, err := x.generateXDSResource(xdsType, opt) - if err != nil { - log.Error("[XDS][Envoy] generate xds resource fail", zap.String("type", xdsType.String()), zap.Error(err)) - return - } + buildCache(resource.LDS, opt) - typeUrl := xdsType.ResourceType() - client := opt.Client - if client == nil { - client = &resource.XDSClient{ - TLSMode: opt.TLSMode, - Namespace: opt.Namespace, - } - } - cacheKey := cache.BuildCacheKey(typeUrl, opt.TLSMode, client) - if err := x.cache.DeltaRemoveResource(cacheKey, typeUrl, cachev3.IndexRawResourcesByName(xxds)); err != nil { - log.Error("[XDS][Envoy] delta update fail", zap.String("cache-key", cacheKey), - zap.String("type", xdsType.String()), zap.Error(err)) - return - } + return x.cache.UpdateResources(context.Background(), &cache.UpdateResourcesRequest{ + Lds: map[string]map[string]types.Resource{ + opt.Client.ID: cachev3.IndexRawResourcesByName(finalResources), + }, + }) } -func (x *XdsResourceGenerator) buildAndDeltaUpdate(xdsType resource.XDSType, opt *resource.BuildOption) { - opt.ForceDelete = false +func (x *XdsResourceGenerator) buildUpdateRequest(req *cache.UpdateResourcesRequest, xdsType resource.XDSType, + opt *resource.BuildOption, isRemove bool) { + + opt.ForceDelete = isRemove xxds, err := x.generateXDSResource(xdsType, opt) if err != nil { - log.Error("[XDS][Envoy] generate xds resource fail", zap.String("type", xdsType.String()), zap.Error(err)) + log.Error("[XDS][Envoy] generate xds resource fail", zap.Error(err)) return } - typeUrl := xdsType.ResourceType() - client := opt.Client - if client == nil { - client = &resource.XDSClient{ - TLSMode: opt.TLSMode, - Namespace: opt.Namespace, + switch opt.TLSMode { + case resource.TLSModeNone: + if opt.ForceDelete { + req.RemoveNormalNamespaces(opt.Namespace, opt.TLSMode, xdsType, xxds) + } else { + req.AddNormalNamespaces(opt.Namespace, xdsType, xxds) + } + default: + if opt.ForceDelete { + req.RemoveTlsNamespaces(opt.Namespace, opt.TLSMode, xdsType, xxds) + } else { + req.AddTlsNamespaces(opt.Namespace, opt.TLSMode, xdsType, xxds) } - } - cacheKey := cache.BuildCacheKey(typeUrl, opt.TLSMode, client) - if err = x.cache.DeltaUpdateResource(cacheKey, typeUrl, cachev3.IndexRawResourcesByName(xxds)); err != nil { - log.Error("[XDS][Envoy] delta update fail", zap.String("cache-key", cacheKey), - zap.String("type", xdsType.String()), zap.Error(err)) } } diff --git a/apiserver/xdsserverv3/lds.go b/apiserver/xdsserverv3/lds.go index 6caaab399..5ddab0767 100644 --- a/apiserver/xdsserverv3/lds.go +++ b/apiserver/xdsserverv3/lds.go @@ -118,7 +118,7 @@ func (lds *LDSBuilder) makeListener(option *resource.BuildOption, if isGateway { boundHCM = resource.MakeGatewayBoundHCM(selfService, option) } else { - if option.OpenOnDemand && direction == core.TrafficDirection_OUTBOUND { + if option.IsDemand() && direction == core.TrafficDirection_OUTBOUND { boundHCM = resource.MakeSidecarOnDemandOutBoundHCM(selfService, option) } else { boundHCM = resource.MakeSidecarBoundHCM(selfService, direction, option) diff --git a/apiserver/xdsserverv3/rds.go b/apiserver/xdsserverv3/rds.go index 69cffb3ff..fa33c5e05 100644 --- a/apiserver/xdsserverv3/rds.go +++ b/apiserver/xdsserverv3/rds.go @@ -77,7 +77,7 @@ func (rds *RDSBuilder) makeSidecarInBoundRouteConfiguration(option *resource.Bui return []types.Resource{} } routeConf := &route.RouteConfiguration{ - Name: resource.MakeInBoundRouteConfigName(selfService, option.OpenOnDemand), + Name: resource.MakeInBoundRouteConfigName(selfService, option.IsDemand()), ValidateClusters: wrapperspb.Bool(false), } @@ -115,8 +115,8 @@ func (rds *RDSBuilder) makeSidecarOutBoundRouteConfiguration(option *resource.Bu } } hosts = append(hosts, resource.BuildAllowAnyVHost()) - if option.OpenOnDemand { - baseRouteName = fmt.Sprintf("%s|%s|DEMAND", resource.OutBoundRouteConfigName, option.Namespace) + if option.IsDemand() { + baseRouteName = fmt.Sprintf("%s|%s|demand", resource.OutBoundRouteConfigName, option.Namespace) // routeConfiguration.Vhds = &route.Vhds{ // ConfigSource: &corev3.ConfigSource{ // ConfigSourceSpecifier: &corev3.ConfigSource_ApiConfigSource{ @@ -167,7 +167,7 @@ func (rds *RDSBuilder) makeSidecarInBoundRoutes(selfService model.ServiceKey, limits, typedPerFilterConfig, err := resource.MakeSidecarLocalRateLimit(seacher, selfService) if err == nil { currentRoute.TypedPerFilterConfig = typedPerFilterConfig - if opt.OpenOnDemand { + if opt.IsDemand() { currentRoute.TypedPerFilterConfig[resource.EnvoyHttpFilter_OnDemand] = resource.BuildOnDemandRouteTypedPerFilterConfig() } @@ -201,7 +201,7 @@ func (rds *RDSBuilder) makeGatewayRouteConfiguration(option *resource.BuildOptio } hosts = append(hosts, vHost) routeConfiguration := &route.RouteConfiguration{ - Name: resource.OutBoundRouteConfigName, + Name: resource.OutBoundRouteConfigName + "-gateway", VirtualHosts: hosts, } return append(routeConfs, routeConfiguration), nil diff --git a/apiserver/xdsserverv3/resource/api.go b/apiserver/xdsserverv3/resource/api.go index bd5fb8464..fd3649a9f 100644 --- a/apiserver/xdsserverv3/resource/api.go +++ b/apiserver/xdsserverv3/resource/api.go @@ -33,19 +33,34 @@ type XDSBuilder interface { } type BuildOption struct { - RunType RunType - Namespace string - TLSMode TLSMode - Services map[model.ServiceKey]*ServiceInfo - OpenOnDemand bool - SelfService model.ServiceKey - // 不是比带,只有在 EDS 生成,并且是处理 INBOUND 的时候才会设置 - Client *XDSClient + RunType RunType + Namespace string + TLSMode TLSMode + Services map[model.ServiceKey]*ServiceInfo + // openOnDemand 是否开启按需能力,该能力只能在 RunSidecar 场景下才能生效 + openOnDemand bool + // SelfService 当前服务信息,只有处理 INBOUND 级别的信息才能设置 + SelfService model.ServiceKey + // 不是必须,只有在 EDS 生成,并且是处理 INBOUND 的时候才会设置 + Client *XDSClient + // TrafficDirection 设置流量的出入方向,INBOUND|OUTBOUND TrafficDirection corev3.TrafficDirection // ForceDelete 如果设置了该字段值为 true, 则不会真正执行 XDS 的构建工作, 仅仅生成对应资源的 Name 名称用于清理 ForceDelete bool } +func (opt *BuildOption) CloseEnvoyDemand() { + opt.openOnDemand = false +} + +func (opt *BuildOption) OpenEnvoyDemand() { + opt.openOnDemand = true +} + +func (opt *BuildOption) IsDemand() bool { + return opt.RunType == RunTypeSidecar && opt.openOnDemand +} + func (opt *BuildOption) HasTls() bool { return opt.TLSMode == TLSModeStrict || opt.TLSMode == TLSModePermissive } diff --git a/apiserver/xdsserverv3/resource/help.go b/apiserver/xdsserverv3/resource/help.go index 9d78bc498..d3d36c4f6 100644 --- a/apiserver/xdsserverv3/resource/help.go +++ b/apiserver/xdsserverv3/resource/help.go @@ -590,7 +590,7 @@ func MakeDefaultRoute(trafficDirection corev3.TrafficDirection, svcKey model.Ser }, }, } - if opt.OpenOnDemand { + if opt.IsDemand() { routeConf.TypedPerFilterConfig = map[string]*anypb.Any{ EnvoyHttpFilter_OnDemand: BuildOnDemandRouteTypedPerFilterConfig(), } @@ -614,7 +614,7 @@ func MakeSidecarRoute(trafficDirection corev3.TrafficDirection, routeMatch *rout }, }, } - if opt.OpenOnDemand { + if opt.IsDemand() { currentRoute.TypedPerFilterConfig = map[string]*anypb.Any{ EnvoyHttpFilter_OnDemand: BuildOnDemandRouteTypedPerFilterConfig(), } @@ -674,7 +674,7 @@ func MakeInBoundRouteConfigName(svcKey model.ServiceKey, demand bool) string { // MakeServiceName . func MakeServiceName(svcKey model.ServiceKey, trafficDirection corev3.TrafficDirection, opt *BuildOption) string { - if trafficDirection == core.TrafficDirection_INBOUND || !opt.OpenOnDemand { + if trafficDirection == core.TrafficDirection_INBOUND || !opt.IsDemand() { return fmt.Sprintf("%s|%s|%s", corev3.TrafficDirection_name[int32(trafficDirection)], svcKey.Namespace, svcKey.Name) } @@ -787,7 +787,7 @@ func MakeSidecarBoundHCM(svcKey model.ServiceKey, trafficDirection corev3.Traffi if trafficDirection == corev3.TrafficDirection_INBOUND { hcmFilters = append(makeRateLimitHCMFilter(svcKey), hcmFilters...) } - if opt.OpenOnDemand { + if opt.IsDemand() { hcmFilters = append([]*hcm.HttpFilter{ { Name: EnvoyHttpFilter_OnDemand, @@ -811,7 +811,7 @@ func MakeSidecarBoundHCM(svcKey model.ServiceKey, trafficDirection corev3.Traffi // 重写 RouteSpecifier 的路由规则数据信息 if trafficDirection == core.TrafficDirection_INBOUND { - manager.GetRds().RouteConfigName = MakeInBoundRouteConfigName(svcKey, opt.OpenOnDemand) + manager.GetRds().RouteConfigName = MakeInBoundRouteConfigName(svcKey, opt.IsDemand()) } return manager @@ -839,8 +839,12 @@ func MakeGatewayBoundHCM(svcKey model.ServiceKey, opt *BuildOption) *hcm.HttpCon func routeSpecifier(trafficDirection corev3.TrafficDirection, opt *BuildOption) *hcm.HttpConnectionManager_Rds { baseRouteName := TrafficBoundRoute[trafficDirection] - if opt.OpenOnDemand { - baseRouteName = fmt.Sprintf("%s|%s|DEMAND", TrafficBoundRoute[trafficDirection], opt.Namespace) + if opt.IsDemand() { + baseRouteName = fmt.Sprintf("%s|%s|demand", TrafficBoundRoute[trafficDirection], + opt.Namespace) + } + if opt.RunType == RunTypeGateway { + baseRouteName += "-gateway" } return &hcm.HttpConnectionManager_Rds{ Rds: &hcm.Rds{ diff --git a/apiserver/xdsserverv3/resource/model.go b/apiserver/xdsserverv3/resource/model.go index 198854b82..838898d79 100644 --- a/apiserver/xdsserverv3/resource/model.go +++ b/apiserver/xdsserverv3/resource/model.go @@ -18,6 +18,8 @@ package resource import ( + "strings" + corev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" resourcev3 "github.com/envoyproxy/go-control-plane/pkg/resource/v3" "github.com/polarismesh/specification/source/go/api/v1/fault_tolerance" @@ -86,6 +88,26 @@ const ( UnknownXDS ) +func FromSimpleXDS(s string) XDSType { + s = strings.ToLower(s) + switch s { + case "cds": + return CDS + case "eds": + return EDS + case "rds": + return RDS + case "lds": + return LDS + case "rls": + return RLS + case "vhds": + return VHDS + default: + return UnknownXDS + } +} + func FormatTypeUrl(typeUrl string) XDSType { switch typeUrl { case resourcev3.ListenerType: diff --git a/apiserver/xdsserverv3/resource/node.go b/apiserver/xdsserverv3/resource/node.go index 8283f8ee2..c77b55b0d 100644 --- a/apiserver/xdsserverv3/resource/node.go +++ b/apiserver/xdsserverv3/resource/node.go @@ -66,6 +66,8 @@ const ( SidecarTLSModeTag = "sidecar.polarismesh.cn/tlsMode" // SidecarOpenOnDemandFeature . SidecarOpenOnDemandFeature = "sidecar.polarismesh.cn/openOnDemand" + // SidecarOpenOnDemandServer . + SidecarOpenOnDemandServer = "sidecar.polarismesh.cn/demandServer" ) type EnvoyNodeView struct { @@ -219,61 +221,6 @@ func (x *XDSNodeManager) ListEnvoyNodesView(run RunType) []*EnvoyNodeView { return ret } -// ID id 的格式是 ${sidecar|gateway}~namespace/uuid~hostIp -// case 1: envoy 为 sidecar 模式时,则 NodeID 的格式为以下两种 -// -// eg 1. namespace/uuid~hostIp -// eg 2. sidecar~namespace/uuid-hostIp -// eg 3. envoy_node_id="${NAMESPACE}/${INSTANCE_IP}~${POD_NAME}" -// -// case 2: envoy 为 gateway 模式时,则 NodeID 的格式为: gateway~namespace/uuid~hostIp -func (PolarisNodeHash) ID(node *core.Node) string { - if node == nil { - return "" - } - - runType, ns, _, _ := ParseNodeID(node.Id) - if node.Metadata == nil || len(node.Metadata.Fields) == 0 { - return ns - } - - // Gateway 类型直接按照 gateway_service 以及 gateway_namespace 纬度 - if runType != string(RunTypeSidecar) { - gatewayNamespace := node.Metadata.Fields[GatewayNamespaceName].GetStringValue() - gatewayService := node.Metadata.Fields[GatewayServiceName].GetStringValue() - // 兼容老的 envoy gateway metadata 参数设置 - if gatewayNamespace == "" { - gatewayNamespace = node.Metadata.Fields[OldGatewayNamespaceName].GetStringValue() - } - if gatewayService == "" { - gatewayService = node.Metadata.Fields[OldGatewayServiceName].GetStringValue() - } - if gatewayNamespace == "" { - gatewayNamespace = ns - } - return strings.Join([]string{runType, gatewayNamespace, gatewayService}, "/") - } - // 兼容老版本注入的 envoy, 默认获取 snapshot resource 粒度为 namespace 级别, 只能下发 OUTBOUND 规则 - ret := ns - - // 判断是否存在 sidecar_namespace 以及 sidecar_service - if node.Metadata != nil && node.Metadata.Fields != nil { - sidecarNamespace, _ := getEnvoyMetaField(node.Metadata, SidecarNamespaceName, "") - sidecarService, _ := getEnvoyMetaField(node.Metadata, SidecarServiceName, "") - // 如果存在, 则表示是由新版本 controller 注入的 envoy, 可以下发 INBOUND 规则 - if sidecarNamespace != "" && sidecarService != "" { - ret = runType + "/" + sidecarNamespace + "/" + sidecarService - } - - // 在判断是否设置了 TLS 相关参数 - tlsMode := node.Metadata.Fields[TLSModeTag].GetStringValue() - if tlsMode == string(TLSModePermissive) || tlsMode == string(TLSModeStrict) { - ret = ret + "/" + tlsMode - } - } - return ret -} - // PolarisNodeHash 存放 hash 方法 type PolarisNodeHash struct { NodeMgr *XDSNodeManager @@ -325,6 +272,7 @@ type XDSClient struct { Node *core.Node TLSMode TLSMode OpenOnDemand bool + DemandServer string } func (n *XDSClient) toView() *EnvoyNodeView { diff --git a/apiserver/xdsserverv3/server.go b/apiserver/xdsserverv3/server.go index 381cec559..48437d922 100644 --- a/apiserver/xdsserverv3/server.go +++ b/apiserver/xdsserverv3/server.go @@ -32,6 +32,7 @@ import ( routeservice "github.com/envoyproxy/go-control-plane/envoy/service/route/v3" runtimeservice "github.com/envoyproxy/go-control-plane/envoy/service/runtime/v3" secretservice "github.com/envoyproxy/go-control-plane/envoy/service/secret/v3" + "github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3" serverv3 "github.com/envoyproxy/go-control-plane/pkg/server/v3" apiservice "github.com/polarismesh/specification/source/go/api/v1/service_manage" "go.uber.org/atomic" @@ -52,7 +53,7 @@ import ( ) type ResourceServer interface { - Generate(versionLocal string, registryInfo map[string]map[model.ServiceKey]*resource.ServiceInfo) + Generate(versionLocal string, registryInfo ServiceInfos) } // XDSServer is the xDS server @@ -65,13 +66,13 @@ type XDSServer struct { exitCh chan struct{} namingServer service.DiscoverServer healthSvr *healthcheck.Server - cache *xdscache.XDSCache + cache *xdscache.ResourceCache versionNum *atomic.Uint64 server *grpc.Server connLimitConfig *connlimit.Config nodeMgr *resource.XDSNodeManager - registryInfo map[string]map[model.ServiceKey]*resource.ServiceInfo + registryInfo *utils.AtomicValue[ServiceInfos] resourceGenerator *XdsResourceGenerator active *atomic.Bool @@ -84,11 +85,11 @@ type XDSServer struct { // Initialize 初始化 func (x *XDSServer) Initialize(ctx context.Context, option map[string]interface{}, apiConf map[string]apiserver.APIConfig) error { - x.registryInfo = make(map[string]map[model.ServiceKey]*resource.ServiceInfo) + x.registryInfo = utils.NewAtomicValue[ServiceInfos](ServiceInfos{}) x.listenPort = uint32(option["listenPort"].(int)) x.listenIP = option["listenIP"].(string) x.nodeMgr = resource.NewXDSNodeManager() - x.cache = xdscache.NewCache(x) + x.cache = xdscache.NewResourceCache(x) x.active = atomic.NewBool(false) x.versionNum = atomic.NewUint64(0) x.ctx = ctx @@ -114,10 +115,11 @@ func (x *XDSServer) Initialize(ctx context.Context, option map[string]interface{ x.connLimitConfig = connConfig } x.resourceGenerator = &XdsResourceGenerator{ - namingServer: x.namingServer, - cache: x.cache, - versionNum: x.versionNum, - xdsNodesMgr: x.nodeMgr, + namingServer: x.namingServer, + cache: x.cache, + versionNum: x.versionNum, + xdsNodesMgr: x.nodeMgr, + svcInfoProvider: x.fetchCurrentServices, } resource.Init() return nil @@ -128,7 +130,7 @@ func (x *XDSServer) Run(errCh chan error) { // 启动 grpc server ctx := context.Background() cb := xdscache.NewCallback(x.cache, x.nodeMgr) - srv := serverv3.NewServer(ctx, x.cache, cb) + srv := serverv3.NewServer(ctx, x.cache, cb, sotw.WithOrderedADS()) var grpcOptions []grpc.ServerOption grpcOptions = append(grpcOptions, grpc.MaxConcurrentStreams(1000)) grpcServer := grpc.NewServer(grpcOptions...) @@ -230,31 +232,32 @@ func (x *XDSServer) activeUpdateTask() { return } - if err := x.getRegistryInfoWithCache(x.ctx, x.registryInfo); err != nil { + if err := x.getRegistryInfoWithCache(x.ctx, x.registryInfo.Load()); err != nil { log.Errorf("getRegistryInfoWithCache %v", err) return } // 首次更新没有需要移除的 XDS 资源信息 - x.Generate(x.registryInfo, nil) + x.Generate(x.registryInfo.Load(), nil) go x.startSynTask(x.ctx) } func (x *XDSServer) startSynTask(ctx context.Context) { // 读取 polaris 缓存数据 synXdsConfFunc := func() { - curRegistryInfo := make(map[string]map[model.ServiceKey]*resource.ServiceInfo) + curRegistryInfo := make(ServiceInfos) if err := x.getRegistryInfoWithCache(ctx, curRegistryInfo); err != nil { log.Error("get registry info from cache", zap.Error(err)) return } - needPush := make(map[string]map[model.ServiceKey]*resource.ServiceInfo) - needRemove := make(map[string]map[model.ServiceKey]*resource.ServiceInfo) + needPush := make(ServiceInfos) + needRemove := make(ServiceInfos) // 与本地缓存对比,是否发生了变化,对发生变化的命名空间,推送配置 + oldRegistryInfo := x.registryInfo.Load() // step 1: 这里先生成需要删除 XDS 资源数据的资源信息 - for ns, infos := range x.registryInfo { + for ns, infos := range oldRegistryInfo { // 如果当前整个命名空间都不存在,直接按照整个 namespace 级别进行数据删除 if _, exist := curRegistryInfo[ns]; !exist { needRemove[ns] = infos @@ -275,7 +278,7 @@ func (x *XDSServer) startSynTask(ctx context.Context) { } for ns, infos := range curRegistryInfo { - cacheServiceInfos, ok := x.registryInfo[ns] + cacheServiceInfos, ok := oldRegistryInfo[ns] if !ok { // 新命名空间,需要处理 needPush[ns] = infos @@ -298,12 +301,12 @@ func (x *XDSServer) startSynTask(ctx context.Context) { } } + x.registryInfo.Store(curRegistryInfo) if len(needPush) > 0 || len(needRemove) > 0 { log.Info("start update xds resource snapshot ticker task", zap.Int("need-push", len(needPush)), zap.Int("need-remove", len(needRemove))) x.Generate(needPush, needRemove) } - x.registryInfo = curRegistryInfo } ticker := time.NewTicker(5 * cache.UpdateCacheInterval) @@ -319,18 +322,24 @@ func (x *XDSServer) startSynTask(ctx context.Context) { } } +func (x *XDSServer) fetchCurrentServices() ServiceInfos { + return x.registryInfo.Load() +} + func (x *XDSServer) initRegistryInfo() error { + cur := map[string]map[model.ServiceKey]*resource.ServiceInfo{} namespaces := x.namingServer.Cache().Namespace().GetNamespaceList() // 启动时,获取全量的 namespace 信息,用来推送空配置 for _, n := range namespaces { - x.registryInfo[n.Name] = map[model.ServiceKey]*resource.ServiceInfo{} + cur[n.Name] = map[model.ServiceKey]*resource.ServiceInfo{} } + x.registryInfo.Store(cur) return nil } // syncPolarisServiceInfo 初始化本地 cache,初始化 xds cache func (x *XDSServer) getRegistryInfoWithCache(ctx context.Context, - registryInfo map[string]map[model.ServiceKey]*resource.ServiceInfo) error { + registryInfo ServiceInfos) error { // 从 cache 中获取全量的服务信息 serviceIterProc := func(key string, value *model.Service) (bool, error) { @@ -443,7 +452,7 @@ func (x *XDSServer) getRegistryInfoWithCache(ctx context.Context, return nil } -func (x *XDSServer) Generate(needPush, needRemove map[string]map[model.ServiceKey]*resource.ServiceInfo) { +func (x *XDSServer) Generate(needPush, needRemove ServiceInfos) { versionLocal := time.Now().Format(time.RFC3339) + "/" + strconv.FormatUint(x.versionNum.Inc(), 10) x.resourceGenerator.Generate(versionLocal, needPush, needRemove) } @@ -457,13 +466,8 @@ func (x *XDSServer) DebugHandlers() []model.DebugHandler { }, { Path: "/debug/apiserver/xds/resources", - Desc: "Query XDS Resource List, query parameter name is 'type', value is [node, common]", - Handler: x.listXDSResources, - }, - { - Path: "/debug/apiserver/xds/cache_names", - Desc: "Query XDS cache name list", - Handler: x.listXDSCaches, + Desc: "Query the list of Envoy nodes, eg. /debug/apiserver/xds/resources?type=&nodeId=, type is [eds,cds,rds,vhds,lds]", + Handler: x.listXDSResource, }, } } diff --git a/apiserver/xdsserverv3/server_test.go b/apiserver/xdsserverv3/server_test.go index cf5db6b07..d847fd2a5 100644 --- a/apiserver/xdsserverv3/server_test.go +++ b/apiserver/xdsserverv3/server_test.go @@ -33,12 +33,10 @@ import ( "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes" "github.com/golang/protobuf/ptypes/duration" - _struct "github.com/golang/protobuf/ptypes/struct" "github.com/golang/protobuf/ptypes/wrappers" apimodel "github.com/polarismesh/specification/source/go/api/v1/model" apitraffic "github.com/polarismesh/specification/source/go/api/v1/traffic_manage" "google.golang.org/protobuf/types/known/anypb" - "google.golang.org/protobuf/types/known/structpb" "github.com/polarismesh/polaris/apiserver/xdsserverv3/resource" "github.com/polarismesh/polaris/common/model" @@ -206,130 +204,6 @@ func TestParseNodeID(t *testing.T) { } } -func TestNodeHashID(t *testing.T) { - testTable := []struct { - Node *core.Node - TargetID string - }{ - { - Node: &core.Node{ - Id: "default/9b9f5630-81a1-47cd-a558-036eb616dc71~172.17.1.1", - Metadata: &_struct.Struct{ - Fields: map[string]*structpb.Value{ - resource.TLSModeTag: &_struct.Value{ - Kind: &_struct.Value_StringValue{ - StringValue: string(resource.TLSModeStrict), - }, - }, - }, - }, - }, - TargetID: "default/" + string(resource.TLSModeStrict), - }, - { - Node: &core.Node{ - Id: "polaris/9b9f5630-81a1-47cd-a558-036eb616dc71~172.17.1.1", - Metadata: &_struct.Struct{ - Fields: map[string]*structpb.Value{ - resource.TLSModeTag: &_struct.Value{ - Kind: &_struct.Value_StringValue{ - StringValue: string(resource.TLSModePermissive), - }, - }, - }, - }, - }, - TargetID: "polaris/" + string(resource.TLSModePermissive), - }, - { - Node: &core.Node{ - Id: "default/9b9f5630-81a1-47cd-a558-036eb616dc71~172.17.1.1", - Metadata: &_struct.Struct{ - Fields: map[string]*structpb.Value{ - resource.TLSModeTag: &_struct.Value{ - Kind: &_struct.Value_StringValue{ - StringValue: string(resource.TLSModeNone), - }, - }, - }, - }, - }, - TargetID: "default", - }, - // bad case: wrong tls mode - { - Node: &core.Node{ - Id: "default/9b9f5630-81a1-47cd-a558-036eb616dc71~172.17.1.1", - Metadata: &_struct.Struct{ - Fields: map[string]*structpb.Value{ - resource.TLSModeTag: &_struct.Value{ - Kind: &_struct.Value_StringValue{ - StringValue: "abc", - }, - }, - }, - }, - }, - TargetID: "default", - }, - // no node metadata - { - Node: &core.Node{ - Id: "default/9b9f5630-81a1-47cd-a558-036eb616dc71~172.17.1.1", - }, - TargetID: "default", - }, - // metadata does not contain tls mode kv - { - Node: &core.Node{ - Id: "default/9b9f5630-81a1-47cd-a558-036eb616dc71~172.17.1.1", - Metadata: &_struct.Struct{ - Fields: map[string]*structpb.Value{ - "hello": &_struct.Value{ - Kind: &_struct.Value_StringValue{ - StringValue: "abc", - }, - }, - }, - }, - }, - TargetID: "default", - }, - { - Node: &core.Node{ - Id: "gateway~default/9b9f5630-81a1-47cd-a558-036eb616dc71~172.17.1.1", - Metadata: &_struct.Struct{ - Fields: map[string]*structpb.Value{ - "hello": &_struct.Value{ - Kind: &_struct.Value_StringValue{ - StringValue: "abc", - }, - }, - resource.GatewayNamespaceName: &_struct.Value{ - Kind: &structpb.Value_StringValue{ - StringValue: "default", - }, - }, - resource.GatewayServiceName: &_struct.Value{ - Kind: &structpb.Value_StringValue{ - StringValue: "service", - }, - }, - }, - }, - }, - TargetID: "gateway/default/service", - }, - } - for i, item := range testTable { - id := resource.PolarisNodeHash{}.ID(item.Node) - if id != item.TargetID { - t.Fatalf("test case [%d] failed: expect ID %s, got ID %s", - i, item.TargetID, id) - } - } -} - var ( testServicesData []byte noInboundDump []byte diff --git a/auth/defaultauth/auth_checker.go b/auth/defaultauth/auth_checker.go index 36da65d4d..8af012864 100644 --- a/auth/defaultauth/auth_checker.go +++ b/auth/defaultauth/auth_checker.go @@ -157,7 +157,14 @@ func (d *DefaultAuthChecker) checkMaintainPermission(preCtx *model.AcquireContex return true, nil } - tokenInfo := preCtx.GetAttachment(model.TokenDetailInfoKey).(OperatorInfo) + attachVal, ok := preCtx.GetAttachment(model.TokenDetailInfoKey) + if !ok { + return false, model.ErrorTokenNotExist + } + tokenInfo, ok := attachVal.(OperatorInfo) + if !ok { + return false, model.ErrorTokenNotExist + } if tokenInfo.Disable { return false, model.ErrorTokenDisabled @@ -189,7 +196,14 @@ func (d *DefaultAuthChecker) CheckPermission(authCtx *model.AcquireContext) (boo return true, nil } - operatorInfo := authCtx.GetAttachment(model.TokenDetailInfoKey).(OperatorInfo) + attachVal, ok := authCtx.GetAttachment(model.TokenDetailInfoKey) + if !ok { + return false, model.ErrorTokenNotExist + } + operatorInfo, ok := attachVal.(OperatorInfo) + if !ok { + return false, model.ErrorTokenNotExist + } // 这里需要检查当 token 被禁止的情况,如果 token 被禁止,无论是否可以操作目标资源,都无法进行写操作 if operatorInfo.Disable { return false, model.ErrorTokenDisabled @@ -407,8 +421,8 @@ func (d *DefaultAuthChecker) doCheckPermission(authCtx *model.AcquireContext) (b svcResEntries := reqRes[apisecurity.ResourceType_Services] cfgResEntries := reqRes[apisecurity.ResourceType_ConfigGroups] - principleID, _ := authCtx.GetAttachment(model.OperatorIDKey).(string) - principleType, _ := authCtx.GetAttachment(model.OperatorPrincipalType).(model.PrincipalType) + principleID, _ := authCtx.GetAttachments()[model.OperatorIDKey].(string) + principleType, _ := authCtx.GetAttachments()[model.OperatorPrincipalType].(model.PrincipalType) p := model.Principal{ PrincipalID: principleID, PrincipalRole: principleType, diff --git a/auth/defaultauth/auth_checker_test.go b/auth/defaultauth/auth_checker_test.go index 1ef2e76d5..949ba90b7 100644 --- a/auth/defaultauth/auth_checker_test.go +++ b/auth/defaultauth/auth_checker_test.go @@ -128,7 +128,7 @@ func Test_DefaultAuthChecker_VerifyCredential(t *testing.T) { assert.NoError(t, err, "Should be verify success") assert.Equal(t, users[1].ID, utils.ParseUserID(authCtx.GetRequestContext()), "user-id should be equal") assert.False(t, utils.ParseIsOwner(authCtx.GetRequestContext()), "should not be owner") - assert.True(t, authCtx.GetAttachment(model.TokenDetailInfoKey).(defaultauth.OperatorInfo).Disable, "should be disable") + assert.True(t, authCtx.GetAttachments()[model.TokenDetailInfoKey].(defaultauth.OperatorInfo).Disable, "should be disable") }) t.Run("权限检查非严格模式-错误的token字符串-降级为匿名用户", func(t *testing.T) { @@ -141,7 +141,7 @@ func Test_DefaultAuthChecker_VerifyCredential(t *testing.T) { err = checker.VerifyCredential(authCtx) t.Logf("%+v", err) assert.NoError(t, err, "Should be verify success") - assert.True(t, authCtx.GetAttachment(model.TokenDetailInfoKey).(defaultauth.OperatorInfo).Anonymous, "should be anonymous") + assert.True(t, authCtx.GetAttachments()[model.TokenDetailInfoKey].(defaultauth.OperatorInfo).Anonymous, "should be anonymous") }) t.Run("权限检查非严格模式-空token字符串-降级为匿名用户", func(t *testing.T) { @@ -153,7 +153,7 @@ func Test_DefaultAuthChecker_VerifyCredential(t *testing.T) { err = checker.VerifyCredential(authCtx) t.Logf("%+v", err) assert.NoError(t, err, "Should be verify success") - assert.True(t, authCtx.GetAttachment(model.TokenDetailInfoKey).(defaultauth.OperatorInfo).Anonymous, "should be anonymous") + assert.True(t, authCtx.GetAttachments()[model.TokenDetailInfoKey].(defaultauth.OperatorInfo).Anonymous, "should be anonymous") }) t.Run("权限检查非严格模式-错误的token字符串-访问鉴权模块", func(t *testing.T) { diff --git a/auth/defaultauth/server.go b/auth/defaultauth/server.go index 6e2ec3fd7..48a1efe9f 100644 --- a/auth/defaultauth/server.go +++ b/auth/defaultauth/server.go @@ -129,19 +129,27 @@ func (svr *Server) AfterResourceOperation(afterCtx *model.AcquireContext) error return nil } + attachVal, ok := afterCtx.GetAttachment(model.TokenDetailInfoKey) + if !ok { + return nil + } + tokenInfo, ok := attachVal.(OperatorInfo) + if !ok { + return nil + } + // 如果 token 信息为空,则代表当前创建的资源,任何人都可以进行操作,不做资源的后置逻辑处理 - if IsEmptyOperator(afterCtx.GetAttachment(model.TokenDetailInfoKey).(OperatorInfo)) { + if IsEmptyOperator(tokenInfo) { return nil } - addUserIds := afterCtx.GetAttachment(model.LinkUsersKey).([]string) - addGroupIds := afterCtx.GetAttachment(model.LinkGroupsKey).([]string) - removeUserIds := afterCtx.GetAttachment(model.RemoveLinkUsersKey).([]string) - removeGroupIds := afterCtx.GetAttachment(model.RemoveLinkGroupsKey).([]string) + addUserIds := afterCtx.GetAttachments()[model.LinkUsersKey].([]string) + addGroupIds := afterCtx.GetAttachments()[model.LinkGroupsKey].([]string) + removeUserIds := afterCtx.GetAttachments()[model.RemoveLinkUsersKey].([]string) + removeGroupIds := afterCtx.GetAttachments()[model.RemoveLinkGroupsKey].([]string) // 只有在创建一个资源的时候,才需要把当前的创建者一并加到里面去 if afterCtx.GetOperation() == model.Create { - tokenInfo := afterCtx.GetAttachment(model.TokenDetailInfoKey).(OperatorInfo) if tokenInfo.IsUserToken { addUserIds = append(addUserIds, tokenInfo.OperatorID) } else { @@ -150,7 +158,7 @@ func (svr *Server) AfterResourceOperation(afterCtx *model.AcquireContext) error } log.Info("[Auth][Server] add resource to principal default strategy", - zap.Any("resource", afterCtx.GetAttachment(model.ResourceAttachmentKey)), + zap.Any("resource", afterCtx.GetAttachments()[model.ResourceAttachmentKey]), zap.Any("add_user", addUserIds), zap.Any("add_group", addGroupIds), zap.Any("remove_user", removeUserIds), zap.Any("remove_group", removeGroupIds), @@ -237,10 +245,16 @@ func (svr *Server) handlerModifyDefaultStrategy(id, ownerId string, uType model. var ( strategyResource = make([]model.StrategyResource, 0) - resources = afterCtx.GetAttachment( - model.ResourceAttachmentKey).(map[apisecurity.ResourceType][]model.ResourceEntry) - strategyId = strategy.ID + strategyId = strategy.ID ) + attachVal, ok := afterCtx.GetAttachment(model.ResourceAttachmentKey) + if !ok { + return nil + } + resources, ok := attachVal.(map[apisecurity.ResourceType][]model.ResourceEntry) + if !ok { + return nil + } // 资源删除时,清理该资源与所有策略的关联关系 if afterCtx.GetOperation() == model.Delete { diff --git a/auth/defaultauth/utils.go b/auth/defaultauth/utils.go index f8ddcfde8..b686d96aa 100644 --- a/auth/defaultauth/utils.go +++ b/auth/defaultauth/utils.go @@ -134,7 +134,14 @@ func verifyAuth(ctx context.Context, isWrite bool, return nil, api.NewAuthResponse(apimodel.Code_AuthTokenForbidden) } - tokenInfo := authCtx.GetAttachment(model.TokenDetailInfoKey).(OperatorInfo) + attachVal, ok := authCtx.GetAttachment(model.TokenDetailInfoKey) + if !ok { + return nil, api.NewAuthResponse(apimodel.Code_TokenNotExisted) + } + tokenInfo, ok := attachVal.(OperatorInfo) + if !ok { + return nil, api.NewAuthResponse(apimodel.Code_TokenNotExisted) + } if isWrite && tokenInfo.Disable { log.Error("[Auth][Server] token is disabled", utils.ZapRequestID(reqId), diff --git a/cache/service/service.go b/cache/service/service.go index f36a0e5c5..eba2de3ff 100644 --- a/cache/service/service.go +++ b/cache/service/service.go @@ -476,10 +476,11 @@ func (sc *serviceCache) setServices(services map[string]*model.Service) (map[str func (sc *serviceCache) notifyServiceCountReload(svcIds map[string]bool) { sc.plock.RLock() - defer sc.plock.RUnlock() for k := range svcIds { sc.pendingServices.Store(k, struct{}{}) } + sc.plock.RUnlock() + sc.postProcessUpdatedServices(map[string]struct{}{}) } // appendServiceCountChangeNamespace diff --git a/cache/service/service_contract.go b/cache/service/service_contract.go index d6f73e2bb..b3aab090c 100644 --- a/cache/service/service_contract.go +++ b/cache/service/service_contract.go @@ -172,6 +172,8 @@ func (sc *serviceContractCache) Query(filter map[string]string, offset, limit ui searchName := filter["name"] searchProtocol := filter["protocol"] searchVersion := filter["version"] + searchInterfaceName := filter["interface_name"] + searchInterfacePath := filter["interface_path"] sc.contracts.ReadRange(func(namespace string, services *utils.SyncMap[string, *utils.SyncMap[string, *model.EnrichServiceContract]]) { if searchNamespace != "" { @@ -205,7 +207,26 @@ func (sc *serviceContractCache) Query(filter map[string]string, offset, limit ui return } } - values = append(values, val) + // 支持针对接口信息反向查询服务契约列表 + if searchInterfaceName != "" || searchInterfacePath != "" { + tmpContract := &model.EnrichServiceContract{ + ServiceContract: val.ServiceContract, + Interfaces: []*model.InterfaceDescriptor{}, + } + for i := range val.Interfaces { + descriptor := val.Interfaces[i] + if !utils.IsWildMatch(descriptor.Name, searchInterfaceName) { + continue + } + if !utils.IsWildMatch(descriptor.Path, searchInterfacePath) { + continue + } + tmpContract.Interfaces = append(tmpContract.Interfaces, descriptor) + } + values = append(values, tmpContract) + } else { + values = append(values, val) + } return }) }) diff --git a/common/model/acquire_context.go b/common/model/acquire_context.go index a9938f73f..8c211840b 100644 --- a/common/model/acquire_context.go +++ b/common/model/acquire_context.go @@ -200,8 +200,9 @@ func (authCtx *AcquireContext) GetAttachments() map[string]interface{} { } // GetAttachment 按照 key 获取某一个附件信息 -func (authCtx *AcquireContext) GetAttachment(key string) interface{} { - return authCtx.attachment[key] +func (authCtx *AcquireContext) GetAttachment(key string) (interface{}, bool) { + val, ok := authCtx.attachment[key] + return val, ok } // SetAttachment 设置附件 diff --git a/common/model/contract.go b/common/model/contract.go index 30ccb6c8a..f4e31f2b1 100644 --- a/common/model/contract.go +++ b/common/model/contract.go @@ -132,6 +132,8 @@ func (s *ServiceContract) GetCacheKey() string { type InterfaceDescriptor struct { // ID ID string + // Name 接口名称 + Name string // ContractID ContractID string // 方法名称,对应 http method/ dubbo interface func/grpc service func diff --git a/common/utils/common.go b/common/utils/common.go index 78bcea4d2..ac5d56bda 100644 --- a/common/utils/common.go +++ b/common/utils/common.go @@ -552,36 +552,20 @@ func ConvertStringValuesToSlice(vals []*wrapperspb.StringValue) []string { return ret } -func CalculateContractID(namespace, service, name, protocol, version string) (string, error) { - h := sha1.New() - str := fmt.Sprintf("%s##%s##%s##%s##%s", namespace, service, name, protocol, version) - - if _, err := io.WriteString(h, str); err != nil { - return "", err - } - - out := hex.EncodeToString(h.Sum(nil)) - return out, nil -} - // CheckContractTetrad 根据服务实例四元组计算ID func CheckContractTetrad(req *apiservice.ServiceContract) (string, *apiservice.Response) { - id, err := CalculateContractID( - req.GetNamespace(), - req.GetService(), - req.GetName(), - req.GetProtocol(), - req.GetVersion(), - ) - if err != nil { + str := fmt.Sprintf("%s##%s##%s##%s##%s", req.GetNamespace(), req.GetService(), req.GetName(), + req.GetProtocol(), req.GetVersion()) + + h := sha1.New() + if _, err := io.WriteString(h, str); err != nil { return "", api.NewResponse(apimodel.Code_ExecuteException) } - return id, nil + return hex.EncodeToString(h.Sum(nil)), nil } func CheckContractInterfaceTetrad(contractId string, source apiservice.InterfaceDescriptor_Source, req *apiservice.InterfaceDescriptor) (string, *apiservice.Response) { - if contractId == "" { return "", api.NewResponseWithMsg(apimodel.Code_BadRequest, "invalid service_contract id") } @@ -592,7 +576,7 @@ func CheckContractInterfaceTetrad(contractId string, source apiservice.Interface return "", api.NewResponseWithMsg(apimodel.Code_BadRequest, "invalid service_contract interface path") } h := sha1.New() - str := fmt.Sprintf("%s##%s##%s##%d", contractId, req.GetMethod(), req.GetPath(), source) + str := fmt.Sprintf("%s##%s##%s##%s##%d", contractId, req.GetMethod(), req.GetPath(), req.GetName(), source) if _, err := io.WriteString(h, str); err != nil { return "", api.NewResponseWithMsg(apimodel.Code_ExecuteException, err.Error()) diff --git a/common/utils/funcs.go b/common/utils/funcs.go index 22f234ab7..47430c7fb 100644 --- a/common/utils/funcs.go +++ b/common/utils/funcs.go @@ -146,6 +146,13 @@ func NewV2Revision() string { return "v2-" + hex.EncodeToString(uuidBytes[:]) } +func DefaultString(v, d string) string { + if v == "" { + return d + } + return v +} + // StringSliceDeDuplication 字符切片去重 func StringSliceDeDuplication(s []string) []string { m := make(map[string]struct{}, len(s)) diff --git a/plugin/healthchecker/leader/peer_mock_test.go b/plugin/healthchecker/leader/peer_mock_test.go index 35998914b..e4daa8ed5 100644 --- a/plugin/healthchecker/leader/peer_mock_test.go +++ b/plugin/healthchecker/leader/peer_mock_test.go @@ -184,7 +184,7 @@ func (mc *MockPolarisHeartbeatClient) BatchGetHeartbeat(ctx context.Context, } return 0 }(), - Exist: func () bool { + Exist: func() bool { if ok { return val.Exist } diff --git a/plugin/healthchecker/leader/peer_test.go b/plugin/healthchecker/leader/peer_test.go index d2c7cf140..3b325d53c 100644 --- a/plugin/healthchecker/leader/peer_test.go +++ b/plugin/healthchecker/leader/peer_test.go @@ -23,6 +23,7 @@ import ( "time" "github.com/golang/mock/gomock" + "github.com/polarismesh/specification/source/go/api/v1/service_manage" "github.com/stretchr/testify/assert" "google.golang.org/grpc" @@ -30,7 +31,6 @@ import ( "github.com/polarismesh/polaris/common/utils" "github.com/polarismesh/polaris/plugin" "github.com/polarismesh/polaris/store/mock" - "github.com/polarismesh/specification/source/go/api/v1/service_manage" ) func TestLocalPeer(t *testing.T) { diff --git a/plugin/ratelimit/token/config.go b/plugin/ratelimit/token/config.go index 9dbd1166e..a1a69cc05 100644 --- a/plugin/ratelimit/token/config.go +++ b/plugin/ratelimit/token/config.go @@ -27,6 +27,8 @@ import ( // Config 限流配置类 type Config struct { + // Enable 是否启用 + Enable bool `yaml:"enable" mapstructure:"enable"` // RuleFile RuleFile string `yaml:"rule-file" mapstructure:"rule-file"` // 是否启用远程配置,默认false。TODO 暂时无远程配置,后续版本补全,如果开启,则默认监听 Polaris/polaris-system 分组下的 plugin-ratelimit.yaml 配置文件 diff --git a/plugin/ratelimit/token/implement.go b/plugin/ratelimit/token/implement.go index 755345c33..4ec5e5d20 100644 --- a/plugin/ratelimit/token/implement.go +++ b/plugin/ratelimit/token/implement.go @@ -28,6 +28,10 @@ func (tb *tokenBucket) initialize(c *plugin.ConfigEntry) error { log.Errorf("[Plugin][%s] initialize err: %s", PluginName, err.Error()) return err } + if !config.Enable { + tb.config = config + return nil + } // 加载本地配置 if config.RuleFile != "" { config, err = loadLocalConfig(config.RuleFile) diff --git a/plugin/ratelimit/token/invoke.go b/plugin/ratelimit/token/invoke.go index 45a5ba2e3..b2070fe11 100644 --- a/plugin/ratelimit/token/invoke.go +++ b/plugin/ratelimit/token/invoke.go @@ -44,5 +44,8 @@ func (tb *tokenBucket) Destroy() error { // Allow 限流接口实现 func (tb *tokenBucket) Allow(typ plugin.RatelimitType, key string) bool { + if !tb.config.Enable { + return true + } return tb.allow(typ, key) } diff --git a/plugin/ratelimit/token/invoke_test.go b/plugin/ratelimit/token/invoke_test.go index fa69044ef..9133eee87 100644 --- a/plugin/ratelimit/token/invoke_test.go +++ b/plugin/ratelimit/token/invoke_test.go @@ -28,6 +28,7 @@ import ( // baseConfigOption 返回一个基础的正常option配置 func baseConfigOption() map[string]interface{} { return map[string]interface{}{ + "enable": true, "ip-limit": &ResourceLimitConfig{ Open: true, Global: &BucketRatelimit{true, 10, 2}, @@ -70,11 +71,13 @@ func TestTokenBucket_Initialize(t *testing.T) { Global: nil, MaxResourceCacheAmount: 100, }, + "enable": true, } So(tb.Initialize(configEntry), ShouldNotBeNil) }) Convey("无效api-limit配置,返回失败", t, func() { configEntry.Option = map[string]interface{}{ + "enable": true, "api-limit": &APILimitConfig{Open: true}, } So(tb.Initialize(configEntry), ShouldNotBeNil) diff --git a/release/conf/polaris-server.yaml b/release/conf/polaris-server.yaml index 560cb1e88..cd037172e 100644 --- a/release/conf/polaris-server.yaml +++ b/release/conf/polaris-server.yaml @@ -489,4 +489,5 @@ plugin: ratelimit: name: token-bucket option: + enable: false rule-file: ./conf/plugin/ratelimit/rule.yaml diff --git a/service/ratelimit_config_test.go b/service/ratelimit_config_test.go index a6d69549c..fd8e43924 100644 --- a/service/ratelimit_config_test.go +++ b/service/ratelimit_config_test.go @@ -31,6 +31,7 @@ import ( apiservice "github.com/polarismesh/specification/source/go/api/v1/service_manage" apitraffic "github.com/polarismesh/specification/source/go/api/v1/traffic_manage" "github.com/stretchr/testify/assert" + "google.golang.org/protobuf/types/known/wrapperspb" api "github.com/polarismesh/polaris/common/api/v1" "github.com/polarismesh/polaris/common/utils" @@ -87,6 +88,17 @@ func (l *CacheListener) OnBatchDeleted(value interface{}) { } } +func Test_Echo(t *testing.T) { + data, _ := json.Marshal(&apitraffic.Rule{ + Method: &apimodel.MatchString{ + Type: apimodel.MatchString_EXACT, + Value: wrapperspb.String("*"), + ValueType: apimodel.MatchString_TEXT, + }, + }) + t.Logf("%s", string(data)) +} + /** * @brief 测试创建限流规则 */ diff --git a/service/service_contract.go b/service/service_contract.go index 21d59de4c..1da61b1b1 100644 --- a/service/service_contract.go +++ b/service/service_contract.go @@ -37,15 +37,17 @@ import ( var ( contractSearchFilters = map[string]string{ - "id": "id", - "namespace": "namespace", - "service": "service", - "name": "name", - "protocol": "protocol", - "version": "version", - "brief": "brief", - "offset": "offset", - "limit": "limit", + "id": "id", + "namespace": "namespace", + "service": "service", + "name": "name", + "protocol": "protocol", + "version": "version", + "brief": "brief", + "offset": "offset", + "limit": "limit", + "interface_name": "interface_name", + "interface_path": "interface_path", } ) @@ -252,9 +254,6 @@ func (s *Server) GetServiceContractVersions(ctx context.Context, filter map[stri if namespace == "" { return api.NewBatchQueryResponseWithMsg(apimodel.Code_InvalidParameter, "namespace is empty") } - if serviceName == "" { - return api.NewBatchQueryResponseWithMsg(apimodel.Code_InvalidParameter, "service is empty") - } ret := s.caches.ServiceContract().ListVersions(serviceName, namespace) resp := api.NewBatchQueryResponse(apimodel.Code_ExecuteSuccess) @@ -268,6 +267,7 @@ func (s *Server) GetServiceContractVersions(ctx context.Context, filter map[stri Service: item.Service, Version: item.Version, Protocol: item.Protocol, + Revision: utils.NewUUID(), Ctime: commontime.Time2String(item.CreateTime), Mtime: commontime.Time2String(item.ModifyTime), }); err != nil { @@ -305,6 +305,7 @@ func (s *Server) CreateServiceContractInterfaces(ctx context.Context, createData.Interfaces = append(createData.Interfaces, &model.InterfaceDescriptor{ ID: interfaceId, ContractID: contract.Id, + Name: item.Name, Method: item.Method, Path: item.Path, Content: item.Content, @@ -355,6 +356,7 @@ func (s *Server) AppendServiceContractInterfaces(ctx context.Context, appendData.Interfaces = append(appendData.Interfaces, &model.InterfaceDescriptor{ ID: interfaceId, ContractID: contract.Id, + Name: item.Name, Method: item.Method, Path: item.Path, Content: item.Content, @@ -406,6 +408,7 @@ func (s *Server) DeleteServiceContractInterfaces(ctx context.Context, ID: interfaceId, ContractID: contract.Id, Method: item.Method, + Name: item.Name, Path: item.Path, }) } @@ -421,6 +424,9 @@ func checkOperationServiceContractInterface(contract *apiservice.ServiceContract if contract.Id != "" { return nil } + if err := checkBaseServiceContract(contract); err != nil { + return err + } id, errRsp := utils.CheckContractTetrad(contract) if errRsp != nil { return errRsp @@ -450,20 +456,14 @@ func serviceContractRecordEntry(ctx context.Context, req *apiservice.ServiceCont } func checkBaseServiceContract(req *apiservice.ServiceContract) *apiservice.Response { - if err := utils.CheckResourceName(utils.NewStringValue(req.GetService())); err != nil { - return api.NewResponse(apimodel.Code_InvalidServiceName) - } if err := utils.CheckResourceName(utils.NewStringValue(req.GetNamespace())); err != nil { return api.NewResponse(apimodel.Code_InvalidNamespaceName) } - if err := utils.CheckResourceName(utils.NewStringValue(req.GetName())); err != nil { + if req.GetName() == "" { return api.NewResponseWithMsg(apimodel.Code_BadRequest, "invalid service_contract name") } if req.GetProtocol() == "" { return api.NewResponseWithMsg(apimodel.Code_BadRequest, "invalid service_contract protocol") } - if req.GetVersion() == "" { - return api.NewResponseWithMsg(apimodel.Code_BadRequest, "invalid service_contract version") - } return nil } diff --git a/store/mysql/group.go b/store/mysql/group.go index bd741b625..3b4667542 100644 --- a/store/mysql/group.go +++ b/store/mysql/group.go @@ -88,12 +88,17 @@ func (u *groupStore) addGroup(group *model.UserGroupDetail) error { VALUES (?, ?, ?, ?, ?, ?, ?, sysdate(), sysdate()) ` + tokenEnable := 1 + if !group.TokenEnable { + tokenEnable = 0 + } + if _, err = tx.Exec(addSql, []interface{}{ group.ID, group.Name, group.Owner, group.Token, - 1, + tokenEnable, group.Comment, 0, }...); err != nil { diff --git a/store/mysql/scripts/polaris_server.sql b/store/mysql/scripts/polaris_server.sql index 5c1befc10..174b6543a 100644 --- a/store/mysql/scripts/polaris_server.sql +++ b/store/mysql/scripts/polaris_server.sql @@ -905,6 +905,7 @@ CREATE TABLE service_contract_detail ( `id` VARCHAR(128) NOT NULL COMMENT '服务契约单个接口定义记录主键', `contract_id` VARCHAR(128) NOT NULL COMMENT '服务契约 ID', + `name` VARCHAR(128) NOT NULL COMMENT '服务契约接口名称', `method` VARCHAR(32) NOT NULL COMMENT 'http协议中的 method 字段, eg:POST/GET/PUT/DELETE, 其他 gRPC 可以用来标识 stream 类型', `path` VARCHAR(128) NOT NULL COMMENT '接口具体全路径描述', `source` INT COMMENT '该条记录来源, 0:SDK/1:MANUAL', diff --git a/store/mysql/service_contract.go b/store/mysql/service_contract.go index 9ddb50ac6..138242f1d 100644 --- a/store/mysql/service_contract.go +++ b/store/mysql/service_contract.go @@ -103,7 +103,7 @@ func (s *serviceContractStore) GetServiceContract(id string) (data *model.Enrich args := []interface{}{id} rows, err := s.master.Query(querySql, args...) if err != nil { - log.Error("[Store][Contract] list contract ", zap.String("query sql", querySql), zap.Any("args", args)) + log.Error("[Store][Contract] list contract ", zap.String("query", querySql), zap.Any("args", args)) return nil, store.Error(err) } defer func() { @@ -122,8 +122,8 @@ func (s *serviceContractStore) GetServiceContract(id string) (data *model.Enrich } contract.Valid = flag == 0 - contract.CreateTime = time.Unix(0, ctime) - contract.ModifyTime = time.Unix(0, mtime) + contract.CreateTime = time.Unix(ctime, 0) + contract.ModifyTime = time.Unix(mtime, 0) list = append(list, &contract) } @@ -147,12 +147,14 @@ func (s *serviceContractStore) AddServiceContractInterfaces(contract *model.Enri // 新增批量数据 for _, item := range contract.Interfaces { - addSql := "REPLACE INTO service_contract_detail(`id`,`contract_id`, `method`, `path` ,`content`,`revision`" + + addSql := "REPLACE INTO service_contract_detail(`id`, `contract_id`, `name`, `method`, `path` " + + " ,`content`,`revision`" + ",`flag`,`ctime`, `mtime`, `source`" + ") VALUES (?,?,?,?,?,?,?,sysdate(),sysdate(),?)" if _, err := tx.Exec(addSql, []interface{}{ item.ID, contract.ID, + item.Name, item.Method, item.Path, item.Content, @@ -177,13 +179,15 @@ func (s *serviceContractStore) AppendServiceContractInterfaces(contract *model.E return err } for _, item := range contract.Interfaces { - addSql := "REPLACE INTO service_contract_detail(`id`,`contract_id`, `method`, `path` ,`content`,`revision`" + + addSql := "REPLACE INTO service_contract_detail(`id`,`contract_id`, `name`, `method`, " + + " `path` ,`content`,`revision`" + ",`flag`,`ctime`, `mtime`,`source`" + ") VALUES (?,?,?,?,?,?,?,sysdate(),sysdate(),?)" if _, err := tx.Exec(addSql, []interface{}{ item.ID, contract.ID, + item.Name, item.Method, item.Path, item.Content, @@ -208,12 +212,13 @@ func (s *serviceContractStore) DeleteServiceContractInterfaces(contract *model.E return err } for _, item := range contract.Interfaces { - addSql := "DELETE FROM service_contract_detail WHERE contract_id = ? AND method = ? AND path = ?" + addSql := "DELETE FROM service_contract_detail WHERE contract_id = ? AND method = ? AND path = ? AND name = ?" if _, err := tx.Exec(addSql, []interface{}{ item.ContractID, item.Method, item.Path, + item.Name, }...); err != nil { log.Errorf("[Store][database] delete service contract detail err: %s", err.Error()) return err @@ -262,8 +267,8 @@ func (s *serviceContractStore) GetMoreServiceContracts(firstUpdate bool, mtime t } contract.Valid = flag == 0 - contract.CreateTime = time.Unix(0, ctime) - contract.ModifyTime = time.Unix(0, mtime) + contract.CreateTime = time.Unix(ctime, 0) + contract.ModifyTime = time.Unix(mtime, 0) list = append(list, &model.EnrichServiceContract{ ServiceContract: contract, @@ -272,7 +277,7 @@ func (s *serviceContractStore) GetMoreServiceContracts(firstUpdate bool, mtime t contractDetailMap := map[string][]*model.InterfaceDescriptor{} if len(list) > 0 { - queryDetailSql := "SELECT sd.id, sd.contract_id, sd.method, sd.path, sd.content, sd.revision, " + + queryDetailSql := "SELECT sd.id, sd.contract_id, sd.name, sd.method, sd.path, sd.content, sd.revision, " + " UNIX_TIMESTAMP(sd.ctime), UNIX_TIMESTAMP(sd.mtime), IFNULL(sd.source, 1) " + " FROM service_contract_detail sd LEFT JOIN service_contract sc ON sd.contract_id = sc.id " + " WHERE sc.mtime >= ?" @@ -288,7 +293,7 @@ func (s *serviceContractStore) GetMoreServiceContracts(firstUpdate bool, mtime t var flag, ctime, mtime, source int64 detailItem := &model.InterfaceDescriptor{} if scanErr := detailRows.Scan( - &detailItem.ID, &detailItem.ContractID, &detailItem.Method, + &detailItem.ID, &detailItem.ContractID, &detailItem.Name, &detailItem.Method, &detailItem.Path, &detailItem.Content, &detailItem.Revision, &ctime, &mtime, &source, ); scanErr != nil { @@ -297,8 +302,8 @@ func (s *serviceContractStore) GetMoreServiceContracts(firstUpdate bool, mtime t } detailItem.Valid = flag == 0 - detailItem.CreateTime = time.Unix(0, ctime) - detailItem.ModifyTime = time.Unix(0, mtime) + detailItem.CreateTime = time.Unix(ctime, 0) + detailItem.ModifyTime = time.Unix(mtime, 0) switch source { case 2: detailItem.Source = service_manage.InterfaceDescriptor_Client