Skip to content

Commit

Permalink
feat:support dubbo3 & fix config change can't notify client (#1325)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuntaojun authored Jan 29, 2024
1 parent 7b40a69 commit ea1f4f7
Show file tree
Hide file tree
Showing 46 changed files with 664 additions and 463 deletions.
57 changes: 0 additions & 57 deletions apiserver/grpcserver/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import (

"google.golang.org/grpc/metadata"

"github.com/polarismesh/polaris/apiserver"
grpchelp "github.com/polarismesh/polaris/apiserver/grpcserver/utils"
"github.com/polarismesh/polaris/common/utils"
)

Expand Down Expand Up @@ -122,58 +120,3 @@ func TestConvertContext(t *testing.T) {
})
}
}

func TestGetClientOpenMethod(t *testing.T) {
type args struct {
include []string
protocol string
}
tests := []struct {
name string
args args
want map[string]bool
wantErr bool
}{
{
name: "case=1",
args: args{
include: []string{
apiserver.RegisterAccess,
},
protocol: "grpc",
},
want: map[string]bool{
"/v1.PolarisGRPC/RegisterInstance": true,
"/v1.PolarisGRPC/DeregisterInstance": true,
},
wantErr: false,
},
{
name: "case=2",
args: args{
include: []string{
apiserver.DiscoverAccess,
},
protocol: "grpc",
},
want: map[string]bool{
"/v1.PolarisGRPC/Discover": true,
"/v1.PolarisGRPC/ReportClient": true,
"/v1.PolarisServiceContractGRPC/ReportServiceContract": true,
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := grpchelp.GetClientOpenMethod(tt.args.include, tt.args.protocol)
if (err != nil) != tt.wantErr {
t.Errorf("GetClientOpenMethod() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("GetClientOpenMethod() = %v, want %v", got, tt.want)
}
})
}
}
15 changes: 15 additions & 0 deletions apiserver/grpcserver/config/client_access.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,21 @@ func (g *ConfigGRPCServer) PublishConfigFile(ctx context.Context,
return response, nil
}

func (g *ConfigGRPCServer) UpsertAndPublishConfigFile(ctx context.Context,
req *apiconfig.ConfigFilePublishInfo) (*apiconfig.ConfigClientResponse, error) {
ctx = utils.ConvertGRPCContext(ctx)
response := g.configServer.CasUpsertAndReleaseConfigFileFromClient(ctx, req)
return &apiconfig.ConfigClientResponse{
Code: response.Code,
Info: response.Info,
ConfigFile: &apiconfig.ClientConfigFileInfo{
Namespace: req.Namespace,
Group: req.Group,
FileName: req.FileName,
},
}, nil
}

// WatchConfigFiles 订阅配置变更
func (g *ConfigGRPCServer) WatchConfigFiles(ctx context.Context,
request *apiconfig.ClientWatchConfigFileRequest) (*apiconfig.ConfigClientResponse, error) {
Expand Down
12 changes: 10 additions & 2 deletions apiserver/grpcserver/config/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,16 @@ func (g *ConfigGRPCServer) allowAccess(method string) bool {

// GetClientOpenMethod .
func GetClientOpenMethod(protocol string) (map[string]bool, error) {
openMethods := []string{"GetConfigFile", "CreateConfigFile",
"UpdateConfigFile", "PublishConfigFile", "WatchConfigFiles", "GetConfigFileMetadataList", "Discover"}
openMethods := []string{
"GetConfigFile",
"CreateConfigFile",
"UpdateConfigFile",
"PublishConfigFile",
"WatchConfigFiles",
"GetConfigFileMetadataList",
"UpsertAndPublishConfigFile",
"Discover",
}

openMethod := make(map[string]bool)

Expand Down
13 changes: 10 additions & 3 deletions apiserver/grpcserver/discover/v1/client_access.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,6 @@ func (g *DiscoverServer) Discover(server apiservice.PolarisGRPC_DiscoverServer)
case apiservice.DiscoverRequest_FAULT_DETECTOR:
action = metrics.ActionDiscoverFaultDetect
out = g.namingServer.GetFaultDetectWithCache(ctx, in.Service)
case apiservice.DiscoverRequest_SERVICE_CONTRACT:
action = metrics.ActionDiscoverServiceContract
out = g.namingServer.GetServiceContractWithCache(ctx, in.ServiceContract)
default:
out = api.NewDiscoverRoutingResponse(apimodel.Code_InvalidDiscoverResource, in.Service)
}
Expand All @@ -186,6 +183,16 @@ func (g *DiscoverServer) ReportServiceContract(ctx context.Context, in *apiservi
return out, nil
}

// 查询服务契约
func (g *DiscoverServer) GetServiceContract(ctx context.Context, req *apiservice.ServiceContract) (*apiservice.Response, error) {
// 需要记录操作来源,提高效率,只针对特殊接口添加operator
rCtx := utils.ConvertGRPCContext(ctx)
rCtx = context.WithValue(rCtx, utils.StringContext("operator"), ParseGrpcOperator(ctx))

out := g.namingServer.GetServiceContractWithCache(rCtx, req)
return out, nil
}

// ParseGrpcOperator 构造请求源
func ParseGrpcOperator(ctx context.Context) string {
// 获取请求源
Expand Down
4 changes: 2 additions & 2 deletions apiserver/grpcserver/utils/help.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
// GetClientOpenMethod 获取客户端openMethod
func GetClientOpenMethod(include []string, protocol string) (map[string]bool, error) {
clientAccess := make(map[string][]string)
clientAccess[apiserver.DiscoverAccess] = []string{"Discover", "ReportClient", "ReportServiceContract"}
clientAccess[apiserver.DiscoverAccess] = []string{"Discover", "ReportClient", "ReportServiceContract", "GetServiceContract"}
clientAccess[apiserver.RegisterAccess] = []string{"RegisterInstance", "DeregisterInstance"}
clientAccess[apiserver.HealthcheckAccess] = []string{"Heartbeat", "BatchHeartbeat", "BatchGetHeartbeat", "BatchDelHeartbeat"}

Expand All @@ -49,7 +49,7 @@ func GetClientOpenMethod(include []string, protocol string) (map[string]bool, er
if item == apiserver.HealthcheckAccess && method != "Heartbeat" {
recordMethod = "/v1.PolarisHeartbeat" + strings.ToUpper(protocol) + "/" + method
}
if method == "ReportServiceContract" {
if method == "ReportServiceContract" || method == "GetServiceContract" {
recordMethod = "/v1.PolarisServiceContract" + strings.ToUpper(protocol) + "/" + method
}
openMethod[recordMethod] = true
Expand Down
1 change: 1 addition & 0 deletions apiserver/grpcserver/utils/help_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func TestGetClientOpenMethod(t *testing.T) {
"/v1.PolarisGRPC/Discover": true,
"/v1.PolarisGRPC/ReportClient": true,
"/v1.PolarisServiceContractGRPC/ReportServiceContract": true,
"/v1.PolarisServiceContractGRPC/GetServiceContract": true,
},
wantErr: false,
},
Expand Down
3 changes: 3 additions & 0 deletions apiserver/nacosserver/core/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,9 @@ type ServiceData struct {
}

func (s *ServiceData) loadInstances(svcIns *model.ServiceInstances) {
if svcIns == nil {
return
}
var (
finalInstances = map[string]*nacosmodel.Instance{}
)
Expand Down
13 changes: 8 additions & 5 deletions apiserver/xdsserverv3/cds.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,7 @@ func (cds *CDSBuilder) makeCluster(svcInfo *resource.ServiceInfo,
trafficDirection corev3.TrafficDirection, opt *resource.BuildOption) *cluster.Cluster {

name := resource.MakeServiceName(svcInfo.ServiceKey, trafficDirection, opt)

return &cluster.Cluster{
c := &cluster.Cluster{
Name: name,
ConnectTimeout: durationpb.New(5 * time.Second),
ClusterDiscoveryType: &cluster.Cluster_Type{Type: cluster.Cluster_EDS},
Expand All @@ -161,8 +160,12 @@ func (cds *CDSBuilder) makeCluster(svcInfo *resource.ServiceInfo,
},
},
},
LbSubsetConfig: resource.MakeLbSubsetConfig(svcInfo),
OutlierDetection: resource.MakeOutlierDetection(svcInfo),
HealthChecks: resource.MakeHealthCheck(svcInfo),
}
// 只有针对出流量场景才能设置 Cluster 的相关信息
if opt.TrafficDirection == corev3.TrafficDirection_OUTBOUND {
c.LbSubsetConfig = resource.MakeLbSubsetConfig(svcInfo)
c.OutlierDetection = resource.MakeOutlierDetection(svcInfo)
c.HealthChecks = resource.MakeHealthCheck(svcInfo)
}
return c
}
11 changes: 2 additions & 9 deletions apiserver/xdsserverv3/rds.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,15 +156,8 @@ func (rds *RDSBuilder) makeSidecarInBoundRoutes(selfService model.ServiceKey,
},
Action: &route.Route_Route{
Route: &route.RouteAction{
ClusterSpecifier: &route.RouteAction_WeightedClusters{
WeightedClusters: &route.WeightedCluster{
Clusters: []*route.WeightedCluster_ClusterWeight{
{
Name: resource.MakeServiceName(selfService, trafficDirection, opt),
Weight: wrapperspb.UInt32(100),
},
},
},
ClusterSpecifier: &route.RouteAction_Cluster{
Cluster: resource.MakeServiceName(selfService, trafficDirection, opt),
},
},
},
Expand Down
14 changes: 6 additions & 8 deletions apiserver/xdsserverv3/resource/help.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,6 @@ func BuildWeightClustersV2(trafficDirection corev3.TrafficDirection,
weightedClusters = append(weightedClusters, weightCluster)
totalWeight += destination.Weight
}

return &route.WeightedCluster{
TotalWeight: &wrappers.UInt32Value{Value: totalWeight},
Clusters: weightedClusters,
Expand Down Expand Up @@ -1045,10 +1044,7 @@ func MakeLbSubsetConfig(serviceInfo *ServiceInfo) *cluster.Cluster_LbSubsetConfi
return nil
}

lbSubsetConfig := &cluster.Cluster_LbSubsetConfig{}
var subsetSelectors []*cluster.Cluster_LbSubsetConfig_LbSubsetSelector
lbSubsetConfig.FallbackPolicy = cluster.Cluster_LbSubsetConfig_ANY_ENDPOINT

for _, rule := range rules {
// 对每一个 destination 产生一个 subset
for _, destination := range rule.GetDestinations() {
Expand All @@ -1058,13 +1054,15 @@ func MakeLbSubsetConfig(serviceInfo *ServiceInfo) *cluster.Cluster_LbSubsetConfi
}
subsetSelectors = append(subsetSelectors, &cluster.Cluster_LbSubsetConfig_LbSubsetSelector{
Keys: keys,
FallbackPolicy: cluster.Cluster_LbSubsetConfig_LbSubsetSelector_NO_FALLBACK,
FallbackPolicy: cluster.Cluster_LbSubsetConfig_LbSubsetSelector_ANY_ENDPOINT,
})
}
}

lbSubsetConfig.SubsetSelectors = subsetSelectors
return lbSubsetConfig
return &cluster.Cluster_LbSubsetConfig{
SubsetSelectors: subsetSelectors,
FallbackPolicy: cluster.Cluster_LbSubsetConfig_ANY_ENDPOINT,
}
}

func GenEndpointMetaFromPolarisIns(ins *apiservice.Instance) *core.Metadata {
Expand All @@ -1082,7 +1080,7 @@ func GenEndpointMetaFromPolarisIns(ins *apiservice.Instance) *core.Metadata {
meta.FilterMetadata["envoy.lb"] = &_struct.Struct{
Fields: fields,
}
if ins.Metadata != nil && ins.Metadata[TLSModeTag] != "" {
if ins.Metadata != nil && EnableTLS(TLSMode(ins.Metadata[TLSModeTag])) {
meta.FilterMetadata["envoy.transport_socket_match"] = MTLSTransportSocketMatch
}
return meta
Expand Down
33 changes: 11 additions & 22 deletions apiserver/xdsserverv3/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,36 +242,28 @@ func (x *XDSServer) activeUpdateTask() {
func (x *XDSServer) startSynTask(ctx context.Context) {
// 读取 polaris 缓存数据
synXdsConfFunc := func() {

registryInfo := make(map[string]map[model.ServiceKey]*resource.ServiceInfo)

err := x.getRegistryInfoWithCache(ctx, registryInfo)
if err != nil {
curRegistryInfo := make(map[string]map[model.ServiceKey]*resource.ServiceInfo)
if err := x.getRegistryInfoWithCache(ctx, curRegistryInfo); err != nil {
log.Error("get registry info from cache", zap.Error(err))
return
}

needPush := make(map[string]map[model.ServiceKey]*resource.ServiceInfo)
needRemove := make(map[string]map[model.ServiceKey]*resource.ServiceInfo)

// 处理删除 ns 中最后一个 service
for ns, infos := range x.registryInfo {
_, ok := registryInfo[ns]
if !ok && len(infos) > 0 {
// 这一次轮询时,该命名空间下的最后一个服务已经被删除了,此时,当前的命名空间需要处理
needPush[ns] = map[model.ServiceKey]*resource.ServiceInfo{}
x.registryInfo[ns] = map[model.ServiceKey]*resource.ServiceInfo{}
}
}
// 与本地缓存对比,是否发生了变化,对发生变化的命名空间,推送配置

// step 1: 这里先生成需要删除 XDS 资源数据的资源信息
for ns, infos := range x.registryInfo {
if _, exist := registryInfo[ns]; !exist {
// 如果当前整个命名空间都不存在,直接按照整个 namespace 级别进行数据删除
if _, exist := curRegistryInfo[ns]; !exist {
needRemove[ns] = infos
continue
}

// 命名空间存在,但是命名空间下的服务有删除情况,需要找出来
for _, info := range infos {
cacheServiceInfos := registryInfo[ns]
cacheServiceInfos := curRegistryInfo[ns]
if _, ok := cacheServiceInfos[info.ServiceKey]; !ok {
if _, ok := needRemove[ns]; !ok {
needRemove[ns] = make(map[model.ServiceKey]*resource.ServiceInfo)
Expand All @@ -282,21 +274,17 @@ func (x *XDSServer) startSynTask(ctx context.Context) {
}
}

// 与本地缓存对比,是否发生了变化,对发生变化的命名空间,推送配置
for ns, infos := range registryInfo {
for ns, infos := range curRegistryInfo {
cacheServiceInfos, ok := x.registryInfo[ns]
if !ok {
// 新命名空间,需要处理
needPush[ns] = infos
x.registryInfo[ns] = infos
continue
}

// todo 不考虑命名空间删除的情况
// 判断当前这个空间,是否需要更新配置
if x.checkUpdate(infos, cacheServiceInfos) {
needPush[ns] = infos
x.registryInfo[ns] = infos
}
}

Expand All @@ -305,6 +293,7 @@ func (x *XDSServer) startSynTask(ctx context.Context) {
zap.Int("need-remove", len(needRemove)))
x.Generate(needPush, needRemove)
}
x.registryInfo = curRegistryInfo
}

ticker := time.NewTicker(5 * cache.UpdateCacheInterval)
Expand Down Expand Up @@ -357,7 +346,7 @@ func (x *XDSServer) getRegistryInfoWithCache(ctx context.Context,
}

if err := x.namingServer.Cache().Service().IteratorServices(serviceIterProc); err != nil {
log.Errorf("syn polaris services error %v", err)
log.Errorf("sync polaris services error %v", err)
return err
}

Expand Down
34 changes: 0 additions & 34 deletions auth/defaultauth/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,40 +108,6 @@ func checkOwner(owner *wrappers.StringValue) error {
return nil
}

// checkMobile 检查用户的 mobile 信息
func checkMobile(mobile *wrappers.StringValue) error {
if mobile == nil {
return nil
}

if mobile.GetValue() == "" {
return nil
}

if utf8.RuneCountInString(mobile.GetValue()) != 11 {
return errors.New("invalid mobile")
}

return nil
}

// checkEmail 检查用户的 email 信息
func checkEmail(email *wrappers.StringValue) error {
if email == nil {
return nil
}

if email.GetValue() == "" {
return nil
}

if ok := regEmail.MatchString(email.GetValue()); !ok {
return errors.New("invalid email")
}

return nil
}

// verifyAuth 用于 user、group 以及 strategy 模块的鉴权工作检查
func verifyAuth(ctx context.Context, isWrite bool,
needOwner bool, authMgn *DefaultAuthChecker) (context.Context, *apiservice.Response) {
Expand Down
1 change: 0 additions & 1 deletion cache/service/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,6 @@ func (b *instancePorts) listPort(serviceID string) []*model.ServicePort {
ret := make([]*model.ServicePort, 0, 4)

val, ok := b.ports[serviceID]

if !ok {
return ret
}
Expand Down
Loading

0 comments on commit ea1f4f7

Please sign in to comment.