diff --git a/clientv3/integration/cluster_test.go b/clientv3/integration/cluster_test.go index 622087cbfe2..0dcfbde666a 100644 --- a/clientv3/integration/cluster_test.go +++ b/clientv3/integration/cluster_test.go @@ -16,7 +16,6 @@ package integration import ( "context" - "fmt" "reflect" "testing" @@ -180,27 +179,11 @@ func TestMemberAddForLearner(t *testing.T) { t.Errorf("Added a member with IsLearner = %v, got %v", isLearner, resp.Member.IsLearner) } - numOfLearners, err := getNumberOfLearners(clus) + learners, err := clus.GetLearnerMembers() if err != nil { - t.Fatalf("failed to get the number of learners in cluster: %v", err) + t.Fatalf("failed to get the learner members in cluster: %v", err) } - if numOfLearners != 1 { - t.Errorf("Added 1 learner node to cluster, got %d", numOfLearners) + if len(learners) != 1 { + t.Errorf("Added 1 learner node to cluster, got %d", len(learners)) } } - -// getNumberOfLearners return the number of learner nodes in cluster using MemberList API -func getNumberOfLearners(clus *integration.ClusterV3) (int, error) { - cli := clus.RandClient() - resp, err := cli.MemberList(context.Background()) - if err != nil { - return 0, fmt.Errorf("failed to list member %v", err) - } - numberOfLearners := 0 - for _, m := range resp.Members { - if m.IsLearner { - numberOfLearners++ - } - } - return numberOfLearners, nil -} diff --git a/clientv3/integration/kv_test.go b/clientv3/integration/kv_test.go index 636bcd27a96..2ab76db89a6 100644 --- a/clientv3/integration/kv_test.go +++ b/clientv3/integration/kv_test.go @@ -971,3 +971,79 @@ func TestKVLargeRequests(t *testing.T) { clus.Terminate(t) } } + +// TestKVForLearner ensures learner member only accepts serializable read request. +func TestKVForLearner(t *testing.T) { + defer testutil.AfterTest(t) + + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + // we have to add and launch learner member after initial cluster was created, because + // bootstrapping a cluster with learner member is not supported. + clus.AddAndLaunchLearnerMember(t) + + learners, err := clus.GetLearnerMembers() + if err != nil { + t.Fatalf("failed to get the learner members in cluster: %v", err) + } + if len(learners) != 1 { + t.Fatalf("added 1 learner to cluster, got %d", len(learners)) + } + + if len(clus.Members) != 4 { + t.Fatalf("expecting 4 members in cluster after adding the learner member, got %d", len(clus.Members)) + } + // note: + // 1. clus.Members[3] is the newly added learner member, which was appended to clus.Members + // 2. we are using member's grpcAddr instead of clientURLs as the endpoint for clientv3.Config, + // because the implementation of integration test has diverged from embed/etcd.go. + learnerEp := clus.Members[3].GRPCAddr() + cfg := clientv3.Config{ + Endpoints: []string{learnerEp}, + DialTimeout: 5 * time.Second, + DialOptions: []grpc.DialOption{grpc.WithBlock()}, + } + // this cli only has endpoint of the learner member + cli, err := clientv3.New(cfg) + if err != nil { + t.Fatalf("failed to create clientv3: %v", err) + } + defer cli.Close() + + tests := []struct { + op clientv3.Op + wErr bool + }{ + { + op: clientv3.OpGet("foo", clientv3.WithSerializable()), + wErr: false, + }, + { + op: clientv3.OpGet("foo"), + wErr: true, + }, + { + op: clientv3.OpPut("foo", "bar"), + wErr: true, + }, + { + op: clientv3.OpDelete("foo"), + wErr: true, + }, + { + op: clientv3.OpTxn([]clientv3.Cmp{clientv3.Compare(clientv3.CreateRevision("foo"), "=", 0)}, nil, nil), + wErr: true, + }, + } + + for idx, test := range tests { + _, err := cli.Do(context.TODO(), test.op) + if err != nil && !test.wErr { + t.Errorf("%d: expect no error, got %v", idx, err) + } + if err == nil && test.wErr { + t.Errorf("%d: expect error, got nil", idx) + } + } +} diff --git a/integration/cluster.go b/integration/cluster.go index 5585250e0ae..8a8c40b1d95 100644 --- a/integration/cluster.go +++ b/integration/cluster.go @@ -559,6 +559,8 @@ type member struct { clientMaxCallSendMsgSize int clientMaxCallRecvMsgSize int useIP bool + + isLearner bool } func (m *member) GRPCAddr() string { return m.grpcAddr } @@ -1272,3 +1274,121 @@ type grpcAPI struct { // Election is the election API for the client's connection. Election epb.ElectionClient } + +// GetLearnerMembers returns the list of learner members in cluster using MemberList API. +func (c *ClusterV3) GetLearnerMembers() ([]*pb.Member, error) { + cli := c.Client(0) + resp, err := cli.MemberList(context.Background()) + if err != nil { + return nil, fmt.Errorf("failed to list member %v", err) + } + var learners []*pb.Member + for _, m := range resp.Members { + if m.IsLearner { + learners = append(learners, m) + } + } + return learners, nil +} + +// AddAndLaunchLearnerMember creates a leaner member, adds it to cluster +// via v3 MemberAdd API, and then launches the new member. +func (c *ClusterV3) AddAndLaunchLearnerMember(t testing.TB) { + m := c.mustNewMember(t) + m.isLearner = true + + scheme := schemeFromTLSInfo(c.cfg.PeerTLS) + peerURLs := []string{scheme + "://" + m.PeerListeners[0].Addr().String()} + + cli := c.Client(0) + _, err := cli.MemberAdd(context.Background(), peerURLs, m.isLearner) + if err != nil { + t.Fatalf("failed to add learner member %v", err) + } + + m.InitialPeerURLsMap = types.URLsMap{} + for _, mm := range c.Members { + m.InitialPeerURLsMap[mm.Name] = mm.PeerURLs + } + m.InitialPeerURLsMap[m.Name] = m.PeerURLs + m.NewCluster = false + + if err := m.Launch(); err != nil { + t.Fatal(err) + } + + c.Members = append(c.Members, m) + + c.waitMembersMatch(t) +} + +// getMembers returns a list of members in cluster, in format of etcdserverpb.Member +func (c *ClusterV3) getMembers() []*pb.Member { + var mems []*pb.Member + for _, m := range c.Members { + mem := &pb.Member{ + Name: m.Name, + PeerURLs: m.PeerURLs.StringSlice(), + ClientURLs: m.ClientURLs.StringSlice(), + IsLearner: m.isLearner, + } + mems = append(mems, mem) + } + return mems +} + +// waitMembersMatch waits until v3rpc MemberList returns the 'same' members info as the +// local 'c.Members', which is the local recording of members in the testing cluster. With +// the exception that the local recording c.Members does not have info on Member.ID, which +// is generated when the member is been added to cluster. +// +// Note: +// A successful match means the Member.clientURLs are matched. This means member has already +// finished publishing its server attributes to cluster. Publishing attributes is a cluster-wide +// write request (in v2 server). Therefore, at this point, any raft log entries prior to this +// would have already been applied. +// +// If a new member was added to an existing cluster, at this point, it has finished publishing +// its own server attributes to the cluster. And therefore by the same argument, it has already +// applied the raft log entries (especially those of type raftpb.ConfChangeType). At this point, +// the new member has the correct view of the cluster configuration. +// +// Special note on learner member: +// Learner member is only added to a cluster via v3rpc MemberAdd API (as of v3.4). When starting +// the learner member, its initial view of the cluster created by peerURLs map does not have info +// on whether or not the new member itself is learner. But at this point, a successful match does +// indicate that the new learner member has applied the raftpb.ConfChangeAddLearnerNode entry +// which was used to add the learner itself to the cluster, and therefore it has the correct info +// on learner. +func (c *ClusterV3) waitMembersMatch(t testing.TB) { + wMembers := c.getMembers() + sort.Sort(SortableProtoMemberSliceByPeerURLs(wMembers)) + cli := c.Client(0) + for { + resp, err := cli.MemberList(context.Background()) + if err != nil { + t.Fatalf("failed to list member %v", err) + } + + if len(resp.Members) != len(wMembers) { + continue + } + sort.Sort(SortableProtoMemberSliceByPeerURLs(resp.Members)) + for _, m := range resp.Members { + m.ID = 0 + } + if reflect.DeepEqual(resp.Members, wMembers) { + return + } + + time.Sleep(tickDuration) + } +} + +type SortableProtoMemberSliceByPeerURLs []*pb.Member + +func (p SortableProtoMemberSliceByPeerURLs) Len() int { return len(p) } +func (p SortableProtoMemberSliceByPeerURLs) Less(i, j int) bool { + return p[i].PeerURLs[0] < p[j].PeerURLs[0] +} +func (p SortableProtoMemberSliceByPeerURLs) Swap(i, j int) { p[i], p[j] = p[j], p[i] }