Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat:support dubbo3 & fix config change can't notify client #1325

Merged
merged 14 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 0 additions & 57 deletions apiserver/grpcserver/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import (

"google.golang.org/grpc/metadata"

"github.com/polarismesh/polaris/apiserver"
grpchelp "github.com/polarismesh/polaris/apiserver/grpcserver/utils"
"github.com/polarismesh/polaris/common/utils"
)

Expand Down Expand Up @@ -122,58 +120,3 @@ func TestConvertContext(t *testing.T) {
})
}
}

func TestGetClientOpenMethod(t *testing.T) {
type args struct {
include []string
protocol string
}
tests := []struct {
name string
args args
want map[string]bool
wantErr bool
}{
{
name: "case=1",
args: args{
include: []string{
apiserver.RegisterAccess,
},
protocol: "grpc",
},
want: map[string]bool{
"/v1.PolarisGRPC/RegisterInstance": true,
"/v1.PolarisGRPC/DeregisterInstance": true,
},
wantErr: false,
},
{
name: "case=2",
args: args{
include: []string{
apiserver.DiscoverAccess,
},
protocol: "grpc",
},
want: map[string]bool{
"/v1.PolarisGRPC/Discover": true,
"/v1.PolarisGRPC/ReportClient": true,
"/v1.PolarisServiceContractGRPC/ReportServiceContract": true,
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := grpchelp.GetClientOpenMethod(tt.args.include, tt.args.protocol)
if (err != nil) != tt.wantErr {
t.Errorf("GetClientOpenMethod() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("GetClientOpenMethod() = %v, want %v", got, tt.want)
}
})
}
}
15 changes: 15 additions & 0 deletions apiserver/grpcserver/config/client_access.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,21 @@ func (g *ConfigGRPCServer) PublishConfigFile(ctx context.Context,
return response, nil
}

func (g *ConfigGRPCServer) UpsertAndPublishConfigFile(ctx context.Context,
req *apiconfig.ConfigFilePublishInfo) (*apiconfig.ConfigClientResponse, error) {
ctx = utils.ConvertGRPCContext(ctx)
response := g.configServer.CasUpsertAndReleaseConfigFileFromClient(ctx, req)
return &apiconfig.ConfigClientResponse{
Code: response.Code,
Info: response.Info,
ConfigFile: &apiconfig.ClientConfigFileInfo{
Namespace: req.Namespace,
Group: req.Group,
FileName: req.FileName,
},
}, nil
}

// WatchConfigFiles 订阅配置变更
func (g *ConfigGRPCServer) WatchConfigFiles(ctx context.Context,
request *apiconfig.ClientWatchConfigFileRequest) (*apiconfig.ConfigClientResponse, error) {
Expand Down
12 changes: 10 additions & 2 deletions apiserver/grpcserver/config/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,16 @@ func (g *ConfigGRPCServer) allowAccess(method string) bool {

// GetClientOpenMethod .
func GetClientOpenMethod(protocol string) (map[string]bool, error) {
openMethods := []string{"GetConfigFile", "CreateConfigFile",
"UpdateConfigFile", "PublishConfigFile", "WatchConfigFiles", "GetConfigFileMetadataList", "Discover"}
openMethods := []string{
"GetConfigFile",
"CreateConfigFile",
"UpdateConfigFile",
"PublishConfigFile",
"WatchConfigFiles",
"GetConfigFileMetadataList",
"UpsertAndPublishConfigFile",
"Discover",
}

openMethod := make(map[string]bool)

Expand Down
13 changes: 10 additions & 3 deletions apiserver/grpcserver/discover/v1/client_access.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,6 @@ func (g *DiscoverServer) Discover(server apiservice.PolarisGRPC_DiscoverServer)
case apiservice.DiscoverRequest_FAULT_DETECTOR:
action = metrics.ActionDiscoverFaultDetect
out = g.namingServer.GetFaultDetectWithCache(ctx, in.Service)
case apiservice.DiscoverRequest_SERVICE_CONTRACT:
action = metrics.ActionDiscoverServiceContract
out = g.namingServer.GetServiceContractWithCache(ctx, in.ServiceContract)
default:
out = api.NewDiscoverRoutingResponse(apimodel.Code_InvalidDiscoverResource, in.Service)
}
Expand All @@ -186,6 +183,16 @@ func (g *DiscoverServer) ReportServiceContract(ctx context.Context, in *apiservi
return out, nil
}

// 查询服务契约
func (g *DiscoverServer) GetServiceContract(ctx context.Context, req *apiservice.ServiceContract) (*apiservice.Response, error) {
// 需要记录操作来源,提高效率,只针对特殊接口添加operator
rCtx := utils.ConvertGRPCContext(ctx)
rCtx = context.WithValue(rCtx, utils.StringContext("operator"), ParseGrpcOperator(ctx))

out := g.namingServer.GetServiceContractWithCache(rCtx, req)
return out, nil
}

// ParseGrpcOperator 构造请求源
func ParseGrpcOperator(ctx context.Context) string {
// 获取请求源
Expand Down
4 changes: 2 additions & 2 deletions apiserver/grpcserver/utils/help.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
// GetClientOpenMethod 获取客户端openMethod
func GetClientOpenMethod(include []string, protocol string) (map[string]bool, error) {
clientAccess := make(map[string][]string)
clientAccess[apiserver.DiscoverAccess] = []string{"Discover", "ReportClient", "ReportServiceContract"}
clientAccess[apiserver.DiscoverAccess] = []string{"Discover", "ReportClient", "ReportServiceContract", "GetServiceContract"}
clientAccess[apiserver.RegisterAccess] = []string{"RegisterInstance", "DeregisterInstance"}
clientAccess[apiserver.HealthcheckAccess] = []string{"Heartbeat", "BatchHeartbeat", "BatchGetHeartbeat", "BatchDelHeartbeat"}

Expand All @@ -49,7 +49,7 @@ func GetClientOpenMethod(include []string, protocol string) (map[string]bool, er
if item == apiserver.HealthcheckAccess && method != "Heartbeat" {
recordMethod = "/v1.PolarisHeartbeat" + strings.ToUpper(protocol) + "/" + method
}
if method == "ReportServiceContract" {
if method == "ReportServiceContract" || method == "GetServiceContract" {
recordMethod = "/v1.PolarisServiceContract" + strings.ToUpper(protocol) + "/" + method
}
openMethod[recordMethod] = true
Expand Down
1 change: 1 addition & 0 deletions apiserver/grpcserver/utils/help_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func TestGetClientOpenMethod(t *testing.T) {
"/v1.PolarisGRPC/Discover": true,
"/v1.PolarisGRPC/ReportClient": true,
"/v1.PolarisServiceContractGRPC/ReportServiceContract": true,
"/v1.PolarisServiceContractGRPC/GetServiceContract": true,
},
wantErr: false,
},
Expand Down
3 changes: 3 additions & 0 deletions apiserver/nacosserver/core/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,9 @@ type ServiceData struct {
}

func (s *ServiceData) loadInstances(svcIns *model.ServiceInstances) {
if svcIns == nil {
return
}
var (
finalInstances = map[string]*nacosmodel.Instance{}
)
Expand Down
13 changes: 8 additions & 5 deletions apiserver/xdsserverv3/cds.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,7 @@ func (cds *CDSBuilder) makeCluster(svcInfo *resource.ServiceInfo,
trafficDirection corev3.TrafficDirection, opt *resource.BuildOption) *cluster.Cluster {

name := resource.MakeServiceName(svcInfo.ServiceKey, trafficDirection, opt)

return &cluster.Cluster{
c := &cluster.Cluster{
Name: name,
ConnectTimeout: durationpb.New(5 * time.Second),
ClusterDiscoveryType: &cluster.Cluster_Type{Type: cluster.Cluster_EDS},
Expand All @@ -161,8 +160,12 @@ func (cds *CDSBuilder) makeCluster(svcInfo *resource.ServiceInfo,
},
},
},
LbSubsetConfig: resource.MakeLbSubsetConfig(svcInfo),
OutlierDetection: resource.MakeOutlierDetection(svcInfo),
HealthChecks: resource.MakeHealthCheck(svcInfo),
}
// 只有针对出流量场景才能设置 Cluster 的相关信息
if opt.TrafficDirection == corev3.TrafficDirection_OUTBOUND {
c.LbSubsetConfig = resource.MakeLbSubsetConfig(svcInfo)
c.OutlierDetection = resource.MakeOutlierDetection(svcInfo)
c.HealthChecks = resource.MakeHealthCheck(svcInfo)
}
return c
}
11 changes: 2 additions & 9 deletions apiserver/xdsserverv3/rds.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,15 +156,8 @@ func (rds *RDSBuilder) makeSidecarInBoundRoutes(selfService model.ServiceKey,
},
Action: &route.Route_Route{
Route: &route.RouteAction{
ClusterSpecifier: &route.RouteAction_WeightedClusters{
WeightedClusters: &route.WeightedCluster{
Clusters: []*route.WeightedCluster_ClusterWeight{
{
Name: resource.MakeServiceName(selfService, trafficDirection, opt),
Weight: wrapperspb.UInt32(100),
},
},
},
ClusterSpecifier: &route.RouteAction_Cluster{
Cluster: resource.MakeServiceName(selfService, trafficDirection, opt),
},
},
},
Expand Down
14 changes: 6 additions & 8 deletions apiserver/xdsserverv3/resource/help.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,6 @@ func BuildWeightClustersV2(trafficDirection corev3.TrafficDirection,
weightedClusters = append(weightedClusters, weightCluster)
totalWeight += destination.Weight
}

return &route.WeightedCluster{
TotalWeight: &wrappers.UInt32Value{Value: totalWeight},
Clusters: weightedClusters,
Expand Down Expand Up @@ -1045,10 +1044,7 @@ func MakeLbSubsetConfig(serviceInfo *ServiceInfo) *cluster.Cluster_LbSubsetConfi
return nil
}

lbSubsetConfig := &cluster.Cluster_LbSubsetConfig{}
var subsetSelectors []*cluster.Cluster_LbSubsetConfig_LbSubsetSelector
lbSubsetConfig.FallbackPolicy = cluster.Cluster_LbSubsetConfig_ANY_ENDPOINT

for _, rule := range rules {
// 对每一个 destination 产生一个 subset
for _, destination := range rule.GetDestinations() {
Expand All @@ -1058,13 +1054,15 @@ func MakeLbSubsetConfig(serviceInfo *ServiceInfo) *cluster.Cluster_LbSubsetConfi
}
subsetSelectors = append(subsetSelectors, &cluster.Cluster_LbSubsetConfig_LbSubsetSelector{
Keys: keys,
FallbackPolicy: cluster.Cluster_LbSubsetConfig_LbSubsetSelector_NO_FALLBACK,
FallbackPolicy: cluster.Cluster_LbSubsetConfig_LbSubsetSelector_ANY_ENDPOINT,
})
}
}

lbSubsetConfig.SubsetSelectors = subsetSelectors
return lbSubsetConfig
return &cluster.Cluster_LbSubsetConfig{
SubsetSelectors: subsetSelectors,
FallbackPolicy: cluster.Cluster_LbSubsetConfig_ANY_ENDPOINT,
}
}

func GenEndpointMetaFromPolarisIns(ins *apiservice.Instance) *core.Metadata {
Expand All @@ -1082,7 +1080,7 @@ func GenEndpointMetaFromPolarisIns(ins *apiservice.Instance) *core.Metadata {
meta.FilterMetadata["envoy.lb"] = &_struct.Struct{
Fields: fields,
}
if ins.Metadata != nil && ins.Metadata[TLSModeTag] != "" {
if ins.Metadata != nil && EnableTLS(TLSMode(ins.Metadata[TLSModeTag])) {
meta.FilterMetadata["envoy.transport_socket_match"] = MTLSTransportSocketMatch
}
return meta
Expand Down
33 changes: 11 additions & 22 deletions apiserver/xdsserverv3/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,36 +242,28 @@ func (x *XDSServer) activeUpdateTask() {
func (x *XDSServer) startSynTask(ctx context.Context) {
// 读取 polaris 缓存数据
synXdsConfFunc := func() {

registryInfo := make(map[string]map[model.ServiceKey]*resource.ServiceInfo)

err := x.getRegistryInfoWithCache(ctx, registryInfo)
if err != nil {
curRegistryInfo := make(map[string]map[model.ServiceKey]*resource.ServiceInfo)
if err := x.getRegistryInfoWithCache(ctx, curRegistryInfo); err != nil {
log.Error("get registry info from cache", zap.Error(err))
return
}

needPush := make(map[string]map[model.ServiceKey]*resource.ServiceInfo)
needRemove := make(map[string]map[model.ServiceKey]*resource.ServiceInfo)

// 处理删除 ns 中最后一个 service
for ns, infos := range x.registryInfo {
_, ok := registryInfo[ns]
if !ok && len(infos) > 0 {
// 这一次轮询时,该命名空间下的最后一个服务已经被删除了,此时,当前的命名空间需要处理
needPush[ns] = map[model.ServiceKey]*resource.ServiceInfo{}
x.registryInfo[ns] = map[model.ServiceKey]*resource.ServiceInfo{}
}
}
// 与本地缓存对比,是否发生了变化,对发生变化的命名空间,推送配置

// step 1: 这里先生成需要删除 XDS 资源数据的资源信息
for ns, infos := range x.registryInfo {
if _, exist := registryInfo[ns]; !exist {
// 如果当前整个命名空间都不存在,直接按照整个 namespace 级别进行数据删除
if _, exist := curRegistryInfo[ns]; !exist {
needRemove[ns] = infos
continue
}

// 命名空间存在,但是命名空间下的服务有删除情况,需要找出来
for _, info := range infos {
cacheServiceInfos := registryInfo[ns]
cacheServiceInfos := curRegistryInfo[ns]
if _, ok := cacheServiceInfos[info.ServiceKey]; !ok {
if _, ok := needRemove[ns]; !ok {
needRemove[ns] = make(map[model.ServiceKey]*resource.ServiceInfo)
Expand All @@ -282,21 +274,17 @@ func (x *XDSServer) startSynTask(ctx context.Context) {
}
}

// 与本地缓存对比,是否发生了变化,对发生变化的命名空间,推送配置
for ns, infos := range registryInfo {
for ns, infos := range curRegistryInfo {
cacheServiceInfos, ok := x.registryInfo[ns]
if !ok {
// 新命名空间,需要处理
needPush[ns] = infos
x.registryInfo[ns] = infos
continue
}

// todo 不考虑命名空间删除的情况
// 判断当前这个空间,是否需要更新配置
if x.checkUpdate(infos, cacheServiceInfos) {
needPush[ns] = infos
x.registryInfo[ns] = infos
}
}

Expand All @@ -305,6 +293,7 @@ func (x *XDSServer) startSynTask(ctx context.Context) {
zap.Int("need-remove", len(needRemove)))
x.Generate(needPush, needRemove)
}
x.registryInfo = curRegistryInfo
}

ticker := time.NewTicker(5 * cache.UpdateCacheInterval)
Expand Down Expand Up @@ -357,7 +346,7 @@ func (x *XDSServer) getRegistryInfoWithCache(ctx context.Context,
}

if err := x.namingServer.Cache().Service().IteratorServices(serviceIterProc); err != nil {
log.Errorf("syn polaris services error %v", err)
log.Errorf("sync polaris services error %v", err)
return err
}

Expand Down
34 changes: 0 additions & 34 deletions auth/defaultauth/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,40 +108,6 @@ func checkOwner(owner *wrappers.StringValue) error {
return nil
}

// checkMobile 检查用户的 mobile 信息
func checkMobile(mobile *wrappers.StringValue) error {
if mobile == nil {
return nil
}

if mobile.GetValue() == "" {
return nil
}

if utf8.RuneCountInString(mobile.GetValue()) != 11 {
return errors.New("invalid mobile")
}

return nil
}

// checkEmail 检查用户的 email 信息
func checkEmail(email *wrappers.StringValue) error {
if email == nil {
return nil
}

if email.GetValue() == "" {
return nil
}

if ok := regEmail.MatchString(email.GetValue()); !ok {
return errors.New("invalid email")
}

return nil
}

// verifyAuth 用于 user、group 以及 strategy 模块的鉴权工作检查
func verifyAuth(ctx context.Context, isWrite bool,
needOwner bool, authMgn *DefaultAuthChecker) (context.Context, *apiservice.Response) {
Expand Down
1 change: 0 additions & 1 deletion cache/service/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,6 @@ func (b *instancePorts) listPort(serviceID string) []*model.ServicePort {
ret := make([]*model.ServicePort, 0, 4)

val, ok := b.ports[serviceID]

if !ok {
return ret
}
Expand Down
Loading
Loading