Skip to content

Commit

Permalink
clientv3: non-recursive watch
Browse files Browse the repository at this point in the history
Signed-off-by: Ted Yu <[email protected]>
  • Loading branch information
tedyu committed May 20, 2020
1 parent 732df43 commit f976138
Showing 1 changed file with 45 additions and 37 deletions.
82 changes: 45 additions & 37 deletions clientv3/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,55 +311,63 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
ok := false
ctxKey := streamKeyFromCtx(ctx)

// find or allocate appropriate grpc watch stream
w.mu.Lock()
if w.streams == nil {
// closed
var closeCh chan WatchResponse
for {
// find or allocate appropriate grpc watch stream
w.mu.Lock()
if w.streams == nil {
// closed
w.mu.Unlock()
ch := make(chan WatchResponse)
close(ch)
return ch
}
wgs := w.streams[ctxKey]
if wgs == nil {
wgs = w.newWatcherGrpcStream(ctx)
w.streams[ctxKey] = wgs
}
donec := wgs.donec
reqc := wgs.reqc
w.mu.Unlock()
ch := make(chan WatchResponse)
close(ch)
return ch
}
wgs := w.streams[ctxKey]
if wgs == nil {
wgs = w.newWatcherGrpcStream(ctx)
w.streams[ctxKey] = wgs
}
donec := wgs.donec
reqc := wgs.reqc
w.mu.Unlock()

// couldn't create channel; return closed channel
closeCh := make(chan WatchResponse, 1)

// submit request
select {
case reqc <- wr:
ok = true
case <-wr.ctx.Done():
case <-donec:
if wgs.closeErr != nil {
closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr}
break
// couldn't create channel; return closed channel
if closeCh == nil {
closeCh = make(chan WatchResponse, 1)
}
// retry; may have dropped stream from no ctxs
return w.Watch(ctx, key, opts...)
}

// receive channel
if ok {
// submit request
select {
case ret := <-wr.retc:
return ret
case <-ctx.Done():
case reqc <- wr:
ok = true
case <-wr.ctx.Done():
ok = false
case <-donec:
ok = false
if wgs.closeErr != nil {
closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr}
break
}
// retry; may have dropped stream from no ctxs
return w.Watch(ctx, key, opts...)
continue
}

// receive channel
if ok {
select {
case ret := <-wr.retc:
return ret
case <-ctx.Done():
case <-donec:
if wgs.closeErr != nil {
closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr}
break
}
// retry; may have dropped stream from no ctxs
continue
}
}
break
}

close(closeCh)
Expand Down

0 comments on commit f976138

Please sign in to comment.