diff --git a/clientv3/integration/watch_test.go b/clientv3/integration/watch_test.go index 395452c369a0..02cc5d1e494e 100644 --- a/clientv3/integration/watch_test.go +++ b/clientv3/integration/watch_test.go @@ -770,15 +770,22 @@ func TestWatchCancelOnServer(t *testing.T) { // 4. watcher client finishes tearing down stream on "ctx" // 5. w2 comes back canceled func TestWatchOverlapContextCancel(t *testing.T) { + f := func(clus *integration.ClusterV3) {} + testWatchOverlapContextCancel(t, f) +} + +func TestWatchOverlapDropConnContextCancel(t *testing.T) { + f := func(clus *integration.ClusterV3) { + clus.Members[0].DropConnections() + } + testWatchOverlapContextCancel(t, f) +} + +func testWatchOverlapContextCancel(t *testing.T, f func(*integration.ClusterV3)) { defer testutil.AfterTest(t) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer clus.Terminate(t) - cli := clus.RandClient() - if _, err := cli.Put(context.TODO(), "abc", "def"); err != nil { - t.Fatal(err) - } - // each unique context "%v" has a unique grpc stream n := 100 ctxs, ctxc := make([]context.Context, 5), make([]chan struct{}, 5) @@ -788,17 +795,28 @@ func TestWatchOverlapContextCancel(t *testing.T) { // limits the maximum number of outstanding watchers per stream ctxc[i] = make(chan struct{}, 2) } + + // issue concurrent watches on "abc" with cancel + cli := clus.RandClient() + if _, err := cli.Put(context.TODO(), "abc", "def"); err != nil { + t.Fatal(err) + } ch := make(chan struct{}, n) - // issue concurrent watches with cancel for i := 0; i < n; i++ { go func() { defer func() { ch <- struct{}{} }() idx := rand.Intn(len(ctxs)) ctx, cancel := context.WithCancel(ctxs[idx]) ctxc[idx] <- struct{}{} - ch := cli.Watch(ctx, "abc", clientv3.WithRev(1)) - if _, ok := <-ch; !ok { - t.Fatalf("unexpected closed channel") + wch := cli.Watch(ctx, "abc", clientv3.WithRev(1)) + f(clus) + select { + case _, ok := <-wch: + if !ok { + t.Fatalf("unexpected closed channel %p", wch) + } + case <-time.After(time.Second): + t.Fatalf("timed out waiting for watch on %p", wch) } // randomize how cancel overlaps with watch creation if rand.Intn(2) == 0 { @@ -814,7 +832,7 @@ func TestWatchOverlapContextCancel(t *testing.T) { for i := 0; i < n; i++ { select { case <-ch: - case <-time.After(time.Second): + case <-time.After(5 * time.Second): t.Fatalf("timed out waiting for completed watch") } }