Skip to content

Commit

Permalink
test: deflake TestDowngradeUpgradeClusterOf3 timeout
Browse files Browse the repository at this point in the history
In the TestDowngradeUpgradeCluster case, the brand-new cluster is using
simple-config-changer, which means that entries has been committed
before leader election and these entries will be applied when etcdserver
starts to receive apply-requests. The simple-config-changer will mark
the `confState` dirty and the storage backend precommit hook will update
the `confState`.

For the new cluster, the storage version is nil at the beginning. And
it will be v3.5 if the `confState` record has been committed. And it
will be >v3.5 if the `storageVersion` record has been committed.

When the new cluster is ready, the leader will set init cluster version
with v3.6.x. And then it will trigger the `monitorStorageVersion` to
update the `storageVersion` to v3.6.x. If the `confState` record has
been updated before cluster version update, we will get storageVersion
record.

If the storage backend doesn't commit in time, the
`monitorStorageVersion` won't update the version because of `cannot
detect storage schema version: missing confstate information`.

And then we file the downgrade request before next round of
`monitorStorageVersion`(per 4 second), the cluster version will be
v3.5.0 which is equal to the `UnsafeDetectSchemaVersion`'s result.
And we won't see that `The server is ready to downgrade`.

It is easy to reproduce the issue if you use cpuset or taskset to limit
in two cpus.

So, we should wait for the new cluster's storage ready before downgrade
request.

Fixes: etcd-io#14540

Signed-off-by: Wei Fu <[email protected]>
  • Loading branch information
fuweid committed Oct 30, 2022
1 parent 0a19ee7 commit d40c2a0
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 32 deletions.
98 changes: 66 additions & 32 deletions tests/e2e/cluster_downgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package e2e

import (
"context"
"encoding/json"
"fmt"
"strings"
"testing"
"time"

Expand All @@ -27,6 +29,7 @@ import (
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/tests/v3/framework/e2e"
"go.etcd.io/etcd/tests/v3/framework/testutils"
"go.uber.org/zap"
)

func TestDowngradeUpgradeClusterOf1(t *testing.T) {
Expand All @@ -45,15 +48,19 @@ func testDowngradeUpgrade(t *testing.T, clusterSize int) {
}
currentVersion := semver.New(version.Version)
lastVersion := semver.Version{Major: currentVersion.Major, Minor: currentVersion.Minor - 1}
currentVersionStr := fmt.Sprintf("%d.%d", currentVersion.Major, currentVersion.Minor)
lastVersionStr := fmt.Sprintf("%d.%d", lastVersion.Major, lastVersion.Minor)
currentVersionStr := fmt.Sprintf("%d.%d.%d", currentVersion.Major, currentVersion.Minor, currentVersion.Patch)
lastVersionStr := fmt.Sprintf("%d.%d.0", lastVersion.Major, lastVersion.Minor)

e2e.BeforeTest(t)

t.Logf("Create cluster with version %s", currentVersionStr)
epc := newCluster(t, currentEtcdBinary, clusterSize)
for i := 0; i < len(epc.Procs); i++ {
validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{Cluster: currentVersionStr, Server: currentVersionStr})
validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{
Cluster: currentVersionStr,
Server: version.Version,
Storage: currentVersionStr,
})
}
t.Logf("Cluster created")

Expand All @@ -62,36 +69,50 @@ func testDowngradeUpgrade(t *testing.T, clusterSize int) {

t.Log("Downgrade enabled, validating if cluster is ready for downgrade")
for i := 0; i < len(epc.Procs); i++ {
e2e.AssertProcessLogs(t, epc.Procs[i], "The server is ready to downgrade")
validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{Cluster: lastVersionStr, Server: currentVersionStr})
validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{
Cluster: lastVersionStr,
Server: version.Version,
Storage: lastVersionStr,
})
}
t.Log("Cluster is ready for downgrade")

t.Log("Cluster is ready for downgrade")
t.Logf("Starting downgrade process to %q", lastVersionStr)
for i := 0; i < len(epc.Procs); i++ {
t.Logf("Downgrading member %d by running %s binary", i, lastReleaseBinary)
stopEtcd(t, epc.Procs[i])
startEtcd(t, epc.Procs[i], lastReleaseBinary)
}

t.Log("All members downgraded, validating downgrade")
e2e.AssertProcessLogs(t, leader(t, epc), "the cluster has been downgraded")
for i := 0; i < len(epc.Procs); i++ {
validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{Cluster: lastVersionStr, Server: lastVersionStr})
validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{
Cluster: lastVersionStr,
Server: lastVersionStr,
// NOTE: The release 3.5's /version handler doesn't return storage field.
// Do not check the storage version here.
})
}
t.Log("Downgrade complete")

t.Log("Downgrade complete")
t.Logf("Starting upgrade process to %q", currentVersionStr)
for i := 0; i < len(epc.Procs); i++ {
t.Logf("Upgrading member %d", i)
stopEtcd(t, epc.Procs[i])
startEtcd(t, epc.Procs[i], currentEtcdBinary)
if i+1 < len(epc.Procs) {
validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{Cluster: lastVersionStr, Server: currentVersionStr})
}
// NOTE: The leader has monitor to the cluster version, which will
// update cluster version. We don't need to check the temporary
// version just in case that it might be flaky.
}

t.Log("All members upgraded, validating upgrade")
for i := 0; i < len(epc.Procs); i++ {
validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{Cluster: currentVersionStr, Server: currentVersionStr})
validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{
Cluster: currentVersionStr,
Server: version.Version,
Storage: currentVersionStr,
})
}
t.Log("Upgrade complete")
}
Expand Down Expand Up @@ -140,30 +161,43 @@ func stopEtcd(t *testing.T, ep e2e.EtcdProcess) {
}

func validateVersion(t *testing.T, cfg *e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess, expect version.Versions) {
// Two separate calls to expect as it doesn't support multiple matches on the same line
var err error
testutils.ExecuteWithTimeout(t, 20*time.Second, func() {
var verifyFn = func(lines []string) error {
var data = strings.Join(lines, "\n")
var result = version.Versions{}

if err := json.Unmarshal([]byte(data), &result); err != nil {
return fmt.Errorf("failed to unmarshal %v into %v: %w", data, result, err)
}

if expect.Server != "" && expect.Server != result.Server {
return fmt.Errorf("expect etcdserver version %v, but got %v (%v)", expect.Server, result.Server, data)
}

if expect.Cluster != "" && expect.Cluster != result.Cluster {
return fmt.Errorf("expect etcdcluster version %v, but got %v (%v)", expect.Cluster, result.Cluster, data)
}

if expect.Storage != "" && expect.Storage != result.Storage {
return fmt.Errorf("expect storage version %v, but got %v (%v)", expect.Storage, result.Storage, data)
}
return nil
}

var args = e2e.CURLPrefixArgs(cfg, member, "GET", e2e.CURLReq{Endpoint: "/version"})
testutils.ExecuteWithTimeout(t, 30*time.Second, func() {
for {
if expect.Server != "" {
err = e2e.SpawnWithExpects(e2e.CURLPrefixArgs(cfg, member, "GET", e2e.CURLReq{Endpoint: "/version"}), nil, `"etcdserver":"`+expect.Server)
if err != nil {
time.Sleep(time.Second)
continue
}
}
if expect.Cluster != "" {
err = e2e.SpawnWithExpects(e2e.CURLPrefixArgs(cfg, member, "GET", e2e.CURLReq{Endpoint: "/version"}), nil, `"etcdcluster":"`+expect.Cluster)
if err != nil {
time.Sleep(time.Second)
continue
}
err := e2e.SpawnWithExpectResult(args, nil, verifyFn)
if err == nil {
break
}
break

cfg.Logger.Warn("failed to verify version",
zap.Error(err),
zap.String("commandline", strings.Join(args, "_")),
)
time.Sleep(time.Second)
}
})
if err != nil {
t.Fatal(err)
}
}

func leader(t *testing.T, epc *e2e.EtcdProcessCluster) e2e.EtcdProcess {
Expand Down
19 changes: 19 additions & 0 deletions tests/framework/e2e/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,25 @@ func SpawnWithExpectLines(ctx context.Context, args []string, envVars map[string
return lines, perr
}

// SpawnWithExpectResult is used to verify the output from short-running commands.
func SpawnWithExpectResult(args []string, envVars map[string]string, f func([]string) error) error {
proc, err := SpawnCmd(args, envVars)
if err != nil {
return err
}
defer proc.Stop()

perr := proc.Wait()
// make sure that all the outputs are received
proc.Close()

if perr != nil {
return fmt.Errorf("unexpected error from command %v: %w", args, perr)
}

return f(proc.Lines())
}

func RandomLeaseID() int64 {
return rand.New(rand.NewSource(time.Now().UnixNano())).Int63()
}
Expand Down

0 comments on commit d40c2a0

Please sign in to comment.