Skip to content

Commit

Permalink
Fix: issue apache#951 etcd exit panic
Browse files Browse the repository at this point in the history
  • Loading branch information
watermelo committed Jan 23, 2021
1 parent 9a666eb commit 3907104
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 20 deletions.
11 changes: 8 additions & 3 deletions registry/etcdv3/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package etcdv3

import (
"strings"
"sync"
)

import (
Expand Down Expand Up @@ -79,8 +80,9 @@ func (l *dataListener) DataChange(eventType remoting.Event) bool {
}

type configurationListener struct {
registry *etcdV3Registry
events chan *config_center.ConfigChangeEvent
registry *etcdV3Registry
events chan *config_center.ConfigChangeEvent
closeOnce sync.Once
}

// NewConfigurationListener for listening the event of etcdv3.
Expand Down Expand Up @@ -119,6 +121,9 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
}

// Close etcd registry center
// BugFix why no real close
func (l *configurationListener) Close() {
l.registry.WaitGroup().Done()
l.closeOnce.Do(func() {
l.registry.WaitGroup().Done()
})
}
19 changes: 5 additions & 14 deletions registry/etcdv3/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type etcdV3Registry struct {
registry.BaseRegistry
cltLock sync.Mutex
client *etcdv3.Client
listenerLock sync.Mutex
listenerLock sync.RWMutex
listener *etcdv3.EventListener
dataListener *dataListener
configListener *configurationListener
Expand Down Expand Up @@ -150,27 +150,18 @@ func (r *etcdV3Registry) CreatePath(k string) error {

// DoSubscribe actually subscribe the provider URL
func (r *etcdV3Registry) DoSubscribe(svc *common.URL) (registry.Listener, error) {

var (
configListener *configurationListener
)

r.listenerLock.Lock()
configListener = r.configListener
r.listenerLock.Unlock()
r.listenerLock.RLock()
configListener := r.configListener
r.listenerLock.RUnlock()
if r.listener == nil {
r.cltLock.Lock()
client := r.client
r.cltLock.Unlock()
if client == nil {
return nil, perrors.New("etcd client broken")
}

// new client & listener
listener := etcdv3.NewEventListener(r.client)

r.listenerLock.Lock()
r.listener = listener
r.listener = etcdv3.NewEventListener(r.client) // new client & listener
r.listenerLock.Unlock()
}

Expand Down
6 changes: 3 additions & 3 deletions remoting/etcdv3/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
// nolint
type EventListener struct {
client *Client
keyMapLock sync.Mutex
keyMapLock sync.RWMutex
keyMap map[string]struct{}
wg sync.WaitGroup
}
Expand Down Expand Up @@ -181,9 +181,9 @@ func timeSecondDuration(sec int) time.Duration {
// --------> listenServiceNodeEvent
func (l *EventListener) ListenServiceEvent(key string, listener remoting.DataListener) {

l.keyMapLock.Lock()
l.keyMapLock.RLock()
_, ok := l.keyMap[key]
l.keyMapLock.Unlock()
l.keyMapLock.RUnlock()
if ok {
logger.Warnf("etcdv3 key %s has already been listened.", key)
return
Expand Down

0 comments on commit 3907104

Please sign in to comment.