Skip to content

Commit

Permalink
feat:支持初始化管理员帐户
Browse files Browse the repository at this point in the history
  • Loading branch information
chuntaojun committed Nov 27, 2024
1 parent e633487 commit 83500dc
Show file tree
Hide file tree
Showing 11 changed files with 213 additions and 11 deletions.
20 changes: 20 additions & 0 deletions apiserver/nacosserver/v1/config/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
)

type LongPollWatchContext struct {
lock sync.RWMutex
clientId string
labels map[string]string
once sync.Once
Expand Down Expand Up @@ -76,6 +77,9 @@ func (c *LongPollWatchContext) ClientID() string {

// ShouldNotify .
func (c *LongPollWatchContext) ShouldNotify(event *model.SimpleConfigFileRelease) bool {
c.lock.RLock()
defer c.lock.RUnlock()

key := event.FileKey()
watchFile, ok := c.watchConfigFiles[key]
if !ok {
Expand All @@ -90,21 +94,37 @@ func (c *LongPollWatchContext) ShouldNotify(event *model.SimpleConfigFileRelease
}

func (c *LongPollWatchContext) ListWatchFiles() []*config_manage.ClientConfigFileInfo {
c.lock.RLock()
defer c.lock.RUnlock()

ret := make([]*config_manage.ClientConfigFileInfo, 0, len(c.watchConfigFiles))
for _, v := range c.watchConfigFiles {
ret = append(ret, v)
}
return ret
}

func (c *LongPollWatchContext) CurWatchVersion(k string) uint64 {
c.lock.RLock()
defer c.lock.RUnlock()

return c.watchConfigFiles[k].GetVersion().GetValue()
}

// AppendInterest .
func (c *LongPollWatchContext) AppendInterest(item *config_manage.ClientConfigFileInfo) {
c.lock.Lock()
defer c.lock.Unlock()

key := model.BuildKeyForClientConfigFileInfo(item)
c.watchConfigFiles[key] = item
}

// RemoveInterest .
func (c *LongPollWatchContext) RemoveInterest(item *config_manage.ClientConfigFileInfo) {
c.lock.Lock()
defer c.lock.Unlock()

key := model.BuildKeyForClientConfigFileInfo(item)
delete(c.watchConfigFiles, key)
}
Expand Down
8 changes: 8 additions & 0 deletions apiserver/nacosserver/v2/config/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,14 @@ func (c *StreamWatchContext) ListWatchFiles() []*apiconfig.ClientConfigFileInfo
return c.watchConfigFiles.Values()
}

func (c *StreamWatchContext) CurWatchVersion(k string) uint64 {
val, ok := c.watchConfigFiles.Load(k)
if !ok {
return 0
}
return val.GetVersion().GetValue()
}

// AppendInterest .
func (c *StreamWatchContext) AppendInterest(item *apiconfig.ClientConfigFileInfo) {
key := model.BuildKeyForClientConfigFileInfo(item)
Expand Down
2 changes: 1 addition & 1 deletion cache/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ type (
QueryInstances(filter, metaFilter map[string]string, offset, limit uint32) (uint32, []*model.Instance, error)
// DiscoverServiceInstances 服务发现获取实例
DiscoverServiceInstances(serviceID string, onlyHealthy bool) []*model.Instance
// RemoveService
// RemoveService
RemoveService(serviceID string)
}
)
Expand Down
2 changes: 2 additions & 0 deletions common/model/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,4 +141,6 @@ const (
ClientLabel_Version = "CLIENT_VERSION"
// ClientLabel_Language 客户端语言
ClientLabel_Language = "CLIENT_LANGUAGE"
// ClientLabel_Host 客户端主机名
ClientLabel_Host = "CLIENT_HOST"
)
13 changes: 7 additions & 6 deletions common/model/config_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,17 +565,18 @@ type Subscriber struct {
// ConfigSubscribers 以文件视角的监听数据
type ConfigSubscribers struct {
// key
key ConfigFileKey
Key ConfigFileKey
// VersionClients 版本对应的客户端
VersionClients []*struct {
Versoin uint64 `json:"versoin"`
Subscribers []Subscriber `json:"subscribers"`
} `json:"clients"`
VersionClients []*VersionClient `json:"clients"`
}

type VersionClient struct {
Versoin uint64 `json:"versoin"`
Subscribers []*Subscriber `json:"subscribers"`
}

// FileReleaseSubscribeInfo 文件订阅信息
type FileReleaseSubscribeInfo struct {
Id uint64 `json:"id"`
Name string `json:"name"`
Namespace string `json:"namespace"`
Group string `json:"group"`
Expand Down
13 changes: 12 additions & 1 deletion common/model/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@

package model

import "net/http"
import (
"net/http"

api "github.com/polarismesh/polaris/common/api/v1"
)

type DebugHandlerGroup struct {
Name string
Expand All @@ -35,3 +39,10 @@ type CommonResponse struct {
Info string `json:"info"`
Data interface{} `json:"data"`
}

func NewCommonResponse(code uint32) *CommonResponse {
return &CommonResponse{
Code: code,
Info: api.Code2Info(code),
}
}
4 changes: 4 additions & 0 deletions config/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ type ConfigFileOperate interface {
configFiles []*apiconfig.ConfigFile, conflictHandling string) *apiconfig.ConfigImportResponse
// GetAllConfigEncryptAlgorithms 获取配置加密算法
GetAllConfigEncryptAlgorithms(ctx context.Context) *apiconfig.ConfigEncryptAlgorithmResponse
// GetClientSubscribers 获取客户端订阅者
GetClientSubscribers(ctx context.Context, filter map[string]string) *model.CommonResponse
// GetConfigSubscribers 获取配置订阅者
GetConfigSubscribers(ctx context.Context, filter map[string]string) *model.CommonResponse
}

// ConfigFileReleaseOperate 配置文件发布接口
Expand Down
99 changes: 96 additions & 3 deletions config/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,10 +295,103 @@ func (s *Server) PublishConfigFileFromClient(ctx context.Context,

// GetConfigSubscribers 根据配置视角获取订阅者列表
func (s *Server) GetConfigSubscribers(ctx context.Context, filter map[string]string) *model.CommonResponse {
return nil
namespace := filter["namespace"]
group := filter["group"]
fileName := filter["file_name"]

key := utils.GenFileId(namespace, group, fileName)
clientIds, _ := s.watchCenter.watchers.Load(key)
if clientIds == nil {
return model.NewCommonResponse(uint32(apimodel.Code_NotFoundResource))
}

versionClients := map[uint64][]*model.Subscriber{}
clientIds.Range(func(val string) {
watchCtx, ok := s.watchCenter.clients.Load(val)
if !ok {
return
}
curVer := watchCtx.CurWatchVersion(key)
if _, ok := versionClients[curVer]; !ok {
versionClients[curVer] = []*model.Subscriber{}
}

watchCtx.ClientLabels()

versionClients[curVer] = append(versionClients[curVer], &model.Subscriber{
ID: watchCtx.ClientID(),
Host: watchCtx.ClientLabels()[model.ClientLabel_Host],
Version: watchCtx.ClientLabels()[model.ClientLabel_Version],
ClientType: watchCtx.ClientLabels()[model.ClientLabel_Language],
})
})

rsp := model.NewCommonResponse(uint32(apimodel.Code_ExecuteSuccess))
rsp.Data = &model.ConfigSubscribers{
Key: model.ConfigFileKey{
Namespace: namespace,
Group: group,
Name: fileName,
},
VersionClients: func() []*model.VersionClient {
ret := make([]*model.VersionClient, 0, len(versionClients))
for ver, clients := range versionClients {
ret = append(ret, &model.VersionClient{
Versoin: ver,
Subscribers: clients,
})
}
return ret
}(),
}
return rsp
}

// GetConfigSubscribers 根据客户端视角获取订阅的配置文件列表
// GetClientSubscribers 根据客户端视角获取订阅的配置文件列表
func (s *Server) GetClientSubscribers(ctx context.Context, filter map[string]string) *model.CommonResponse {
return nil
clientId := filter["client_id"]
watchCtx, ok := s.watchCenter.clients.Load(clientId)
if !ok {
return model.NewCommonResponse(uint32(apimodel.Code_NotFoundResource))
}

watchFiles := watchCtx.ListWatchFiles()
data := &model.ClientSubscriber{
Subscriber: model.Subscriber{
ID: watchCtx.ClientID(),
Host: watchCtx.ClientLabels()[model.ClientLabel_Host],
Version: watchCtx.ClientLabels()[model.ClientLabel_Version],
ClientType: watchCtx.ClientLabels()[model.ClientLabel_Language],
},
Files: []model.FileReleaseSubscribeInfo{},
}

for _, file := range watchFiles {
key := model.BuildKeyForClientConfigFileInfo(file)
curVer := watchCtx.CurWatchVersion(key)

ns := file.GetNamespace().GetValue()
group := file.GetGroup().GetValue()
filename := file.GetFileName().GetValue()

data.Files = append(data.Files, model.FileReleaseSubscribeInfo{
Name: file.GetName().GetValue(),
Namespace: ns,
Group: group,
FileName: filename,
ReleaseType: func() model.ReleaseType {
if gray := s.fileCache.GetActiveGrayRelease(ns, group, filename); gray != nil {
if gray.Version == curVer {
return model.ReleaseTypeGray
}
}
return model.ReleaseTypeFull
}(),
Version: curVer,
})
}

rsp := model.NewCommonResponse(uint32(apimodel.Code_ExecuteSuccess))
rsp.Data = data
return rsp
}
11 changes: 11 additions & 0 deletions config/interceptor/auth/config_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
apiconfig "github.com/polarismesh/specification/source/go/api/v1/config_manage"

api "github.com/polarismesh/polaris/common/api/v1"
"github.com/polarismesh/polaris/common/model"
"github.com/polarismesh/polaris/common/model/auth"
"github.com/polarismesh/polaris/common/utils"
)
Expand Down Expand Up @@ -152,3 +153,13 @@ func (s *Server) GetAllConfigEncryptAlgorithms(
ctx context.Context) *apiconfig.ConfigEncryptAlgorithmResponse {
return s.nextServer.GetAllConfigEncryptAlgorithms(ctx)
}

// GetClientSubscribers 获取客户端订阅者
func (s *Server) GetClientSubscribers(ctx context.Context, filter map[string]string) *model.CommonResponse {
return s.nextServer.GetClientSubscribers(ctx, filter)
}

// GetConfigSubscribers 获取配置订阅者
func (s *Server) GetConfigSubscribers(ctx context.Context, filter map[string]string) *model.CommonResponse {
return s.nextServer.GetConfigSubscribers(ctx, filter)
}
30 changes: 30 additions & 0 deletions config/interceptor/paramcheck/config_file_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ import (

apiconfig "github.com/polarismesh/specification/source/go/api/v1/config_manage"
apimodel "github.com/polarismesh/specification/source/go/api/v1/model"
"google.golang.org/protobuf/types/known/wrapperspb"

api "github.com/polarismesh/polaris/common/api/v1"
"github.com/polarismesh/polaris/common/model"
"github.com/polarismesh/polaris/common/utils"
)

Expand Down Expand Up @@ -115,3 +117,31 @@ func (s *Server) GetAllConfigEncryptAlgorithms(
ctx context.Context) *apiconfig.ConfigEncryptAlgorithmResponse {
return s.nextServer.GetAllConfigEncryptAlgorithms(ctx)
}

// GetClientSubscribers 获取客户端订阅者
func (s *Server) GetClientSubscribers(ctx context.Context, filter map[string]string) *model.CommonResponse {
clientId := filter["client_id"]
if clientId == "" {
return model.NewCommonResponse(uint32(apimodel.Code_BadRequest))
}
return s.nextServer.GetClientSubscribers(ctx, filter)
}

// GetConfigSubscribers 获取配置订阅者
func (s *Server) GetConfigSubscribers(ctx context.Context, filter map[string]string) *model.CommonResponse {
namespace := filter["namespace"]
group := filter["group"]
fileName := filter["file_name"]

if err := CheckFileName(wrapperspb.String(fileName)); err != nil {
return model.NewCommonResponse(uint32(apimodel.Code_InvalidConfigFileName))
}
if err := utils.CheckResourceName(wrapperspb.String(group)); err != nil {
return model.NewCommonResponse(uint32(apimodel.Code_InvalidConfigFileGroupName))
}
if err := utils.CheckResourceName(wrapperspb.String(namespace)); err != nil {
return model.NewCommonResponse(uint32(apimodel.Code_InvalidNamespaceName))
}

return s.nextServer.GetConfigSubscribers(ctx, filter)
}
22 changes: 22 additions & 0 deletions config/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,15 @@ type (
ShouldExpire(now time.Time) bool
// ListWatchFiles 列举出当前订阅的所有配置文件
ListWatchFiles() []*apiconfig.ClientConfigFileInfo
// CurWatchVersion 获取当前订阅的配置文件的版本
CurWatchVersion(k string) uint64
// IsOnce 是不是只能被通知一次
IsOnce() bool
}
)

type LongPollWatchContext struct {
lock sync.RWMutex
clientId string
labels map[string]string
once sync.Once
Expand Down Expand Up @@ -120,6 +123,9 @@ func (c *LongPollWatchContext) ClientID() string {
}

func (c *LongPollWatchContext) ShouldNotify(event *model.SimpleConfigFileRelease) bool {
c.lock.RLock()
defer c.lock.RUnlock()

if event.ReleaseType == model.ReleaseTypeGray && !c.betaMatcher(c.ClientLabels(), event) {
return false
}
Expand All @@ -133,21 +139,37 @@ func (c *LongPollWatchContext) ShouldNotify(event *model.SimpleConfigFileRelease
}

func (c *LongPollWatchContext) ListWatchFiles() []*apiconfig.ClientConfigFileInfo {
c.lock.RLock()
defer c.lock.RUnlock()

ret := make([]*apiconfig.ClientConfigFileInfo, 0, len(c.watchConfigFiles))
for _, v := range c.watchConfigFiles {
ret = append(ret, v)
}
return ret
}

func (c *LongPollWatchContext) CurWatchVersion(k string) uint64 {
c.lock.RLock()
defer c.lock.RUnlock()

return c.watchConfigFiles[k].GetVersion().GetValue()
}

// AppendInterest .
func (c *LongPollWatchContext) AppendInterest(item *apiconfig.ClientConfigFileInfo) {
c.lock.Lock()
defer c.lock.Unlock()

key := model.BuildKeyForClientConfigFileInfo(item)
c.watchConfigFiles[key] = item
}

// RemoveInterest .
func (c *LongPollWatchContext) RemoveInterest(item *apiconfig.ClientConfigFileInfo) {
c.lock.Lock()
defer c.lock.Unlock()

key := model.BuildKeyForClientConfigFileInfo(item)
delete(c.watchConfigFiles, key)
}
Expand Down

0 comments on commit 83500dc

Please sign in to comment.