From 60ecfda0b1dadf2ff8b5ed90438c09814a9cd44e Mon Sep 17 00:00:00 2001 From: liaochuntao Date: Thu, 28 Mar 2024 01:08:35 +0800 Subject: [PATCH] =?UTF-8?q?fix:=E4=BF=AE=E5=A4=8D=E5=AE=A2=E6=88=B7?= =?UTF-8?q?=E7=AB=AF=E9=89=B4=E6=9D=83=E6=8E=A5=E5=8F=A3=E8=AE=BE=E7=BD=AE?= =?UTF-8?q?=20(#203)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/config/api.go | 4 +++ pkg/config/config_connector.go | 12 +++++++ pkg/config/serverconnector.go | 14 +++++++++ .../polaris/config_connector.go | 12 +++---- plugin/serverconnector/common/client.go | 10 ++++-- plugin/serverconnector/common/discover.go | 20 ++++++++++-- plugin/serverconnector/common/util.go | 31 +++++++------------ .../serverconnector/grpc/operation_async.go | 10 +++--- plugin/serverconnector/grpc/operation_sync.go | 8 ++--- 9 files changed, 83 insertions(+), 38 deletions(-) diff --git a/pkg/config/api.go b/pkg/config/api.go index f0de217c..aa88af3e 100644 --- a/pkg/config/api.go +++ b/pkg/config/api.go @@ -270,6 +270,10 @@ type ServerConnectorConfig interface { GetConnectionIdleTimeout() time.Duration // SetConnectionIdleTimeout 设置连接会被释放的空闲的时长 SetConnectionIdleTimeout(time.Duration) + // GetToken . + GetToken() string + // SetToken . + SetToken(string) } // LocalCacheConfig 本地缓存相关配置项. diff --git a/pkg/config/config_connector.go b/pkg/config/config_connector.go index e27d2076..8cb4461e 100644 --- a/pkg/config/config_connector.go +++ b/pkg/config/config_connector.go @@ -51,6 +51,8 @@ type ConfigConnectorConfigImpl struct { Plugin PluginConfigs `yaml:"plugin" json:"plugin"` + Token string `yaml:"token" json:"token"` + ConnectorType string `yaml:"connectorType" json:"connectorType"` } @@ -158,6 +160,16 @@ func (c *ConfigConnectorConfigImpl) SetConnectorType(connectorType string) { c.ConnectorType = connectorType } +// GetToken . +func (c *ConfigConnectorConfigImpl) GetToken() string { + return c.Token +} + +// SetToken . +func (c *ConfigConnectorConfigImpl) SetToken(token string) { + c.Token = token +} + // Verify 检验ConfigConnector配置. func (c *ConfigConnectorConfigImpl) Verify() error { if nil == c { diff --git a/pkg/config/serverconnector.go b/pkg/config/serverconnector.go index d06dc9a6..f51f5906 100644 --- a/pkg/config/serverconnector.go +++ b/pkg/config/serverconnector.go @@ -49,6 +49,8 @@ type ServerConnectorConfigImpl struct { ReconnectInterval *time.Duration `yaml:"reconnectInterval" json:"reconnectInterval"` Plugin PluginConfigs `yaml:"plugin" json:"plugin"` + + Token string `yaml:"token" json:"token"` } // GetAddresses global.serverConnector.addresses @@ -152,6 +154,18 @@ func (s *ServerConnectorConfigImpl) SetPluginConfig(pluginName string, value Bas return s.Plugin.SetPluginConfig(common.TypeServerConnector, pluginName, value) } +// GetProtocol global.serverConnector.protocol +// 与cl5 server对接的协议. +func (s *ServerConnectorConfigImpl) GetToken() string { + return s.Token +} + +// SetProtocol 设置与cl5 server对接的协议. +func (s *ServerConnectorConfigImpl) SetToken(t string) { + s.Token = t +} + + // Verify 检验ServerConnector配置. func (s *ServerConnectorConfigImpl) Verify() error { if nil == s { diff --git a/plugin/configconnector/polaris/config_connector.go b/plugin/configconnector/polaris/config_connector.go index 8f7272d0..479abc10 100644 --- a/plugin/configconnector/polaris/config_connector.go +++ b/plugin/configconnector/polaris/config_connector.go @@ -121,7 +121,7 @@ func (c *Connector) GetConfigFile(configFile *configconnector.ConfigFile) (*conf defer conn.Release(opKey) configClient := config_manage.NewPolarisConfigGRPCClient(network.ToGRPCConn(conn.Conn)) reqID := connector.NextRegisterInstanceReqID() - ctx, cancel := connector.CreateHeaderContextWithReqId(0, reqID) + ctx, cancel := connector.CreateHeadersContext(0, connector.AppendHeaderWithReqId(reqID)) if cancel != nil { defer cancel() } @@ -153,7 +153,7 @@ func (c *Connector) WatchConfigFiles(configFileList []*configconnector.ConfigFil defer conn.Release(opKey) configClient := config_manage.NewPolarisConfigGRPCClient(network.ToGRPCConn(conn.Conn)) reqID := connector.NextWatchConfigFilesReqID() - ctx, cancel := connector.CreateHeaderContextWithReqId(0, reqID) + ctx, cancel := connector.CreateHeadersContext(0, connector.AppendHeaderWithReqId(reqID)) if cancel != nil { defer cancel() } @@ -190,7 +190,7 @@ func (c *Connector) CreateConfigFile(configFile *configconnector.ConfigFile) (*c defer conn.Release(opKey) configClient := config_manage.NewPolarisConfigGRPCClient(network.ToGRPCConn(conn.Conn)) reqID := connector.NextCreateConfigFileReqID() - ctx, cancel := connector.CreateHeaderContextWithReqId(0, reqID) + ctx, cancel := connector.CreateHeadersContext(0, connector.AppendHeaderWithReqId(reqID)) if cancel != nil { defer cancel() } @@ -222,7 +222,7 @@ func (c *Connector) UpdateConfigFile(configFile *configconnector.ConfigFile) (*c defer conn.Release(opKey) configClient := config_manage.NewPolarisConfigGRPCClient(network.ToGRPCConn(conn.Conn)) reqID := connector.NextUpdateConfigFileReqID() - ctx, cancel := connector.CreateHeaderContextWithReqId(0, reqID) + ctx, cancel := connector.CreateHeadersContext(0, connector.AppendHeaderWithReqId(reqID)) if cancel != nil { defer cancel() } @@ -254,7 +254,7 @@ func (c *Connector) PublishConfigFile(configFile *configconnector.ConfigFile) (* defer conn.Release(opKey) configClient := config_manage.NewPolarisConfigGRPCClient(network.ToGRPCConn(conn.Conn)) reqID := connector.NextPublishConfigFileReqID() - ctx, cancel := connector.CreateHeaderContextWithReqId(0, reqID) + ctx, cancel := connector.CreateHeadersContext(0, connector.AppendHeaderWithReqId(reqID)) if cancel != nil { defer cancel() } @@ -283,7 +283,7 @@ func (c *Connector) GetConfigGroup(req *configconnector.ConfigGroup) (*configcon } reqID := connector.NextPublishConfigFileReqID() - ctx, cancel := connector.CreateHeaderContextWithReqId(0, reqID) + ctx, cancel := connector.CreateHeadersContext(0, connector.AppendHeaderWithReqId(reqID)) if cancel != nil { defer cancel() } diff --git a/plugin/serverconnector/common/client.go b/plugin/serverconnector/common/client.go index 8111442f..c830361d 100644 --- a/plugin/serverconnector/common/client.go +++ b/plugin/serverconnector/common/client.go @@ -36,6 +36,12 @@ type DiscoverClient interface { CloseSend() error } +type DiscoverClientCreatorArgs struct { + ReqId string + AuthToken string + Connection *network.Connection + Timeout time.Duration +} + // DiscoverClientCreator 创建client的函数 -type DiscoverClientCreator func( - reqId string, connection *network.Connection, timeout time.Duration) (DiscoverClient, context.CancelFunc, error) +type DiscoverClientCreator func(args *DiscoverClientCreatorArgs) (DiscoverClient, context.CancelFunc, error) diff --git a/plugin/serverconnector/common/discover.go b/plugin/serverconnector/common/discover.go index e323b81b..1b42d7d8 100644 --- a/plugin/serverconnector/common/discover.go +++ b/plugin/serverconnector/common/discover.go @@ -44,6 +44,8 @@ import ( const ( // 需要发往服务端的请求跟踪标识 headerRequestID = "request-id" + // + headerAuthToken = "X-Polaris-Token" // 失败时的最大超时时间 maxConnTimeout = 100 * time.Millisecond // 任务重试间隔 @@ -68,6 +70,8 @@ type DiscoverConnector struct { ServiceConnector *plugin.PluginBase connectionIdleTimeout time.Duration messageTimeout time.Duration + // authToken + authToken string // 普通任务队列 taskChannel chan *clientTask // 高优先级重试任务队列,只会在系统服务未ready时候会往队列塞值 @@ -100,6 +104,7 @@ type clientTask struct { // Init 初始化插件 func (g *DiscoverConnector) Init(ctx *plugin.InitContext, createClient DiscoverClientCreator) { ctxConfig := ctx.Config + g.authToken = ctxConfig.GetGlobal().GetServerConnector().GetToken() g.RunContext = common.NewRunContext() g.scalableRand = rand.NewScalableRand() g.discoverKey.Namespace = ctxConfig.GetGlobal().GetSystem().GetDiscoverCluster().GetNamespace() @@ -587,8 +592,12 @@ func (g *DiscoverConnector) newStream(task *serviceUpdateTask) (streamingClient goto finally } streamingClient.reqID = NextDiscoverReqID() - streamingClient.discoverClient, streamingClient.cancel, err = g.createClient(streamingClient.reqID, - streamingClient.connection, 0) + streamingClient.discoverClient, streamingClient.cancel, err = g.createClient(&DiscoverClientCreatorArgs{ + ReqId: streamingClient.reqID, + Connection: streamingClient.connection, + Timeout: 0, + AuthToken: g.authToken, + }) if err != nil { log.GetNetworkLogger().Errorf("%s, newStream: fail to get streaming client from %s, reqID %s, err %v", g.ServiceConnector.GetSDKContextID(), streamingClient.connection, streamingClient.reqID, err) @@ -953,7 +962,12 @@ func (g *DiscoverConnector) syncUpdateTask(task *serviceUpdateTask) error { } defer connection.Release(OpKeyDiscover) reqID := NextDiscoverReqID() - discoverClient, cancel, err := g.createClient(reqID, connection, g.messageTimeout) + discoverClient, cancel, err := g.createClient(&DiscoverClientCreatorArgs{ + ReqId: reqID, + Connection: connection, + Timeout: g.messageTimeout, + AuthToken: g.authToken, + }) if cancel != nil { defer cancel() } diff --git a/plugin/serverconnector/common/util.go b/plugin/serverconnector/common/util.go index 63f526de..eb8b3d2d 100644 --- a/plugin/serverconnector/common/util.go +++ b/plugin/serverconnector/common/util.go @@ -174,8 +174,12 @@ func GetUpdateTaskRequestTime(updateTask *serviceUpdateTask) time.Duration { // return metadata.NewOutgoingContext(ctx, md) // } -// CreateHeaderContext 创建传输grpc头的valueContext -func CreateHeaderContext(timeout time.Duration, headers map[string]string) (context.Context, context.CancelFunc) { +func CreateHeadersContext(timeout time.Duration, options ...func(map[string]string)) (context.Context, context.CancelFunc) { + headers := map[string]string{} + for _, option := range options { + option(headers) + } + md := metadata.New(headers) var ctx context.Context var cancel context.CancelFunc @@ -188,25 +192,14 @@ func CreateHeaderContext(timeout time.Duration, headers map[string]string) (cont return metadata.NewOutgoingContext(ctx, md), cancel } -// CreateHeaderContextWithReqId 创建传输grpc头的valueContext -func CreateHeaderContextWithReqId(timeout time.Duration, reqID string) (context.Context, context.CancelFunc) { - md := metadata.New(map[string]string{headerRequestID: reqID}) - var ctx context.Context - var cancel context.CancelFunc - if timeout > 0 { - ctx, cancel = context.WithTimeout(context.Background(), timeout) - } else { - ctx = context.Background() - cancel = nil +func AppendAuthHeader(token string) func(map[string]string) { + return func(header map[string]string) { + header[headerAuthToken] = token } - return metadata.NewOutgoingContext(ctx, md), cancel } -func AppendHeaderWithReqId(header map[string]string, reqID string) map[string]string { - m := make(map[string]string, len(header)+1) - for k, v := range header { - m[k] = v +func AppendHeaderWithReqId(reqID string) func(map[string]string) { + return func(header map[string]string) { + header[headerRequestID] = reqID } - m[headerRequestID] = reqID - return m } diff --git a/plugin/serverconnector/grpc/operation_async.go b/plugin/serverconnector/grpc/operation_async.go index 7e76ab8c..b9127135 100644 --- a/plugin/serverconnector/grpc/operation_async.go +++ b/plugin/serverconnector/grpc/operation_async.go @@ -98,11 +98,13 @@ func (g *Connector) GetConnectionManager() network.ConnectionManager { } // 创建服务发现客户端 -func (g *Connector) createDiscoverClient(reqID string, - connection *network.Connection, timeout time.Duration) (connector.DiscoverClient, context.CancelFunc, error) { +func (g *Connector) createDiscoverClient(args *connector.DiscoverClientCreatorArgs) (connector.DiscoverClient, context.CancelFunc, error) { // 创建namingClient对象 - client := apiservice.NewPolarisGRPCClient(network.ToGRPCConn(connection.Conn)) - outgoingCtx, cancel := connector.CreateHeaderContextWithReqId(timeout, reqID) + client := apiservice.NewPolarisGRPCClient(network.ToGRPCConn(args.Connection.Conn)) + outgoingCtx, cancel := connector.CreateHeadersContext(args.Timeout, + connector.AppendAuthHeader(args.AuthToken), + connector.AppendHeaderWithReqId(args.ReqId)) + discoverClient, err := client.Discover(outgoingCtx) return discoverClient, cancel, err } diff --git a/plugin/serverconnector/grpc/operation_sync.go b/plugin/serverconnector/grpc/operation_sync.go index 4199ec5d..c14f1dbe 100644 --- a/plugin/serverconnector/grpc/operation_sync.go +++ b/plugin/serverconnector/grpc/operation_sync.go @@ -56,7 +56,7 @@ func (g *Connector) RegisterInstance(req *model.InstanceRegisterRequest, header var ( namingClient = apiservice.NewPolarisGRPCClient(network.ToGRPCConn(conn.Conn)) reqID = connector.NextRegisterInstanceReqID() - ctx, cancel = connector.CreateHeaderContext(*req.Timeout, connector.AppendHeaderWithReqId(header, reqID)) + ctx, cancel = connector.CreateHeadersContext(*req.Timeout, connector.AppendHeaderWithReqId(reqID)) ) if cancel != nil { @@ -120,7 +120,7 @@ func (g *Connector) DeregisterInstance(req *model.InstanceDeRegisterRequest) err var ( namingClient = apiservice.NewPolarisGRPCClient(network.ToGRPCConn(conn.Conn)) reqID = connector.NextDeRegisterInstanceReqID() - ctx, cancel = connector.CreateHeaderContextWithReqId(*req.Timeout, reqID) + ctx, cancel = connector.CreateHeadersContext(*req.Timeout, connector.AppendHeaderWithReqId(reqID)) ) if cancel != nil { defer cancel() @@ -181,7 +181,7 @@ func (g *Connector) Heartbeat(req *model.InstanceHeartbeatRequest) error { var ( namingClient = apiservice.NewPolarisGRPCClient(network.ToGRPCConn(conn.Conn)) reqID = connector.NextHeartbeatReqID() - ctx, cancel = connector.CreateHeaderContextWithReqId(*req.Timeout, reqID) + ctx, cancel = connector.CreateHeadersContext(*req.Timeout, connector.AppendHeaderWithReqId(reqID)) ) if cancel != nil { defer cancel() @@ -269,7 +269,7 @@ func (g *Connector) ReportClient(req *model.ReportClientRequest) (*model.ReportC var ( namingClient = apiservice.NewPolarisGRPCClient(network.ToGRPCConn(conn.Conn)) reqID = connector.NextReportClientReqID() - ctx, cancel = connector.CreateHeaderContextWithReqId(req.Timeout, reqID) + ctx, cancel = connector.CreateHeadersContext(req.Timeout, connector.AppendHeaderWithReqId(reqID)) ) if cancel != nil { defer cancel()