Skip to content

Commit

Permalink
Merge pull request etcd-io#13 from jingyih/filter_rpc_request_to_learner
Browse files Browse the repository at this point in the history
integration: add TestKVForLearner
  • Loading branch information
jingyih authored Apr 4, 2019
2 parents 04d19b5 + 1b5d4a0 commit daccd92
Show file tree
Hide file tree
Showing 3 changed files with 200 additions and 21 deletions.
25 changes: 4 additions & 21 deletions clientv3/integration/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package integration

import (
"context"
"fmt"
"reflect"
"testing"

Expand Down Expand Up @@ -180,27 +179,11 @@ func TestMemberAddForLearner(t *testing.T) {
t.Errorf("Added a member with IsLearner = %v, got %v", isLearner, resp.Member.IsLearner)
}

numOfLearners, err := getNumberOfLearners(clus)
learners, err := clus.GetLearnerMembers()
if err != nil {
t.Fatalf("failed to get the number of learners in cluster: %v", err)
t.Fatalf("failed to get the learner members in cluster: %v", err)
}
if numOfLearners != 1 {
t.Errorf("Added 1 learner node to cluster, got %d", numOfLearners)
if len(learners) != 1 {
t.Errorf("Added 1 learner node to cluster, got %d", len(learners))
}
}

// getNumberOfLearners return the number of learner nodes in cluster using MemberList API
func getNumberOfLearners(clus *integration.ClusterV3) (int, error) {
cli := clus.RandClient()
resp, err := cli.MemberList(context.Background())
if err != nil {
return 0, fmt.Errorf("failed to list member %v", err)
}
numberOfLearners := 0
for _, m := range resp.Members {
if m.IsLearner {
numberOfLearners++
}
}
return numberOfLearners, nil
}
76 changes: 76 additions & 0 deletions clientv3/integration/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -971,3 +971,79 @@ func TestKVLargeRequests(t *testing.T) {
clus.Terminate(t)
}
}

// TestKVForLearner ensures learner member only accepts serializable read request.
func TestKVForLearner(t *testing.T) {
defer testutil.AfterTest(t)

clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)

// we have to add and launch learner member after initial cluster was created, because
// bootstrapping a cluster with learner member is not supported.
clus.AddAndLaunchLearnerMember(t)

learners, err := clus.GetLearnerMembers()
if err != nil {
t.Fatalf("failed to get the learner members in cluster: %v", err)
}
if len(learners) != 1 {
t.Fatalf("added 1 learner to cluster, got %d", len(learners))
}

if len(clus.Members) != 4 {
t.Fatalf("expecting 4 members in cluster after adding the learner member, got %d", len(clus.Members))
}
// note:
// 1. clus.Members[3] is the newly added learner member, which was appended to clus.Members
// 2. we are using member's grpcAddr instead of clientURLs as the endpoint for clientv3.Config,
// because the implementation of integration test has diverged from embed/etcd.go.
learnerEp := clus.Members[3].GRPCAddr()
cfg := clientv3.Config{
Endpoints: []string{learnerEp},
DialTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{grpc.WithBlock()},
}
// this cli only has endpoint of the learner member
cli, err := clientv3.New(cfg)
if err != nil {
t.Fatalf("failed to create clientv3: %v", err)
}
defer cli.Close()

tests := []struct {
op clientv3.Op
wErr bool
}{
{
op: clientv3.OpGet("foo", clientv3.WithSerializable()),
wErr: false,
},
{
op: clientv3.OpGet("foo"),
wErr: true,
},
{
op: clientv3.OpPut("foo", "bar"),
wErr: true,
},
{
op: clientv3.OpDelete("foo"),
wErr: true,
},
{
op: clientv3.OpTxn([]clientv3.Cmp{clientv3.Compare(clientv3.CreateRevision("foo"), "=", 0)}, nil, nil),
wErr: true,
},
}

for idx, test := range tests {
_, err := cli.Do(context.TODO(), test.op)
if err != nil && !test.wErr {
t.Errorf("%d: expect no error, got %v", idx, err)
}
if err == nil && test.wErr {
t.Errorf("%d: expect error, got nil", idx)
}
}
}
120 changes: 120 additions & 0 deletions integration/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,8 @@ type member struct {
clientMaxCallSendMsgSize int
clientMaxCallRecvMsgSize int
useIP bool

isLearner bool
}

func (m *member) GRPCAddr() string { return m.grpcAddr }
Expand Down Expand Up @@ -1272,3 +1274,121 @@ type grpcAPI struct {
// Election is the election API for the client's connection.
Election epb.ElectionClient
}

