Skip to content

Commit

Permalink
clientv3: Allow watcher to close channels while waiting for new client
Browse files Browse the repository at this point in the history
Once watchGrpcStream.run() gets a connection error, it does not anything
until it gets a new watch client, because newWatchClient() called in the main
loop blocks.
In this state, watch response channels returned by the Watcher won't be
closed even if the cancel function of the context given to the Watcher
has been called.
The cancel request will be executed after a connection to the server has
been re-established.
This commit allow watcher to run cancel tasks while waiting for a new
client so that users can cancel watch tasks anytime.
  • Loading branch information
yudai committed Oct 14, 2016
1 parent d1660b5 commit 8f27c23
Showing 1 changed file with 58 additions and 42 deletions.
100 changes: 58 additions & 42 deletions clientv3/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,9 +412,10 @@ func (w *watchGrpcStream) run() {
}()

// start a stream with the etcd grpc server
if wc, closeErr = w.newWatchClient(); closeErr != nil {
if wc, closeErr = w.openWatchClient(); closeErr != nil {
return
}
w.serveWatchClient(wc)

cancelSet := make(map[int64]struct{})

Expand Down Expand Up @@ -484,9 +485,33 @@ func (w *watchGrpcStream) run() {
closeErr = err
return
}
if wc, closeErr = w.newWatchClient(); closeErr != nil {

clientOpen := make(chan struct{})
go func() {
wc, closeErr = w.openWatchClient()
clientOpen <- struct{}{}
}()

for {
select {
case <-clientOpen:
case ws := <-w.closingc:
w.closeSubstream(ws)
delete(closing, ws)
if len(w.substreams)+len(w.resuming) == 0 {
// no more watchers on this stream, shutdown
w.cancel()
return
}
continue
}
break
}

if closeErr != nil {
return
}
w.serveWatchClient(wc)
if ws := w.nextResume(); ws != nil {
wc.Send(ws.initReq.toPB())
}
Expand Down Expand Up @@ -543,21 +568,41 @@ func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {

// serveWatchClient forwards messages from the grpc stream to run()
func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) {
for {
resp, err := wc.Recv()
if err != nil {
if len(w.substreams)+len(w.resuming) > 0 {
close(w.resumec)
w.resumec = make(chan struct{})
w.joinSubstreams()
for _, ws := range w.substreams {
ws.id = -1
w.resuming = append(w.resuming, ws)
}
for _, ws := range w.resuming {
if ws == nil || ws.closing {
continue
}
ws.donec = make(chan struct{})
go w.serveSubstream(ws, w.resumec)
}
}
w.substreams = make(map[int64]*watcherStream)

go func() {
for {
resp, err := wc.Recv()
if err != nil {
select {
case w.errc <- err:
case <-w.donec:
}
return
}
select {
case w.errc <- err:
case w.respc <- resp:
case <-w.donec:
return
}
return
}
select {
case w.respc <- resp:
case <-w.donec:
return
}
}
}()
}

// serveSubstream forwards watch responses from run() to the subscriber
Expand Down Expand Up @@ -629,35 +674,6 @@ func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{
// lazily send cancel message if events on missing id
}

func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
// connect to grpc stream
wc, err := w.openWatchClient()
if err != nil {
return nil, v3rpc.Error(err)
}
// mark all substreams as resuming
if len(w.substreams)+len(w.resuming) > 0 {
close(w.resumec)
w.resumec = make(chan struct{})
w.joinSubstreams()
for _, ws := range w.substreams {
ws.id = -1
w.resuming = append(w.resuming, ws)
}
for _, ws := range w.resuming {
if ws == nil || ws.closing {
continue
}
ws.donec = make(chan struct{})
go w.serveSubstream(ws, w.resumec)
}
}
w.substreams = make(map[int64]*watcherStream)
// receive data from new grpc stream
go w.serveWatchClient(wc)
return wc, nil
}

// joinSubstream waits for all substream goroutines to complete
func (w *watchGrpcStream) joinSubstreams() {
for _, ws := range w.substreams {
Expand Down

0 comments on commit 8f27c23

Please sign in to comment.