From 0e61165babcd2b89d47aaeaba865fd27e5c1f9a8 Mon Sep 17 00:00:00 2001 From: Iwasaki Yudai Date: Fri, 14 Oct 2016 16:30:29 +0900 Subject: [PATCH] clientv3: Allow watcher to close channels while waiting for new client Once watchGrpcStream.run() gets a connection error, it does not anything until it gets a new watch client, because newWatchClient() called in the main loop blocks. In this state, watch response channels returned by the Watcher won't be closed even if the cancel function of the context given to the Watcher has been called. The cancel request will be executed after a connection to the server has been re-established. This commit allow watcher to run cancel tasks while waiting for a new client so that users can cancel watch tasks anytime. --- clientv3/integration/watch_test.go | 26 ++++++++++++++++ clientv3/watch.go | 49 +++++++++++++++++++++++------- 2 files changed, 64 insertions(+), 11 deletions(-) diff --git a/clientv3/integration/watch_test.go b/clientv3/integration/watch_test.go index 1b544ca41249..6ed0d6696cfc 100644 --- a/clientv3/integration/watch_test.go +++ b/clientv3/integration/watch_test.go @@ -926,3 +926,29 @@ func TestWatchStressResumeClose(t *testing.T) { } clus.TakeClient(0) } + +// TestWatchCancelWithNoConnection ensures that canceling a watcher closes +// response channels even when it does not have any connections to endpoints. +func TestWatchCancelWithNoConnection(t *testing.T) { + defer testutil.AfterTest(t) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + cli := clus.Client(0) + ctx, cancel := context.WithCancel(context.Background()) + wch := cli.Watch(ctx, "abc") + time.Sleep(time.Second * 3) + clus.Members[0].Terminate(t) + time.Sleep(time.Second * 3) + go cancel() + select { + case wr, ok := <-wch: + if ok { + t.Fatalf("expected closed watch after cancel(), got resp=%+v err=%v", wr, wr.Err()) + } + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for closed channel") + } + if err := cli.Close(); err != nil { + t.Fatal(err) + } + clus.TakeClient(0) +} diff --git a/clientv3/watch.go b/clientv3/watch.go index 5281f8f5c205..eaeff463222f 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -67,6 +67,11 @@ type WatchResponse struct { closeErr error } +type openWatchClientResponse struct { + wc pb.Watch_WatchClient + err error +} + // IsCreate returns true if the event tells that the key is newly created. func (e *Event) IsCreate() bool { return e.Type == EventTypePut && e.Kv.CreateRevision == e.Kv.ModRevision @@ -414,9 +419,10 @@ func (w *watchGrpcStream) run() { }() // start a stream with the etcd grpc server - if wc, closeErr = w.newWatchClient(); closeErr != nil { + if wc, closeErr = w.openWatchClient(); closeErr != nil { return } + w.setupWatchClient(wc) cancelSet := make(map[int64]struct{}) @@ -486,9 +492,35 @@ func (w *watchGrpcStream) run() { closeErr = err return } - if wc, closeErr = w.newWatchClient(); closeErr != nil { + + clientOpenC := make(chan openWatchClientResponse, 1) + go func() { + owc, err := w.openWatchClient() + clientOpenC <- openWatchClientResponse{wc: owc, err: err} + }() + + for { + select { + case opc := <-clientOpenC: + wc = opc.wc + closeErr = opc.err + case ws := <-w.closingc: + w.closeSubstream(ws) + delete(closing, ws) + if len(w.substreams)+len(w.resuming) == 0 { + // no more watchers on this stream, shutdown + w.cancel() + return + } + continue + } + break + } + + if closeErr != nil { return } + w.setupWatchClient(wc) if ws := w.nextResume(); ws != nil { wc.Send(ws.initReq.toPB()) } @@ -631,12 +663,7 @@ func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{ // lazily send cancel message if events on missing id } -func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) { - // connect to grpc stream - wc, err := w.openWatchClient() - if err != nil { - return nil, v3rpc.Error(err) - } +func (w *watchGrpcStream) setupWatchClient(wc pb.Watch_WatchClient) (pb.Watch_WatchClient, error) { // mark all substreams as resuming if len(w.substreams)+len(w.resuming) > 0 { close(w.resumec) @@ -673,7 +700,7 @@ func (w *watchGrpcStream) joinSubstreams() { } // openWatchClient retries opening a watchclient until retryConnection fails -func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) { +func (w *watchGrpcStream) openWatchClient() (wc pb.Watch_WatchClient, err error) { for { select { case <-w.stopc: @@ -683,14 +710,14 @@ func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) return nil, err default: } - if ws, err = w.remote.Watch(w.ctx, grpc.FailFast(false)); ws != nil && err == nil { + if wc, err = w.remote.Watch(w.ctx, grpc.FailFast(false)); wc != nil && err == nil { break } if isHaltErr(w.ctx, err) { return nil, v3rpc.Error(err) } } - return ws, nil + return wc, nil } // toPB converts an internal watch request structure to its protobuf messagefunc (wr *watchRequest)