Skip to content

Commit

Permalink
Merge pull request #1589 from Mulavar/3.0
Browse files Browse the repository at this point in the history
refactor listenDirEvent
  • Loading branch information
justxuewei authored Nov 19, 2021
2 parents 2ae0b3e + 4d780a9 commit a812919
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 43 deletions.
12 changes: 9 additions & 3 deletions config_center/zookeeper/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
)

import (
"github.com/dubbogo/go-zookeeper/zk"
gxset "github.com/dubbogo/gost/container/set"
gxzookeeper "github.com/dubbogo/gost/database/kv/zk"

Expand Down Expand Up @@ -80,15 +81,20 @@ func newZookeeperDynamicConfiguration(url *common.URL) (*zookeeperDynamicConfigu
logger.Errorf("zookeeper client start error ,error message is %v", err)
return nil, err
}
err = c.client.Create(c.rootPath)
if err != nil && err != zk.ErrNodeExists {
return nil, err
}

// Before handle client restart, we need to ensure that the zk dynamic configuration successfully start and create the configuration directory
c.wg.Add(1)
go zookeeper.HandleClientRestart(c)

// Start listener
c.listener = zookeeper.NewZkEventListener(c.client)
c.cacheListener = NewCacheListener(c.rootPath)

err = c.client.Create(c.rootPath)
c.listener.ListenServiceEvent(url, c.rootPath, c.cacheListener)
return c, err
return c, nil
}

