Skip to content

Commit

Permalink
fix:修复nacos协议兼容中的问题
Browse files Browse the repository at this point in the history
  • Loading branch information
chuntaojun committed Jan 11, 2024
1 parent 0a73518 commit 21a15e7
Show file tree
Hide file tree
Showing 18 changed files with 157 additions and 51 deletions.
6 changes: 5 additions & 1 deletion apiserver/nacosserver/model/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,5 +145,9 @@ func ToNacosConfigNamespace(ns string) string {
}

const (
ActionGetConfigFile = "NACOS_GET_CONFIG"
ActionGetConfigFile = "NACOS_GET_CONFIG"
ActionPublishConfigFile = "NACOS_PUBLISH_CONFIG"
ActionGrpcGetConfigFile = "NACOS_GRPC_GET_CONFIG"
ActionGrpcPublishConfigFile = "NACOS_GRPC_PUBLISH_CONFIG"
ActionGrpcPushConfigFile = "NACOS_GRPC_PUSH_CONFIG"
)
8 changes: 7 additions & 1 deletion apiserver/nacosserver/v1/config/config_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,13 @@ import (
)

func (n *ConfigServer) handlePublishConfig(ctx context.Context, req *model.ConfigFile) (bool, error) {
resp := n.configSvr.UpsertAndReleaseConfigFileFromClient(ctx, req.ToSpecConfigFile())
var resp *config_manage.ConfigResponse
if req.CasMd5 != "" {
resp = n.configSvr.CasUpsertAndReleaseConfigFileFromClient(ctx, req.ToSpecConfigFile())
} else {
resp = n.configSvr.UpsertAndReleaseConfigFileFromClient(ctx, req.ToSpecConfigFile())
}

if resp.GetCode().GetValue() == uint32(apimodel.Code_ExecuteSuccess) {
return true, nil
}
Expand Down
24 changes: 24 additions & 0 deletions apiserver/nacosserver/v1/discover/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ import (

apimodel "github.com/polarismesh/specification/source/go/api/v1/model"
apiservice "github.com/polarismesh/specification/source/go/api/v1/service_manage"
"google.golang.org/protobuf/types/known/wrapperspb"

"github.com/polarismesh/polaris/apiserver/nacosserver/core"
"github.com/polarismesh/polaris/apiserver/nacosserver/model"
commonmodel "github.com/polarismesh/polaris/common/model"
"github.com/polarismesh/polaris/common/utils"
)

Expand All @@ -45,6 +47,24 @@ func (n *DiscoverServer) handleRegister(ctx context.Context, namespace, serviceN

func (n *DiscoverServer) handleUpdate(ctx context.Context, namespace, serviceName string, ins *model.Instance) error {
specIns := model.PrepareSpecInstance(namespace, serviceName, ins)
if specIns.Id == nil || specIns.GetId().GetValue() == "" {
insId, errRsp := utils.CheckInstanceTetrad(specIns)
if errRsp != nil {
return &model.NacosError{
ErrCode: int32(model.ExceptionCode_ServerError),
ErrMsg: errRsp.GetInfo().GetValue(),
}
}
specIns.Id = wrapperspb.String(insId)
}
saveIns, err := n.discoverSvr.Cache().GetStore().GetInstance(specIns.GetId().GetValue())
if err != nil {
return &model.NacosError{
ErrCode: int32(model.ExceptionCode_ServerError),
ErrMsg: err.Error(),
}
}
specIns = mergeUpdateInstanceInfo(specIns, saveIns)
resp := n.discoverSvr.UpdateInstance(ctx, specIns)
if apimodel.Code(resp.GetCode().GetValue()) != apimodel.Code_ExecuteSuccess {
return &model.NacosError{
Expand Down Expand Up @@ -148,3 +168,7 @@ func (n *DiscoverServer) handleQueryInstances(ctx context.Context, params map[st
result.Namespace = model.ToNacosNamespace(namespace)
return result, nil
}

func mergeUpdateInstanceInfo(req *apiservice.Instance, saveVal *commonmodel.Instance) *apiservice.Instance {
return req
}
17 changes: 3 additions & 14 deletions apiserver/nacosserver/v2/access.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ var (
)

func (h *NacosV2Server) Request(ctx context.Context, payload *nacospb.Payload) (*nacospb.Payload, error) {
ctx = h.ConvertContext(ctx)
h.connectionManager.RefreshClient(ctx)
ctx = injectPayloadHeader(ctx, payload)
handle, val, err := h.UnmarshalPayload(payload)
Expand All @@ -64,20 +65,8 @@ func (h *NacosV2Server) Request(ctx context.Context, payload *nacospb.Payload) (
if !ok {
return nil, ErrorInvalidRequestBodyType
}

if _, ok := debugLevel[msg.GetRequestType()]; !ok {
nacoslog.Info("[NACOS-V2] handler client request", zap.String("conn-id", remote.ValueConnID(ctx)),
utils.ZapRequestID(msg.GetRequestId()),
zap.String("type", msg.GetRequestType()),
)
} else {
if nacoslog.DebugEnabled() {
nacoslog.Debug("[NACOS-V2] handler client request", zap.String("conn-id", remote.ValueConnID(ctx)),
utils.ZapRequestID(msg.GetRequestId()),
zap.String("type", msg.GetRequestType()),
)
}
}
nacoslog.Debug("[NACOS-V2] handler client request", zap.String("conn-id", remote.ValueConnID(ctx)),
utils.ZapRequestID(msg.GetRequestId()), zap.String("type", msg.GetRequestType()))
connMeta := remote.ValueConnMeta(ctx)

startTime := time.Now()
Expand Down
24 changes: 21 additions & 3 deletions apiserver/nacosserver/v2/config/config_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,25 @@ func (h *ConfigServer) handlePublishConfigRequest(ctx context.Context, req nacos
return nil, remote.ErrorInvalidRequestBodyType
}

resp := h.configSvr.CasUpsertAndReleaseConfigFileFromClient(ctx, configReq.ToSpec())
var resp *config_manage.ConfigResponse
startTime := commontime.CurrentMillisecond()
defer func() {
plugin.GetStatis().ReportDiscoverCall(metrics.ClientDiscoverMetric{
Action: nacosmodel.ActionGrpcPublishConfigFile,
ClientIP: meta.ConnectionID,
Namespace: configReq.Tenant,
Resource: metrics.ResourceOfConfigFile(configReq.Group, configReq.DataId),
Timestamp: startTime,
CostTime: commontime.CurrentMillisecond() - startTime,
Success: resp.GetCode().GetValue() == uint32(apimodel.Code_ExecuteSuccess),
})
}()

if configReq.CasMd5 != "" {
resp = h.configSvr.CasUpsertAndReleaseConfigFileFromClient(ctx, configReq.ToSpec())
} else {
resp = h.configSvr.UpsertAndReleaseConfigFileFromClient(ctx, configReq.ToSpec())
}
if resp.GetCode().GetValue() != uint32(apimodel.Code_ExecuteSuccess) {
nacoslog.Error("[NACOS-V2][Config] publish config file fail", zap.String("tenant", configReq.Tenant),
utils.ZapGroup(configReq.Group), utils.ZapFileName(configReq.DataId),
Expand Down Expand Up @@ -83,8 +101,8 @@ func (h *ConfigServer) handleGetConfigRequest(ctx context.Context, req nacospb.B
startTime := commontime.CurrentMillisecond()
defer func() {
plugin.GetStatis().ReportDiscoverCall(metrics.ClientDiscoverMetric{
Action: nacosmodel.ActionGetConfigFile,
ClientIP: utils.ParseClientAddress(ctx),
Action: nacosmodel.ActionGrpcGetConfigFile,
ClientIP: meta.ConnectionID,
Namespace: configReq.Tenant,
Resource: metrics.ResourceOfConfigFile(configReq.Group, configReq.DataId),
Timestamp: startTime,
Expand Down
23 changes: 23 additions & 0 deletions apiserver/nacosserver/v2/config/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@ import (
nacospb "github.com/polarismesh/polaris/apiserver/nacosserver/v2/pb"
"github.com/polarismesh/polaris/apiserver/nacosserver/v2/remote"
"github.com/polarismesh/polaris/common/eventhub"
"github.com/polarismesh/polaris/common/metrics"
"github.com/polarismesh/polaris/common/model"
commontime "github.com/polarismesh/polaris/common/time"
"github.com/polarismesh/polaris/common/utils"
"github.com/polarismesh/polaris/config"
"github.com/polarismesh/polaris/plugin"
)

type ConnectionClientManager struct {
Expand Down Expand Up @@ -99,6 +102,9 @@ func (c *StreamWatchContext) ClientID() string {

// ShouldNotify .
func (c *StreamWatchContext) ShouldNotify(event *model.SimpleConfigFileRelease) bool {
if event.ReleaseType == model.ReleaseTypeGray && !c.betaMatcher(c.ClientLabels(), event) {
return false
}
key := event.FileKey()
watchFile, ok := c.watchConfigFiles.Load(key)
if !ok {
Expand All @@ -108,6 +114,7 @@ func (c *StreamWatchContext) ShouldNotify(event *model.SimpleConfigFileRelease)
if !event.Valid {
return true
}
nacoslog.Info("should notify", zap.String("client", c.ClientID()), zap.String("save-md5", watchFile.GetMd5().GetValue()), zap.String("recv-md5", event.Md5))

Check failure on line 117 in apiserver/nacosserver/v2/config/watch.go

View workflow job for this annotation

GitHub Actions / golangci-lint (1.21.5)

line is 157 characters (lll)
isChange := watchFile.GetMd5().GetValue() != event.Md5
return isChange
}
Expand Down Expand Up @@ -142,6 +149,21 @@ func (c *StreamWatchContext) Reply(event *apiconfig.ConfigClientResponse) {
notifyRequest.Group = viewConfig.GetGroup().GetValue()
notifyRequest.DataId = viewConfig.GetFileName().GetValue()

success := false
startTime := commontime.CurrentMillisecond()
defer func() {
plugin.GetStatis().ReportDiscoverCall(metrics.ClientDiscoverMetric{
Action: nacosmodel.ActionGrpcPushConfigFile,
ClientIP: c.ClientID(),
Namespace: notifyRequest.Tenant,
Resource: metrics.ResourceOfConfigFile(notifyRequest.Group, notifyRequest.DataId),
Timestamp: startTime,
CostTime: commontime.CurrentMillisecond() - startTime,
Revision: viewConfig.GetMd5().GetValue(),
Success: success,
})
}()

remoteClient, ok := c.connMgr.GetClient(c.clientId)
if !ok {
nacoslog.Error("[NACOS-V2][Config][Push] send ConfigChangeNotifyRequest not found remoteClient",
Expand All @@ -164,4 +186,5 @@ func (c *StreamWatchContext) Reply(event *apiconfig.ConfigClientResponse) {
nacoslog.Error("[NACOS-V2][Config][Push] send ConfigChangeNotifyRequest fail",
zap.String("clientId", c.ClientID()), zap.Error(err))
}
success = true
}
2 changes: 1 addition & 1 deletion cache/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ type (
// RateLimitCache rateLimit的cache接口
RateLimitCache interface {
Cache
// GetRateLimit 根据serviceID进行迭代回调
// IteratorRateLimit 遍历所有的限流规则
IteratorRateLimit(rateLimitIterProc RateLimitIterProc)
// GetRateLimitRules 根据serviceID获取限流数据
GetRateLimitRules(serviceKey model.ServiceKey) ([]*model.RateLimit, string)
Expand Down
4 changes: 0 additions & 4 deletions cache/config/config_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,6 @@ func (fc *fileCache) setReleases(releases []*model.ConfigFileRelease) (map[strin
}

if item.Active {
configLog.Info("[Config][Release][Cache] notify config release change",
zap.String("namespace", item.Namespace), zap.String("group", item.Group), zap.String("release", item.Name),
zap.String("file", item.FileName), zap.Uint64("version", item.Version), zap.Bool("valid", item.Valid),
zap.String("type", string(item.ReleaseType)))
fc.sendEvent(item)
}
}
Expand Down
8 changes: 6 additions & 2 deletions common/metrics/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,12 @@ type ClientDiscoverMetric struct {
}

func (c ClientDiscoverMetric) String() string {
return fmt.Sprintf("%s|%s|%s|%s|%s|%s|%d|%+v", c.ClientIP, c.Action, c.Namespace, c.Resource,
c.Revision, time.Unix(c.Timestamp, 0).Format("2006-01-02 15:04:05"), c.CostTime, c.Success)
revision := c.Revision
if revision == "" {
revision = "-"
}
return fmt.Sprintf("%s|%s|%s|%s|%s|%s|%dms|%+v", c.ClientIP, c.Action, c.Namespace, c.Resource,
revision, time.Unix(c.Timestamp/1000, 0).Format(time.DateTime), c.CostTime, c.Success)
}

type ConfigMetricType string
Expand Down
2 changes: 1 addition & 1 deletion config/config_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func (s *Server) CreateConfigFile(ctx context.Context, req *apiconfig.ConfigFile
log.Error("[Config][File] create config file commit tx.", utils.RequestID(ctx), zap.Error(err))
return api.NewConfigResponse(commonstore.StoreCode2APICode(err))
}
s.RecordHistory(ctx, configFileRecordEntry(ctx, req, model.OCreate))
resp.ConfigFile = req
return resp
}
Expand Down Expand Up @@ -88,7 +89,6 @@ func (s *Server) handleCreateConfigFile(ctx context.Context, tx store.Tx,
utils.ZapFileName(req.GetName().GetValue()), zap.Error(err))
return api.NewConfigResponse(commonstore.StoreCode2APICode(err))
}
s.RecordHistory(ctx, configFileRecordEntry(ctx, req, model.OCreate))
return api.NewConfigResponse(apimodel.Code_ExecuteSuccess)
}

Expand Down
24 changes: 24 additions & 0 deletions config/config_file_release.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ func (s *Server) handlePublishConfigFile(ctx context.Context, tx store.Tx,
req.Name = utils.NewStringValue(fmt.Sprintf("%s-%d-%d", fileName, time.Now().Unix(), s.nextSequence()))
}

fileRelease.Name = req.GetName().GetValue()
fileRelease.Format = toPublishFile.Format
fileRelease.Metadata = toPublishFile.Metadata
fileRelease.Comment = req.GetComment().GetValue()
Expand Down Expand Up @@ -591,17 +592,25 @@ func (s *Server) CasUpsertAndReleaseConfigFile(ctx context.Context,
return api.NewConfigResponse(commonstore.StoreCode2APICode(err))
}

historyRecords := []func(){}

var upsertResp *apiconfig.ConfigResponse
if saveFile == nil {
if req.GetMd5().GetValue() != "" {
return api.NewConfigResponse(apimodel.Code_DataConflict)
}
upsertResp = s.handleCreateConfigFile(ctx, tx, upsertFileReq)
historyRecords = append(historyRecords, func() {
s.RecordHistory(ctx, configFileRecordEntry(ctx, upsertFileReq, model.OCreate))
})
} else {
if req.GetMd5().GetValue() != CalMd5(saveFile.Content) {
return api.NewConfigResponse(apimodel.Code_DataConflict)
}
upsertResp = s.handleUpdateConfigFile(ctx, tx, upsertFileReq)
historyRecords = append(historyRecords, func() {
s.RecordHistory(ctx, configFileRecordEntry(ctx, upsertFileReq, model.OUpdate))
})
}
if upsertResp.GetCode().GetValue() != uint32(apimodel.Code_ExecuteSuccess) {
return upsertResp
Expand All @@ -625,6 +634,9 @@ func (s *Server) CasUpsertAndReleaseConfigFile(ctx context.Context,
log.Error("[Config][File] upsert config file when commit tx.", utils.RequestID(ctx), zap.Error(err))
return api.NewConfigResponse(commonstore.StoreCode2APICode(err))
}
for i := range historyRecords {
historyRecords[i]()
}
s.recordReleaseHistory(ctx, data, utils.ReleaseTypeNormal, utils.ReleaseStatusSuccess, "")
return releaseResp
}
Expand Down Expand Up @@ -667,9 +679,18 @@ func (s *Server) UpsertAndReleaseConfigFile(ctx context.Context,
defer func() {
_ = tx.Rollback()
}()

historyRecords := []func(){}
upsertResp := s.handleCreateConfigFile(ctx, tx, upsertFileReq)
if upsertResp.GetCode().GetValue() == uint32(apimodel.Code_ExistedResource) {
upsertResp = s.handleUpdateConfigFile(ctx, tx, upsertFileReq)
historyRecords = append(historyRecords, func() {
s.RecordHistory(ctx, configFileRecordEntry(ctx, upsertFileReq, model.OUpdate))
})
} else {
historyRecords = append(historyRecords, func() {
s.RecordHistory(ctx, configFileRecordEntry(ctx, upsertFileReq, model.OCreate))
})
}
if upsertResp.GetCode().GetValue() != uint32(apimodel.Code_ExecuteSuccess) {
return upsertResp
Expand All @@ -693,6 +714,9 @@ func (s *Server) UpsertAndReleaseConfigFile(ctx context.Context,
log.Error("[Config][File] upsert config file when commit tx.", utils.RequestID(ctx), zap.Error(err))
return api.NewConfigResponse(commonstore.StoreCode2APICode(err))
}
for i := range historyRecords {
historyRecords[i]()
}
s.recordReleaseHistory(ctx, data, utils.ReleaseTypeNormal, utils.ReleaseStatusSuccess, "")
return releaseResp
}
Expand Down
12 changes: 6 additions & 6 deletions config/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,18 +103,18 @@ func Initialize(ctx context.Context, config Config, s store.Store, cacheMgr cach

func doInitialize(ctx context.Context, config Config, s store.Store, cacheMgr cachetypes.CacheManager,
namespaceOperator namespace.NamespaceOperateServer) (ConfigCenterServer, *Server, error) {
var proxySvr ConfigCenterServer
originSvr := &Server{}

if !config.Open {
originServer.initialized = true
originSvr.initialized = true
return nil, nil, nil
}

var proxySvr ConfigCenterServer
originSvr := &Server{}

if err := cacheMgr.OpenResourceCache(configCacheEntries...); err != nil {
return nil, nil, err
}
err := originServer.initialize(ctx, config, s, namespaceOperator, cacheMgr)
err := originSvr.initialize(ctx, config, s, namespaceOperator, cacheMgr)
if err != nil {
return nil, nil, err
}
Expand All @@ -127,7 +127,7 @@ func doInitialize(ctx context.Context, config Config, s store.Store, cacheMgr ca
return nil, nil, fmt.Errorf("name(%s) not exist in serverProxyFactories", order[i])
}

tmpSvr, err := factory(originServer, server)
tmpSvr, err := factory(originSvr, server)
if err != nil {
return nil, nil, err
}
Expand Down
1 change: 0 additions & 1 deletion config/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
)

func Test_Initialize(t *testing.T) {
t.SkipNow()
eventhub.InitEventHub()
ctrl := gomock.NewController(t)
mockStore := mockstore.NewMockStore(ctrl)
Expand Down
Loading

0 comments on commit 21a15e7

Please sign in to comment.