// GetLearnerMembers returns the list of learner members in cluster using MemberList API.
func (c *ClusterV3) GetLearnerMembers() ([]*pb.Member, error) {
cli := c.Client(0)
resp, err := cli.MemberList(context.Background())
if err != nil {
return nil, fmt.Errorf("failed to list member %v", err)
}
var learners []*pb.Member
for _, m := range resp.Members {
if m.IsLearner {
learners = append(learners, m)
}
}
return learners, nil
}

// AddAndLaunchLearnerMember creates a leaner member, adds it to cluster
// via v3 MemberAdd API, and then launches the new member.
func (c *ClusterV3) AddAndLaunchLearnerMember(t testing.TB) {
m := c.mustNewMember(t)
m.isLearner = true

scheme := schemeFromTLSInfo(c.cfg.PeerTLS)
peerURLs := []string{scheme + "://" + m.PeerListeners[0].Addr().String()}

cli := c.Client(0)
_, err := cli.MemberAdd(context.Background(), peerURLs, m.isLearner)
if err != nil {
t.Fatalf("failed to add learner member %v", err)
}

m.InitialPeerURLsMap = types.URLsMap{}
for _, mm := range c.Members {
m.InitialPeerURLsMap[mm.Name] = mm.PeerURLs
}
m.InitialPeerURLsMap[m.Name] = m.PeerURLs
m.NewCluster = false

if err := m.Launch(); err != nil {
t.Fatal(err)
}

c.Members = append(c.Members, m)

c.waitMembersMatch(t)
}

// getMembers returns a list of members in cluster, in format of etcdserverpb.Member
func (c *ClusterV3) getMembers() []*pb.Member {
var mems []*pb.Member
for _, m := range c.Members {
mem := &pb.Member{
Name: m.Name,
PeerURLs: m.PeerURLs.StringSlice(),
ClientURLs: m.ClientURLs.StringSlice(),
IsLearner: m.isLearner,
}
mems = append(mems, mem)
}
return mems
}

// waitMembersMatch waits until v3rpc MemberList returns the 'same' members info as the
// local 'c.Members', which is the local recording of members in the testing cluster. With
// the exception that the local recording c.Members does not have info on Member.ID, which
// is generated when the member is been added to cluster.
//
// Note:
// A successful match means the Member.clientURLs are matched. This means member has already
// finished publishing its server attributes to cluster. Publishing attributes is a cluster-wide
// write request (in v2 server). Therefore, at this point, any raft log entries prior to this
// would have already been applied.
//
// If a new member was added to an existing cluster, at this point, it has finished publishing
// its own server attributes to the cluster. And therefore by the same argument, it has already
// applied the raft log entries (especially those of type raftpb.ConfChangeType). At this point,
// the new member has the correct view of the cluster configuration.
//
// Special note on learner member:
// Learner member is only added to a cluster via v3rpc MemberAdd API (as of v3.4). When starting
// the learner member, its initial view of the cluster created by peerURLs map does not have info
// on whether or not the new member itself is learner. But at this point, a successful match does
// indicate that the new learner member has applied the raftpb.ConfChangeAddLearnerNode entry
// which was used to add the learner itself to the cluster, and therefore it has the correct info
// on learner.
func (c *ClusterV3) waitMembersMatch(t testing.TB) {
wMembers := c.getMembers()
sort.Sort(SortableProtoMemberSliceByPeerURLs(wMembers))
cli := c.Client(0)
for {
resp, err := cli.MemberList(context.Background())
if err != nil {
t.Fatalf("failed to list member %v", err)
}

if len(resp.Members) != len(wMembers) {
continue
}
sort.Sort(SortableProtoMemberSliceByPeerURLs(resp.Members))
for _, m := range resp.Members {
m.ID = 0
}
if reflect.DeepEqual(resp.Members, wMembers) {
return
}

time.Sleep(tickDuration)
}
}

type SortableProtoMemberSliceByPeerURLs []*pb.Member

func (p SortableProtoMemberSliceByPeerURLs) Len() int { return len(p) }
func (p SortableProtoMemberSliceByPeerURLs) Less(i, j int) bool {
return p[i].PeerURLs[0] < p[j].PeerURLs[0]
}
func (p SortableProtoMemberSliceByPeerURLs) Swap(i, j int) { p[i], p[j] = p[j], p[i] }

0 comments on commit daccd92

Please sign in to comment.