diff --git a/tests/e2e/cluster_downgrade_test.go b/tests/e2e/cluster_downgrade_test.go index 3234d21df3d0..0ae4e2afad2e 100644 --- a/tests/e2e/cluster_downgrade_test.go +++ b/tests/e2e/cluster_downgrade_test.go @@ -17,7 +17,6 @@ package e2e import ( "context" "fmt" - "math/rand" "strings" "testing" "time" @@ -29,7 +28,15 @@ import ( "go.etcd.io/etcd/tests/v3/framework/e2e" ) -func TestDowngradeUpgrade(t *testing.T) { +func TestDowngradeUpgradeClusterOf1(t *testing.T) { + testDowngradeUpgrade(t, 1) +} + +func TestDowngradeUpgradeClusterOf3(t *testing.T) { + testDowngradeUpgrade(t, 3) +} + +func testDowngradeUpgrade(t *testing.T, clusterSize int) { currentEtcdBinary := e2e.BinDir + "/etcd" lastReleaseBinary := e2e.BinDir + "/etcd-last-release" if !fileutil.Exist(lastReleaseBinary) { @@ -43,8 +50,10 @@ func TestDowngradeUpgrade(t *testing.T) { e2e.BeforeTest(t) t.Logf("Create cluster with version %s", currentVersionStr) - epc := newCluster(t, currentEtcdBinary) - validateVersion(t, epc, version.Versions{Cluster: currentVersionStr, Server: currentVersionStr}) + epc := newCluster(t, currentEtcdBinary, clusterSize) + for i := 0; i < len(epc.Procs); i++ { + validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{Cluster: currentVersionStr, Server: currentVersionStr}) + } t.Logf("Cluster created") t.Logf("etcdctl downgrade enable %s", lastVersion) @@ -53,30 +62,42 @@ func TestDowngradeUpgrade(t *testing.T) { triggerSnaphsot(t, epc) t.Log("Downgrade enabled, validating if cluster is ready for downgrade") - expectLog(t, epc, "The server is ready to downgrade") - validateVersion(t, epc, version.Versions{Cluster: lastVersionStr, Server: currentVersionStr}) + for i := 0; i < len(epc.Procs); i++ { + expectLog(t, epc.Procs[i], "The server is ready to downgrade") + validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{Cluster: lastVersionStr, Server: currentVersionStr}) + } t.Log("Cluster is ready for downgrade") t.Log("Starting downgrade process") - stopEtcd(t, epc.Procs[0]) - startEtcd(t, epc, lastReleaseBinary) - expectLog(t, epc, "the cluster has been downgraded") + for i := 0; i < len(epc.Procs); i++ { + t.Logf("Downgrading member %d", i) + stopEtcd(t, epc.Procs[i]) + startEtcd(t, epc.Procs[i], lastReleaseBinary) + } t.Log("All members downgraded, validating downgrade") - validateVersion(t, epc, version.Versions{Cluster: lastVersionStr, Server: lastVersionStr}) + expectLog(t, leader(t, epc), "the cluster has been downgraded") + for i := 0; i < len(epc.Procs); i++ { + validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{Cluster: lastVersionStr, Server: lastVersionStr}) + } t.Log("Downgrade complete") t.Log("Starting upgrade process") - stopEtcd(t, epc.Procs[0]) - startEtcd(t, epc, currentEtcdBinary) + for i := 0; i < len(epc.Procs); i++ { + t.Logf("Upgrading member %d", i) + stopEtcd(t, epc.Procs[i]) + startEtcd(t, epc.Procs[i], currentEtcdBinary) + } t.Log("All members upgraded, validating upgrade") - validateVersion(t, epc, version.Versions{Cluster: currentVersionStr, Server: currentVersionStr}) + for i := 0; i < len(epc.Procs); i++ { + validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{Cluster: currentVersionStr, Server: currentVersionStr}) + } t.Log("Upgrade complete") } -func newCluster(t *testing.T, execPath string) *e2e.EtcdProcessCluster { +func newCluster(t *testing.T, execPath string, clusterSize int) *e2e.EtcdProcessCluster { epc, err := e2e.NewEtcdProcessCluster(t, &e2e.EtcdProcessClusterConfig{ ExecPath: execPath, - ClusterSize: 1, + ClusterSize: clusterSize, InitialToken: "new", KeepDataDir: true, // TODO: REMOVE snapshot override when snapshotting is automated after lowering storage versiont l @@ -109,9 +130,9 @@ func triggerSnaphsot(t *testing.T, epc *e2e.EtcdProcessCluster) { } } -func startEtcd(t *testing.T, epc *e2e.EtcdProcessCluster, execPath string) { - epc.Procs[0].Config().ExecPath = execPath - err := epc.Procs[0].Restart() +func startEtcd(t *testing.T, ep e2e.EtcdProcess, execPath string) { + ep.Config().ExecPath = execPath + err := ep.Restart() if err != nil { t.Fatalf("could not start etcd process cluster (%v)", err) } @@ -140,20 +161,20 @@ func stopEtcd(t *testing.T, ep e2e.EtcdProcess) { } } -func validateVersion(t *testing.T, epc *e2e.EtcdProcessCluster, expect version.Versions) { +func validateVersion(t *testing.T, cfg *e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess, expect version.Versions) { // Two separate calls to expect as it doesn't support multiple matches on the same line var err error e2e.ExecuteWithTimeout(t, 20*time.Second, func() { for { if expect.Server != "" { - err = e2e.SpawnWithExpects(e2e.CURLPrefixArgs(epc.Cfg, epc.Procs[rand.Intn(epc.Cfg.ClusterSize)], "GET", e2e.CURLReq{Endpoint: "/version"}), nil, `"etcdserver":"`+expect.Server) + err = e2e.SpawnWithExpects(e2e.CURLPrefixArgs(cfg, member, "GET", e2e.CURLReq{Endpoint: "/version"}), nil, `"etcdserver":"`+expect.Server) if err != nil { time.Sleep(time.Second) continue } } if expect.Cluster != "" { - err = e2e.SpawnWithExpects(e2e.CURLPrefixArgs(epc.Cfg, epc.Procs[rand.Intn(epc.Cfg.ClusterSize)], "GET", e2e.CURLReq{Endpoint: "/version"}), nil, `"etcdcluster":"`+expect.Cluster) + err = e2e.SpawnWithExpects(e2e.CURLPrefixArgs(cfg, member, "GET", e2e.CURLReq{Endpoint: "/version"}), nil, `"etcdcluster":"`+expect.Cluster) if err != nil { time.Sleep(time.Second) continue @@ -167,13 +188,23 @@ func validateVersion(t *testing.T, epc *e2e.EtcdProcessCluster, expect version.V } } -func expectLog(t *testing.T, epc *e2e.EtcdProcessCluster, expectLog string) { +func expectLog(t *testing.T, ep e2e.EtcdProcess, expectLog string) { t.Helper() var err error e2e.ExecuteWithTimeout(t, 30*time.Second, func() { - _, err = epc.Procs[0].Logs().Expect(expectLog) + _, err = ep.Logs().Expect(expectLog) }) if err != nil { t.Fatal(err) } } + +func leader(t *testing.T, epc *e2e.EtcdProcessCluster) e2e.EtcdProcess { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + leader, err := epc.Leader(ctx) + cancel() + if err != nil { + t.Fatal(err) + } + return leader +} diff --git a/tests/framework/e2e/cluster.go b/tests/framework/e2e/cluster.go index 242ebe0d6a7d..cdb7b4adb378 100644 --- a/tests/framework/e2e/cluster.go +++ b/tests/framework/e2e/cluster.go @@ -15,6 +15,7 @@ package e2e import ( + "context" "fmt" "net/url" "os" @@ -23,6 +24,7 @@ import ( "testing" "time" + clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/server/v3/etcdserver" "go.uber.org/zap" "go.uber.org/zap/zaptest" @@ -491,3 +493,25 @@ func (epc *EtcdProcessCluster) WithStopSignal(sig os.Signal) (ret os.Signal) { } return ret } +func (epc *EtcdProcessCluster) Leader(ctx context.Context) (EtcdProcess, error) { + for i := 0; i < len(epc.Procs); i++ { + endpoints := epc.Procs[i].EndpointsV3() + cli, err := clientv3.New(clientv3.Config{ + Endpoints: endpoints, + DialTimeout: 3 * time.Second, + }) + if err != nil { + return nil, err + } + defer cli.Close() + resp, err := cli.Status(ctx, endpoints[0]) + if err != nil { + return nil, err + } + if resp.Header.GetMemberId() == resp.Leader { + return epc.Procs[i], nil + } + } + + return nil, fmt.Errorf("Leader not found") +}