Skip to content

Commit

Permalink
server: Implement HA downgrade test
Browse files Browse the repository at this point in the history
  • Loading branch information
serathius committed Feb 14, 2022
1 parent 1b26356 commit c2b754c
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 23 deletions.
77 changes: 54 additions & 23 deletions tests/e2e/cluster_downgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package e2e
import (
"context"
"fmt"
"math/rand"
"strings"
"testing"
"time"
Expand All @@ -29,7 +28,15 @@ import (
"go.etcd.io/etcd/tests/v3/framework/e2e"
)

func TestDowngradeUpgrade(t *testing.T) {
func TestDowngradeUpgradeClusterOf1(t *testing.T) {
testDowngradeUpgrade(t, 1)
}

func TestDowngradeUpgradeClusterOf3(t *testing.T) {
testDowngradeUpgrade(t, 3)
}

func testDowngradeUpgrade(t *testing.T, clusterSize int) {
currentEtcdBinary := e2e.BinDir + "/etcd"
lastReleaseBinary := e2e.BinDir + "/etcd-last-release"
if !fileutil.Exist(lastReleaseBinary) {
Expand All @@ -43,8 +50,10 @@ func TestDowngradeUpgrade(t *testing.T) {
e2e.BeforeTest(t)

t.Logf("Create cluster with version %s", currentVersionStr)
epc := newCluster(t, currentEtcdBinary)
validateVersion(t, epc, version.Versions{Cluster: currentVersionStr, Server: currentVersionStr})
epc := newCluster(t, currentEtcdBinary, clusterSize)
for i := 0; i < len(epc.Procs); i++ {
validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{Cluster: currentVersionStr, Server: currentVersionStr})
}
t.Logf("Cluster created")

t.Logf("etcdctl downgrade enable %s", lastVersion)
Expand All @@ -53,30 +62,42 @@ func TestDowngradeUpgrade(t *testing.T) {
triggerSnaphsot(t, epc)

t.Log("Downgrade enabled, validating if cluster is ready for downgrade")
expectLog(t, epc, "The server is ready to downgrade")
validateVersion(t, epc, version.Versions{Cluster: lastVersionStr, Server: currentVersionStr})
for i := 0; i < len(epc.Procs); i++ {
expectLog(t, epc.Procs[i], "The server is ready to downgrade")
validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{Cluster: lastVersionStr, Server: currentVersionStr})
}
t.Log("Cluster is ready for downgrade")

t.Log("Starting downgrade process")
stopEtcd(t, epc.Procs[0])
startEtcd(t, epc, lastReleaseBinary)
expectLog(t, epc, "the cluster has been downgraded")
for i := 0; i < len(epc.Procs); i++ {
t.Logf("Downgrading member %d", i)
stopEtcd(t, epc.Procs[i])
startEtcd(t, epc.Procs[i], lastReleaseBinary)
}
t.Log("All members downgraded, validating downgrade")
validateVersion(t, epc, version.Versions{Cluster: lastVersionStr, Server: lastVersionStr})
expectLog(t, leader(t, epc), "the cluster has been downgraded")
for i := 0; i < len(epc.Procs); i++ {
validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{Cluster: lastVersionStr, Server: lastVersionStr})
}
t.Log("Downgrade complete")

t.Log("Starting upgrade process")
stopEtcd(t, epc.Procs[0])
startEtcd(t, epc, currentEtcdBinary)
for i := 0; i < len(epc.Procs); i++ {
t.Logf("Upgrading member %d", i)
stopEtcd(t, epc.Procs[i])
startEtcd(t, epc.Procs[i], currentEtcdBinary)
}
t.Log("All members upgraded, validating upgrade")
validateVersion(t, epc, version.Versions{Cluster: currentVersionStr, Server: currentVersionStr})
for i := 0; i < len(epc.Procs); i++ {
validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{Cluster: currentVersionStr, Server: currentVersionStr})
}
t.Log("Upgrade complete")
}

func newCluster(t *testing.T, execPath string) *e2e.EtcdProcessCluster {
func newCluster(t *testing.T, execPath string, clusterSize int) *e2e.EtcdProcessCluster {
epc, err := e2e.NewEtcdProcessCluster(t, &e2e.EtcdProcessClusterConfig{
ExecPath: execPath,
ClusterSize: 1,
ClusterSize: clusterSize,
InitialToken: "new",
KeepDataDir: true,
// TODO: REMOVE snapshot override when snapshotting is automated after lowering storage versiont l
Expand Down Expand Up @@ -109,9 +130,9 @@ func triggerSnaphsot(t *testing.T, epc *e2e.EtcdProcessCluster) {
}
}

func startEtcd(t *testing.T, epc *e2e.EtcdProcessCluster, execPath string) {
epc.Procs[0].Config().ExecPath = execPath
err := epc.Procs[0].Restart()
func startEtcd(t *testing.T, ep e2e.EtcdProcess, execPath string) {
ep.Config().ExecPath = execPath
err := ep.Restart()
if err != nil {
t.Fatalf("could not start etcd process cluster (%v)", err)
}
Expand Down Expand Up @@ -140,20 +161,20 @@ func stopEtcd(t *testing.T, ep e2e.EtcdProcess) {
}
}

func validateVersion(t *testing.T, epc *e2e.EtcdProcessCluster, expect version.Versions) {
func validateVersion(t *testing.T, cfg *e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess, expect version.Versions) {
// Two separate calls to expect as it doesn't support multiple matches on the same line
var err error
e2e.ExecuteWithTimeout(t, 20*time.Second, func() {
for {
if expect.Server != "" {
err = e2e.SpawnWithExpects(e2e.CURLPrefixArgs(epc.Cfg, epc.Procs[rand.Intn(epc.Cfg.ClusterSize)], "GET", e2e.CURLReq{Endpoint: "/version"}), nil, `"etcdserver":"`+expect.Server)
err = e2e.SpawnWithExpects(e2e.CURLPrefixArgs(cfg, member, "GET", e2e.CURLReq{Endpoint: "/version"}), nil, `"etcdserver":"`+expect.Server)
if err != nil {
time.Sleep(time.Second)
continue
}
}
if expect.Cluster != "" {
err = e2e.SpawnWithExpects(e2e.CURLPrefixArgs(epc.Cfg, epc.Procs[rand.Intn(epc.Cfg.ClusterSize)], "GET", e2e.CURLReq{Endpoint: "/version"}), nil, `"etcdcluster":"`+expect.Cluster)
err = e2e.SpawnWithExpects(e2e.CURLPrefixArgs(cfg, member, "GET", e2e.CURLReq{Endpoint: "/version"}), nil, `"etcdcluster":"`+expect.Cluster)
if err != nil {
time.Sleep(time.Second)
continue
Expand All @@ -167,13 +188,23 @@ func validateVersion(t *testing.T, epc *e2e.EtcdProcessCluster, expect version.V
}
}

func expectLog(t *testing.T, epc *e2e.EtcdProcessCluster, expectLog string) {
func expectLog(t *testing.T, ep e2e.EtcdProcess, expectLog string) {
t.Helper()
var err error
e2e.ExecuteWithTimeout(t, 30*time.Second, func() {
_, err = epc.Procs[0].Logs().Expect(expectLog)
_, err = ep.Logs().Expect(expectLog)
})
if err != nil {
t.Fatal(err)
}
}

func leader(t *testing.T, epc *e2e.EtcdProcessCluster) e2e.EtcdProcess {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
leader, err := epc.Leader(ctx)
cancel()
if err != nil {
t.Fatal(err)
}
return leader
}
24 changes: 24 additions & 0 deletions tests/framework/e2e/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package e2e

import (
"context"
"fmt"
"net/url"
"os"
Expand All @@ -23,6 +24,7 @@ import (
"testing"
"time"

clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
Expand Down Expand Up @@ -491,3 +493,25 @@ func (epc *EtcdProcessCluster) WithStopSignal(sig os.Signal) (ret os.Signal) {
}
return ret
}
func (epc *EtcdProcessCluster) Leader(ctx context.Context) (EtcdProcess, error) {
for i := 0; i < len(epc.Procs); i++ {
endpoints := epc.Procs[i].EndpointsV3()
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: 3 * time.Second,
})
if err != nil {
return nil, err
}
defer cli.Close()
resp, err := cli.Status(ctx, endpoints[0])
if err != nil {
return nil, err
}
if resp.Header.GetMemberId() == resp.Leader {
return epc.Procs[i], nil
}
}

return nil, fmt.Errorf("Leader not found")
}

0 comments on commit c2b754c

Please sign in to comment.