Skip to content

Commit

Permalink
clientv3: Allow watcher to close channels while waiting for new client
Browse files Browse the repository at this point in the history
Once watchGrpcStream.run() gets a connection error, it does not anything
until it gets a new watch client, because newWatchClient() called in the main
loop blocks.
In this state, watch response channels returned by the Watcher won't be
closed even if the cancel function of the context given to the Watcher
has been called.
The cancel request will be executed after a connection to the server has
been re-established.
This commit allow watcher to run cancel tasks while waiting for a new
client so that users can cancel watch tasks anytime.
  • Loading branch information
yudai authored and Iwasaki Yudai committed Oct 27, 2016
1 parent 8825392 commit 0e61165
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 11 deletions.
26 changes: 26 additions & 0 deletions clientv3/integration/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -926,3 +926,29 @@ func TestWatchStressResumeClose(t *testing.T) {
}
clus.TakeClient(0)
}

// TestWatchCancelWithNoConnection ensures that canceling a watcher closes
// response channels even when it does not have any connections to endpoints.
func TestWatchCancelWithNoConnection(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
cli := clus.Client(0)
ctx, cancel := context.WithCancel(context.Background())
wch := cli.Watch(ctx, "abc")
time.Sleep(time.Second * 3)
clus.Members[0].Terminate(t)
time.Sleep(time.Second * 3)
go cancel()
select {
case wr, ok := <-wch:
if ok {
t.Fatalf("expected closed watch after cancel(), got resp=%+v err=%v", wr, wr.Err())
}
case <-time.After(5 * time.Second):
t.Fatal("timed out waiting for closed channel")
}
if err := cli.Close(); err != nil {
t.Fatal(err)
}
clus.TakeClient(0)
}
49 changes: 38 additions & 11 deletions clientv3/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ type WatchResponse struct {
closeErr error
}

type openWatchClientResponse struct {
wc pb.Watch_WatchClient
err error
}

// IsCreate returns true if the event tells that the key is newly created.
func (e *Event) IsCreate() bool {
return e.Type == EventTypePut && e.Kv.CreateRevision == e.Kv.ModRevision
Expand Down Expand Up @@ -414,9 +419,10 @@ func (w *watchGrpcStream) run() {
}()

// start a stream with the etcd grpc server
if wc, closeErr = w.newWatchClient(); closeErr != nil {
if wc, closeErr = w.openWatchClient(); closeErr != nil {
return
}
w.setupWatchClient(wc)

cancelSet := make(map[int64]struct{})

Expand Down Expand Up @@ -486,9 +492,35 @@ func (w *watchGrpcStream) run() {
closeErr = err
return
}
if wc, closeErr = w.newWatchClient(); closeErr != nil {

clientOpenC := make(chan openWatchClientResponse, 1)
go func() {
owc, err := w.openWatchClient()
clientOpenC <- openWatchClientResponse{wc: owc, err: err}
}()

for {
select {
case opc := <-clientOpenC:
wc = opc.wc
closeErr = opc.err
case ws := <-w.closingc:
w.closeSubstream(ws)
delete(closing, ws)
if len(w.substreams)+len(w.resuming) == 0 {
// no more watchers on this stream, shutdown
w.cancel()
return
}
continue
}
break
}

if closeErr != nil {
return
}
w.setupWatchClient(wc)
if ws := w.nextResume(); ws != nil {
wc.Send(ws.initReq.toPB())
}
Expand Down Expand Up @@ -631,12 +663,7 @@ func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{
// lazily send cancel message if events on missing id
}

func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
// connect to grpc stream
wc, err := w.openWatchClient()
if err != nil {
return nil, v3rpc.Error(err)
}
func (w *watchGrpcStream) setupWatchClient(wc pb.Watch_WatchClient) (pb.Watch_WatchClient, error) {
// mark all substreams as resuming
if len(w.substreams)+len(w.resuming) > 0 {
close(w.resumec)
Expand Down Expand Up @@ -673,7 +700,7 @@ func (w *watchGrpcStream) joinSubstreams() {
}

// openWatchClient retries opening a watchclient until retryConnection fails
func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) {
func (w *watchGrpcStream) openWatchClient() (wc pb.Watch_WatchClient, err error) {
for {
select {
case <-w.stopc:
Expand All @@ -683,14 +710,14 @@ func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error)
return nil, err
default:
}
if ws, err = w.remote.Watch(w.ctx, grpc.FailFast(false)); ws != nil && err == nil {
if wc, err = w.remote.Watch(w.ctx, grpc.FailFast(false)); wc != nil && err == nil {
break
}
if isHaltErr(w.ctx, err) {
return nil, v3rpc.Error(err)
}
}
return ws, nil
return wc, nil
}

// toPB converts an internal watch request structure to its protobuf messagefunc (wr *watchRequest)
Expand Down

0 comments on commit 0e61165

Please sign in to comment.