Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

优化longPulling err时delay、防止日志瞬间写满 #92

Merged
merged 1 commit into from
Jul 31, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 26 additions & 16 deletions clients/config_client/config_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ type ConfigClient struct {
configCacheDir string
}

const perTaskConfigSize = 3000
const (
perTaskConfigSize = 3000
executorErrDelay = 20 * time.Second
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

20秒太长停顿时间了,5秒合适点

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

可以的

)

var (
currentTaskCount int
Expand Down Expand Up @@ -293,25 +296,27 @@ func (client *ConfigClient) ListenConfig(param vo.ConfigParam) (err error) {
//Delay Scheduler
//initialDelay the time to delay first execution
//delay the delay between the termination of one execution and the commencement of the next
func delayScheduler(t *time.Timer, delay time.Duration, taskId string, execute func()) {
func delayScheduler(t *time.Timer, delay time.Duration, taskId string, execute func() (err error)) {
for {
if v, ok := schedulerMap.Get(taskId); ok {
if !v.(bool) {
return
}
}
<-t.C
execute()
t.Reset(delay)
d := delay
if err := execute(); err != nil {
d = executorErrDelay
}
t.Reset(d)
}
}

//Listen for the configuration executor
func listenConfigExecutor() func() {
return func() {
func listenConfigExecutor() func() (err error) {
return func() (err error) {
listenerSize := len(cacheMap.Keys())
taskCount := int(math.Ceil(float64(listenerSize) / float64(perTaskConfigSize)))

if taskCount > currentTaskCount {
for i := currentTaskCount; i < taskCount; i++ {
schedulerMap.Set(strconv.Itoa(i), true)
Expand All @@ -326,14 +331,16 @@ func listenConfigExecutor() func() {
}
currentTaskCount = taskCount
}
return
}
}

//Long polling listening configuration
func longPulling(taskId int) func() {
return func() {
func longPulling(taskId int) func() (err error) {
return func() (err error) {
var listeningConfigs string
var client *ConfigClient
var clientConfig constant.ClientConfig
initializationList := make([]cacheData, 0)
for _, key := range cacheMap.Keys() {
if value, ok := cacheMap.Get(key); ok {
Expand All @@ -354,7 +361,7 @@ func longPulling(taskId int) func() {
}
}
if len(listeningConfigs) > 0 {
clientConfig, err := client.GetClientConfig()
clientConfig, err = client.GetClientConfig()
if err != nil {
log.Printf("[checkConfigInfo.GetClientConfig] err: %+v", err)
return
Expand All @@ -364,7 +371,8 @@ func longPulling(taskId int) func() {
params[constant.KEY_LISTEN_CONFIGS] = listeningConfigs

var changed string
changedTmp, err := client.configProxy.ListenConfig(params, len(initializationList) > 0, clientConfig.AccessKey, clientConfig.SecretKey)
var changedTmp string
changedTmp, err = client.configProxy.ListenConfig(params, len(initializationList) > 0, clientConfig.AccessKey, clientConfig.SecretKey)
if err == nil {
changed = changedTmp
} else {
Expand All @@ -382,26 +390,26 @@ func longPulling(taskId int) func() {
log.Println("[client.ListenConfig] no change")
} else {
log.Print("[client.ListenConfig] config changed:" + changed)
client.callListener(changed, clientConfig.NamespaceId)
err = client.callListener(changed, clientConfig.NamespaceId)
}
}

return err
}

}

//Execute the Listener callback func()
func (client *ConfigClient) callListener(changed, tenant string) {
func (client *ConfigClient) callListener(changed, tenant string) (err error) {
changedConfigs := strings.Split(changed, "%01")
for _, config := range changedConfigs {
attrs := strings.Split(config, "%02")
if len(attrs) >= 2 {
if value, ok := cacheMap.Get(utils.GetConfigCacheKey(attrs[0], attrs[1], tenant)); ok {
cData := value.(cacheData)
if content, err := client.getConfigInner(vo.ConfigParam{
if content, err2 := client.getConfigInner(vo.ConfigParam{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

为什么要使用err2这个变量?我觉得不需要引入这个变量

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

client.getConfigInner如果出错(比如group或dataid错误),日志还是会大量报错(config_client.go 429行)

DataId: cData.dataId,
Group: cData.group,
}); err == nil {
}); err2 == nil {
cData.content = content
cData.md5 = util.Md5(content)
if cData.md5 != cData.cacheDataListener.lastMd5 {
Expand All @@ -410,11 +418,13 @@ func (client *ConfigClient) callListener(changed, tenant string) {
cacheMap.Set(utils.GetConfigCacheKey(cData.dataId, cData.group, tenant), cData)
}
} else {
err = err2
log.Printf("[client.getConfigInner] DataId:[%s] Group:[%s] Error:[%+v]", cData.dataId, cData.group, err)
}
}
}
}
return err
}

func (client *ConfigClient) buildBasePath(serverConfig constant.ServerConfig) (basePath string) {
Expand Down