diff --git a/.github/workflows/benchmark.yaml b/.github/workflows/benchmark.yaml index 19fa7b0ad..81903d8b4 100644 --- a/.github/workflows/benchmark.yaml +++ b/.github/workflows/benchmark.yaml @@ -64,7 +64,7 @@ jobs: - name: Setup Go uses: actions/setup-go@v4 with: - go-version: + go-version: "1.21.5" # Checkout latest code - name: Checkout repo uses: actions/checkout@v2 diff --git a/.github/workflows/codecov.yaml b/.github/workflows/codecov.yaml index 282658436..fb6742261 100644 --- a/.github/workflows/codecov.yaml +++ b/.github/workflows/codecov.yaml @@ -54,7 +54,7 @@ jobs: - name: Setup Go uses: actions/setup-go@v4 with: - go-version: "1.20" + go-version: "1.21.5" # Checkout latest code - name: Checkout repo uses: actions/checkout@v2 diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml index 94c14734c..0034a354c 100644 --- a/.github/workflows/docker.yml +++ b/.github/workflows/docker.yml @@ -38,7 +38,7 @@ jobs: - name: Setup Go uses: actions/setup-go@v4 with: - go-version: "1.20" + go-version: "1.21.5" - name: Set up Docker Buildx uses: docker/setup-buildx-action@v1 diff --git a/.github/workflows/golangci-lint.yml b/.github/workflows/golangci-lint.yml index 569132415..394cb4df3 100644 --- a/.github/workflows/golangci-lint.yml +++ b/.github/workflows/golangci-lint.yml @@ -29,7 +29,7 @@ jobs: golangci: strategy: matrix: - go-version: [ "1.20" ] + go-version: [ "1.21.5" ] name: golangci-lint runs-on: ubuntu-latest steps: diff --git a/.github/workflows/integration-testing-mysql.yml b/.github/workflows/integration-testing-mysql.yml index 8fb18b814..8d4f3775f 100644 --- a/.github/workflows/integration-testing-mysql.yml +++ b/.github/workflows/integration-testing-mysql.yml @@ -78,7 +78,7 @@ jobs: - name: Setup Go uses: actions/setup-go@v4 with: - go-version: "1.20" + go-version: "1.21.5" # Checkout latest code - name: Checkout repo uses: actions/checkout@v2 diff --git a/.github/workflows/integration-testing.yml b/.github/workflows/integration-testing.yml index 754204e67..8be7f2607 100644 --- a/.github/workflows/integration-testing.yml +++ b/.github/workflows/integration-testing.yml @@ -64,7 +64,7 @@ jobs: - name: Setup Go uses: actions/setup-go@v4 with: - go-version: "1.20" + go-version: "1.21.5" # Checkout latest code - name: Checkout repo uses: actions/checkout@v2 diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index ef9b08b39..6e658003b 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -38,7 +38,7 @@ jobs: - name: Setup Go uses: actions/setup-go@v4 with: - go-version: "1.20" + go-version: "1.21.5" - name: Get version id: get_version diff --git a/.github/workflows/standalone.yml b/.github/workflows/standalone.yml index 996519186..3ef40a124 100644 --- a/.github/workflows/standalone.yml +++ b/.github/workflows/standalone.yml @@ -51,7 +51,7 @@ jobs: - name: Setup Go uses: actions/setup-go@v4 with: - go-version: "1.20" + go-version: "1.21.5" - name: Build id: build diff --git a/apiserver/nacosserver/v1/config/config_file.go b/apiserver/nacosserver/v1/config/config_file.go index e689f52b9..ff5f8c072 100644 --- a/apiserver/nacosserver/v1/config/config_file.go +++ b/apiserver/nacosserver/v1/config/config_file.go @@ -193,7 +193,7 @@ func (n *ConfigServer) diffChangeFiles(ctx context.Context, dataId := item.GetFileName().GetValue() mdval := item.GetMd5().GetValue() - if beta := n.cacheSvr.ConfigFile().GetGrayRelease(namespace, group, dataId); beta != nil { + if beta := n.cacheSvr.ConfigFile().GetActiveGrayRelease(namespace, group, dataId); beta != nil { if n.cacheSvr.Gray().HitGrayRule(beta.FileKey(), clientLabels) { changeKeys = append(changeKeys, &model.ConfigListenItem{ Tenant: model.ToNacosConfigNamespace(beta.Namespace), diff --git a/apiserver/nacosserver/v2/config/config_file.go b/apiserver/nacosserver/v2/config/config_file.go index b1615ed21..22cf4662a 100644 --- a/apiserver/nacosserver/v2/config/config_file.go +++ b/apiserver/nacosserver/v2/config/config_file.go @@ -189,7 +189,7 @@ func (h *ConfigServer) handleWatchConfigRequest(ctx context.Context, req nacospb var active *model.ConfigFileRelease var match bool - if betaActive := h.cacheSvr.ConfigFile().GetGrayRelease(namespace, group, dataId); betaActive != nil { + if betaActive := h.cacheSvr.ConfigFile().GetActiveGrayRelease(namespace, group, dataId); betaActive != nil { match = h.cacheSvr.Gray().HitGrayRule(model.GetGrayConfigRealseKey(betaActive.SimpleConfigFileRelease), watchCtx.ClientLabels()) active = betaActive } diff --git a/apiserver/xdsserverv3/cache/cache.go b/apiserver/xdsserverv3/cache/cache.go index 9ca4f1d96..a51126384 100644 --- a/apiserver/xdsserverv3/cache/cache.go +++ b/apiserver/xdsserverv3/cache/cache.go @@ -20,12 +20,10 @@ package cache import ( "context" "errors" - "fmt" + corev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" "github.com/envoyproxy/go-control-plane/pkg/cache/types" - "github.com/envoyproxy/go-control-plane/pkg/cache/v3" cachev3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3" - resourcev3 "github.com/envoyproxy/go-control-plane/pkg/resource/v3" "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" "go.uber.org/zap" @@ -39,7 +37,7 @@ type ( // hash is the hashing function for Envoy nodes hash cachev3.NodeHash // Muxed caches. - Caches *utils.SyncMap[string, cache.Cache] + Caches *utils.SyncMap[string, cachev3.Cache] } // CacheHook @@ -64,13 +62,19 @@ func NewCache(hook CacheHook) *XDSCache { return sc } +// CleanEnvoyNodeCache 清理和 Envoy Node 强相关的缓存数据 +func (sc *XDSCache) CleanEnvoyNodeCache(node *corev3.Node) { + cacheKey := resource.LDS.ResourceType() + "~" + node.Id + sc.Caches.Delete(cacheKey) +} + // CreateWatch returns a watch for an xDS request. func (sc *XDSCache) CreateWatch(request *cachev3.Request, streamState stream.StreamState, value chan cachev3.Response) func() { if sc.hook != nil { sc.hook.OnCreateWatch(request, streamState, value) } - item := sc.loadCache(request) + item := sc.loadCache(request, streamState) if item == nil { value <- nil return func() {} @@ -84,7 +88,7 @@ func (sc *XDSCache) CreateDeltaWatch(request *cachev3.DeltaRequest, state stream if sc.hook != nil { sc.hook.OnCreateDeltaWatch(request, state, value) } - item := sc.loadCache(request) + item := sc.loadCache(request, state) if item == nil { value <- &NoReadyXdsResponse{} return func() {} @@ -98,30 +102,21 @@ func (sc *XDSCache) Fetch(ctx context.Context, request *cachev3.Request) (cachev return nil, errors.New("not implemented") } -// DeltaUpdateNodeResource . -func (sc *XDSCache) DeltaUpdateNodeResource(client *resource.XDSClient, key, typeUrl string, current map[string]types.Resource) error { - val, _ := sc.Caches.ComputeIfAbsent(key, func(_ string) cachev3.Cache { - return NewLinearCache(typeUrl) - }) - linearCache, _ := val.(*LinearCache) - return linearCache.UpdateNodeResource(client, current) -} - // DeltaUpdateResource . func (sc *XDSCache) DeltaUpdateResource(key, typeUrl string, current map[string]types.Resource) error { val, _ := sc.Caches.ComputeIfAbsent(key, func(_ string) cachev3.Cache { - return NewLinearCache(typeUrl) + return cachev3.NewLinearCache(typeUrl, cachev3.WithLogger(log)) }) - linearCache, _ := val.(*LinearCache) + linearCache, _ := val.(*cachev3.LinearCache) return linearCache.UpdateResources(current, []string{}) } // DeltaRemoveResource . func (sc *XDSCache) DeltaRemoveResource(key, typeUrl string, current map[string]types.Resource) error { val, _ := sc.Caches.ComputeIfAbsent(key, func(_ string) cachev3.Cache { - return NewLinearCache(typeUrl) + return cachev3.NewLinearCache(typeUrl, cachev3.WithLogger(log)) }) - linearCache, _ := val.(*LinearCache) + linearCache, _ := val.(*cachev3.LinearCache) waitRemove := make([]string, 0, len(current)) for k := range current { @@ -130,99 +125,43 @@ func (sc *XDSCache) DeltaRemoveResource(key, typeUrl string, current map[string] return linearCache.UpdateResources(nil, waitRemove) } -func classify(typeUrl string, resources []string, client *resource.XDSClient) []string { - isAllowNode := false - _, isAllowTls := allowTlsResource[typeUrl] - if isAllowNodeFunc, exist := allowEachNodeResource[typeUrl]; exist { - isAllowNode = isAllowNodeFunc(typeUrl, resources, client) - } - first := typeUrl + "~" + client.Node.GetId() - second := typeUrl + "~" + client.GetSelfNamespace() - tlsMode, exist := client.Metadata[resource.TLSModeTag] - - // 没有设置 TLS 开关 - if !exist || tlsMode == string(resource.TLSModeNone) { - if isAllowNode { - return []string{first, second} - } - return []string{second} - } - if isAllowNode { - if isAllowTls { - return []string{first, second, second + "~" + tlsMode} - } - return []string{first, second} - } - if isAllowTls { - return []string{first, second, second + "~" + tlsMode} - } - return []string{second} -} - -type PredicateNodeResource func(typeUrl string, resources []string, client *resource.XDSClient) bool - -var ( - allowEachNodeResource = map[string]PredicateNodeResource{ - resourcev3.ListenerType: func(typeUrl string, resources []string, client *resource.XDSClient) bool { - return true - }, - resourcev3.EndpointType: func(typeUrl string, resources []string, client *resource.XDSClient) bool { - selfSvc := fmt.Sprintf("INBOUND|%s|%s", client.GetSelfNamespace(), client.GetSelfService()) - for i := range resources { - if resources[i] == selfSvc { - return true - } - } - return false - }, - resourcev3.RouteType: func(typeUrl string, resources []string, client *resource.XDSClient) bool { - selfSvc := resource.MakeInBoundRouteConfigName(client.GetSelfServiceKey()) - for i := range resources { - if resources[i] == selfSvc { - return true - } - } - return false - }, - } - allowTlsResource = map[string]struct{}{ - resourcev3.ListenerType: {}, - resourcev3.ClusterType: {}, - } -) - -func (sc *XDSCache) loadCache(req interface{}) cachev3.Cache { +func (sc *XDSCache) loadCache(req interface{}, streamState stream.StreamState) cachev3.Cache { var ( - keys []string - typeUrl string - client *resource.XDSClient - subscribeResources []string + typeUrl string + client *resource.XDSClient ) switch args := req.(type) { - case *cache.Request: + case *cachev3.Request: client = resource.ParseXDSClient(args.GetNode()) - subscribeResources = args.GetResourceNames() - keys = classify(args.TypeUrl, subscribeResources, client) typeUrl = args.TypeUrl - case *cache.DeltaRequest: + case *cachev3.DeltaRequest: client = resource.ParseXDSClient(args.GetNode()) - subscribeResources = args.GetResourceNamesSubscribe() - keys = classify(args.TypeUrl, subscribeResources, client) typeUrl = args.TypeUrl default: log.Error("[XDS][V3] no support client request type", zap.Any("req", args)) return nil } - for i := range keys { - val, ok := sc.Caches.Load(keys[i]) - if ok { - log.Info("[XDS][V3] load cache to handle client request", zap.Strings("keys", keys), - zap.String("hit-key", keys[i]), zap.Strings("subscribe", subscribeResources), - zap.String("type", typeUrl), zap.String("client", client.Node.GetId())) - return val - } + cacheKey := BuildCacheKey(typeUrl, client.TLSMode, client) + val, ok := sc.Caches.Load(cacheKey) + if ok { + log.Info("[XDS][V3] load cache to handle client request", + zap.String("cache-key", cacheKey), zap.String("type", typeUrl), + zap.String("client", client.Node.GetId()), zap.Bool("wildcard", streamState.IsWildcard())) + return val } log.Error("[XDS][V3] cache not found to handle client request", zap.String("type", typeUrl), zap.String("client", client.Node.GetId())) return nil } + +func BuildCacheKey(typeUrl string, tlsMode resource.TLSMode, client *resource.XDSClient) string { + xdsType := resource.FormatTypeUrl(typeUrl) + if xdsType == resource.LDS { + return typeUrl + "~" + client.GetNodeID() + } + key := typeUrl + "~" + client.GetSelfNamespace() + if resource.SupportTLS(xdsType) && resource.EnableTLS(tlsMode) { + key = key + "~" + string(tlsMode) + } + return key +} diff --git a/apiserver/xdsserverv3/cache/callback.go b/apiserver/xdsserverv3/cache/callback.go index 0bde7c02e..281b3105e 100644 --- a/apiserver/xdsserverv3/cache/callback.go +++ b/apiserver/xdsserverv3/cache/callback.go @@ -25,38 +25,38 @@ import ( "go.uber.org/zap" "github.com/polarismesh/polaris/apiserver/xdsserverv3/resource" - commonlog "github.com/polarismesh/polaris/common/log" ) -func NewCallback(log *commonlog.Scope, nodeMgr *resource.XDSNodeManager) *Callbacks { +func NewCallback(cacheMgr *XDSCache, nodeMgr *resource.XDSNodeManager) *Callbacks { return &Callbacks{ - log: log, - nodeMgr: nodeMgr, + cacheMgr: cacheMgr, + nodeMgr: nodeMgr, } } type Callbacks struct { - log *commonlog.Scope - nodeMgr *resource.XDSNodeManager + cacheMgr *XDSCache + nodeMgr *resource.XDSNodeManager } -func (cb *Callbacks) Report() { - +func (cb *Callbacks) OnStreamOpen(_ context.Context, id int64, typ string) error { + return nil } -func (cb *Callbacks) OnStreamOpen(_ context.Context, id int64, typ string) error { +func (cb *Callbacks) OnDeltaStreamOpen(_ context.Context, id int64, typ string) error { return nil } func (cb *Callbacks) OnStreamClosed(id int64, node *corev3.Node) { cb.nodeMgr.DelNode(id) -} - -func (cb *Callbacks) OnDeltaStreamOpen(_ context.Context, id int64, typ string) error { - return nil + // 清理 cache + cb.cacheMgr.CleanEnvoyNodeCache(node) } func (cb *Callbacks) OnDeltaStreamClosed(id int64, node *corev3.Node) { + cb.nodeMgr.DelNode(id) + // 清理 cache + cb.cacheMgr.CleanEnvoyNodeCache(node) } func (cb *Callbacks) OnStreamRequest(id int64, req *discovery.DiscoveryRequest) error { diff --git a/apiserver/xdsserverv3/cache/linear.go b/apiserver/xdsserverv3/cache/linear.go deleted file mode 100644 index 6cbe3fc00..000000000 --- a/apiserver/xdsserverv3/cache/linear.go +++ /dev/null @@ -1,700 +0,0 @@ -// Copyright 2020 Envoyproxy Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package cache - -import ( - "context" - "errors" - "strconv" - "strings" - "sync" - "sync/atomic" - "time" - - corev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" - "github.com/envoyproxy/go-control-plane/pkg/cache/types" - cachev3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3" - "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" - - "github.com/polarismesh/polaris/apiserver/xdsserverv3/resource" -) - -type watches = map[chan cachev3.Response]struct{} - -// LinearCache supports collections of opaque resources. This cache has a -// single collection indexed by resource names and manages resource versions -// internally. It implements the cache interface for a single type URL and -// should be combined with other caches via type URL muxing. It can be used to -// supply EDS entries, for example, uniformly across a fleet of proxies. -type LinearCache struct { - // Type URL specific to the cache. - typeURL string - // --- XDS Watcher --- - // Watchers collection by node id - watchClients map[string]*watchClient - // Continuously incremented counter used to index delta watches. - deltaWatchCount int64 - // --- common xds resource for target type_url --- - // Collection of resources indexed by name. - resources map[string]types.Resource - // versionMap holds the current hash map of all resources in the cache when delta watches are present. - // versionMap is only to be used with delta xDS. - versionMap map[string]string - // Versions for each resource by name. - versionVector map[string]uint64 - // --- node xds resource for target type_url --- - // Collection of resource indexed by node-id -> name - nodeResources map[string]map[string]types.Resource - // nodeVersionMap holds the current hash map of all resources in the cache when delta watches are present. - // nodeVersionMap is only to be used with delta xDS. - nodeVersionMap map[string]map[string]string - // nodeVersionVector for each resource by name. - nodeVersionVector map[string]map[string]uint64 - - versionPrefix string - - // Continuously incremented version. - version uint64 - // Continuously incremented version for per envoy node. - nodeVersion map[string]uint64 - mu sync.RWMutex -} - -var _ cachev3.Cache = &LinearCache{} - -// NewLinearCache creates a new cache. See the comments on the struct definition. -func NewLinearCache(typeURL string) *LinearCache { - out := &LinearCache{ - typeURL: typeURL, - resources: make(map[string]types.Resource), - watchClients: make(map[string]*watchClient), - versionMap: nil, - version: 0, - versionVector: make(map[string]uint64), - nodeResources: map[string]map[string]types.Resource{}, - nodeVersionMap: map[string]map[string]string{}, - nodeVersion: map[string]uint64{}, - nodeVersionVector: map[string]map[string]uint64{}, - } - return out -} - -func (cache *LinearCache) respond(value chan<- cachev3.Response, staleResources []string) { - var resources []types.ResourceWithTTL - // TODO: optimize the resources slice creations across different clients - if len(staleResources) == 0 { - resources = make([]types.ResourceWithTTL, 0, len(cache.resources)) - for _, resItem := range cache.resources { - resources = append(resources, types.ResourceWithTTL{Resource: resItem}) - } - } else { - resources = make([]types.ResourceWithTTL, 0, len(staleResources)) - for _, name := range staleResources { - resItem := cache.resources[name] - if resItem != nil { - resources = append(resources, types.ResourceWithTTL{Resource: resItem}) - } - } - } - value <- &cachev3.RawResponse{ - Request: &cachev3.Request{TypeUrl: cache.typeURL}, - Resources: resources, - // Version: cache.getVersion(), - Ctx: context.Background(), - } -} - -func (cache *LinearCache) notifyAll(watchClients map[string]*watchClient, modified map[string]struct{}) { - // de-duplicate watches that need to be responded - notifyList := make(map[chan<- cachev3.Response][]string) - for name := range modified { - for _, client := range watchClients { - watchChs := client.popResourceWatchChans(name) - for _, ch := range watchChs { - notifyList[ch] = append(notifyList[ch], name) - } - } - } - for value, stale := range notifyList { - cache.respond(value, stale) - } - for _, client := range watchClients { - if !client.hasWatchAll() { - continue - } - client.foreachWatchAll(func(resp chan<- cachev3.Response) { - cache.respond(resp, nil) - }) - client.cleanWatchAll() - } - - // Building the version map has a very high cost when using SetResources to do full updates. - // As it is only used with delta watches, it is only maintained when applicable. - if cache.versionMap != nil || len(cache.nodeVersionMap) > 0 { - if cache.versionMap != nil { - if err := cache.updateVersionMap(modified); err != nil { - log.Errorf("failed to update version map: %v", err) - } - } - if len(cache.nodeVersionMap) > 0 { - for _, client := range watchClients { - if err := cache.updateNodeVersionMap(client.client.GetNodeID(), modified); err != nil { - log.Errorf("failed to update node version map: %v", err) - } - } - } - - for _, client := range watchClients { - watchs, ok := client.listDeltaWatchs(modified) - if !ok { - continue - } - for id, watch := range watchs { - if res := cache.respondDelta(watch.Request, watch.Response, watch.StreamState); res != nil { - cache.cancelDeltaWatch(client.client.GetNodeID(), id)() - } - } - } - } -} - -func (cache *LinearCache) notifyClient(node *resource.XDSClient, modified map[string]struct{}) { - watchC, ok := cache.watchClients[node.GetNodeID()] - if !ok { - return - } - waitNotifier := map[string]*watchClient{ - node.GetNodeID(): watchC, - } - cache.notifyAll(waitNotifier, modified) -} - -func (cache *LinearCache) respondDelta(request *cachev3.DeltaRequest, value chan cachev3.DeltaResponse, state stream.StreamState) *cachev3.RawDeltaResponse { - resp := createDeltaResponse(context.Background(), request, state, resourceContainer{ - resourceMap: cache.resources, - nodeResourceMap: cache.nodeResources[request.Node.Id], - versionMap: cache.versionMap, - nodeVersionMap: cache.nodeVersionMap[request.Node.Id], - systemVersion: cache.getVersion(request.Node.Id), - }) - - // Only send a response if there were changes - if len(resp.Resources) > 0 || len(resp.RemovedResources) > 0 { - log.Infof("[linear cache] node: %s, sending delta response for typeURL %s with resources: %v removed resources: %v with wildcard: %t", - request.GetNode().GetId(), request.TypeUrl, cachev3.GetResourceNames(resp.Resources), resp.RemovedResources, state.IsWildcard()) - value <- resp - return resp - } - return nil -} - -// DeleteNodeResources clean target node link all resource in the collection. -func (cache *LinearCache) DeleteNodeResources(client *resource.XDSClient) error { - if client == nil { - return errors.New("nil resource") - } - cache.mu.Lock() - defer cache.mu.Unlock() - - delete(cache.watchClients, client.GetNodeID()) - delete(cache.nodeVersion, client.GetNodeID()) - delete(cache.nodeResources, client.GetNodeID()) - delete(cache.nodeVersionMap, client.GetNodeID()) - delete(cache.nodeVersionVector, client.GetNodeID()) - return nil -} - -// UpdateResource updates a resource in the collection. -func (cache *LinearCache) UpdateNodeResource(client *resource.XDSClient, toUpdate map[string]types.Resource) error { - waitNotify := map[string]struct{}{} - for name, res := range toUpdate { - if res == nil { - return errors.New("nil resource") - } - cache.mu.Lock() - defer cache.mu.Unlock() - - if _, ok := cache.nodeVersion[client.GetNodeID()]; !ok { - cache.nodeVersion[client.GetNodeID()] = 0 - } - cache.nodeVersion[client.GetNodeID()] = cache.nodeVersion[client.GetNodeID()] + 1 - - if _, ok := cache.nodeVersionVector[client.GetNodeID()]; !ok { - cache.nodeVersionVector[client.GetNodeID()] = map[string]uint64{} - } - cache.nodeVersionVector[client.GetNodeID()][name] = cache.nodeVersion[client.GetNodeID()] - - if _, ok := cache.nodeResources[client.GetNodeID()]; !ok { - cache.nodeResources[client.GetNodeID()] = map[string]types.Resource{} - } - cache.nodeResources[client.GetNodeID()][name] = res - waitNotify[name] = struct{}{} - } - // TODO: batch watch closures to prevent rapid updates - cache.notifyClient(client, waitNotify) - return nil -} - -// DeleteResource removes a resource in the collection. -func (cache *LinearCache) DeleteResource(name string) error { - cache.mu.Lock() - defer cache.mu.Unlock() - - cache.version++ - delete(cache.versionVector, name) - delete(cache.resources, name) - - // TODO: batch watch closures to prevent rapid updates - cache.notifyAll(cache.watchClients, map[string]struct{}{name: {}}) - return nil -} - -// UpdateResources updates/deletes a list of resources in the cache. -// Calling UpdateResources instead of iterating on UpdateResource and DeleteResource -// is significantly more efficient when using delta or wildcard watches. -func (cache *LinearCache) UpdateResources(toUpdate map[string]types.Resource, toDelete []string) error { - cache.mu.Lock() - defer cache.mu.Unlock() - - cache.version++ - - modified := make(map[string]struct{}, len(toUpdate)+len(toDelete)) - for name, resItem := range toUpdate { - cache.versionVector[name] = cache.version - cache.resources[name] = resItem - modified[name] = struct{}{} - } - for _, name := range toDelete { - delete(cache.versionVector, name) - delete(cache.resources, name) - modified[name] = struct{}{} - } - - cache.notifyAll(cache.watchClients, modified) - return nil -} - -// GetResources returns current resources stored in the cache -func (cache *LinearCache) GetResources() map[string]types.Resource { - cache.mu.RLock() - defer cache.mu.RUnlock() - - // create a copy of our internal storage to avoid data races - // involving mutations of our backing map - resources := make(map[string]types.Resource, len(cache.resources)) - for k, v := range cache.resources { - resources[k] = v - } - return resources -} - -// GetResources returns current resources stored in the cache -func (cache *LinearCache) GetNodeResources() map[string]map[string]types.Resource { - cache.mu.RLock() - defer cache.mu.RUnlock() - - // create a copy of our internal storage to avoid data races - // involving mutations of our backing map - resources := make(map[string]map[string]types.Resource, len(cache.resources)) - for k, v := range cache.nodeResources { - resources[k] = make(map[string]types.Resource) - for n, r := range v { - resources[k][n] = r - } - } - return resources -} - -func parseReceiveVersion(version string) (string, string) { - ret := strings.Split(version, "~") - resVer, nodeVer := ret[0], ret[1] - return resVer, nodeVer -} - -func (cache *LinearCache) CreateWatch(request *cachev3.Request, _ stream.StreamState, value chan cachev3.Response) func() { - if request.TypeUrl != cache.typeURL { - value <- nil - return nil - } - - nodeId := request.Node.Id - watchClient, ok := cache.watchClients[nodeId] - if !ok { - watchClient = newWatchClient(request.Node) - cache.watchClients[nodeId] = watchClient - } - - // If the version is not up to date, check whether any requested resource has - // been updated between the last version and the current version. This avoids the problem - // of sending empty updates whenever an irrelevant resource changes. - stale := false - staleResources := []string{} // empty means all - - // strip version prefix if it is present - var ( - lastVersion, lastNodeVersion uint64 - err error - ) - if strings.HasPrefix(request.VersionInfo, cache.versionPrefix) { - var lastVerErr, lastNodeVerErr error - resVer, nodeVer := parseReceiveVersion(request.VersionInfo[len(cache.versionPrefix):]) - lastVersion, lastVerErr = strconv.ParseUint(resVer, 0, 64) - lastNodeVersion, lastNodeVerErr = strconv.ParseUint(nodeVer, 0, 64) - err = errors.Join(lastVerErr, lastNodeVerErr) - } else { - err = errors.New("mis-matched version prefix") - } - - cache.mu.Lock() - defer cache.mu.Unlock() - - if err != nil { - stale = true - staleResources = request.ResourceNames - } else if len(request.ResourceNames) == 0 { - stale = lastVersion != cache.version - } else { - exist := map[string]struct{}{} - for _, name := range request.ResourceNames { - if saveVer, ok := cache.nodeVersionVector[nodeId][name]; ok { - exist[name] = struct{}{} - // When a resource is removed, its version defaults 0 and it is not considered stale. - if lastNodeVersion < saveVer { - stale = true - staleResources = append(staleResources, name) - } - } - if _, ok := exist[name]; ok { - continue - } - if saveVer, ok := cache.versionVector[name]; ok { - // When a resource is removed, its version defaults 0 and it is not considered stale. - if lastVersion < saveVer { - stale = true - staleResources = append(staleResources, name) - } - } - } - } - if stale { - cache.respond(value, staleResources) - return nil - } - // Create open watches since versions are up to date. - if len(request.ResourceNames) == 0 { - watchClient.setWatchAll(value) - return func() { - watchClient.removeWatchAll(value) - } - } - watchClient.addResourcesWatch(request.GetResourceNames(), value) - return func() { - watchClient.removeResourcesWatch(request.GetResourceNames(), value) - } -} - -func (cache *LinearCache) CreateDeltaWatch(request *cachev3.DeltaRequest, state stream.StreamState, value chan cachev3.DeltaResponse) func() { - cache.mu.Lock() - defer cache.mu.Unlock() - - nodeId := request.Node.Id - watchClient, ok := cache.watchClients[nodeId] - if !ok { - watchClient = newWatchClient(request.Node) - cache.watchClients[nodeId] = watchClient - } - // update last watch request time - watchClient.setLastDeltaWatchRequestTime(time.Now()) - - if cache.versionMap == nil { - // If we had no previously open delta watches, we need to build the version map for the first time. - // The version map will not be destroyed when the last delta watch is removed. - // This avoids constantly rebuilding when only a few delta watches are open. - modified := map[string]struct{}{} - for name := range cache.resources { - modified[name] = struct{}{} - } - err := cache.updateVersionMap(modified) - if err != nil { - log.Errorf("failed to update version map: %v", err) - } - } - if len(cache.nodeVersionMap[nodeId]) == 0 { - // If we had no previously open delta watches, we need to build the version map for the first time. - // The version map will not be destroyed when the last delta watch is removed. - // This avoids constantly rebuilding when only a few delta watches are open. - modified := map[string]struct{}{} - for name := range cache.nodeResources[nodeId] { - modified[name] = struct{}{} - } - err := cache.updateNodeVersionMap(nodeId, modified) - if err != nil { - log.Errorf("failed to update node version map: %v", err) - } - } - response := cache.respondDelta(request, value, state) - - // if respondDelta returns nil this means that there is no change in any resource version - // create a new watch accordingly - if response == nil { - watchID := cache.nextDeltaWatchID() - log.Infof("[linear cache] open delta watch ID:%d for %s Resources:%v, system version %q", watchID, - cache.typeURL, state.GetSubscribedResourceNames(), cache.getVersion(nodeId)) - - watchClient.setDeltaResponseWatch(watchID, cachev3.DeltaResponseWatch{Request: request, Response: value, StreamState: state}) - return cache.cancelDeltaWatch(nodeId, watchID) - } - return nil -} - -func (cache *LinearCache) updateVersionMap(modified map[string]struct{}) error { - if cache.versionMap == nil { - cache.versionMap = make(map[string]string, len(modified)) - } - for name := range modified { - r, ok := cache.resources[name] - if !ok { - // The resource was deleted - delete(cache.versionMap, name) - continue - } - // hash our version in here and build the version map - marshaledResource, err := cachev3.MarshalResource(r) - if err != nil { - return err - } - v := cachev3.HashResource(marshaledResource) - if v == "" { - return errors.New("failed to build resource version") - } - - cache.versionMap[name] = v - } - return nil -} - -func (cache *LinearCache) updateNodeVersionMap(id string, modified map[string]struct{}) error { - if _, ok := cache.nodeVersionMap[id]; !ok { - cache.nodeVersionMap[id] = make(map[string]string, len(modified)) - } - for name := range modified { - r, ok := cache.nodeResources[id][name] - if !ok { - // The resource was deleted - delete(cache.nodeVersionMap[id], name) - continue - } - // hash our version in here and build the version map - marshaledResource, err := cachev3.MarshalResource(r) - if err != nil { - return err - } - v := cachev3.HashResource(marshaledResource) - if v == "" { - return errors.New("failed to build resource version") - } - - cache.nodeVersionMap[id][name] = v - } - return nil -} - -func (cache *LinearCache) getVersion(nodeId string) string { - return cache.versionPrefix + strconv.FormatUint(cache.version, 10) + "~" + strconv.FormatUint(cache.nodeVersion[nodeId], 10) -} - -// cancellation function for cleaning stale watches -func (cache *LinearCache) cancelDeltaWatch(nodeID string, watchID int64) func() { - return func() { - cache.mu.RLock() - defer cache.mu.RUnlock() - if info, ok := cache.watchClients[nodeID]; ok { - info.mu.Lock() - delete(info.deltaWatches, watchID) - info.mu.Unlock() - } - } -} - -func (cache *LinearCache) nextDeltaWatchID() int64 { - return atomic.AddInt64(&cache.deltaWatchCount, 1) -} - -func (cache *LinearCache) Fetch(context.Context, *cachev3.Request) (cachev3.Response, error) { - return nil, errors.New("not implemented") -} - -// Number of resources currently on the cache. -// As GetResources is building a clone it is expensive to get metrics otherwise. -func (cache *LinearCache) NumResources() int { - cache.mu.RLock() - defer cache.mu.RUnlock() - return len(cache.resources) -} - -// groups together resource-related arguments for the createDeltaResponse function -type resourceContainer struct { - resourceMap map[string]types.Resource - nodeResourceMap map[string]types.Resource - versionMap map[string]string - nodeVersionMap map[string]string - systemVersion string -} - -// newWatchClient initializes a status info data structure. -func newWatchClient(node *corev3.Node) *watchClient { - out := watchClient{ - watchAll: map[chan<- cachev3.Response]struct{}{}, - client: resource.ParseXDSClient(node), - watches: make(map[string]map[chan<- cachev3.Response]struct{}), - deltaWatches: make(map[int64]cachev3.DeltaResponseWatch), - } - return &out -} - -// watchClient tracks the server state for the remote Envoy node. -type watchClient struct { - watchAll map[chan<- cachev3.Response]struct{} - // node is the constant Envoy node metadata. - client *resource.XDSClient - // watches are indexed channels for the response watches and the original requests. - watches map[string]map[chan<- cachev3.Response]struct{} - // deltaWatches are indexed channels for the delta response watches and the original requests - deltaWatches map[int64]cachev3.DeltaResponseWatch - // the timestamp of the last watch request - lastWatchRequestTime time.Time - // the timestamp of the last delta watch request - lastDeltaWatchRequestTime time.Time - // mutex to protect the status fields. - // should not acquire mutex of the parent cache after acquiring this mutex. - mu sync.RWMutex -} - -func (info *watchClient) GetNumWatches() int { - info.mu.RLock() - defer info.mu.RUnlock() - return len(info.watches) -} - -func (info *watchClient) GetNumDeltaWatches() int { - info.mu.RLock() - defer info.mu.RUnlock() - return len(info.deltaWatches) -} - -func (info *watchClient) GetLastWatchRequestTime() time.Time { - info.mu.RLock() - defer info.mu.RUnlock() - return info.lastWatchRequestTime -} - -func (info *watchClient) GetLastDeltaWatchRequestTime() time.Time { - info.mu.RLock() - defer info.mu.RUnlock() - return info.lastDeltaWatchRequestTime -} - -// setLastDeltaWatchRequestTime will set the current time of the last delta discovery watch request. -func (info *watchClient) setLastDeltaWatchRequestTime(t time.Time) { - info.mu.Lock() - defer info.mu.Unlock() - info.lastDeltaWatchRequestTime = t -} - -// setDeltaResponseWatch will set the provided delta response watch for the associated watch ID. -func (info *watchClient) setDeltaResponseWatch(id int64, drw cachev3.DeltaResponseWatch) { - info.mu.Lock() - defer info.mu.Unlock() - info.deltaWatches[id] = drw -} - -func (info *watchClient) listDeltaWatchs(modified map[string]struct{}) (map[int64]cachev3.DeltaResponseWatch, bool) { - return nil, true -} - -// setDeltaResponseWatch will set the provided delta response watch for the associated watch ID. -func (info *watchClient) setWatchAll(respCh chan<- cachev3.Response) { - info.mu.Lock() - defer info.mu.Unlock() - info.watchAll[respCh] = struct{}{} -} - -func (info *watchClient) removeWatchAll(respCh chan<- cachev3.Response) { - info.mu.Lock() - defer info.mu.Unlock() - delete(info.watchAll, respCh) -} - -func (info *watchClient) hasWatchAll() bool { - info.mu.RLock() - defer info.mu.RUnlock() - return len(info.watchAll) > 0 -} - -func (info *watchClient) foreachWatchAll(consumer func(resp chan<- cachev3.Response)) { - info.mu.RLock() - defer info.mu.RUnlock() - for v := range info.watchAll { - consumer(v) - } -} - -func (info *watchClient) cleanWatchAll() { - info.mu.Lock() - defer info.mu.Unlock() - info.watchAll = make(map[chan<- cachev3.Response]struct{}) -} - -func (info *watchClient) addResourcesWatch(resources []string, resp chan<- cachev3.Response) { - info.mu.Lock() - defer info.mu.Unlock() - - for _, res := range resources { - if _, ok := info.watches[res]; !ok { - info.watches[res] = make(map[chan<- cachev3.Response]struct{}) - } - info.watches[res][resp] = struct{}{} - } -} - -func (info *watchClient) popResourceWatchChans(resourceName string) []chan<- cachev3.Response { - info.mu.Lock() - defer info.mu.Unlock() - - ret := make([]chan<- cachev3.Response, 0, 4) - resps, ok := info.watches[resourceName] - if !ok { - return ret - } - for r := range resps { - ret = append(ret, r) - } - return ret -} - -func (info *watchClient) removeResourcesWatch(resources []string, resp chan<- cachev3.Response) { - info.mu.Lock() - defer info.mu.Unlock() - - for _, res := range resources { - if _, ok := info.watches[res]; !ok { - continue - } - delete(info.watches[res], resp) - if len(info.watches[res]) == 0 { - delete(info.watches, res) - } - } -} diff --git a/apiserver/xdsserverv3/cache/response.go b/apiserver/xdsserverv3/cache/response.go index 38c0ba1b9..eec1c21b6 100644 --- a/apiserver/xdsserverv3/cache/response.go +++ b/apiserver/xdsserverv3/cache/response.go @@ -18,104 +18,12 @@ package cache import ( - "context" "errors" discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" - "github.com/envoyproxy/go-control-plane/pkg/cache/types" cachev3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3" - "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" ) -func createDeltaResponse(ctx context.Context, req *cachev3.DeltaRequest, state stream.StreamState, - resources resourceContainer) *cachev3.RawDeltaResponse { - // variables to build our response with - var nextVersionMap map[string]string - var filtered []types.Resource - var toRemove []string - - // If we are handling a wildcard request, we want to respond with all resources - switch { - case state.IsWildcard(): - if len(state.GetResourceVersions()) == 0 { - filtered = make([]types.Resource, 0, len(resources.resourceMap)) - } - nextVersionMap = make(map[string]string, len(resources.resourceMap)) - for name, r := range resources.nodeResourceMap { - // Since we've already precomputed the version hashes of the new snapshot, - // we can just set it here to be used for comparison later - version := resources.nodeVersionMap[name] - nextVersionMap[name] = version - prevVersion, found := state.GetResourceVersions()[name] - if !found || (prevVersion != version) { - filtered = append(filtered, r) - } - } - - for name, r := range resources.resourceMap { - if _, exist := nextVersionMap[name]; exist { - continue - } - // Since we've already precomputed the version hashes of the new snapshot, - // we can just set it here to be used for comparison later - version := resources.versionMap[name] - nextVersionMap[name] = version - prevVersion, found := state.GetResourceVersions()[name] - if !found || (prevVersion != version) { - filtered = append(filtered, r) - } - } - - // Compute resources for removal - // The resource version can be set to "" here to trigger a removal even if never returned before - for name := range state.GetResourceVersions() { - _, commonOk := resources.resourceMap[name] - _, nodeOk := resources.nodeResourceMap[name] - if !commonOk && !nodeOk { - toRemove = append(toRemove, name) - } - } - default: - nextVersionMap = make(map[string]string, len(state.GetSubscribedResourceNames())) - // state.GetResourceVersions() may include resources no longer subscribed - // In the current code this gets silently cleaned when updating the version map - for name := range state.GetSubscribedResourceNames() { - prevVersion, found := state.GetResourceVersions()[name] - var commonOk, nodeOk bool - if r, ok := resources.nodeResourceMap[name]; ok { - nodeOk = true - nextVersion := resources.nodeVersionMap[name] - if prevVersion != nextVersion { - filtered = append(filtered, r) - } - nextVersionMap[name] = nextVersion - } - if r, ok := resources.resourceMap[name]; ok { - if _, exist := nextVersionMap[name]; !exist { - commonOk = true - nextVersion := resources.versionMap[name] - if prevVersion != nextVersion { - filtered = append(filtered, r) - } - nextVersionMap[name] = nextVersion - } - } - if found && (!commonOk && !nodeOk) { - toRemove = append(toRemove, name) - } - } - } - - return &cachev3.RawDeltaResponse{ - DeltaRequest: req, - Resources: filtered, - RemovedResources: toRemove, - NextVersionMap: nextVersionMap, - SystemVersionInfo: resources.systemVersion, - Ctx: ctx, - } -} - type NoReadyXdsResponse struct { cachev3.DeltaResponse } diff --git a/apiserver/xdsserverv3/debug.go b/apiserver/xdsserverv3/debug.go index 4d1748940..c50068848 100644 --- a/apiserver/xdsserverv3/debug.go +++ b/apiserver/xdsserverv3/debug.go @@ -19,29 +19,21 @@ package xdsserverv3 import ( "net/http" + "strings" cachev3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3" apimodel "github.com/polarismesh/specification/source/go/api/v1/model" - "github.com/polarismesh/polaris/apiserver/xdsserverv3/cache" + "github.com/polarismesh/polaris/apiserver/xdsserverv3/resource" "github.com/polarismesh/polaris/common/utils" ) func (x *XDSServer) listXDSNodes(resp http.ResponseWriter, req *http.Request) { cType := req.URL.Query().Get("type") - var nodes interface{} - - switch cType { - case "sidecar": - nodes = x.nodeMgr.ListSidecarNodes() - case "gateway": - nodes = x.nodeMgr.ListGatewayNodes() - } - data := map[string]interface{}{ "code": apimodel.Code_ExecuteSuccess, "info": "execute success", - "data": nodes, + "data": x.nodeMgr.ListEnvoyNodesView(resource.RunType(cType)), } ret := utils.MustJson(data) @@ -54,17 +46,20 @@ func (x *XDSServer) listXDSResources(resp http.ResponseWriter, req *http.Request resources := map[string]interface{}{} x.cache.Caches.ReadRange(func(key string, val cachev3.Cache) { - linearCache := val.(*cache.LinearCache) + linearCache := val.(*cachev3.LinearCache) - var retVal interface{} if cType == "node" { - retVal = linearCache.GetNodeResources() + if strings.Contains(key, resource.LDS.ResourceType()) { + resources[key] = map[string]interface{}{ + "resources": linearCache.GetResources(), + } + } } else { - retVal = linearCache.GetResources() - } - - resources[key] = map[string]interface{}{ - "resources": retVal, + if !strings.Contains(key, resource.LDS.ResourceType()) { + resources[key] = map[string]interface{}{ + "resources": linearCache.GetResources(), + } + } } }) diff --git a/apiserver/xdsserverv3/generate.go b/apiserver/xdsserverv3/generate.go index fd29098ff..95b1e4f33 100644 --- a/apiserver/xdsserverv3/generate.go +++ b/apiserver/xdsserverv3/generate.go @@ -43,7 +43,6 @@ var ( type ( ServiceInfos map[string]map[model.ServiceKey]*resource.ServiceInfo - DemandConfs map[resource.RunType]map[string]map[string]struct{} XDSGenerate func(xdsType resource.XDSType, opt *resource.BuildOption) ) @@ -53,21 +52,11 @@ type XdsResourceGenerator struct { cache *cache.XDSCache versionNum *atomic.Uint64 xdsNodesMgr *resource.XDSNodeManager - - beforeDemandConfs DemandConfs } func (x *XdsResourceGenerator) Generate(versionLocal string, needUpdate, needRemove ServiceInfos) { - // 如果没有任何一个 XDS Node 接入则不会生成与 Node 有关的 XDS Resource - if x.xdsNodesMgr.HasEnvoyNodes() { - // 只构建 OnDemand 场景涉及的 XDS 数据 - _ = x.buildOnDemandXDSCache(needUpdate) - // 只构建 Envoy Node 特有的 XDS 数据 - _ = x.buildMoreEnvoyXDSCache(needUpdate, needRemove) - } - deltaOp := func(runType resource.RunType, infos ServiceInfos, f XDSGenerate) { direction := corev3.TrafficDirection_OUTBOUND if runType == resource.RunTypeGateway { @@ -85,15 +74,40 @@ func (x *XdsResourceGenerator) Generate(versionLocal string, f(resource.RDS, opt) f(resource.EDS, opt) f(resource.VHDS, opt) + // 默认构建没有设置 TLS 的 CDS 资源 f(resource.CDS, opt) - // 构建设置了 TLS Mode == Strict 的 CDS 资源 opt.TLSMode = resource.TLSModeStrict f(resource.CDS, opt) // 构建设置了 TLS Mode == Permissive 的 CDS 资源 opt.TLSMode = resource.TLSModePermissive f(resource.CDS, opt) + // 构建支持按需加载 + opt.OpenOnDemand = true + f(resource.RDS, opt) + + if runType == resource.RunTypeSidecar { + for svcKey := range services { + // 换成 INBOUND 构建 CDS、EDS、RDS + opt.SelfService = svcKey + opt.TrafficDirection = corev3.TrafficDirection_INBOUND + opt.TLSMode = resource.TLSModeNone + f(resource.EDS, opt) + f(resource.RDS, opt) + // 默认构建没有设置 TLS 的 CDS 资源 + f(resource.CDS, opt) + // 构建设置了 TLS Mode == Strict 的 CDS 资源 + opt.TLSMode = resource.TLSModeStrict + f(resource.CDS, opt) + // 构建设置了 TLS Mode == Permissive 的 CDS 资源 + opt.TLSMode = resource.TLSModePermissive + f(resource.CDS, opt) + // 构建支持按需加载 + opt.OpenOnDemand = true + f(resource.RDS, opt) + } + } } } @@ -118,6 +132,28 @@ func (x *XdsResourceGenerator) Generate(versionLocal string, wg.Wait() } +func (x *XdsResourceGenerator) buildOneEnvoyXDSCache(node *resource.XDSClient) error { + opt := &resource.BuildOption{ + RunType: node.RunType, + Client: node, + TLSMode: node.TLSMode, + Namespace: node.GetSelfNamespace(), + OpenOnDemand: node.OpenOnDemand, + SelfService: model.ServiceKey{ + Namespace: node.GetSelfNamespace(), + Name: node.GetSelfService(), + }, + } + + opt.TrafficDirection = corev3.TrafficDirection_OUTBOUND + // 构建 OUTBOUND LDS 资源 + x.buildAndDeltaUpdate(resource.LDS, opt) + opt.TrafficDirection = corev3.TrafficDirection_INBOUND + // 构建 INBOUND LDS 资源 + x.buildAndDeltaUpdate(resource.LDS, opt) + return nil +} + func (x *XdsResourceGenerator) buildAndDeltaRemove(xdsType resource.XDSType, opt *resource.BuildOption) { opt.ForceDelete = true xxds, err := x.generateXDSResource(xdsType, opt) @@ -127,13 +163,14 @@ func (x *XdsResourceGenerator) buildAndDeltaRemove(xdsType resource.XDSType, opt } typeUrl := xdsType.ResourceType() - cacheKey := xdsType.ResourceType() + "~" + opt.Namespace - if opt.TLSMode != resource.TLSModeNone { - cacheKey = cacheKey + "~" + string(opt.TLSMode) - } - if opt.OpenOnDemand { - cacheKey = cacheKey + "~" + opt.OnDemandServer + client := opt.Client + if client == nil { + client = &resource.XDSClient{ + TLSMode: opt.TLSMode, + Namespace: opt.Namespace, + } } + cacheKey := cache.BuildCacheKey(typeUrl, opt.TLSMode, client) if err := x.cache.DeltaRemoveResource(cacheKey, typeUrl, cachev3.IndexRawResourcesByName(xxds)); err != nil { log.Error("[XDS][Envoy] delta update fail", zap.String("cache-key", cacheKey), zap.String("type", xdsType.String()), zap.Error(err)) @@ -150,114 +187,20 @@ func (x *XdsResourceGenerator) buildAndDeltaUpdate(xdsType resource.XDSType, opt } typeUrl := xdsType.ResourceType() - cacheKey := xdsType.ResourceType() + "~" + opt.Namespace - if opt.TLSMode != resource.TLSModeNone { - cacheKey = cacheKey + "~" + string(opt.TLSMode) - } - // 与 XDS Node 有关的全部都有单独的 Cache 缓存处理 - if opt.Client != nil { - cacheKey = xdsType.ResourceType() + "~" + opt.Client.Node.Id - err = x.cache.DeltaUpdateNodeResource(opt.Client, cacheKey, typeUrl, cachev3.IndexRawResourcesByName(xxds)) - } else { - if opt.OpenOnDemand { - cacheKey = cacheKey + "~" + opt.OnDemandServer + client := opt.Client + if client == nil { + client = &resource.XDSClient{ + TLSMode: opt.TLSMode, + Namespace: opt.Namespace, } - err = x.cache.DeltaUpdateResource(cacheKey, typeUrl, cachev3.IndexRawResourcesByName(xxds)) } - if err != nil { + cacheKey := cache.BuildCacheKey(typeUrl, opt.TLSMode, client) + if err = x.cache.DeltaUpdateResource(cacheKey, typeUrl, cachev3.IndexRawResourcesByName(xxds)); err != nil { log.Error("[XDS][Envoy] delta update fail", zap.String("cache-key", cacheKey), zap.String("type", xdsType.String()), zap.Error(err)) } } -// buildOnDemandXDSCache 只构建 OnDemand XDS Resource -func (x *XdsResourceGenerator) buildOnDemandXDSCache(needUpdate ServiceInfos) error { - // runType -> []ns -> []demand-server - demandConfs := x.xdsNodesMgr.ListDemandConfs() - - deltaOp := func(demandConfs DemandConfs, needUpdate ServiceInfos, f XDSGenerate) { - for runType := range demandConfs { - for ns, svrs := range demandConfs[runType] { - svcInfos := needUpdate[ns] - for demandSvr := range svrs { - opt := &resource.BuildOption{ - RunType: resource.RunType(runType), - Namespace: ns, - Services: svcInfos, - OpenOnDemand: true, - OnDemandServer: demandSvr, - } - - opt.TrafficDirection = corev3.TrafficDirection_OUTBOUND - // 构建 OUTBOUND RDS 资源 - f(resource.RDS, opt) - } - } - } - - } - - waitUpdate := demandConfs - waitRemove := findWaitRemoveDemandConfs(x.beforeDemandConfs, demandConfs) - - deltaOp(DemandConfs(waitUpdate), needUpdate, x.buildAndDeltaUpdate) - deltaOp(DemandConfs(waitRemove), needUpdate, x.buildAndDeltaUpdate) - - x.beforeDemandConfs = demandConfs - return nil -} - -func (x *XdsResourceGenerator) buildMoreEnvoyXDSCache(needUpdate, needRemove ServiceInfos) error { - nodes := x.xdsNodesMgr.ListEnvoyNodes() - if len(nodes) == 0 || len(needUpdate) == 0 { - // 如果没有任何一个 XDS Sidecar Node 客户端,不做任何操作 - log.Info("[XDS][Envoy] xds nodes or update info is empty", zap.Int("nodes", len(nodes)), - zap.Int("need-update", len(needUpdate))) - return nil - } - - for i := range nodes { - if err := x.buildOneEnvoyXDSCache(nodes[i], needUpdate, needRemove); err != nil { - return err - } - } - return nil -} - -func (x *XdsResourceGenerator) buildOneEnvoyXDSCache(node *resource.XDSClient, needUpdate, needRemove ServiceInfos) error { - deltaOp := func(infos ServiceInfos, f XDSGenerate) { - opt := &resource.BuildOption{ - RunType: node.RunType, - Client: node, - TLSMode: node.TLSMode, - Namespace: node.GetSelfNamespace(), - Services: infos[node.GetSelfNamespace()], - OpenOnDemand: node.OpenOnDemand, - OnDemandServer: node.OnDemandServer, - SelfService: model.ServiceKey{ - Namespace: node.GetSelfNamespace(), - Name: node.GetSelfService(), - }, - } - - opt.TrafficDirection = corev3.TrafficDirection_OUTBOUND - // 构建 OUTBOUND LDS 资源 - f(resource.LDS, opt) - // 构建 OUTBOUND RDS 资源 - f(resource.RDS, opt) - opt.TrafficDirection = corev3.TrafficDirection_INBOUND - // 构建 INBOUND LDS 资源 - f(resource.LDS, opt) - // 构建 INBOUND EDS 资源 - f(resource.EDS, opt) - // 构建 INBOUND RDS 资源 - f(resource.RDS, opt) - } - - deltaOp(needUpdate, x.buildAndDeltaUpdate) - return nil -} - func (x *XdsResourceGenerator) generateXDSResource(xdsType resource.XDSType, opt *resource.BuildOption) ([]types.Resource, error) { @@ -303,26 +246,3 @@ func (x *XdsResourceGenerator) generateXDSResource(xdsType resource.XDSType, } return resources.([]types.Resource), nil } - -func findWaitRemoveDemandConfs(before, after DemandConfs) DemandConfs { - ret := map[resource.RunType]map[string]map[string]struct{}{ - resource.RunTypeSidecar: {}, - resource.RunTypeGateway: {}, - } - - for runT, nsSvrs := range before { - for ns, svrs := range nsSvrs { - if _, ok := after[runT][ns]; !ok { - ret[runT][ns] = svrs - continue - } - ret[runT][ns] = map[string]struct{}{} - for svr := range svrs { - if _, ok := after[runT][ns][svr]; !ok { - ret[runT][ns][svr] = struct{}{} - } - } - } - } - return ret -} diff --git a/apiserver/xdsserverv3/hook.go b/apiserver/xdsserverv3/hook.go index 4a0e240a9..c03d6d681 100644 --- a/apiserver/xdsserverv3/hook.go +++ b/apiserver/xdsserverv3/hook.go @@ -33,7 +33,7 @@ func (x *XDSServer) OnCreateWatch(request *cachev3.Request, streamState stream.S if client == nil { return } - _ = x.resourceGenerator.buildOneEnvoyXDSCache(client, x.registryInfo, nil) + _ = x.resourceGenerator.buildOneEnvoyXDSCache(client) } // OnCreateDeltaWatch before call cachev3.SnapshotCache OnCreateDeltaWatch @@ -44,7 +44,7 @@ func (x *XDSServer) OnCreateDeltaWatch(request *cachev3.DeltaRequest, state stre if client == nil { return } - _ = x.resourceGenerator.buildOneEnvoyXDSCache(client, x.registryInfo, nil) + _ = x.resourceGenerator.buildOneEnvoyXDSCache(client) } // OnFetch before call cachev3.SnapshotCache OnFetch @@ -54,5 +54,5 @@ func (x *XDSServer) OnFetch(ctx context.Context, request *cachev3.Request) { if client == nil { return } - _ = x.resourceGenerator.buildOneEnvoyXDSCache(client, x.registryInfo, nil) + _ = x.resourceGenerator.buildOneEnvoyXDSCache(client) } diff --git a/apiserver/xdsserverv3/lds.go b/apiserver/xdsserverv3/lds.go index f9b9d8fc1..6caaab399 100644 --- a/apiserver/xdsserverv3/lds.go +++ b/apiserver/xdsserverv3/lds.go @@ -92,17 +92,6 @@ func (lds *LDSBuilder) Generate(option *resource.BuildOption) (interface{}, erro resources = ret case resource.RunTypeSidecar: switch option.TrafficDirection { - case core.TrafficDirection_UNSPECIFIED: - inBoundListener, err := lds.makeListener(option, corev3.TrafficDirection_INBOUND) - if err != nil { - return nil, err - } - resources = append(resources, inBoundListener...) - outBoundListener, err := lds.makeListener(option, corev3.TrafficDirection_OUTBOUND) - if err != nil { - return nil, err - } - resources = append(resources, outBoundListener...) case core.TrafficDirection_INBOUND: inBoundListener, err := lds.makeListener(option, corev3.TrafficDirection_INBOUND) if err != nil { @@ -140,7 +129,7 @@ func (lds *LDSBuilder) makeListener(option *resource.BuildOption, listener := makeDefaultListener(direction, boundHCM, option, dstPorts) listener.ListenerFilters = append(listener.ListenerFilters, defaultListenerFilters...) - if option.TLSMode != resource.TLSModeNone { + if resource.EnableTLS(option.TLSMode) { listener.FilterChains = []*listenerv3.FilterChain{ { FilterChainMatch: &listenerv3.FilterChainMatch{ @@ -248,11 +237,9 @@ func makeDefaultListenerFilterChain(trafficDirection corev3.TrafficDirection, }, }) } - } else { - filterChain = append(filterChain, &listenerv3.FilterChain{ - Filters: defaultHttpFilter, - }) } - + filterChain = append(filterChain, &listenerv3.FilterChain{ + Filters: defaultHttpFilter, + }) return filterChain } diff --git a/apiserver/xdsserverv3/rds.go b/apiserver/xdsserverv3/rds.go index fea4f6313..a54cf8b1e 100644 --- a/apiserver/xdsserverv3/rds.go +++ b/apiserver/xdsserverv3/rds.go @@ -77,7 +77,7 @@ func (rds *RDSBuilder) makeSidecarInBoundRouteConfiguration(option *resource.Bui return []types.Resource{} } routeConf := &route.RouteConfiguration{ - Name: resource.MakeInBoundRouteConfigName(selfService), + Name: resource.MakeInBoundRouteConfigName(selfService, option.OpenOnDemand), ValidateClusters: wrapperspb.Bool(false), } @@ -116,7 +116,7 @@ func (rds *RDSBuilder) makeSidecarOutBoundRouteConfiguration(option *resource.Bu } hosts = append(hosts, resource.BuildAllowAnyVHost()) if option.OpenOnDemand { - baseRouteName = fmt.Sprintf("%s|%s|Demand|%s", resource.OutBoundRouteConfigName, option.Namespace, option.OnDemandServer) + baseRouteName = fmt.Sprintf("%s|%s|DEMAND", resource.OutBoundRouteConfigName, option.Namespace) // routeConfiguration.Vhds = &route.Vhds{ // ConfigSource: &corev3.ConfigSource{ // ConfigSourceSpecifier: &corev3.ConfigSource_ApiConfigSource{ @@ -174,6 +174,8 @@ func (rds *RDSBuilder) makeSidecarInBoundRoutes(selfService model.ServiceKey, limits, typedPerFilterConfig, err := resource.MakeSidecarLocalRateLimit(seacher, selfService) if err == nil { currentRoute.TypedPerFilterConfig = typedPerFilterConfig + currentRoute.TypedPerFilterConfig[resource.EnvoyHttpFilter_OnDemand] = + resource.BuildOnDemandRouteTypedPerFilterConfig() currentRoute.GetRoute().RateLimits = limits } return []*route.Route{ diff --git a/apiserver/xdsserverv3/resource/api.go b/apiserver/xdsserverv3/resource/api.go index 74928b90f..bd5fb8464 100644 --- a/apiserver/xdsserverv3/resource/api.go +++ b/apiserver/xdsserverv3/resource/api.go @@ -33,13 +33,12 @@ type XDSBuilder interface { } type BuildOption struct { - RunType RunType - Namespace string - TLSMode TLSMode - Services map[model.ServiceKey]*ServiceInfo - OpenOnDemand bool - OnDemandServer string - SelfService model.ServiceKey + RunType RunType + Namespace string + TLSMode TLSMode + Services map[model.ServiceKey]*ServiceInfo + OpenOnDemand bool + SelfService model.ServiceKey // 不是比带,只有在 EDS 生成,并且是处理 INBOUND 的时候才会设置 Client *XDSClient TrafficDirection corev3.TrafficDirection @@ -47,6 +46,10 @@ type BuildOption struct { ForceDelete bool } +func (opt *BuildOption) HasTls() bool { + return opt.TLSMode == TLSModeStrict || opt.TLSMode == TLSModePermissive +} + func (opt *BuildOption) Clone() *BuildOption { return &BuildOption{ Namespace: opt.Namespace, diff --git a/apiserver/xdsserverv3/resource/help.go b/apiserver/xdsserverv3/resource/help.go index 1c0d85b67..3d2e5b81a 100644 --- a/apiserver/xdsserverv3/resource/help.go +++ b/apiserver/xdsserverv3/resource/help.go @@ -592,28 +592,7 @@ func MakeDefaultRoute(trafficDirection corev3.TrafficDirection, svcKey model.Ser } if opt.OpenOnDemand { routeConf.TypedPerFilterConfig = map[string]*anypb.Any{ - "envoy.filters.http.on_demand": MustNewAny(&on_demandv3.PerRouteConfig{ - Odcds: &on_demandv3.OnDemandCds{ - Source: &corev3.ConfigSource{ - ConfigSourceSpecifier: &corev3.ConfigSource_ApiConfigSource{ - ApiConfigSource: &corev3.ApiConfigSource{ - ApiType: corev3.ApiConfigSource_DELTA_GRPC, - TransportApiVersion: corev3.ApiVersion_V3, - GrpcServices: []*corev3.GrpcService{ - { - TargetSpecifier: &corev3.GrpcService_GoogleGrpc_{ - GoogleGrpc: &corev3.GrpcService_GoogleGrpc{ - TargetUri: opt.OnDemandServer, - StatPrefix: "polaris_odcds", - }, - }, - }, - }, - }, - }, - }, - }, - }), + EnvoyHttpFilter_OnDemand: BuildOnDemandRouteTypedPerFilterConfig(), } } return routeConf @@ -637,31 +616,34 @@ func MakeSidecarRoute(trafficDirection corev3.TrafficDirection, routeMatch *rout } if opt.OpenOnDemand { currentRoute.TypedPerFilterConfig = map[string]*anypb.Any{ - "envoy.filters.http.on_demand": MustNewAny(&on_demandv3.PerRouteConfig{ - Odcds: &on_demandv3.OnDemandCds{ - Source: &corev3.ConfigSource{ - ConfigSourceSpecifier: &corev3.ConfigSource_ApiConfigSource{ - ApiConfigSource: &corev3.ApiConfigSource{ - ApiType: corev3.ApiConfigSource_DELTA_GRPC, - TransportApiVersion: corev3.ApiVersion_V3, - GrpcServices: []*corev3.GrpcService{ - { - TargetSpecifier: &corev3.GrpcService_GoogleGrpc_{ - GoogleGrpc: &corev3.GrpcService_GoogleGrpc{ - TargetUri: opt.OnDemandServer, - StatPrefix: "polaris_odcds", - }, - }, + EnvoyHttpFilter_OnDemand: BuildOnDemandRouteTypedPerFilterConfig(), + } + } + return currentRoute +} + +func BuildOnDemandRouteTypedPerFilterConfig() *anypb.Any { + return MustNewAny(&on_demandv3.PerRouteConfig{ + Odcds: &on_demandv3.OnDemandCds{ + Source: &corev3.ConfigSource{ + ConfigSourceSpecifier: &corev3.ConfigSource_ApiConfigSource{ + ApiConfigSource: &corev3.ApiConfigSource{ + ApiType: corev3.ApiConfigSource_DELTA_GRPC, + TransportApiVersion: corev3.ApiVersion_V3, + GrpcServices: []*corev3.GrpcService{ + { + TargetSpecifier: &corev3.GrpcService_EnvoyGrpc_{ + EnvoyGrpc: &corev3.GrpcService_EnvoyGrpc{ + ClusterName: "polaris_xds_server", }, }, }, }, }, }, - }), - } - } - return currentRoute + }, + }, + }) } var PassthroughCluster = &cluster.Cluster{ @@ -682,8 +664,11 @@ var PassthroughCluster = &cluster.Cluster{ } // MakeInBoundRouteConfigName . -func MakeInBoundRouteConfigName(svcKey model.ServiceKey) string { - return InBoundRouteConfigName + "/" + svcKey.Domain() +func MakeInBoundRouteConfigName(svcKey model.ServiceKey, demand bool) string { + if demand { + return InBoundRouteConfigName + "|" + svcKey.Domain() + "|DEMAND" + } + return InBoundRouteConfigName + "|" + svcKey.Domain() } // MakeServiceName . @@ -760,21 +745,8 @@ func makeRateLimitHCMFilter(svcKey model.ServiceKey) []*hcm.HttpFilter { func makeSidecarOnDemandHCMFilter(option *BuildOption) []*hcm.HttpFilter { return []*hcm.HttpFilter{ - // { - // // 这个插件用于改写所有的 envoy 请求,手动添加一个内置的专门用于 ODCDS 的简单 Lua 脚本 - // Name: wellknown.Lua, - // ConfigType: &hcm.HttpFilter_TypedConfig{ - // TypedConfig: MustNewAny(&luav3.Lua{ - // DefaultSourceCode: &corev3.DataSource{ - // Specifier: &corev3.DataSource_InlineString{ - // InlineString: odcdsLuaCode, - // }, - // }, - // }), - // }, - // }, { - Name: "envoy.filters.http.on_demand", + Name: EnvoyHttpFilter_OnDemand, ConfigType: &hcm.HttpFilter_TypedConfig{ TypedConfig: MustNewAny(&on_demandv3.OnDemand{}), }, @@ -804,21 +776,31 @@ func MakeSidecarOnDemandOutBoundHCM(svcKey model.ServiceKey, option *BuildOption } func MakeSidecarBoundHCM(svcKey model.ServiceKey, trafficDirection corev3.TrafficDirection, opt *BuildOption) *hcm.HttpConnectionManager { - hcmFilters := []*hcm.HttpFilter{} - hcmFilters = append(hcmFilters, &hcm.HttpFilter{ - Name: wellknown.Router, - ConfigType: &hcm.HttpFilter_TypedConfig{ - TypedConfig: MustNewAny(&routerv3.Router{}), + hcmFilters := []*hcm.HttpFilter{ + { + Name: wellknown.Router, + ConfigType: &hcm.HttpFilter_TypedConfig{ + TypedConfig: MustNewAny(&routerv3.Router{}), + }, }, - }) + } if trafficDirection == corev3.TrafficDirection_INBOUND { hcmFilters = append(makeRateLimitHCMFilter(svcKey), hcmFilters...) } + if opt.OpenOnDemand { + hcmFilters = append([]*hcm.HttpFilter{ + { + Name: EnvoyHttpFilter_OnDemand, + ConfigType: &hcm.HttpFilter_TypedConfig{ + TypedConfig: MustNewAny(&on_demandv3.OnDemand{}), + }, + }, + }, hcmFilters...) + } - trafficDirectionName := corev3.TrafficDirection_name[int32(trafficDirection)] manager := &hcm.HttpConnectionManager{ CodecType: hcm.HttpConnectionManager_AUTO, - StatPrefix: trafficDirectionName + "_HTTP", + StatPrefix: corev3.TrafficDirection_name[int32(trafficDirection)] + "_HTTP", RouteSpecifier: routeSpecifier(trafficDirection, opt), AccessLog: accessLog(), HttpFilters: hcmFilters, @@ -829,7 +811,7 @@ func MakeSidecarBoundHCM(svcKey model.ServiceKey, trafficDirection corev3.Traffi // 重写 RouteSpecifier 的路由规则数据信息 if trafficDirection == core.TrafficDirection_INBOUND { - manager.GetRds().RouteConfigName = MakeInBoundRouteConfigName(svcKey) + manager.GetRds().RouteConfigName = MakeInBoundRouteConfigName(svcKey, opt.OpenOnDemand) } return manager @@ -858,7 +840,7 @@ func MakeGatewayBoundHCM(svcKey model.ServiceKey, opt *BuildOption) *hcm.HttpCon func routeSpecifier(trafficDirection corev3.TrafficDirection, opt *BuildOption) *hcm.HttpConnectionManager_Rds { baseRouteName := TrafficBoundRoute[trafficDirection] if opt.OpenOnDemand { - baseRouteName = fmt.Sprintf("%s|%s|Demand|%s", TrafficBoundRoute[trafficDirection], opt.Namespace, opt.OnDemandServer) + baseRouteName = fmt.Sprintf("%s|%s|DEMAND", TrafficBoundRoute[trafficDirection], opt.Namespace) } return &hcm.HttpConnectionManager_Rds{ Rds: &hcm.Rds{ @@ -933,7 +915,7 @@ func MakeSidecarLocalRateLimit(rateLimitCache types.RateLimitCache, svcKey model.ServiceKey) ([]*route.RateLimit, map[string]*anypb.Any, error) { conf, _ := rateLimitCache.GetRateLimitRules(svcKey) if conf == nil { - return nil, nil, nil + return nil, map[string]*anypb.Any{}, nil } confKey := fmt.Sprintf("INBOUND|SIDECAR|%s|%s", svcKey.Namespace, svcKey.Name) rateLimitConf := BuildRateLimitConf(confKey) @@ -1121,3 +1103,12 @@ func FormatEndpointHealth(ins *apiservice.Instance) core.HealthStatus { } return core.HealthStatus_UNHEALTHY } + +func SupportTLS(x XDSType) bool { + switch x { + case CDS, LDS: + return true + default: + return false + } +} diff --git a/apiserver/xdsserverv3/resource/model.go b/apiserver/xdsserverv3/resource/model.go index f7e46537d..34e3efedf 100644 --- a/apiserver/xdsserverv3/resource/model.go +++ b/apiserver/xdsserverv3/resource/model.go @@ -28,6 +28,10 @@ import ( "github.com/polarismesh/polaris/common/model" ) +const ( + EnvoyHttpFilter_OnDemand = "envoy.filters.http.on_demand" +) + const ( PassthroughClusterName = "PassthroughCluster" RouteConfigName = "polaris-router" @@ -79,8 +83,28 @@ const ( RLS SDS VHDS + UnknownXDS ) +func FormatTypeUrl(typeUrl string) XDSType { + switch typeUrl { + case resourcev3.ListenerType: + return LDS + case resourcev3.RouteType: + return RDS + case resourcev3.EndpointType: + return EDS + case resourcev3.ClusterType: + return CDS + case resourcev3.RateLimitConfigType: + return RLS + case resourcev3.VirtualHostType: + return VHDS + default: + return UnknownXDS + } +} + func (x XDSType) ResourceType() resourcev3.Type { if x == LDS { return resourcev3.ListenerType @@ -140,6 +164,10 @@ const ( TLSModePermissive TLSMode = "permissive" ) +func EnableTLS(t TLSMode) bool { + return t == TLSModePermissive || t == TLSModeStrict +} + const ( // 这个是特殊指定的 prefix MatchString_Prefix = apimodel.MatchString_MatchStringType(-1) diff --git a/apiserver/xdsserverv3/resource/node.go b/apiserver/xdsserverv3/resource/node.go index 440f155b7..750370233 100644 --- a/apiserver/xdsserverv3/resource/node.go +++ b/apiserver/xdsserverv3/resource/node.go @@ -70,6 +70,19 @@ const ( SidecarODCDSServerEndpoint = "sidecar.polarismesh.cn/odcdsServerEndpoint" ) +type EnvoyNodeView struct { + ID string + RunType RunType + User string + Namespace string + IPAddr string + PodIP string + Metadata map[string]string + Version string + TLSMode TLSMode + OpenOnDemand bool +} + func NewXDSNodeManager() *XDSNodeManager { return &XDSNodeManager{ nodes: map[string]*XDSClient{}, @@ -87,8 +100,6 @@ type XDSNodeManager struct { sidecarNodes map[string]*XDSClient // gatewayNodes The XDS client is the node list of the Gateway run mode gatewayNodes map[string]*XDSClient - // demandConfs . - demandConfs map[RunType]map[string]map[string]struct{} } func (x *XDSNodeManager) AddNodeIfAbsent(streamId int64, node *core.Node) { @@ -192,8 +203,22 @@ func (x *XDSNodeManager) ListSidecarNodes() []*XDSClient { return ret } -func (x *XDSNodeManager) ListDemandConfs() map[RunType]map[string]map[string]struct{} { - return x.demandConfs +func (x *XDSNodeManager) ListEnvoyNodesView(run RunType) []*EnvoyNodeView { + x.lock.RLock() + defer x.lock.RUnlock() + + if run == RunTypeSidecar { + ret := make([]*EnvoyNodeView, 0, len(x.sidecarNodes)) + for i := range x.sidecarNodes { + ret = append(ret, x.sidecarNodes[i].toView()) + } + return ret + } + ret := make([]*EnvoyNodeView, 0, len(x.gatewayNodes)) + for i := range x.gatewayNodes { + ret = append(ret, x.gatewayNodes[i].toView()) + } + return ret } // ID id 的格式是 ${sidecar|gateway}~namespace/uuid~hostIp @@ -291,21 +316,36 @@ type RegisterService struct { // XDSClient 客户端代码结构体 type XDSClient struct { - RunType RunType - User string - Namespace string - IPAddr string - PodIP string - Metadata map[string]string - Version string - Node *core.Node - TLSMode TLSMode - OpenOnDemand bool - OnDemandServer string + ID string + RunType RunType + User string + Namespace string + IPAddr string + PodIP string + Metadata map[string]string + Version string + Node *core.Node + TLSMode TLSMode + OpenOnDemand bool +} + +func (n *XDSClient) toView() *EnvoyNodeView { + return &EnvoyNodeView{ + ID: n.ID, + RunType: n.RunType, + User: n.User, + Namespace: n.Namespace, + IPAddr: n.IPAddr, + PodIP: n.PodIP, + Metadata: n.Metadata, + Version: n.Version, + TLSMode: n.TLSMode, + OpenOnDemand: n.OpenOnDemand, + } } func (n *XDSClient) GetNodeID() string { - return n.Node.Id + return n.ID } func (n *XDSClient) ResourceKey() string { @@ -388,6 +428,7 @@ func ParseXDSClient(node *core.Node) *XDSClient { func parseNodeProxy(node *core.Node) *XDSClient { runType, polarisNamespace, _, hostIP := ParseNodeID(node.Id) proxy := &XDSClient{ + ID: node.Id, IPAddr: hostIP, PodIP: hostIP, RunType: RunType(runType), @@ -405,14 +446,8 @@ func parseNodeProxy(node *core.Node) *XDSClient { proxy.TLSMode = TLSModeStrict } } - if onDemand, ok := getEnvoyMetaField(node.Metadata, SidecarOpenOnDemandFeature, true); ok { - proxy.OpenOnDemand = onDemand - odcdsSvr, ok := getEnvoyMetaField(node.Metadata, SidecarODCDSServerEndpoint, "") - if !ok { - proxy.OpenOnDemand = false - } else { - proxy.OnDemandServer = odcdsSvr - } + if onDemand, ok := getEnvoyMetaField(node.Metadata, SidecarOpenOnDemandFeature, ""); ok { + proxy.OpenOnDemand = onDemand == "true" } } diff --git a/apiserver/xdsserverv3/server.go b/apiserver/xdsserverv3/server.go index c2c09ab3c..5f0e4a2f7 100644 --- a/apiserver/xdsserverv3/server.go +++ b/apiserver/xdsserverv3/server.go @@ -45,7 +45,6 @@ import ( "github.com/polarismesh/polaris/cache" api "github.com/polarismesh/polaris/common/api/v1" connlimit "github.com/polarismesh/polaris/common/conn/limit" - commonlog "github.com/polarismesh/polaris/common/log" "github.com/polarismesh/polaris/common/model" "github.com/polarismesh/polaris/common/utils" "github.com/polarismesh/polaris/service" @@ -128,7 +127,7 @@ func (x *XDSServer) Initialize(ctx context.Context, option map[string]interface{ func (x *XDSServer) Run(errCh chan error) { // 启动 grpc server ctx := context.Background() - cb := xdscache.NewCallback(commonlog.GetScopeOrDefaultByName(commonlog.XDSLoggerName), x.nodeMgr) + cb := xdscache.NewCallback(x.cache, x.nodeMgr) srv := serverv3.NewServer(ctx, x.cache, cb) var grpcOptions []grpc.ServerOption grpcOptions = append(grpcOptions, grpc.MaxConcurrentStreams(1000)) @@ -223,6 +222,7 @@ func (x *XDSServer) activeUpdateTask() { <-x.activeNotifier.Done() return } + defer x.activeFinish() log.Info("active update xds resource snapshot task") if err := x.initRegistryInfo(); err != nil { @@ -236,7 +236,6 @@ func (x *XDSServer) activeUpdateTask() { } // 首次更新没有需要移除的 XDS 资源信息 x.Generate(x.registryInfo, nil) - x.activeFinish() go x.startSynTask(x.ctx) } @@ -443,7 +442,6 @@ func (x *XDSServer) getRegistryInfoWithCache(ctx context.Context, } func (x *XDSServer) Generate(needPush, needRemove map[string]map[model.ServiceKey]*resource.ServiceInfo) { - defer x.activeFinish() versionLocal := time.Now().Format(time.RFC3339) + "/" + strconv.FormatUint(x.versionNum.Inc(), 10) x.resourceGenerator.Generate(versionLocal, needPush, needRemove) } diff --git a/cache/api/types.go b/cache/api/types.go index 99cdbe0a6..3253e2c5c 100644 --- a/cache/api/types.go +++ b/cache/api/types.go @@ -492,8 +492,8 @@ type ( GetGroupActiveReleases(namespace, group string) ([]*model.ConfigFileRelease, string) // GetActiveRelease GetActiveRelease(namespace, group, fileName string) *model.ConfigFileRelease - // GetGrayRelease - GetGrayRelease(namespace, group, fileName string) *model.ConfigFileRelease + // GetActiveGrayRelease + GetActiveGrayRelease(namespace, group, fileName string) *model.ConfigFileRelease // GetRelease GetRelease(key model.ConfigFileReleaseKey) *model.ConfigFileRelease // QueryReleases diff --git a/cache/config/config_file.go b/cache/config/config_file.go index 01b5b8253..5fd1034e4 100644 --- a/cache/config/config_file.go +++ b/cache/config/config_file.go @@ -233,31 +233,13 @@ func (fc *fileCache) handleUpdateRelease(oldVal *model.SimpleConfigFileRelease, }() if !item.Active { + if oldVal != nil && oldVal.Active { + return fc.cleanActiveRelease(oldVal) + } return nil } - // 保存 active 状态的所有发布 release 信息 - if _, ok := fc.activeReleases.Load(item.Namespace); !ok { - fc.activeReleases.Store(item.Namespace, utils.NewSyncMap[string, - *utils.SyncMap[string, *model.SimpleConfigFileRelease]]()) - } - namespace, _ := fc.activeReleases.Load(item.Namespace) - if _, ok := namespace.Load(item.Group); !ok { - namespace.Store(item.Group, utils.NewSyncMap[string, *model.SimpleConfigFileRelease]()) - } - group, _ := namespace.Load(item.Group) - group.Store(item.ActiveKey(), item.SimpleConfigFileRelease) - - if err := fc.valueCache.Update(func(tx *bbolt.Tx) error { - bucket, err := tx.CreateBucketIfNotExists([]byte(item.OwnerKey())) - if err != nil { - return err - } - return bucket.Put([]byte(item.ActiveKey()), []byte(item.Content)) - }); err != nil { - return errors.Join(err, errors.New("persistent config_file content fail")) - } - return nil + return fc.saveActiveRelease(item) } // handleDeleteRelease @@ -291,6 +273,35 @@ func (fc *fileCache) handleDeleteRelease(release *model.SimpleConfigFileRelease) if !release.Active { return nil } + return fc.cleanActiveRelease(release) +} + +func (fc *fileCache) saveActiveRelease(item *model.ConfigFileRelease) error { + // 保存 active 状态的所有发布 release 信息 + if _, ok := fc.activeReleases.Load(item.Namespace); !ok { + fc.activeReleases.Store(item.Namespace, utils.NewSyncMap[string, + *utils.SyncMap[string, *model.SimpleConfigFileRelease]]()) + } + namespace, _ := fc.activeReleases.Load(item.Namespace) + if _, ok := namespace.Load(item.Group); !ok { + namespace.Store(item.Group, utils.NewSyncMap[string, *model.SimpleConfigFileRelease]()) + } + group, _ := namespace.Load(item.Group) + group.Store(item.ActiveKey(), item.SimpleConfigFileRelease) + + if err := fc.valueCache.Update(func(tx *bbolt.Tx) error { + bucket, err := tx.CreateBucketIfNotExists([]byte(item.OwnerKey())) + if err != nil { + return err + } + return bucket.Put([]byte(item.ActiveKey()), []byte(item.Content)) + }); err != nil { + return errors.Join(err, errors.New("persistent active config_file content fail")) + } + return nil +} + +func (fc *fileCache) cleanActiveRelease(release *model.SimpleConfigFileRelease) error { if namespace, ok := fc.activeReleases.Load(release.Namespace); ok { if group, ok := namespace.Load(release.Group); ok { group.Delete(release.ActiveKey()) @@ -303,7 +314,7 @@ func (fc *fileCache) handleDeleteRelease(release *model.SimpleConfigFileRelease) } return bucket.Delete([]byte(release.ActiveKey())) }); err != nil { - return errors.Join(err, errors.New("remove config_file content fail")) + return errors.Join(err, errors.New("remove active config_file content fail")) } return nil } @@ -409,8 +420,8 @@ func (fc *fileCache) GetActiveRelease(namespace, group, fileName string) *model. return fc.handleGetActiveRelease(namespace, group, fileName, model.ReleaseTypeFull) } -// GetGrayRelease -func (fc *fileCache) GetGrayRelease(namespace, group, fileName string) *model.ConfigFileRelease { +// GetActiveGrayRelease +func (fc *fileCache) GetActiveGrayRelease(namespace, group, fileName string) *model.ConfigFileRelease { return fc.handleGetActiveRelease(namespace, group, fileName, model.ReleaseTypeGray) } diff --git a/cache/mock/cache_mock.go b/cache/mock/cache_mock.go index 4617428cf..0775b9686 100644 --- a/cache/mock/cache_mock.go +++ b/cache/mock/cache_mock.go @@ -2228,18 +2228,18 @@ func (mr *MockConfigFileCacheMockRecorder) GetActiveRelease(namespace, group, fi return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetActiveRelease", reflect.TypeOf((*MockConfigFileCache)(nil).GetActiveRelease), namespace, group, fileName) } -// GetGrayRelease mocks base method. -func (m *MockConfigFileCache) GetGrayRelease(namespace, group, fileName string) *model.ConfigFileRelease { +// GetActiveGrayRelease mocks base method. +func (m *MockConfigFileCache) GetActiveGrayRelease(namespace, group, fileName string) *model.ConfigFileRelease { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetGrayRelease", namespace, group, fileName) + ret := m.ctrl.Call(m, "GetActiveGrayRelease", namespace, group, fileName) ret0, _ := ret[0].(*model.ConfigFileRelease) return ret0 } -// GetGrayRelease indicates an expected call of GetGrayRelease. -func (mr *MockConfigFileCacheMockRecorder) GetGrayRelease(namespace, group, fileName interface{}) *gomock.Call { +// GetActiveGrayRelease indicates an expected call of GetActiveGrayRelease. +func (mr *MockConfigFileCacheMockRecorder) GetActiveGrayRelease(namespace, group, fileName interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetGrayRelease", reflect.TypeOf((*MockConfigFileCache)(nil).GetGrayRelease), namespace, group, fileName) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetActiveGrayRelease", reflect.TypeOf((*MockConfigFileCache)(nil).GetActiveGrayRelease), namespace, group, fileName) } // GetGroupActiveReleases mocks base method. diff --git a/common/model/config_file.go b/common/model/config_file.go index b82de08d4..f894901a0 100644 --- a/common/model/config_file.go +++ b/common/model/config_file.go @@ -366,6 +366,7 @@ func ToConfiogFileReleaseApi(release *ConfigFileRelease) *config_manage.ConfigFi Tags: FromTagMap(release.Metadata), Active: utils.NewBoolValue(release.Active), ReleaseType: utils.NewStringValue(string(release.ReleaseType)), + BetaLabels: release.BetaLabels, } } diff --git a/config/client.go b/config/client.go index dd294247f..62b37f06c 100644 --- a/config/client.go +++ b/config/client.go @@ -54,7 +54,7 @@ func (s *Server) GetConfigFileWithCache(ctx context.Context, var release *model.ConfigFileRelease var match = false if len(client.GetTags()) > 0 { - if release = s.fileCache.GetGrayRelease(namespace, group, fileName); release != nil { + if release = s.fileCache.GetActiveGrayRelease(namespace, group, fileName); release != nil { key := model.GetGrayConfigRealseKey(release.SimpleConfigFileRelease) match = s.grayCache.HitGrayRule(key, model.ToTagMap(client.GetTags())) } diff --git a/config/config_chain.go b/config/config_chain.go index 7478169b8..ff1f5c8d5 100644 --- a/config/config_chain.go +++ b/config/config_chain.go @@ -273,7 +273,7 @@ func (chain *ReleaseConfigFileChain) AfterGetFile(ctx context.Context, group := file.Group name := file.Name // 首先检测灰度版本 - if grayFile := chain.svr.fileCache.GetGrayRelease(namespace, group, name); grayFile != nil { + if grayFile := chain.svr.fileCache.GetActiveGrayRelease(namespace, group, name); grayFile != nil { if grayFile.Content == file.OriginContent { file.Status = utils.ReleaseTypeGray file.ReleaseBy = grayFile.ModifyBy diff --git a/config/config_file.go b/config/config_file.go index 9b542c050..f2f69768e 100644 --- a/config/config_file.go +++ b/config/config_file.go @@ -341,6 +341,7 @@ func (s *Server) SearchConfigFile(ctx context.Context, filter map[string]string) } _ = s.caches.ConfigFile().Update() + _ = s.caches.Gray().Update() ret := make([]*apiconfig.ConfigFile, 0, len(files)) for _, file := range files { file, err := s.chains.AfterGetFile(ctx, file) diff --git a/config/config_file_group_test.go b/config/config_file_group_test.go index 9abfb3cad..0ca05373a 100644 --- a/config/config_file_group_test.go +++ b/config/config_file_group_test.go @@ -18,11 +18,15 @@ package config_test import ( + "fmt" "testing" + apimodel "github.com/polarismesh/specification/source/go/api/v1/model" "github.com/stretchr/testify/assert" + "google.golang.org/protobuf/types/known/wrapperspb" api "github.com/polarismesh/polaris/common/api/v1" + "github.com/polarismesh/polaris/common/utils" ) var ( @@ -30,19 +34,39 @@ var ( randomGroupSize = uint32(7) ) -// TestConfigFileGroupCRUD 测试配置文件组增删改查 -func TestConfigFileGroupCRUD(t *testing.T) { - testSuit := &ConfigCenterTest{} - if err := testSuit.Initialize(); err != nil { - t.Fatal(err) - } - - t.Cleanup(func() { - if err := testSuit.clearTestData(); err != nil { - t.Fatal(err) +func Test_CheckCreateFileGroupParam(t *testing.T) { + testSuit := newConfigCenterTestSuit(t) + + t.Run("invalid_nil_param", func(t *testing.T) { + rsp := testSuit.ConfigServer().CreateConfigFileGroup(testSuit.DefaultCtx, nil) + assert.Equal(t, uint32(apimodel.Code_InvalidParameter), rsp.Code.GetValue()) + }) + t.Run("invalid_name", func(t *testing.T) { + mockGroup := assembleConfigFileGroup() + mockGroup.Name = wrapperspb.String("") + rsp := testSuit.ConfigServer().CreateConfigFileGroup(testSuit.DefaultCtx, mockGroup) + assert.Equal(t, uint32(apimodel.Code_InvalidConfigFileGroupName), rsp.Code.GetValue()) + }) + t.Run("invalid_namespace", func(t *testing.T) { + mockGroup := assembleConfigFileGroup() + mockGroup.Namespace = wrapperspb.String("") + rsp := testSuit.ConfigServer().CreateConfigFileGroup(testSuit.DefaultCtx, mockGroup) + assert.Equal(t, uint32(apimodel.Code_InvalidNamespaceName), rsp.Code.GetValue()) + }) + t.Run("invalid_metadata_len", func(t *testing.T) { + mockGroup := assembleConfigFileGroup() + mockGroup.Metadata = map[string]string{} + for i := 0; i < utils.MaxMetadataLength+10; i++ { + mockGroup.Metadata[fmt.Sprintf("Key_%d", i)] = fmt.Sprintf("Value_%d", i) } - testSuit.Destroy() + rsp := testSuit.ConfigServer().CreateConfigFileGroup(testSuit.DefaultCtx, mockGroup) + assert.Equal(t, uint32(apimodel.Code_InvalidMetadata), rsp.Code.GetValue()) }) +} + +// TestConfigFileGroupCRUD 测试配置文件组增删改查 +func TestConfigFileGroupCRUD(t *testing.T) { + testSuit := newConfigCenterTestSuit(t) // 查询不存在的 group t.Run("step1-query-none", func(t *testing.T) { @@ -58,11 +82,26 @@ func TestConfigFileGroupCRUD(t *testing.T) { // 创建 group t.Run("step2-create", func(t *testing.T) { - rsp := testSuit.ConfigServer().CreateConfigFileGroup(testSuit.DefaultCtx, assembleConfigFileGroup()) - assert.Equal(t, api.ExecuteSuccess, rsp.Code.GetValue()) + t.Run("normal", func(t *testing.T) { + rsp := testSuit.ConfigServer().CreateConfigFileGroup(testSuit.DefaultCtx, assembleConfigFileGroup()) + assert.Equal(t, api.ExecuteSuccess, rsp.Code.GetValue()) - rsp2 := testSuit.ConfigServer().CreateConfigFileGroup(testSuit.DefaultCtx, assembleConfigFileGroup()) - assert.Equal(t, uint32(api.ExistedResource), rsp2.Code.GetValue()) + rsp2 := testSuit.ConfigServer().CreateConfigFileGroup(testSuit.DefaultCtx, assembleConfigFileGroup()) + assert.Equal(t, uint32(api.ExistedResource), rsp2.Code.GetValue()) + }) + }) + + t.Run("step2-update", func(t *testing.T) { + t.Run("no_change", func(t *testing.T) { + rsp := testSuit.ConfigServer().UpdateConfigFileGroup(testSuit.DefaultCtx, assembleConfigFileGroup()) + assert.Equal(t, api.NoNeedUpdate, rsp.Code.GetValue(), rsp.GetInfo().GetValue()) + }) + t.Run("change", func(t *testing.T) { + mockGroup := assembleConfigFileGroup() + mockGroup.Comment = wrapperspb.String("v string changed") + rsp := testSuit.ConfigServer().UpdateConfigFileGroup(testSuit.DefaultCtx, mockGroup) + assert.Equal(t, api.ExecuteSuccess, rsp.Code.GetValue()) + }) }) // 再次查询 group,能查询到上一步创建的 group @@ -94,20 +133,30 @@ func TestConfigFileGroupCRUD(t *testing.T) { // 删除 group t.Run("step4-delete", func(t *testing.T) { - rsp := testSuit.ConfigServer().DeleteConfigFile(testSuit.DefaultCtx, assembleConfigFile()) - assert.Equal(t, api.ExecuteSuccess, rsp.Code.GetValue(), rsp.GetInfo().GetValue()) - - rsp = testSuit.ConfigServer().DeleteConfigFileGroup(testSuit.DefaultCtx, testNamespace, testGroup) - assert.Equal(t, api.ExecuteSuccess, rsp.Code.GetValue(), rsp.GetInfo().GetValue()) - - rsp2 := testSuit.ConfigServer().SearchConfigFile(testSuit.DefaultCtx, map[string]string{ - "namespace": testNamespace, - "group": testGroup, - "offset": "0", - "limit": "10", + t.Run("delete-noexist", func(t *testing.T) { + rsp := testSuit.ConfigServer().DeleteConfigFileGroup(testSuit.DefaultCtx, testNamespace, "testGroup_noExist") + assert.Equal(t, uint32(apimodel.Code_NotFoundResource), rsp.Code.GetValue(), rsp.GetInfo().GetValue()) + }) + t.Run("exist-subresources", func(t *testing.T) { + rsp := testSuit.ConfigServer().DeleteConfigFileGroup(testSuit.DefaultCtx, testNamespace, testGroup) + assert.Equal(t, uint32(apimodel.Code_ExistedResource), rsp.Code.GetValue(), rsp.GetInfo().GetValue()) + }) + t.Run("normal", func(t *testing.T) { + rsp := testSuit.ConfigServer().DeleteConfigFile(testSuit.DefaultCtx, assembleConfigFile()) + assert.Equal(t, api.ExecuteSuccess, rsp.Code.GetValue(), rsp.GetInfo().GetValue()) + + rsp = testSuit.ConfigServer().DeleteConfigFileGroup(testSuit.DefaultCtx, testNamespace, testGroup) + assert.Equal(t, api.ExecuteSuccess, rsp.Code.GetValue(), rsp.GetInfo().GetValue()) + + rsp2 := testSuit.ConfigServer().SearchConfigFile(testSuit.DefaultCtx, map[string]string{ + "namespace": testNamespace, + "group": testGroup, + "offset": "0", + "limit": "10", + }) + assert.Equal(t, api.ExecuteSuccess, rsp2.Code.GetValue()) + assert.Equal(t, uint32(0), rsp2.GetTotal().GetValue()) }) - assert.Equal(t, api.ExecuteSuccess, rsp2.Code.GetValue()) - assert.Equal(t, uint32(0), rsp2.GetTotal().GetValue()) }) // 再次查询group,由于被删除,所以查不到 diff --git a/config/config_file_release.go b/config/config_file_release.go index 77cbf1daa..3a9361110 100644 --- a/config/config_file_release.go +++ b/config/config_file_release.go @@ -601,7 +601,6 @@ func (s *Server) CasUpsertAndReleaseConfigFile(ctx context.Context, if req.GetMd5().GetValue() != CalMd5(saveFile.Content) { return api.NewConfigResponse(apimodel.Code_DataConflict) } - // 补充针对 Version、MD5 的比对逻辑,如果不满足,快速结束 upsertResp = s.handleUpdateConfigFile(ctx, tx, upsertFileReq) } if upsertResp.GetCode().GetValue() != uint32(apimodel.Code_ExecuteSuccess) { @@ -767,7 +766,7 @@ func (s *Server) StopGrayConfigFileRelease(ctx context.Context, req *apiconfig.C return api.NewConfigResponse(commonstore.StoreCode2APICode(err)) } - if err = s.storage.DeleteConfigFileReleaseTx(tx, betaRelease.ConfigFileReleaseKey); err != nil { + if err = s.storage.InactiveConfigFileReleaseTx(tx, betaRelease); err != nil { log.Error("[Config][File] stop beta config file release.", utils.RequestID(ctx), zap.Error(err)) return api.NewConfigResponse(commonstore.StoreCode2APICode(err)) } diff --git a/config/config_file_release_test.go b/config/config_file_release_test.go index 14d23a86a..451ac20e8 100644 --- a/config/config_file_release_test.go +++ b/config/config_file_release_test.go @@ -27,10 +27,11 @@ import ( "github.com/polarismesh/polaris/common/model" "github.com/polarismesh/polaris/common/utils" + "github.com/polarismesh/polaris/config" ) // Test_PublishConfigFile 测试配置文件发布 -func Test_PublishConfigFile(t *testing.T) { +func Test_PublishConfigFile_Check(t *testing.T) { testSuit := newConfigCenterTestSuit(t) var ( @@ -38,6 +39,66 @@ func Test_PublishConfigFile(t *testing.T) { mockGroup = "mock_group" mockFileName = "mock_filename" mockReleaseName = "mock_release" + ) + + t.Run("参数检查", func(t *testing.T) { + testSuit.NamespaceServer().CreateNamespace(testSuit.DefaultCtx, &apimodel.Namespace{ + Name: utils.NewStringValue(mockNamespace), + }) + + t.Run("invalid_file_name", func(t *testing.T) { + pubResp := testSuit.ConfigServer().PublishConfigFile(testSuit.DefaultCtx, &config_manage.ConfigFileRelease{ + Name: utils.NewStringValue(mockReleaseName), + Namespace: utils.NewStringValue(mockNamespace), + Group: utils.NewStringValue(mockGroup), + // FileName: utils.NewStringValue(mockFileName), + }) + // 发布失败 + assert.Equal(t, uint32(apimodel.Code_InvalidConfigFileName), pubResp.GetCode().GetValue(), pubResp.GetInfo().GetValue()) + }) + t.Run("invalid_namespace", func(t *testing.T) { + pubResp := testSuit.ConfigServer().PublishConfigFile(testSuit.DefaultCtx, &config_manage.ConfigFileRelease{ + Name: utils.NewStringValue(mockReleaseName), + // Namespace: utils.NewStringValue(mockNamespace), + Group: utils.NewStringValue(mockGroup), + FileName: utils.NewStringValue(mockFileName), + }) + // 发布失败 + assert.Equal(t, uint32(apimodel.Code_InvalidNamespaceName), pubResp.GetCode().GetValue(), pubResp.GetInfo().GetValue()) + }) + t.Run("invalid_group", func(t *testing.T) { + pubResp := testSuit.ConfigServer().PublishConfigFile(testSuit.DefaultCtx, &config_manage.ConfigFileRelease{ + Name: utils.NewStringValue(mockReleaseName), + Namespace: utils.NewStringValue(mockNamespace), + // Group: utils.NewStringValue(mockGroup), + FileName: utils.NewStringValue(mockFileName), + }) + // 发布失败 + assert.Equal(t, uint32(apimodel.Code_InvalidConfigFileGroupName), pubResp.GetCode().GetValue(), pubResp.GetInfo().GetValue()) + }) + t.Run("invalid_gray_publish", func(t *testing.T) { + pubResp := testSuit.ConfigServer().PublishConfigFile(testSuit.DefaultCtx, &config_manage.ConfigFileRelease{ + Name: utils.NewStringValue(mockReleaseName), + Namespace: utils.NewStringValue(mockNamespace), + Group: utils.NewStringValue(mockGroup), + FileName: utils.NewStringValue(mockFileName), + ReleaseType: wrapperspb.String(model.ReleaseTypeGray), + }) + // 发布失败 + assert.Equal(t, uint32(apimodel.Code_InvalidMatchRule), pubResp.GetCode().GetValue(), pubResp.GetInfo().GetValue()) + }) + }) +} + +// Test_PublishConfigFile 测试配置文件发布 +func Test_PublishConfigFile(t *testing.T) { + testSuit := newConfigCenterTestSuit(t) + + var ( + mockNamespace = "mock_namespace_pub" + mockGroup = "mock_group" + mockFileName = "mock_filename" + mockReleaseName = "mock_release" mockContent = "mock_content" ) @@ -91,6 +152,66 @@ func Test_PublishConfigFile(t *testing.T) { assert.Equal(t, uint32(apimodel.Code_ExecuteSuccess), pubResp.GetCode().GetValue(), pubResp.GetInfo().GetValue()) }) + // 重新发布 + t.Run("normal_republish", func(t *testing.T) { + pubResp := testSuit.ConfigServer().UpsertAndReleaseConfigFile(testSuit.DefaultCtx, &config_manage.ConfigFilePublishInfo{ + ReleaseName: utils.NewStringValue(mockReleaseName), + Namespace: utils.NewStringValue(mockNamespace), + Group: utils.NewStringValue(mockGroup), + FileName: utils.NewStringValue(mockFileName), + Content: utils.NewStringValue(mockContent), + Comment: utils.NewStringValue("mock_comment"), + Format: utils.NewStringValue("yaml"), + ReleaseDescription: utils.NewStringValue("mock_releaseDescription"), + Tags: []*config_manage.ConfigFileTag{ + { + Key: utils.NewStringValue("mock_key"), + Value: utils.NewStringValue("mock_value"), + }, + }, + }) + + // 正常发布成功 + assert.Equal(t, uint32(apimodel.Code_ExecuteSuccess), pubResp.GetCode().GetValue(), pubResp.GetInfo().GetValue()) + }) + + t.Run("list_release_version", func(t *testing.T) { + t.Run("invalid_namespace", func(t *testing.T) { + queryRsp := testSuit.ConfigServer().GetConfigFileReleaseVersions(testSuit.DefaultCtx, map[string]string{ + "group": mockGroup, + "file_name": mockFileName, + }) + assert.Equal(t, uint32(apimodel.Code_BadRequest), queryRsp.GetCode().GetValue()) + assert.Equal(t, "invalid namespace", queryRsp.GetInfo().GetValue()) + }) + t.Run("invalid_group", func(t *testing.T) { + queryRsp := testSuit.ConfigServer().GetConfigFileReleaseVersions(testSuit.DefaultCtx, map[string]string{ + "namespace": mockNamespace, + "file_name": mockFileName, + }) + assert.Equal(t, uint32(apimodel.Code_BadRequest), queryRsp.GetCode().GetValue()) + assert.Equal(t, "invalid config group", queryRsp.GetInfo().GetValue()) + }) + t.Run("invalid_file_name", func(t *testing.T) { + queryRsp := testSuit.ConfigServer().GetConfigFileReleaseVersions(testSuit.DefaultCtx, map[string]string{ + "group": mockGroup, + "namespace": mockNamespace, + }) + assert.Equal(t, uint32(apimodel.Code_BadRequest), queryRsp.GetCode().GetValue()) + assert.Equal(t, "invalid config file name", queryRsp.GetInfo().GetValue()) + }) + t.Run("normal", func(t *testing.T) { + queryRsp := testSuit.ConfigServer().GetConfigFileReleaseVersions(testSuit.DefaultCtx, map[string]string{ + "namespace": mockNamespace, + "group": mockGroup, + "file_name": mockFileName, + }) + assert.Equal(t, uint32(apimodel.Code_ExecuteSuccess), queryRsp.GetCode().GetValue(), queryRsp.GetInfo().GetValue()) + assert.Equal(t, 1, len(queryRsp.ConfigFileReleases)) + assert.Equal(t, 1, int(queryRsp.GetTotal().GetValue())) + }) + }) + t.Run("get_config_file_release", func(t *testing.T) { resp := testSuit.ConfigServer().GetConfigFileRelease(testSuit.DefaultCtx, &config_manage.ConfigFileRelease{ Name: utils.NewStringValue(mockReleaseName), @@ -117,6 +238,7 @@ func Test_PublishConfigFile(t *testing.T) { }) // 获取配置发布成功 assert.Equal(t, uint32(apimodel.Code_ExecuteSuccess), resp.GetCode().GetValue(), resp.GetInfo().GetValue()) + _ = testSuit.CacheMgr().TestUpdate() }) t.Run("reget_config_file_release", func(t *testing.T) { @@ -150,7 +272,6 @@ func Test_PublishConfigFile(t *testing.T) { t.Run("client_get_configfile", func(t *testing.T) { // 客户端获取符合预期, 这里强制触发一次缓存数据同步 - _ = testSuit.CacheMgr().TestUpdate() clientResp := testSuit.ConfigServer().GetConfigFileWithCache(testSuit.DefaultCtx, &config_manage.ClientConfigFileInfo{ Namespace: utils.NewStringValue(mockNamespace), Group: utils.NewStringValue(mockGroup), @@ -161,6 +282,48 @@ func Test_PublishConfigFile(t *testing.T) { assert.Equal(t, uint32(apimodel.Code_ExecuteSuccess), clientResp.GetCode().GetValue(), clientResp.GetInfo().GetValue()) assert.Equal(t, mockContent+"Second", clientResp.GetConfigFile().GetContent().GetValue()) }) + + t.Run("normal_publish_fordelete", func(t *testing.T) { + releaseName := mockReleaseName + "_delete" + pubResp := testSuit.ConfigServer().UpsertAndReleaseConfigFile(testSuit.DefaultCtx, &config_manage.ConfigFilePublishInfo{ + ReleaseName: utils.NewStringValue(releaseName), + Namespace: utils.NewStringValue(mockNamespace), + Group: utils.NewStringValue(mockGroup), + FileName: utils.NewStringValue(mockFileName), + Content: utils.NewStringValue(mockContent), + Comment: utils.NewStringValue("mock_comment"), + Format: utils.NewStringValue("yaml"), + ReleaseDescription: utils.NewStringValue("mock_releaseDescription"), + Tags: []*config_manage.ConfigFileTag{ + { + Key: utils.NewStringValue("mock_key"), + Value: utils.NewStringValue("mock_value"), + }, + }, + }) + + // 正常发布成功 + assert.Equal(t, uint32(apimodel.Code_ExecuteSuccess), pubResp.GetCode().GetValue(), pubResp.GetInfo().GetValue()) + + delResp := testSuit.ConfigServer().DeleteConfigFileReleases(testSuit.DefaultCtx, []*config_manage.ConfigFileRelease{ + { + Name: utils.NewStringValue(releaseName), + Namespace: utils.NewStringValue(mockNamespace), + Group: utils.NewStringValue(mockGroup), + FileName: utils.NewStringValue(mockFileName), + }, + }) + // 删除成功 + assert.Equal(t, uint32(apimodel.Code_ExecuteSuccess), delResp.GetCode().GetValue(), delResp.GetInfo().GetValue()) + + // 查询不到 + queryRsp := testSuit.ConfigServer().GetConfigFileReleases(testSuit.DefaultCtx, map[string]string{ + "name": releaseName, + }) + assert.Equal(t, uint32(apimodel.Code_ExecuteSuccess), queryRsp.GetCode().GetValue(), queryRsp.GetInfo().GetValue()) + assert.Equal(t, 0, len(queryRsp.ConfigFileReleases)) + assert.Equal(t, 0, int(queryRsp.GetTotal().GetValue())) + }) } // Test_RollbackConfigFileRelease 测试配置发布回滚 @@ -416,7 +579,7 @@ func Test_GrayConfigFileRelease(t *testing.T) { // 删除已发布的灰度配置,获取不到 t.Run("delete_gray_release", func(t *testing.T) { resp := testSuit.ConfigServer().StopGrayConfigFileReleases(testSuit.DefaultCtx, []*config_manage.ConfigFileRelease{ - &config_manage.ConfigFileRelease{ + { Namespace: utils.NewStringValue(mockNamespace), Group: utils.NewStringValue(mockGroup), FileName: utils.NewStringValue(mockFileName), @@ -436,7 +599,7 @@ func Test_GrayConfigFileRelease(t *testing.T) { assert.Equal(t, uint32(apimodel.Code_ExecuteSuccess), resp.GetCode().GetValue(), resp.GetInfo().GetValue()) assert.Equal(t, mockContent, clientRsp.GetConfigFile().GetContent().GetValue()) - // 携带正确配置标签查询, 查到处于灰度发布的配置 + // 携带正确配置标签查询, 查不到处于灰度发布的配置 clientRsp = testSuit.ConfigServer().GetConfigFileWithCache(testSuit.DefaultCtx, &config_manage.ClientConfigFileInfo{ Namespace: utils.NewStringValue(mockNamespace), Group: utils.NewStringValue(mockGroup), @@ -463,3 +626,110 @@ func Test_GrayConfigFileRelease(t *testing.T) { assert.Equal(t, uint32(apimodel.Code_ExecuteSuccess), pubResp.GetCode().GetValue(), pubResp.GetInfo().GetValue()) }) } + +func TestServer_CasUpsertAndReleaseConfigFile(t *testing.T) { + testSuit := newConfigCenterTestSuit(t) + _ = testSuit + + var ( + mockNamespace = "mock_namespace_cas" + mockGroup = "mock_group" + mockFileName = "mock_filename" + mockReleaseName = "mock_release" + mockContent = "mock_content" + ) + + nsRsp := testSuit.NamespaceServer().CreateNamespace(testSuit.DefaultCtx, &apimodel.Namespace{ + Name: utils.NewStringValue(mockNamespace), + }) + assert.Equal(t, uint32(apimodel.Code_ExecuteSuccess), nsRsp.GetCode().GetValue(), nsRsp.GetInfo().GetValue()) + + t.Run("param_check", func(t *testing.T) { + t.Run("invalid_namespace", func(t *testing.T) { + queryRsp := testSuit.ConfigServer().GetConfigFileReleaseVersions(testSuit.DefaultCtx, map[string]string{ + "group": mockGroup, + "file_name": mockFileName, + }) + assert.Equal(t, uint32(apimodel.Code_BadRequest), queryRsp.GetCode().GetValue()) + assert.Equal(t, "invalid namespace", queryRsp.GetInfo().GetValue()) + }) + t.Run("invalid_group", func(t *testing.T) { + queryRsp := testSuit.ConfigServer().GetConfigFileReleaseVersions(testSuit.DefaultCtx, map[string]string{ + "namespace": mockNamespace, + "file_name": mockFileName, + }) + assert.Equal(t, uint32(apimodel.Code_BadRequest), queryRsp.GetCode().GetValue()) + assert.Equal(t, "invalid config group", queryRsp.GetInfo().GetValue()) + }) + t.Run("invalid_file_name", func(t *testing.T) { + queryRsp := testSuit.ConfigServer().GetConfigFileReleaseVersions(testSuit.DefaultCtx, map[string]string{ + "group": mockGroup, + "namespace": mockNamespace, + }) + assert.Equal(t, uint32(apimodel.Code_BadRequest), queryRsp.GetCode().GetValue()) + assert.Equal(t, "invalid config file name", queryRsp.GetInfo().GetValue()) + }) + }) + + t.Run("publish_cas", func(t *testing.T) { + // 发布灰度配置 + pubResp := testSuit.ConfigServer().CasUpsertAndReleaseConfigFileFromClient(testSuit.DefaultCtx, &config_manage.ConfigFilePublishInfo{ + Namespace: utils.NewStringValue(mockNamespace), + Group: utils.NewStringValue(mockGroup), + FileName: utils.NewStringValue(mockFileName), + ReleaseName: utils.NewStringValue(mockReleaseName), + Content: utils.NewStringValue(mockContent), + Md5: wrapperspb.String(config.CalMd5(mockContent)), + }) + // 正常发布失败,数据冲突无法处理 + assert.Equal(t, uint32(apimodel.Code_DataConflict), pubResp.GetCode().GetValue(), pubResp.GetInfo().GetValue()) + + // 正常发布一个配置 + pubResp = testSuit.ConfigServer().UpsertAndReleaseConfigFileFromClient(testSuit.DefaultCtx, &config_manage.ConfigFilePublishInfo{ + Namespace: utils.NewStringValue(mockNamespace), + Group: utils.NewStringValue(mockGroup), + FileName: utils.NewStringValue(mockFileName), + ReleaseName: utils.NewStringValue(mockReleaseName), + Content: utils.NewStringValue(mockContent), + Md5: wrapperspb.String(config.CalMd5(mockContent)), + }) + // 正常发布成功 + assert.Equal(t, uint32(apimodel.Code_ExecuteSuccess), pubResp.GetCode().GetValue(), pubResp.GetInfo().GetValue()) + + // 获取下当前配置的 Release + queryRsp := testSuit.ConfigServer().GetConfigFileRelease(testSuit.DefaultCtx, &config_manage.ConfigFileRelease{ + Namespace: utils.NewStringValue(mockNamespace), + Group: utils.NewStringValue(mockGroup), + FileName: utils.NewStringValue(mockFileName), + Name: wrapperspb.String(mockReleaseName), + }) + assert.Equal(t, uint32(apimodel.Code_ExecuteSuccess), queryRsp.GetCode().GetValue(), queryRsp.GetInfo().GetValue()) + assert.NotNil(t, queryRsp.GetConfigFileRelease()) + assert.Equal(t, config.CalMd5(mockContent), queryRsp.GetConfigFileRelease().GetMd5().GetValue()) + + t.Run("md5_不匹配", func(t *testing.T) { + pubResp := testSuit.ConfigServer().CasUpsertAndReleaseConfigFileFromClient(testSuit.DefaultCtx, &config_manage.ConfigFilePublishInfo{ + Namespace: utils.NewStringValue(mockNamespace), + Group: utils.NewStringValue(mockGroup), + FileName: utils.NewStringValue(mockFileName), + ReleaseName: utils.NewStringValue(mockReleaseName), + Content: utils.NewStringValue(mockContent), + Md5: wrapperspb.String(utils.NewUUID()), + }) + // 正常发布失败,数据冲突无法处理 + assert.Equal(t, uint32(apimodel.Code_DataConflict), pubResp.GetCode().GetValue(), pubResp.GetInfo().GetValue()) + }) + + t.Run("md5_匹配", func(t *testing.T) { + pubResp := testSuit.ConfigServer().CasUpsertAndReleaseConfigFileFromClient(testSuit.DefaultCtx, &config_manage.ConfigFilePublishInfo{ + Namespace: utils.NewStringValue(mockNamespace), + Group: utils.NewStringValue(mockGroup), + FileName: utils.NewStringValue(mockFileName), + Content: utils.NewStringValue(mockContent), + Md5: wrapperspb.String(queryRsp.GetConfigFileRelease().GetMd5().GetValue()), + }) + // 正常发布失败,数据冲突无法处理 + assert.Equal(t, uint32(apimodel.Code_ExecuteSuccess), pubResp.GetCode().GetValue(), pubResp.GetInfo().GetValue()) + }) + }) +} diff --git a/config/config_file_template.go b/config/config_file_template.go index 2f96085e0..e903a983c 100644 --- a/config/config_file_template.go +++ b/config/config_file_template.go @@ -88,10 +88,6 @@ func (s *Server) GetAllConfigFileTemplates(ctx context.Context) *apiconfig.Confi return api.NewConfigBatchQueryResponse(commonstore.StoreCode2APICode(err)) } - if len(templates) == 0 { - return api.NewConfigBatchQueryResponse(apimodel.Code_ExecuteSuccess) - } - var apiTemplates []*apiconfig.ConfigFileTemplate for _, template := range templates { apiTemplates = append(apiTemplates, model.ToConfigFileTemplateAPI(template)) diff --git a/config/config_file_template_test.go b/config/config_file_template_test.go index 16edbf05f..a3524e364 100644 --- a/config/config_file_template_test.go +++ b/config/config_file_template_test.go @@ -21,7 +21,9 @@ import ( "testing" apiconfig "github.com/polarismesh/specification/source/go/api/v1/config_manage" + apimodel "github.com/polarismesh/specification/source/go/api/v1/model" "github.com/stretchr/testify/assert" + "google.golang.org/protobuf/types/known/wrapperspb" api "github.com/polarismesh/polaris/common/api/v1" "github.com/polarismesh/polaris/common/utils" @@ -94,3 +96,43 @@ func assembleConfigFileTemplate(name string) *apiconfig.ConfigFileTemplate { ModifyBy: utils.NewStringValue("testUser"), } } + +func TestServer_CreateConfigFileTemplateParam(t *testing.T) { + testSuit := newConfigCenterTestSuit(t) + _ = testSuit + + var ( + mockTemplName = "mock_templname" + mockContent = "mock_content" + ) + + t.Run("invalid_tpl_name", func(t *testing.T) { + rsp := testSuit.ConfigServer().CreateConfigFileTemplate(testSuit.DefaultCtx, &apiconfig.ConfigFileTemplate{ + Content: wrapperspb.String(mockContent), + }) + assert.Equal(t, uint32(apimodel.Code_InvalidConfigFileTemplateName), rsp.Code.GetValue()) + }) + + t.Run("content_too_long", func(t *testing.T) { + mockContentL := "mock_content" + for { + if len(mockContentL) > int(testSuit.GetBootstrapConfig().Config.ContentMaxLength) { + break + } + mockContentL += mockContentL + } + rsp := testSuit.ConfigServer().CreateConfigFileTemplate(testSuit.DefaultCtx, &apiconfig.ConfigFileTemplate{ + Name: wrapperspb.String(mockTemplName), + Content: wrapperspb.String(mockContentL), + }) + assert.Equal(t, uint32(apimodel.Code_InvalidConfigFileContentLength), rsp.Code.GetValue(), rsp.GetInfo().GetValue()) + }) + + t.Run("no_content", func(t *testing.T) { + rsp := testSuit.ConfigServer().CreateConfigFileTemplate(testSuit.DefaultCtx, &apiconfig.ConfigFileTemplate{ + Name: wrapperspb.String(mockTemplName), + Content: wrapperspb.String(""), + }) + assert.Equal(t, uint32(apimodel.Code_BadRequest), rsp.Code.GetValue()) + }) +} diff --git a/config/interceptor/auth/server_authability.go b/config/interceptor/auth/server_authability.go index 8629a5c97..c45d160e4 100644 --- a/config/interceptor/auth/server_authability.go +++ b/config/interceptor/auth/server_authability.go @@ -162,6 +162,9 @@ func (s *ServerAuthability) queryConfigGroupResource(ctx context.Context, names := utils.NewSet[string]() namespace := req[0].GetNamespace().GetValue() for index := range req { + if req[index] == nil { + continue + } names.Add(req[index].GetName().GetValue()) } entries, err := s.queryConfigGroupRsEntryByNames(ctx, namespace, names.ToSlice()) diff --git a/config/watcher.go b/config/watcher.go index 3f17eeeaa..c7f096a2d 100644 --- a/config/watcher.go +++ b/config/watcher.go @@ -243,7 +243,7 @@ func (wc *watchCenter) checkQuickResponseClient(watchCtx WatchContext) *apiconfi } // 从缓存中获取灰度文件 if len(watchCtx.ClientLabels()) > 0 { - if release := wc.fileCache.GetGrayRelease(namespace, group, fileName); release != nil { + if release := wc.fileCache.GetActiveGrayRelease(namespace, group, fileName); release != nil { if watchCtx.ShouldNotify(release.SimpleConfigFileRelease) { return buildRet(release) } diff --git a/release/cluster/helm/templates/config-polaris-server.yaml b/release/cluster/helm/templates/config-polaris-server.yaml index f8402398c..2e599bef0 100644 --- a/release/cluster/helm/templates/config-polaris-server.yaml +++ b/release/cluster/helm/templates/config-polaris-server.yaml @@ -275,6 +275,16 @@ data: openConnLimit: false maxConnPerHost: 128 maxConnLimit: 10240 + - name: service-nacos + option: + listenIP: "0.0.0.0" + listenPort: 8848 + # 设置 nacos 默认命名空间对应 Polaris 命名空间信息 + defaultNamespace: default + connLimit: + openConnLimit: false + maxConnPerHost: 128 + maxConnLimit: 10240 # - name: service-l5 # option: # listenIP: 0.0.0.0 @@ -285,8 +295,14 @@ data: name: defaultAuth option: salt: polarismesh@2021 + # Console auth switch, default true consoleOpen: {{ .Values.polaris.auth.consoleOpen }} + # Console Strict Model, default true + consoleStrict: true + # Customer auth switch, default false clientOpen: {{ .Values.polaris.auth.clientOpen }} + # Customer Strict Model, default close + clientStrict: false namespace: autoCreate: true naming: @@ -338,26 +354,7 @@ data: concurrency: 64 checkers: {{- if eq .Values.global.mode "cluster" }} - {{- if eq .Values.polaris.healthChecker "heartbeatRedis" }} - - name: heartbeatRedis - option: - kvAddr: {{ .Values.polaris.storage.redis.address }} - {{- if .Values.polaris.storage.redis.user }} - kvUser: {{ .Values.polaris.storage.redis.user }} - {{- end }} - {{- if .Values.polaris.storage.redis.password }} - kvPasswd: {{ .Values.polaris.storage.redis.password }} - {{- end }} - poolSize: 200 - minIdleConns: 30 - idleTimeout: 120s - connectTimeout: 200ms - msgTimeout: 200ms - concurrency: 200 - withTLS: {{ .Values.polaris.storage.redis.withTLS | default false }} - {{- else }} - name: heartbeatLeader - {{- end}} {{- else }} - name: heartbeatMemory {{- end }} diff --git a/release/cluster/helm/templates/polaris-server.yaml b/release/cluster/helm/templates/polaris-server.yaml index 927b563ef..731041364 100644 --- a/release/cluster/helm/templates/polaris-server.yaml +++ b/release/cluster/helm/templates/polaris-server.yaml @@ -27,6 +27,12 @@ spec: - port: {{ .Values.service.eurekaPort }} name: service-eureka targetPort: {{ .Values.service.eurekaPort }} + - port: {{ .Values.service.eurekaPort }} + name: service-nacos-http + targetPort: {{ .Values.service.nacosPort }} + - port: {{ .Values.service.nacosPort + 1000 }} + name: service-nacos-grpc + targetPort: {{ .Values.service.nacosPort + 1000 }} selector: app: polaris --- diff --git a/release/cluster/helm/values.yaml b/release/cluster/helm/values.yaml index 4505b92c6..fe614b946 100644 --- a/release/cluster/helm/values.yaml +++ b/release/cluster/helm/values.yaml @@ -43,12 +43,6 @@ polaris: name: polaris_server user: root password: polaris@123456 - redis: - address: localhost:6379 - # ACL user from redis v6.0, remove it if ACL is not available - # user: polaris - password: polaris@123456 # AUTH password below redis v6.0, or ACL password from redis v6.0 - withTLS: false prometheus: image: diff --git a/release/cluster/kubernetes/02-polaris-server-config.yaml b/release/cluster/kubernetes/02-polaris-server-config.yaml index 7a2f88742..be47a16ab 100644 --- a/release/cluster/kubernetes/02-polaris-server-config.yaml +++ b/release/cluster/kubernetes/02-polaris-server-config.yaml @@ -287,10 +287,14 @@ data: strategy: name: defaultStrategy option: - # Console power switch, open default + # Console auth switch, default true consoleOpen: true - # Customer inspection ability switch, default close + # Console Strict Model, default true + consoleStrict: true + # Customer auth switch, default false clientOpen: false + # Customer Strict Model, default close + clientStrict: false namespace: autoCreate: true naming: diff --git a/release/cluster/kubernetes/03-polaris-server.yaml b/release/cluster/kubernetes/03-polaris-server.yaml index c64617e0a..2bfb61cd0 100644 --- a/release/cluster/kubernetes/03-polaris-server.yaml +++ b/release/cluster/kubernetes/03-polaris-server.yaml @@ -27,6 +27,12 @@ spec: - port: 15010 name: xds-v3 targetPort: 15010 + - port: 8848 + name: nacos-http + targetPort: 8848 + - port: 9848 + name: nacos-grpc + targetPort: 9848 selector: app: polaris --- diff --git a/release/conf/xds/envoy_lua/odcds.lua b/release/conf/xds/envoy_lua/odcds.lua deleted file mode 100644 index 3b77a77e0..000000000 --- a/release/conf/xds/envoy_lua/odcds.lua +++ /dev/null @@ -1,13 +0,0 @@ --- Called on the request path. -function envoy_on_request(request_handle) - local service_name = request_handle:headers():get(":authority") - local service_namespace = os.getenv("SIDECAR_NAMESPACE") - local cluster_name = service_name .. "." .. service_namespace - request_handle:headers():remove(":authority") - request_handle:headers():add(":authority", cluster_name) -end - --- Called on the response path. -function envoy_on_response(response_handle) - -- Do something. -end \ No newline at end of file diff --git a/release/standalone/docker-compose/docker-compose.yaml b/release/standalone/docker-compose/docker-compose.yaml index 30c2f77cd..baea6a26b 100644 --- a/release/standalone/docker-compose/docker-compose.yaml +++ b/release/standalone/docker-compose/docker-compose.yaml @@ -36,12 +36,16 @@ services: - 8091 # service-grpc - 8093 # config-grpc - 8761 # service-eureka + - 8848 # nacos-http port + - 9848 # nacos-grpc port - 15010 # xds-v3 ports: # 用于其他服务访问北极星 - "8090:8090" # api-http - "8091:8091" # service-grpc - "8093:8093" # config-grpc - "8761:8761" # service-eureka + - "8848:8848" # nacos-http port + - "9848:9848" # nacos-grpc port - "15010:15010" # xds-v3 networks: - backend diff --git a/store/boltdb/config_file_release.go b/store/boltdb/config_file_release.go index 49632f508..d1a1a5272 100644 --- a/store/boltdb/config_file_release.go +++ b/store/boltdb/config_file_release.go @@ -338,7 +338,14 @@ func (cfr *configFileReleaseStore) ActiveConfigFileReleaseTx(tx store.Tx, releas properties[FileReleaseFieldVersion] = maxVersion + 1 properties[FileReleaseFieldActive] = true properties[FileReleaseFieldModifyTime] = time.Now() - properties[FileReleaseFieldType] = string(release.ReleaseType) + return updateValue(dbTx, tblConfigFileRelease, release.ReleaseKey(), properties) +} + +func (cfr *configFileReleaseStore) InactiveConfigFileReleaseTx(tx store.Tx, release *model.ConfigFileRelease) error { + dbTx := tx.GetDelegateTx().(*bolt.Tx) + properties := make(map[string]interface{}) + properties[FileReleaseFieldActive] = false + properties[FileReleaseFieldModifyTime] = time.Now() return updateValue(dbTx, tblConfigFileRelease, release.ReleaseKey(), properties) } diff --git a/store/config_file_api.go b/store/config_file_api.go index ef5d47b94..a4c881ced 100644 --- a/store/config_file_api.go +++ b/store/config_file_api.go @@ -86,6 +86,8 @@ type ConfigFileReleaseStore interface { DeleteConfigFileReleaseTx(tx Tx, data *model.ConfigFileReleaseKey) error // ActiveConfigFileReleaseTx 指定激活发布的配置文件(激活具有排他性,同一个配置文件的所有 release 中只能有一个处于 active == true 状态) ActiveConfigFileReleaseTx(tx Tx, release *model.ConfigFileRelease) error + // InactiveConfigFileReleaseTx 指定失效发布的配置文件(失效具有排他性,同一个配置文件的所有 release 中能有多个处于 active == false 状态) + InactiveConfigFileReleaseTx(tx Tx, release *model.ConfigFileRelease) error // CleanConfigFileReleasesTx 清空配置文件发布 CleanConfigFileReleasesTx(tx Tx, namespace, group, fileName string) error // GetMoreReleaseFile 获取最近更新的配置文件发布, 此方法用于 cache 增量更新,需要注意 modifyTime 应为数据库时间戳 diff --git a/store/mock/api_mock.go b/store/mock/api_mock.go index 2e4f07c02..2e9ad538f 100644 --- a/store/mock/api_mock.go +++ b/store/mock/api_mock.go @@ -2109,6 +2109,20 @@ func (mr *MockStoreMockRecorder) HasFaultDetectRuleByNameExcludeId(name, namespa return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HasFaultDetectRuleByNameExcludeId", reflect.TypeOf((*MockStore)(nil).HasFaultDetectRuleByNameExcludeId), name, namespace, id) } +// InactiveConfigFileReleaseTx mocks base method. +func (m *MockStore) InactiveConfigFileReleaseTx(tx store.Tx, release *model.ConfigFileRelease) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "InactiveConfigFileReleaseTx", tx, release) + ret0, _ := ret[0].(error) + return ret0 +} + +// InactiveConfigFileReleaseTx indicates an expected call of InactiveConfigFileReleaseTx. +func (mr *MockStoreMockRecorder) InactiveConfigFileReleaseTx(tx, release interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InactiveConfigFileReleaseTx", reflect.TypeOf((*MockStore)(nil).InactiveConfigFileReleaseTx), tx, release) +} + // Initialize mocks base method. func (m *MockStore) Initialize(c *store.Config) error { m.ctrl.T.Helper() diff --git a/store/mysql/config_file_release.go b/store/mysql/config_file_release.go index dd3659023..41ec79501 100644 --- a/store/mysql/config_file_release.go +++ b/store/mysql/config_file_release.go @@ -255,6 +255,21 @@ func (cfr *configFileReleaseStore) ActiveConfigFileReleaseTx(tx store.Tx, releas return nil } +func (cfr *configFileReleaseStore) InactiveConfigFileReleaseTx(tx store.Tx, release *model.ConfigFileRelease) error { + if tx == nil { + return ErrTxIsNil + } + dbTx := tx.GetDelegateTx().(*BaseTx) + + args := []interface{}{release.Namespace, release.Group, release.FileName, release.Name, release.ReleaseType} + // 取消对应发布版本的 active 状态 + if _, err := dbTx.Exec("UPDATE config_file_release SET active = 0, modify_time = sysdate() "+ + " WHERE namespace = ? AND `group` = ? AND file_name = ? AND name = ? AND release_type = ?", args...); err != nil { + return store.Error(err) + } + return nil +} + func (cfr *configFileReleaseStore) inactiveConfigFileRelease(tx *BaseTx, release *model.ConfigFileRelease) (uint64, error) { if tx == nil { diff --git a/test/codecov.sh b/test/codecov.sh index 3f94a1d47..5756db88e 100644 --- a/test/codecov.sh +++ b/test/codecov.sh @@ -18,11 +18,13 @@ set -ex # Exit on error; debugging enabled. cur_dir=$(pwd) +coverpkg="github.com/polarismesh/polaris/apiserver,github.com/polarismesh/polaris/apiserver/eurekaserver,github.com/polarismesh/polaris/auth/defaultauth,github.com/polarismesh/polaris/service,github.com/polarismesh/polaris/service/batch,github.com/polarismesh/polaris/service/healthcheck,github.com/polarismesh/polaris/cache,github.com/polarismesh/polaris/cache/service,github.com/polarismesh/polaris/cache/config,github.com/polarismesh/polaris/cache/gray,github.com/polarismesh/polaris/cache/auth,github.com/polarismesh/polaris/cache/config,github.com/polarismesh/polaris/store/boltdb,github.com/polarismesh/polaris/store/mysql,github.com/polarismesh/polaris/plugin,github.com/polarismesh/polaris/config,github.com/polarismesh/polaris/plugin/healthchecker/leader,github.com/polarismesh/polaris/plugin/healthchecker/memory,github.com/polarismesh/polaris/plugin/healthchecker/redis,github.com/polarismesh/polaris/common/batchjob,github.com/polarismesh/polaris/common/eventhub,github.com/polarismesh/polaris/common/redispool,github.com/polarismesh/polaris/common/timewheel" + function test_standalone() { cd ${cur_dir} export STORE_MODE="" go mod vendor - go test -timeout 40m ./... -v -covermode=count -coverprofile=coverage_1.cover -coverpkg=github.com/polarismesh/polaris/apiserver,github.com/polarismesh/polaris/apiserver/eurekaserver,github.com/polarismesh/polaris/auth/defaultauth,github.com/polarismesh/polaris/service,github.com/polarismesh/polaris/service/batch,github.com/polarismesh/polaris/service/healthcheck,github.com/polarismesh/polaris/cache,github.com/polarismesh/polaris/store/boltdb,github.com/polarismesh/polaris/store/mysql,github.com/polarismesh/polaris/plugin,github.com/polarismesh/polaris/config,github.com/polarismesh/polaris/plugin/healthchecker/leader,github.com/polarismesh/polaris/plugin/healthchecker/memory,github.com/polarismesh/polaris/plugin/healthchecker/redis,github.com/polarismesh/polaris/common/batchjob,github.com/polarismesh/polaris/common/eventhub,github.com/polarismesh/polaris/common/redispool,github.com/polarismesh/polaris/common/timewheel + go test -timeout 40m ./... -v -covermode=count -coverprofile=coverage_1.cover -coverpkg=${coverpkg} } function prepare_cluster_env() { @@ -44,7 +46,7 @@ function test_cluster_auth() { export STORE_MODE=sqldb echo "cur STORE MODE=${STORE_MODE}, MYSQL_DB_USER=${MYSQL_DB_USER}, MYSQL_DB_PWD=${MYSQL_DB_PWD}" pushd ./auth/defaultauth - go mod vendor && go test -v -timeout 40m -v -covermode=count -coverprofile=coverage_sqldb_1.cover -coverpkg=github.com/polarismesh/polaris/apiserver,github.com/polarismesh/polaris/apiserver/eurekaserver,github.com/polarismesh/polaris/auth/defaultauth,github.com/polarismesh/polaris/service,github.com/polarismesh/polaris/service/batch,github.com/polarismesh/polaris/service/healthcheck,github.com/polarismesh/polaris/cache,github.com/polarismesh/polaris/store/boltdb,github.com/polarismesh/polaris/store/mysql,github.com/polarismesh/polaris/plugin,github.com/polarismesh/polaris/config,github.com/polarismesh/polaris/plugin/healthchecker/leader,github.com/polarismesh/polaris/plugin/healthchecker/memory,github.com/polarismesh/polaris/plugin/healthchecker/redis,github.com/polarismesh/polaris/common/batchjob,github.com/polarismesh/polaris/common/eventhub,github.com/polarismesh/polaris/common/redispool,github.com/polarismesh/polaris/common/timewheel + go mod vendor && go test -v -timeout 40m -v -covermode=count -coverprofile=coverage_sqldb_1.cover -coverpkg=${coverpkg} mv coverage_sqldb_1.cover ../../ } @@ -54,7 +56,7 @@ function test_cluster_config() { export STORE_MODE=sqldb echo "cur STORE MODE=${STORE_MODE}, MYSQL_DB_USER=${MYSQL_DB_USER}, MYSQL_DB_PWD=${MYSQL_DB_PWD}" pushd ./config - go mod vendor && go test -v -timeout 40m -v -covermode=count -coverprofile=coverage_sqldb_2.cover -coverpkg=github.com/polarismesh/polaris/apiserver,github.com/polarismesh/polaris/apiserver/eurekaserver,github.com/polarismesh/polaris/auth/defaultauth,github.com/polarismesh/polaris/service,github.com/polarismesh/polaris/service/batch,github.com/polarismesh/polaris/service/healthcheck,github.com/polarismesh/polaris/cache,github.com/polarismesh/polaris/store/boltdb,github.com/polarismesh/polaris/store/mysql,github.com/polarismesh/polaris/plugin,github.com/polarismesh/polaris/config,github.com/polarismesh/polaris/plugin/healthchecker/leader,github.com/polarismesh/polaris/plugin/healthchecker/memory,github.com/polarismesh/polaris/plugin/healthchecker/redis,github.com/polarismesh/polaris/common/batchjob,github.com/polarismesh/polaris/common/eventhub,github.com/polarismesh/polaris/common/redispool,github.com/polarismesh/polaris/common/timewheel + go mod vendor && go test -v -timeout 40m -v -covermode=count -coverprofile=coverage_sqldb_2.cover -coverpkg=${coverpkg} mv coverage_sqldb_2.cover ../ } diff --git a/test/data/polaris-server.yaml b/test/data/polaris-server.yaml index 02d181671..a2423e701 100644 --- a/test/data/polaris-server.yaml +++ b/test/data/polaris-server.yaml @@ -369,6 +369,7 @@ healthcheck: config: # 是否启动配置模块 open: true + contentMaxLength: 20000 # 缓存配置 cache: open: true diff --git a/test/data/service_test.yaml b/test/data/service_test.yaml index fa755a720..bbe182cb4 100644 --- a/test/data/service_test.yaml +++ b/test/data/service_test.yaml @@ -246,9 +246,7 @@ naming: config: # 是否启动配置模块 open: true - cache: - #配置文件缓存过期时间,单位s - expireTimeAfterWrite: 3600 + contentMaxLength: 20000 # 健康检查的配置 healthcheck: open: true diff --git a/test/data/service_test_sqldb.yaml b/test/data/service_test_sqldb.yaml index a813f7793..c76965c90 100644 --- a/test/data/service_test_sqldb.yaml +++ b/test/data/service_test_sqldb.yaml @@ -241,6 +241,7 @@ naming: config: # 是否启动配置模块 open: true + contentMaxLength: 20000 # 健康检查的配置 healthcheck: open: true diff --git a/test/suit/test_suit.go b/test/suit/test_suit.go index 18428e8ef..e19126515 100644 --- a/test/suit/test_suit.go +++ b/test/suit/test_suit.go @@ -156,6 +156,10 @@ func (d *DiscoverTestSuit) InjectSuit(*DiscoverTestSuit) { } +func (d *DiscoverTestSuit) GetBootstrapConfig() *TestConfig { + return d.cfg +} + func (d *DiscoverTestSuit) CacheMgr() *cache.CacheManager { return d.cacheMgr }