Skip to content

Commit

Permalink
clientv3: simplify watch synchronization
Browse files Browse the repository at this point in the history
Was more complicated than it needed to be.

Also fixes clobbering id's on resume and losing watcher channels when
watchers are disconnected and canceled.
  • Loading branch information
Anthony Romano committed Sep 26, 2016
1 parent 67285bd commit 40f82e0
Showing 1 changed file with 77 additions and 90 deletions.
167 changes: 77 additions & 90 deletions clientv3/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ type watchGrpcStream struct {

// mu protects the streams map
mu sync.RWMutex
// streams holds all active watchers
streams map[int64]*watcherStream
// substreams holds all active watchers
substreams map[int64]*watcherStream

// reqc sends a watch request from Watch() to the main goroutine
reqc chan *watchRequest
Expand Down Expand Up @@ -198,12 +198,12 @@ func (vc *valCtx) Err() error { return nil }
func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
ctx, cancel := context.WithCancel(&valCtx{inctx})
wgs := &watchGrpcStream{
owner: w,
remote: w.remote,
ctx: ctx,
ctxKey: fmt.Sprintf("%v", inctx),
cancel: cancel,
streams: make(map[int64]*watcherStream),
owner: w,
remote: w.remote,
ctx: ctx,
ctxKey: fmt.Sprintf("%v", inctx),
cancel: cancel,
substreams: make(map[int64]*watcherStream),

respc: make(chan *pb.WatchResponse),
reqc: make(chan *watchRequest),
Expand Down Expand Up @@ -284,6 +284,9 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
if ok {
select {
case ret := <-retc:
if ret == nil {
return w.Watch(ctx, key, opts...)
}
return ret
case <-ctx.Done():
case <-donec:
Expand Down Expand Up @@ -314,12 +317,7 @@ func (w *watcher) Close() (err error) {
}

func (w *watchGrpcStream) Close() (err error) {
w.mu.Lock()
if w.stopc != nil {
close(w.stopc)
w.stopc = nil
}
w.mu.Unlock()
close(w.stopc)
<-w.donec
select {
case err = <-w.errc:
Expand Down Expand Up @@ -369,9 +367,7 @@ func (w *watchGrpcStream) addStream(resp *pb.WatchResponse, pendingReq *watchReq
ws.initReq.rev = resp.Header.Revision
}

w.mu.Lock()
w.streams[ws.id] = ws
w.mu.Unlock()
w.substreams[ws.id] = ws

// pass back the subscriber channel for the watcher
pendingReq.retc <- ret
Expand All @@ -380,45 +376,47 @@ func (w *watchGrpcStream) addStream(resp *pb.WatchResponse, pendingReq *watchReq
go w.serveStream(ws)
}

func (w *watchGrpcStream) closeStream(ws *watcherStream) bool {
w.mu.Lock()
func (w *watchGrpcStream) closeStream(ws *watcherStream) {
// cancels request stream; subscriber receives nil channel
close(ws.initReq.retc)
// close subscriber's channel
close(ws.outc)
delete(w.streams, ws.id)
empty := len(w.streams) == 0
if empty && w.stopc != nil {
w.stopc = nil
}
w.mu.Unlock()
return empty
delete(w.substreams, ws.id)
}

// run is the root of the goroutines for managing a watcher client
func (w *watchGrpcStream) run() {
var wc pb.Watch_WatchClient
var closeErr error

// substreams that have been marked to close but goroutine still running
closing := make(map[*watcherStream]struct{})

defer func() {
w.owner.mu.Lock()
w.mu.Lock()
w.closeErr = closeErr
w.mu.Unlock()

// shutdown substreams
for _, ws := range w.substreams {
if _, ok := closing[ws]; !ok {
close(ws.recvc)
}
}
for len(w.substreams) != 0 {
ws := <-w.closingc
w.closeStream(ws)
}

w.owner.mu.Lock()
close(w.donec)
w.cancel()
if w.owner.streams != nil {
delete(w.owner.streams, w.ctxKey)
}
close(w.donec)
w.owner.mu.Unlock()
w.cancel()
}()

// already stopped?
w.mu.RLock()
stopc := w.stopc
w.mu.RUnlock()
if stopc == nil {
return
}

// start a stream with the etcd grpc server
if wc, closeErr = w.newWatchClient(); closeErr != nil {
return
Expand Down Expand Up @@ -450,17 +448,10 @@ func (w *watchGrpcStream) run() {
w.dispatchEvent(pbresp)
case pbresp.Canceled:
delete(cancelSet, pbresp.WatchId)
// shutdown serveStream, if any
w.mu.Lock()
if ws, ok := w.streams[pbresp.WatchId]; ok {
if ws, ok := w.substreams[pbresp.WatchId]; ok {
// signal to stream goroutine to update closingc
close(ws.recvc)
delete(w.streams, ws.id)
}
numStreams := len(w.streams)
w.mu.Unlock()
if numStreams == 0 {
// don't leak watcher streams
return
closing[ws] = struct{}{}
}
default:
// dispatch to appropriate watch stream
Expand Down Expand Up @@ -495,10 +486,13 @@ func (w *watchGrpcStream) run() {
failedReq = pendingReq
}
cancelSet = make(map[int64]struct{})
case <-stopc:
case <-w.stopc:
return
case ws := <-w.closingc:
if w.closeStream(ws) {
w.closeStream(ws)
delete(closing, ws)
if len(w.substreams) == 0 && pendingReq == nil {
// ran out of substreams, shutdown
return
}
}
Expand All @@ -520,9 +514,7 @@ func (w *watchGrpcStream) run() {

// dispatchEvent sends a WatchResponse to the appropriate watcher stream
func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
w.mu.RLock()
defer w.mu.RUnlock()
ws, ok := w.streams[pbresp.WatchId]
ws, ok := w.substreams[pbresp.WatchId]
if !ok {
return false
}
Expand All @@ -531,14 +523,13 @@ func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
for i, ev := range pbresp.Events {
events[i] = (*Event)(ev)
}
wr := &WatchResponse{
ws.recvc <- &WatchResponse{
Header: *pbresp.Header,
Events: events,
CompactRevision: pbresp.CompactRevision,
Created: pbresp.Created,
Canceled: pbresp.Canceled,
}
ws.recvc <- wr
return true
}

Expand All @@ -564,20 +555,25 @@ func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) {
// serveStream forwards watch responses from run() to the subscriber
func (w *watchGrpcStream) serveStream(ws *watcherStream) {
defer func() {
// signal that this watcherStream is finished
select {
case w.closingc <- ws:
case <-w.donec:
w.closeStream(ws)
// try to send off close error
w.mu.RLock()
closeErr := w.closeErr
w.mu.RUnlock()
if closeErr != nil {
select {
case ws.outc <- WatchResponse{closeErr: w.closeErr}:
case <-w.donec:
case <-time.After(closeSendErrTimeout):
}
}
// signal that this watcherStream is finished
w.closingc <- ws
}()

var closeErr error
emptyWr := &WatchResponse{}
wrs := []*WatchResponse{}
resuming := false
closing := false
for !closing {
for {
curWr := emptyWr
outc := ws.outc

Expand All @@ -596,8 +592,7 @@ func (w *watchGrpcStream) serveStream(ws *watcherStream) {
select {
case outc <- *curWr:
if wrs[0].Err() != nil {
closing = true
break
return
}
var newRev int64
if len(wrs[0].Events) > 0 {
Expand All @@ -617,7 +612,6 @@ func (w *watchGrpcStream) serveStream(ws *watcherStream) {
}
// resume up to last seen event if disconnected
if resuming && wr.Err() == nil {
resuming = false
// trim events already seen
for i := 0; i < len(wr.Events); i++ {
if wr.Events[i].Kv.ModRevision > ws.lastRev {
Expand All @@ -643,20 +637,8 @@ func (w *watchGrpcStream) serveStream(ws *watcherStream) {
if resumeRev != ws.lastRev {
panic("unexpected resume revision")
}
case <-w.donec:
closing = true
closeErr = w.closeErr
case <-ws.initReq.ctx.Done():
closing = true
}
}

// try to send off close error
if closeErr != nil {
select {
case ws.outc <- WatchResponse{closeErr: w.closeErr}:
case <-w.donec:
case <-time.After(closeSendErrTimeout):
return
}
}

Expand Down Expand Up @@ -687,14 +669,13 @@ func (w *watchGrpcStream) resume() (ws pb.Watch_WatchClient, err error) {
// openWatchClient retries opening a watchclient until retryConnection fails
func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) {
for {
w.mu.Lock()
stopc := w.stopc
w.mu.Unlock()
if stopc == nil {
select {
case <-w.stopc:
if err == nil {
err = context.Canceled
return nil, context.Canceled
}
return nil, err
default:
}
if ws, err = w.remote.Watch(w.ctx, grpc.FailFast(false)); ws != nil && err == nil {
break
Expand All @@ -709,12 +690,17 @@ func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error)
// resumeWatchers rebuilds every registered watcher on a new client
func (w *watchGrpcStream) resumeWatchers(wc pb.Watch_WatchClient) error {
w.mu.RLock()
streams := make([]*watcherStream, 0, len(w.streams))
for _, ws := range w.streams {
streams := make([]*watcherStream, 0, len(w.substreams))
for _, ws := range w.substreams {
streams = append(streams, ws)
}
w.mu.RUnlock()

// new id's may be different since new remote watcher; update streams
// map once all streams have been restablished to avoid new id's clobbering
// streams with old id's.
newStreams := make(map[int64]*watcherStream)

for _, ws := range streams {
// drain recvc so no old WatchResponses (e.g., Created messages)
// are processed while resuming
Expand All @@ -740,15 +726,16 @@ func (w *watchGrpcStream) resumeWatchers(wc pb.Watch_WatchClient) error {
}

// id may be different since new remote watcher; update map
w.mu.Lock()
delete(w.streams, ws.id)
ws.id = resp.WatchId
w.streams[ws.id] = ws
w.mu.Unlock()
newStreams[resp.WatchId] = ws

// unpause serveStream
ws.resumec <- ws.lastRev
}

for id, ws := range newStreams {
ws.id = id
}
w.substreams = newStreams
return nil
}

Expand Down

0 comments on commit 40f82e0

Please sign in to comment.