diff --git a/clientv3/watch.go b/clientv3/watch.go index 1d652c81afae..a593bdafd6e7 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -412,9 +412,10 @@ func (w *watchGrpcStream) run() { }() // start a stream with the etcd grpc server - if wc, closeErr = w.newWatchClient(); closeErr != nil { + if wc, closeErr = w.openWatchClient(); closeErr != nil { return } + w.serveWatchClient(wc) cancelSet := make(map[int64]struct{}) @@ -484,9 +485,33 @@ func (w *watchGrpcStream) run() { closeErr = err return } - if wc, closeErr = w.newWatchClient(); closeErr != nil { + + clientOpen := make(chan struct{}) + go func() { + wc, closeErr = w.openWatchClient() + clientOpen <- struct{}{} + }() + + for { + select { + case <-clientOpen: + case ws := <-w.closingc: + w.closeSubstream(ws) + delete(closing, ws) + if len(w.substreams)+len(w.resuming) == 0 { + // no more watchers on this stream, shutdown + w.cancel() + return + } + continue + } + break + } + + if closeErr != nil { return } + w.serveWatchClient(wc) if ws := w.nextResume(); ws != nil { wc.Send(ws.initReq.toPB()) } @@ -543,21 +568,41 @@ func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool { // serveWatchClient forwards messages from the grpc stream to run() func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) { - for { - resp, err := wc.Recv() - if err != nil { + if len(w.substreams)+len(w.resuming) > 0 { + close(w.resumec) + w.resumec = make(chan struct{}) + w.joinSubstreams() + for _, ws := range w.substreams { + ws.id = -1 + w.resuming = append(w.resuming, ws) + } + for _, ws := range w.resuming { + if ws == nil || ws.closing { + continue + } + ws.donec = make(chan struct{}) + go w.serveSubstream(ws, w.resumec) + } + } + w.substreams = make(map[int64]*watcherStream) + + go func() { + for { + resp, err := wc.Recv() + if err != nil { + select { + case w.errc <- err: + case <-w.donec: + } + return + } select { - case w.errc <- err: + case w.respc <- resp: case <-w.donec: + return } - return } - select { - case w.respc <- resp: - case <-w.donec: - return - } - } + }() } // serveSubstream forwards watch responses from run() to the subscriber @@ -629,35 +674,6 @@ func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{ // lazily send cancel message if events on missing id } -func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) { - // connect to grpc stream - wc, err := w.openWatchClient() - if err != nil { - return nil, v3rpc.Error(err) - } - // mark all substreams as resuming - if len(w.substreams)+len(w.resuming) > 0 { - close(w.resumec) - w.resumec = make(chan struct{}) - w.joinSubstreams() - for _, ws := range w.substreams { - ws.id = -1 - w.resuming = append(w.resuming, ws) - } - for _, ws := range w.resuming { - if ws == nil || ws.closing { - continue - } - ws.donec = make(chan struct{}) - go w.serveSubstream(ws, w.resumec) - } - } - w.substreams = make(map[int64]*watcherStream) - // receive data from new grpc stream - go w.serveWatchClient(wc) - return wc, nil -} - // joinSubstream waits for all substream goroutines to complete func (w *watchGrpcStream) joinSubstreams() { for _, ws := range w.substreams {