Skip to content

Commit

Permalink
fix:service_contract and xdsv3 bug (#1313)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuntaojun authored Jan 2, 2024
1 parent f4f35c6 commit 5c329b0
Show file tree
Hide file tree
Showing 54 changed files with 841 additions and 1,321 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/benchmark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v4
with:
go-version:
go-version: "1.21.5"
# Checkout latest code
- name: Checkout repo
uses: actions/checkout@v2
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/codecov.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v4
with:
go-version: "1.20"
go-version: "1.21.5"
# Checkout latest code
- name: Checkout repo
uses: actions/checkout@v2
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v4
with:
go-version: "1.20"
go-version: "1.21.5"

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v1
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/golangci-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
golangci:
strategy:
matrix:
go-version: [ "1.20" ]
go-version: [ "1.21.5" ]
name: golangci-lint
runs-on: ubuntu-latest
steps:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/integration-testing-mysql.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v4
with:
go-version: "1.20"
go-version: "1.21.5"
# Checkout latest code
- name: Checkout repo
uses: actions/checkout@v2
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/integration-testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v4
with:
go-version: "1.20"
go-version: "1.21.5"
# Checkout latest code
- name: Checkout repo
uses: actions/checkout@v2
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v4
with:
go-version: "1.20"
go-version: "1.21.5"

- name: Get version
id: get_version
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/standalone.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v4
with:
go-version: "1.20"
go-version: "1.21.5"

- name: Build
id: build
Expand Down
2 changes: 1 addition & 1 deletion apiserver/nacosserver/v1/config/config_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (n *ConfigServer) diffChangeFiles(ctx context.Context,
dataId := item.GetFileName().GetValue()
mdval := item.GetMd5().GetValue()

if beta := n.cacheSvr.ConfigFile().GetGrayRelease(namespace, group, dataId); beta != nil {
if beta := n.cacheSvr.ConfigFile().GetActiveGrayRelease(namespace, group, dataId); beta != nil {
if n.cacheSvr.Gray().HitGrayRule(beta.FileKey(), clientLabels) {
changeKeys = append(changeKeys, &model.ConfigListenItem{
Tenant: model.ToNacosConfigNamespace(beta.Namespace),
Expand Down
2 changes: 1 addition & 1 deletion apiserver/nacosserver/v2/config/config_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (h *ConfigServer) handleWatchConfigRequest(ctx context.Context, req nacospb

var active *model.ConfigFileRelease
var match bool
if betaActive := h.cacheSvr.ConfigFile().GetGrayRelease(namespace, group, dataId); betaActive != nil {
if betaActive := h.cacheSvr.ConfigFile().GetActiveGrayRelease(namespace, group, dataId); betaActive != nil {
match = h.cacheSvr.Gray().HitGrayRule(model.GetGrayConfigRealseKey(betaActive.SimpleConfigFileRelease), watchCtx.ClientLabels())
active = betaActive
}
Expand Down
137 changes: 38 additions & 99 deletions apiserver/xdsserverv3/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,10 @@ package cache
import (
"context"
"errors"
"fmt"

corev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
cachev3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
resourcev3 "github.com/envoyproxy/go-control-plane/pkg/resource/v3"
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
"go.uber.org/zap"

Expand All @@ -39,7 +37,7 @@ type (
// hash is the hashing function for Envoy nodes
hash cachev3.NodeHash
// Muxed caches.
Caches *utils.SyncMap[string, cache.Cache]
Caches *utils.SyncMap[string, cachev3.Cache]
}

// CacheHook
Expand All @@ -64,13 +62,19 @@ func NewCache(hook CacheHook) *XDSCache {
return sc
}

// CleanEnvoyNodeCache 清理和 Envoy Node 强相关的缓存数据
func (sc *XDSCache) CleanEnvoyNodeCache(node *corev3.Node) {
cacheKey := resource.LDS.ResourceType() + "~" + node.Id
sc.Caches.Delete(cacheKey)
}

// CreateWatch returns a watch for an xDS request.
func (sc *XDSCache) CreateWatch(request *cachev3.Request, streamState stream.StreamState,
value chan cachev3.Response) func() {
if sc.hook != nil {
sc.hook.OnCreateWatch(request, streamState, value)
}
item := sc.loadCache(request)
item := sc.loadCache(request, streamState)
if item == nil {
value <- nil
return func() {}
Expand All @@ -84,7 +88,7 @@ func (sc *XDSCache) CreateDeltaWatch(request *cachev3.DeltaRequest, state stream
if sc.hook != nil {
sc.hook.OnCreateDeltaWatch(request, state, value)
}
item := sc.loadCache(request)
item := sc.loadCache(request, state)
if item == nil {
value <- &NoReadyXdsResponse{}
return func() {}
Expand All @@ -98,30 +102,21 @@ func (sc *XDSCache) Fetch(ctx context.Context, request *cachev3.Request) (cachev
return nil, errors.New("not implemented")
}

// DeltaUpdateNodeResource .
func (sc *XDSCache) DeltaUpdateNodeResource(client *resource.XDSClient, key, typeUrl string, current map[string]types.Resource) error {
val, _ := sc.Caches.ComputeIfAbsent(key, func(_ string) cachev3.Cache {
return NewLinearCache(typeUrl)
})
linearCache, _ := val.(*LinearCache)
return linearCache.UpdateNodeResource(client, current)
}

// DeltaUpdateResource .
func (sc *XDSCache) DeltaUpdateResource(key, typeUrl string, current map[string]types.Resource) error {
val, _ := sc.Caches.ComputeIfAbsent(key, func(_ string) cachev3.Cache {
return NewLinearCache(typeUrl)
return cachev3.NewLinearCache(typeUrl, cachev3.WithLogger(log))
})
linearCache, _ := val.(*LinearCache)
linearCache, _ := val.(*cachev3.LinearCache)
return linearCache.UpdateResources(current, []string{})
}

// DeltaRemoveResource .
func (sc *XDSCache) DeltaRemoveResource(key, typeUrl string, current map[string]types.Resource) error {
val, _ := sc.Caches.ComputeIfAbsent(key, func(_ string) cachev3.Cache {
return NewLinearCache(typeUrl)
return cachev3.NewLinearCache(typeUrl, cachev3.WithLogger(log))
})
linearCache, _ := val.(*LinearCache)
linearCache, _ := val.(*cachev3.LinearCache)

waitRemove := make([]string, 0, len(current))
for k := range current {
Expand All @@ -130,99 +125,43 @@ func (sc *XDSCache) DeltaRemoveResource(key, typeUrl string, current map[string]
return linearCache.UpdateResources(nil, waitRemove)
}

func classify(typeUrl string, resources []string, client *resource.XDSClient) []string {
isAllowNode := false
_, isAllowTls := allowTlsResource[typeUrl]
if isAllowNodeFunc, exist := allowEachNodeResource[typeUrl]; exist {
isAllowNode = isAllowNodeFunc(typeUrl, resources, client)
}
first := typeUrl + "~" + client.Node.GetId()
second := typeUrl + "~" + client.GetSelfNamespace()
tlsMode, exist := client.Metadata[resource.TLSModeTag]

// 没有设置 TLS 开关
if !exist || tlsMode == string(resource.TLSModeNone) {
if isAllowNode {
return []string{first, second}
}
return []string{second}
}
if isAllowNode {
if isAllowTls {
return []string{first, second, second + "~" + tlsMode}
}
return []string{first, second}
}
if isAllowTls {
return []string{first, second, second + "~" + tlsMode}
}
return []string{second}
}

type PredicateNodeResource func(typeUrl string, resources []string, client *resource.XDSClient) bool

var (
allowEachNodeResource = map[string]PredicateNodeResource{
resourcev3.ListenerType: func(typeUrl string, resources []string, client *resource.XDSClient) bool {
return true
},
resourcev3.EndpointType: func(typeUrl string, resources []string, client *resource.XDSClient) bool {
selfSvc := fmt.Sprintf("INBOUND|%s|%s", client.GetSelfNamespace(), client.GetSelfService())
for i := range resources {
if resources[i] == selfSvc {
return true
}
}
return false
},
resourcev3.RouteType: func(typeUrl string, resources []string, client *resource.XDSClient) bool {
selfSvc := resource.MakeInBoundRouteConfigName(client.GetSelfServiceKey())
for i := range resources {
if resources[i] == selfSvc {
return true
}
}
return false
},
}
allowTlsResource = map[string]struct{}{
resourcev3.ListenerType: {},
resourcev3.ClusterType: {},
}
)

func (sc *XDSCache) loadCache(req interface{}) cachev3.Cache {
func (sc *XDSCache) loadCache(req interface{}, streamState stream.StreamState) cachev3.Cache {
var (
keys []string
typeUrl string
client *resource.XDSClient
subscribeResources []string
typeUrl string
client *resource.XDSClient
)
switch args := req.(type) {
case *cache.Request:
case *cachev3.Request:
client = resource.ParseXDSClient(args.GetNode())
subscribeResources = args.GetResourceNames()
keys = classify(args.TypeUrl, subscribeResources, client)
typeUrl = args.TypeUrl
case *cache.DeltaRequest:
case *cachev3.DeltaRequest:
client = resource.ParseXDSClient(args.GetNode())
subscribeResources = args.GetResourceNamesSubscribe()
keys = classify(args.TypeUrl, subscribeResources, client)
typeUrl = args.TypeUrl
default:
log.Error("[XDS][V3] no support client request type", zap.Any("req", args))
return nil
}
for i := range keys {
val, ok := sc.Caches.Load(keys[i])
if ok {
log.Info("[XDS][V3] load cache to handle client request", zap.Strings("keys", keys),
zap.String("hit-key", keys[i]), zap.Strings("subscribe", subscribeResources),
zap.String("type", typeUrl), zap.String("client", client.Node.GetId()))
return val
}
cacheKey := BuildCacheKey(typeUrl, client.TLSMode, client)
val, ok := sc.Caches.Load(cacheKey)
if ok {
log.Info("[XDS][V3] load cache to handle client request",
zap.String("cache-key", cacheKey), zap.String("type", typeUrl),
zap.String("client", client.Node.GetId()), zap.Bool("wildcard", streamState.IsWildcard()))
return val
}
log.Error("[XDS][V3] cache not found to handle client request", zap.String("type", typeUrl),
zap.String("client", client.Node.GetId()))
return nil
}

func BuildCacheKey(typeUrl string, tlsMode resource.TLSMode, client *resource.XDSClient) string {
xdsType := resource.FormatTypeUrl(typeUrl)
if xdsType == resource.LDS {
return typeUrl + "~" + client.GetNodeID()
}
key := typeUrl + "~" + client.GetSelfNamespace()
if resource.SupportTLS(xdsType) && resource.EnableTLS(tlsMode) {
key = key + "~" + string(tlsMode)
}
return key
}
26 changes: 13 additions & 13 deletions apiserver/xdsserverv3/cache/callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,38 +25,38 @@ import (
"go.uber.org/zap"

"github.com/polarismesh/polaris/apiserver/xdsserverv3/resource"
commonlog "github.com/polarismesh/polaris/common/log"
)

func NewCallback(log *commonlog.Scope, nodeMgr *resource.XDSNodeManager) *Callbacks {
func NewCallback(cacheMgr *XDSCache, nodeMgr *resource.XDSNodeManager) *Callbacks {
return &Callbacks{
log: log,
nodeMgr: nodeMgr,
cacheMgr: cacheMgr,
nodeMgr: nodeMgr,
}
}

type Callbacks struct {
log *commonlog.Scope
nodeMgr *resource.XDSNodeManager
cacheMgr *XDSCache
nodeMgr *resource.XDSNodeManager
}

func (cb *Callbacks) Report() {

func (cb *Callbacks) OnStreamOpen(_ context.Context, id int64, typ string) error {
return nil
}

func (cb *Callbacks) OnStreamOpen(_ context.Context, id int64, typ string) error {
func (cb *Callbacks) OnDeltaStreamOpen(_ context.Context, id int64, typ string) error {
return nil
}

func (cb *Callbacks) OnStreamClosed(id int64, node *corev3.Node) {
cb.nodeMgr.DelNode(id)
}

func (cb *Callbacks) OnDeltaStreamOpen(_ context.Context, id int64, typ string) error {
return nil
// 清理 cache
cb.cacheMgr.CleanEnvoyNodeCache(node)
}

func (cb *Callbacks) OnDeltaStreamClosed(id int64, node *corev3.Node) {
cb.nodeMgr.DelNode(id)
// 清理 cache
cb.cacheMgr.CleanEnvoyNodeCache(node)
}

func (cb *Callbacks) OnStreamRequest(id int64, req *discovery.DiscoveryRequest) error {
Expand Down
Loading

0 comments on commit 5c329b0

Please sign in to comment.