From a39107a3b88baccebe870ada3b799700c324dfab Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Wed, 22 Mar 2017 22:14:39 -0700 Subject: [PATCH] clientv3: use waitgroup to wait for substream goroutine teardown When a grpc watch stream is torn down, it will join on its logical substream goroutines by waiting for each to close a channel. This doesn't guarantee the substream is fully exited, though, but only about to exit and can be waiting to resume even after Watch.Close finishes. Instead, use a waitgroup.Done at the very end of the substream defer. Fixes #7573 --- clientv3/watch.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/clientv3/watch.go b/clientv3/watch.go index 489eba6a8e0..5e00e163fd6 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -132,6 +132,8 @@ type watchGrpcStream struct { errc chan error // closingc gets the watcherStream of closing watchers closingc chan *watcherStream + // wg is Done when all substream goroutines have exited + wg sync.WaitGroup // resumec closes to signal that all substreams should begin resuming resumec chan struct{} @@ -406,7 +408,7 @@ func (w *watchGrpcStream) run() { for range closing { w.closeSubstream(<-w.closingc) } - + w.wg.Wait() w.owner.closeStream(w) }() @@ -431,6 +433,7 @@ func (w *watchGrpcStream) run() { } ws.donec = make(chan struct{}) + w.wg.Add(1) go w.serveSubstream(ws, w.resumec) // queue up for watcher creation/resume @@ -576,6 +579,7 @@ func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{ if !resuming { w.closingc <- ws } + w.wg.Done() }() emptyWr := &WatchResponse{} @@ -674,6 +678,7 @@ func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) { continue } ws.donec = make(chan struct{}) + w.wg.Add(1) go w.serveSubstream(ws, w.resumec) }