func (c *zookeeperDynamicConfiguration) AddListener(key string, listener config_center.ConfigurationListener, opions ...config_center.Option) {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/apache/dubbo-go-hessian2 v1.9.5
github.com/creasty/defaults v1.5.2
github.com/dubbogo/go-zookeeper v1.0.3
github.com/dubbogo/gost v1.11.19
github.com/dubbogo/gost v1.11.20-0.20211116110728-26777ca61b4a
github.com/dubbogo/grpc-go v1.42.5-triple
github.com/dubbogo/triple v1.1.3
github.com/emicklei/go-restful/v3 v3.7.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,8 @@ github.com/dubbogo/go-zookeeper v1.0.3/go.mod h1:fn6n2CAEer3novYgk9ULLwAjuV8/g4D
github.com/dubbogo/gost v1.9.0/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8=
github.com/dubbogo/gost v1.11.12/go.mod h1:vIcP9rqz2KsXHPjsAwIUtfJIJjppQLQDcYaZTy/61jI=
github.com/dubbogo/gost v1.11.18/go.mod h1:vIcP9rqz2KsXHPjsAwIUtfJIJjppQLQDcYaZTy/61jI=
github.com/dubbogo/gost v1.11.19 h1:R1rZ3TNJKV9W5XHLMv+GDO2Wy6UDnwGQtVWbsWYvo0A=
github.com/dubbogo/gost v1.11.19/go.mod h1:vIcP9rqz2KsXHPjsAwIUtfJIJjppQLQDcYaZTy/61jI=
github.com/dubbogo/gost v1.11.20-0.20211116110728-26777ca61b4a h1:RLUCy1Rftro4EmUmqWQCdofwgo9mzPbrZ6d6xWgZNwo=
github.com/dubbogo/gost v1.11.20-0.20211116110728-26777ca61b4a/go.mod h1:vIcP9rqz2KsXHPjsAwIUtfJIJjppQLQDcYaZTy/61jI=
github.com/dubbogo/grpc-go v1.42.5-triple h1:Ed5z/ikkpdZHBMA4mTEthQFTQeKlHtkdAsQrZjTbFk8=
github.com/dubbogo/grpc-go v1.42.5-triple/go.mod h1:F1T9hnUvYGW4JLK1QNriavpOkhusU677ovPzLkk6zHM=
github.com/dubbogo/jsonparser v1.0.1/go.mod h1:tYAtpctvSP/tWw4MeelsowSPgXQRVHHWbqL6ynps8jU=
Expand Down
11 changes: 8 additions & 3 deletions registry/zookeeper/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,11 @@ func (r *zkRegistry) InitListeners() {

// CreatePath creates the path in the registry center of zookeeper
func (r *zkRegistry) CreatePath(path string) error {
return r.ZkClient().Create(path)
err := r.ZkClient().Create(path)
if err != nil && err != zk.ErrNodeExists {
return err
}
return nil
}

// DoRegister actually do the register job in the registry center of zookeeper
Expand Down Expand Up @@ -218,17 +222,18 @@ func (r *zkRegistry) registerTempZookeeperNode(root string, node string) error {
}
logger.Infof("[Zookeeper Registry] Registry instance with root = %s, node = %s", root, node)
err = r.client.Create(root)
if err != nil {
if err != nil && err != zk.ErrNodeExists {
logger.Errorf("zk.Create(root{%s}) = err{%v}", root, perrors.WithStack(err))
return perrors.WithStack(err)
}

// try to register the node
// Try to register the node
zkPath, err = r.client.RegisterTemp(root, node)
if err == nil {
return nil
}

// Maybe the node did exist, then we need to delete it first and recreate it
if perrors.Cause(err) == zk.ErrNodeExists {
if err = r.client.Delete(zkPath); err == nil {
_, err = r.client.RegisterTemp(root, node)
Expand Down
48 changes: 14 additions & 34 deletions remoting/zookeeper/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,52 +228,32 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li
if err == nil {
ttl = timeout
} else {
logger.Warnf("wrong configuration for registry ttl, error:=%+v, using default value %v instead", err, defaultTTL)
logger.Warnf("[Zookeeper EventListener][listenDirEvent] Wrong configuration for registry.ttl, error=%+v, using default value %v instead", err, defaultTTL)
}
}
defer close(event)
for {
// get current children for a zkPath
// Get current children with watcher for the zkRootPath
children, childEventCh, err := l.client.GetChildrenW(zkRootPath)
if err != nil {
failTimes++
if MaxFailTimes <= failTimes {
failTimes = MaxFailTimes
}
logger.Debugf("listenDirEvent(path{%s}) = error{%v}", zkRootPath, err)
// clear the event channel
CLEAR:
for {
select {
case <-event:
default:
break CLEAR
}
}
l.client.RegisterEvent(zkRootPath, &event)
if err == errNilNode {
logger.Warnf("listenDirEvent(path{%s}) got errNilNode,so exit listen", zkRootPath)
l.client.UnregisterEvent(zkRootPath, &event)
return
}
logger.Errorf("[Zookeeper EventListener][listenDirEvent] Get children of path {%s} with watcher failed, the error is %+v", zkRootPath, err)

// May be the provider does not ready yet, sleep failTimes * ConnDelay senconds to wait
after := time.After(timeSecondDuration(failTimes * ConnDelay))
select {
case <-after:
l.client.UnregisterEvent(zkRootPath, &event)
continue
case <-l.exit:
l.client.UnregisterEvent(zkRootPath, &event)
logger.Debugf("listen(path{%s}) goroutine exit now...", zkRootPath)
return
case <-event:
logger.Debugf("get zk.EventNodeDataChange notify event")
l.client.UnregisterEvent(zkRootPath, &event)
l.handleZkNodeEvent(zkRootPath, nil, listener)
continue
}
}
failTimes = 0
if len(children) == 0 {
logger.Warnf("[Zookeeper EventListener][listenDirEvent] Can not gey any children for the path {%s}, please check if the provider does ready.")
continue
}
for _, c := range children {
// Only need to compare Path when subscribing to provider
if strings.LastIndex(zkRootPath, constant.ProviderCategory) != -1 {
Expand All @@ -283,7 +263,7 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li
}
}

// listen l service node
// Build the children path
zkNodePath := path.Join(zkRootPath, c)

// Save the path to avoid listen repeatedly
Expand All @@ -294,7 +274,7 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li
}
l.pathMapLock.Unlock()
if ok {
logger.Warnf("@zkPath %s has already been listened.", zkNodePath)
logger.Warnf("[Zookeeper EventListener][listenDirEvent] The child with zk path {%s} has already been listened.", zkNodePath)
continue
}

Expand All @@ -308,13 +288,13 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li

l.client.RUnlock()
if err != nil {
logger.Errorf("Get new node path {%v} 's content error,message is {%v}", zkNodePath, perrors.WithStack(err))
logger.Errorf("[Zookeeper EventListener][listenDirEvent] Get content of the child node {%v} failed, the error is %+v", zkNodePath, perrors.WithStack(err))
}
logger.Debugf("Get children!{%s}", zkNodePath)
logger.Debugf("[Zookeeper EventListener][listenDirEvent] Get children!{%s}", zkNodePath)
if !listener.DataChange(remoting.Event{Path: zkNodePath, Action: remoting.EventTypeAdd, Content: string(content)}) {
continue
}
logger.Infof("[Zookeeper Listener] listen dubbo service key{%s}", zkNodePath)
logger.Debugf("[Zookeeper EventListener][listenDirEvent] listen dubbo service key{%s}", zkNodePath)
l.wg.Add(1)
go func(zkPath string, listener remoting.DataListener) {
// invoker l.wg.Done() in l.listenServiceNodeEvent
Expand Down Expand Up @@ -366,7 +346,7 @@ func (l *ZkEventListener) startScheduleWatchTask(
ticker = time.NewTicker(tickerTTL)
}
case zkEvent := <-childEventCh:
logger.Warnf("get a zookeeper childEventCh{type:%s, server:%s, path:%s, state:%d-%s, err:%v}",
logger.Debugf("Get a zookeeper childEventCh{type:%s, server:%s, path:%s, state:%d-%s, err:%v}",
zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, gxzookeeper.StateToString(zkEvent.State), zkEvent.Err)
ticker.Stop()
if zkEvent.Type == zk.EventNodeChildrenChanged {
Expand Down

0 comments on commit a812919

Please sign in to comment.