diff --git a/clientv3/integration/black_hole_test.go b/clientv3/integration/black_hole_test.go index 1960c909c88..9d50a5fce27 100644 --- a/clientv3/integration/black_hole_test.go +++ b/clientv3/integration/black_hole_test.go @@ -17,6 +17,7 @@ package integration import ( + "bytes" "context" "testing" "time" @@ -26,6 +27,96 @@ import ( "github.com/coreos/etcd/pkg/testutil" ) +// TestBalancerUnderBlackholeWatch ensures that watch client +// switch its endpoints when the member of the pinned endpoint is blackholed. +func TestBalancerUnderBlackholeWatch(t *testing.T) { + defer testutil.AfterTest(t) + + clus := integration.NewClusterV3(t, &integration.ClusterConfig{ + Size: 2, + SkipCreatingClient: true, + }) + defer clus.Terminate(t) + + eps := []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr()} + + ccfg := clientv3.Config{ + Endpoints: []string{eps[0]}, + DialTimeout: 1 * time.Second, + DialKeepAliveTime: 1 * time.Second, + DialKeepAliveTimeout: 500 * time.Millisecond, + } + watchCli, err := clientv3.New(ccfg) + if err != nil { + t.Fatal(err) + } + defer watchCli.Close() + + // wait for eps[0] to be pinned + mustWaitPinReady(t, watchCli) + + // add all eps to list, so that when the original pined one fails + // the client can switch to other available eps + watchCli.SetEndpoints(eps...) + + key, val := "foo", "bar" + wch := watchCli.Watch(context.Background(), key, clientv3.WithCreatedNotify()) + select { + case <-wch: + case <-time.After(3 * time.Second): + t.Fatal("took too long to create watch") + } + + donec := make(chan struct{}) + go func() { + defer close(donec) + + // switch to others when eps[lead] is shut down + select { + case ev := <-wch: + if werr := ev.Err(); werr != nil { + t.Fatal(werr) + } + if len(ev.Events) != 1 { + t.Fatalf("expected one event, got %+v", ev) + } + if !bytes.Equal(ev.Events[0].Kv.Value, []byte(val)) { + t.Fatalf("expected %q, got %+v", val, ev.Events[0].Kv) + } + case <-time.After(7 * time.Second): + t.Fatal("took too long to receive events") + } + }() + + // blackhole eps[0] + clus.Members[0].Blackhole() + + // writes to eps[1] + putCli, err := clientv3.New(clientv3.Config{Endpoints: []string{eps[1]}}) + if err != nil { + t.Fatal(err) + } + defer putCli.Close() + for { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + _, err = putCli.Put(ctx, key, val) + cancel() + if err == nil { + break + } + if err == context.DeadlineExceeded { + continue + } + t.Fatal(err) + } + + select { + case <-donec: + case <-time.After(5 * time.Second): // enough time for balancer switch + t.Fatal("took too long to receive events") + } +} + func TestBalancerUnderBlackholeNoKeepAlivePut(t *testing.T) { testBalancerUnderBlackholeNoKeepAliveMutable(t, func(cli *clientv3.Client, ctx context.Context) error { _, err := cli.Put(ctx, "foo", "bar")