Skip to content

Commit

Permalink
integration: test canceling watchers when disconnected
Browse files Browse the repository at this point in the history
  • Loading branch information
Anthony Romano committed Sep 26, 2016
1 parent 7d9355f commit 67285bd
Showing 1 changed file with 28 additions and 10 deletions.
38 changes: 28 additions & 10 deletions clientv3/integration/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -770,15 +770,22 @@ func TestWatchCancelOnServer(t *testing.T) {
// 4. watcher client finishes tearing down stream on "ctx"
// 5. w2 comes back canceled
func TestWatchOverlapContextCancel(t *testing.T) {
f := func(clus *integration.ClusterV3) {}
testWatchOverlapContextCancel(t, f)
}

func TestWatchOverlapDropConnContextCancel(t *testing.T) {
f := func(clus *integration.ClusterV3) {
clus.Members[0].DropConnections()
}
testWatchOverlapContextCancel(t, f)
}

func testWatchOverlapContextCancel(t *testing.T, f func(*integration.ClusterV3)) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)

cli := clus.RandClient()
if _, err := cli.Put(context.TODO(), "abc", "def"); err != nil {
t.Fatal(err)
}

// each unique context "%v" has a unique grpc stream
n := 100
ctxs, ctxc := make([]context.Context, 5), make([]chan struct{}, 5)
Expand All @@ -788,17 +795,28 @@ func TestWatchOverlapContextCancel(t *testing.T) {
// limits the maximum number of outstanding watchers per stream
ctxc[i] = make(chan struct{}, 2)
}

// issue concurrent watches on "abc" with cancel
cli := clus.RandClient()
if _, err := cli.Put(context.TODO(), "abc", "def"); err != nil {
t.Fatal(err)
}
ch := make(chan struct{}, n)
// issue concurrent watches with cancel
for i := 0; i < n; i++ {
go func() {
defer func() { ch <- struct{}{} }()
idx := rand.Intn(len(ctxs))
ctx, cancel := context.WithCancel(ctxs[idx])
ctxc[idx] <- struct{}{}
ch := cli.Watch(ctx, "abc", clientv3.WithRev(1))
if _, ok := <-ch; !ok {
t.Fatalf("unexpected closed channel")
wch := cli.Watch(ctx, "abc", clientv3.WithRev(1))
f(clus)
select {
case _, ok := <-wch:
if !ok {
t.Fatalf("unexpected closed channel %p", wch)
}
case <-time.After(time.Second):
t.Fatalf("timed out waiting for watch on %p", wch)
}
// randomize how cancel overlaps with watch creation
if rand.Intn(2) == 0 {
Expand All @@ -814,7 +832,7 @@ func TestWatchOverlapContextCancel(t *testing.T) {
for i := 0; i < n; i++ {
select {
case <-ch:
case <-time.After(time.Second):
case <-time.After(5 * time.Second):
t.Fatalf("timed out waiting for completed watch")
}
}
Expand Down

0 comments on commit 67285bd

Please sign in to comment.