From 63f4c7b8aab0acfa9c27d63a82dea15c3ae9d2a3 Mon Sep 17 00:00:00 2001 From: "wang.dongyun" <675069317@qq.com> Date: Fri, 3 Feb 2023 19:01:33 +0800 Subject: [PATCH] =?UTF-8?q?[conf]=20=E9=85=8D=E7=BD=AE=E4=B8=AD=E5=BF=83?= =?UTF-8?q?=E5=85=B3=E4=BA=8E=E6=9C=AC=E5=9C=B0=E6=96=87=E4=BB=B6=E7=BC=93?= =?UTF-8?q?=E5=AD=98=E9=97=AE=E9=A2=98=20(#565)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 远程nacos故障,降级本地 Co-authored-by: jeff.wang --- clients/config_client/config_client.go | 11 ++++++++++- clients/config_client/config_proxy.go | 15 ++++++++++++--- common/constant/client_config_options.go | 7 +++++++ common/constant/config.go | 1 + common/constant/const.go | 1 + common/nacos_server/nacos_server.go | 2 +- common/security/security_proxy.go | 13 +++++++++++-- 7 files changed, 43 insertions(+), 7 deletions(-) diff --git a/clients/config_client/config_client.go b/clients/config_client/config_client.go index 81267c9f..c514989b 100644 --- a/clients/config_client/config_client.go +++ b/clients/config_client/config_client.go @@ -212,6 +212,10 @@ func (client *ConfigClient) getConfigInner(param vo.ConfigParam) (content string clientConfig.TimeoutMs, false, client) if err != nil { logger.Infof("get config from server error:%+v ", err) + if clientConfig, err := client.GetClientConfig(); err == nil && clientConfig.DisableUseSnapShot { + logger.Errorf("get config from cache error:%+v ", err) + return "", errors.New("get config from remote nacos server fail, and is not allowed to read local file") + } content, err = cache.ReadConfigFromFile(cacheKey, client.configCacheDir) if err != nil { logger.Errorf("get config from cache error:%+v ", err) @@ -423,7 +427,12 @@ func (client *ConfigClient) executeConfigListen() { logger.Warnf("ConfigBatchListenRequest failure,err:%+v", err) continue } - if iResponse == nil && !iResponse.IsSuccess() { + if iResponse == nil { + logger.Warnf("ConfigBatchListenRequest failure, response is nil") + continue + } + if !iResponse.IsSuccess() { + logger.Warnf("ConfigBatchListenRequest failure, error code:%+v", iResponse.GetErrorCode()) continue } changeKeys := make(map[string]struct{}) diff --git a/clients/config_client/config_proxy.go b/clients/config_client/config_proxy.go index 7dbe5e54..8036c2bf 100644 --- a/clients/config_client/config_proxy.go +++ b/clients/config_client/config_proxy.go @@ -152,11 +152,20 @@ func (cp *ConfigProxy) queryConfig(dataId, group, tenant string, timeout uint64, return response, nil } +func appName(client *ConfigClient) string { + if clientConfig, err := client.GetClientConfig(); err == nil { + appName := clientConfig.AppName + return appName + } + return "unknown" +} + func (cp *ConfigProxy) createRpcClient(ctx context.Context, taskId string, client *ConfigClient) *rpc.RpcClient { labels := map[string]string{ - constant.LABEL_SOURCE: constant.LABEL_SOURCE_SDK, - constant.LABEL_MODULE: constant.LABEL_MODULE_CONFIG, - "taskId": taskId, + constant.LABEL_SOURCE: constant.LABEL_SOURCE_SDK, + constant.LABEL_MODULE: constant.LABEL_MODULE_CONFIG, + constant.APPNAME_HEADER: appName(client), + "taskId": taskId, } iRpcClient, _ := rpc.CreateClient(ctx, "config-"+taskId+"-"+client.uid, rpc.GRPC, labels, cp.nacosServer) diff --git a/common/constant/client_config_options.go b/common/constant/client_config_options.go index 94089057..82b8b8d3 100644 --- a/common/constant/client_config_options.go +++ b/common/constant/client_config_options.go @@ -116,6 +116,13 @@ func WithCacheDir(cacheDir string) ClientOption { } } +// WithDisableUseSnapShot ... +func WithDisableUseSnapShot(disableUseSnapShot bool) ClientOption { + return func(config *ClientConfig) { + config.DisableUseSnapShot = disableUseSnapShot + } +} + // WithUpdateThreadNum ... func WithUpdateThreadNum(updateThreadNum int) ClientOption { return func(config *ClientConfig) { diff --git a/common/constant/config.go b/common/constant/config.go index 23768477..39460cee 100644 --- a/common/constant/config.go +++ b/common/constant/config.go @@ -39,6 +39,7 @@ type ClientConfig struct { SecretKey string // the SecretKey for kms OpenKMS bool // it's to open kms,default is false. https://help.aliyun.com/product/28933.html CacheDir string // the directory for persist nacos service info,default value is current path + DisableUseSnapShot bool // It's a switch, default is false, means that when get remote config fail, use local cache file instead UpdateThreadNum int // the number of goroutine for update nacos service info,default value is 20 NotLoadCacheAtStart bool // not to load persistent nacos service info in CacheDir at start time UpdateCacheWhenEmpty bool // update cache when get empty service instance from server diff --git a/common/constant/const.go b/common/constant/const.go index bd6bfa93..f5e92fcc 100644 --- a/common/constant/const.go +++ b/common/constant/const.go @@ -89,6 +89,7 @@ const ( DEFAULT_TIMEOUT_MILLS = 3000 ALL_SYNC_INTERNAL = 5 * time.Minute CLIENT_APPNAME_HEADER = "Client-AppName" + APPNAME_HEADER = "AppName" CLIENT_REQUEST_TS_HEADER = "Client-RequestTS" CLIENT_REQUEST_TOKEN_HEADER = "Client-RequestToken" EX_CONFIG_INFO = "exConfigInfo" diff --git a/common/nacos_server/nacos_server.go b/common/nacos_server/nacos_server.go index 1ef35288..6ea8393e 100644 --- a/common/nacos_server/nacos_server.go +++ b/common/nacos_server/nacos_server.go @@ -87,7 +87,7 @@ func NewNacosServer(ctx context.Context, serverList []constant.ServerConfig, cli _, err := securityLogin.Login() if err != nil { - return &ns, err + logger.Errorf("login in err:%v", err) } securityLogin.AutoRefresh(ctx) diff --git a/common/security/security_proxy.go b/common/security/security_proxy.go index dee22c39..39cb807a 100644 --- a/common/security/security_proxy.go +++ b/common/security/security_proxy.go @@ -75,7 +75,12 @@ func (ac *AuthClient) AutoRefresh(ctx context.Context) { } go func() { - timer := time.NewTimer(time.Second * time.Duration(ac.tokenTtl-ac.tokenRefreshWindow)) + var timer *time.Timer + if lastLoginSuccess := ac.lastRefreshTime > 0 && ac.tokenTtl > 0 && ac.tokenRefreshWindow > 0; lastLoginSuccess { + timer = time.NewTimer(time.Second * time.Duration(ac.tokenTtl-ac.tokenRefreshWindow)) + } else { + timer = time.NewTimer(time.Second * time.Duration(5)) + } defer timer.Stop() for { select { @@ -83,8 +88,11 @@ func (ac *AuthClient) AutoRefresh(ctx context.Context) { _, err := ac.Login() if err != nil { logger.Errorf("login has error %+v", err) + timer.Reset(time.Second * time.Duration(5)) + } else { + logger.Infof("login success, tokenTtl: %+v seconds, tokenRefreshWindow: %+v seconds", ac.tokenTtl, ac.tokenRefreshWindow) + timer.Reset(time.Second * time.Duration(ac.tokenTtl-ac.tokenRefreshWindow)) } - timer.Reset(time.Second * time.Duration(ac.tokenTtl-ac.tokenRefreshWindow)) case <-ctx.Done(): return } @@ -156,6 +164,7 @@ func (ac *AuthClient) login(server constant.ServerConfig) (bool, error) { if val, ok := result[constant.KEY_ACCESS_TOKEN]; ok { ac.accessToken.Store(val) + ac.lastRefreshTime = time.Now().Unix() ac.tokenTtl = int64(result[constant.KEY_TOKEN_TTL].(float64)) ac.tokenRefreshWindow = ac.tokenTtl / 10 }