Skip to content

Commit

Permalink
autopilot: deflake tests (#14475)
Browse files Browse the repository at this point in the history
Includes:

* Remove leader upgrade raft version test, as older versions of raft are now
  incompatible with our autopilot library.

* Remove attempt to assert initial non-voter status on the `PromoteNonVoter`
  test, as this happens too quickly to reliably detect.

* Unskip some previously-skipped tests which we should make stable.

* Remove the `consul/sdk` retry helper for these tests; this uses panic recovery
  in a kind of a clever/gross way to reduce LoC but it seems to introduce some
  timing issues in the process.

* Add more test step logging and reduce logging noise from the scheduler
  goroutines to make it easier to debug failing tests.

* Be more consistent about using the `waitForStableLeadership` helper so that we
  can assert the cluster is fully stable and not just that we've added peers.
  • Loading branch information
tgross authored Sep 7, 2022
1 parent 05908e3 commit 534869e
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 365 deletions.
5 changes: 4 additions & 1 deletion nomad/autopilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,10 @@ func (d *AutopilotDelegate) RemoveFailedServer(failedSrv *autopilot.Server) {
go func() {
err := d.server.RemoveFailedNode(failedSrv.Name)
if err != nil {
d.server.logger.Error("could not remove failed server", "server", string(failedSrv.ID))
d.server.logger.Error("could not remove failed server",
"server", string(failedSrv.ID),
"error", err,
)
}
}()
}
Expand Down
217 changes: 49 additions & 168 deletions nomad/autopilot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ import (
"testing"
"time"

// TODO: replace this with our own helper
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/raft"
autopilot "github.com/hashicorp/raft-autopilot"
"github.com/hashicorp/serf/serf"
"github.com/shoenig/test/must"

"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/testutil"
Expand Down Expand Up @@ -38,53 +37,13 @@ func wantPeers(s *Server, peers int) error {
return nil
}

// wantRaft determines if the servers have all of each other in their
// Raft configurations,
func wantRaft(servers []*Server) error {
// Make sure all the servers are represented in the Raft config,
// and that there are no extras.
verifyRaft := func(c raft.Configuration) error {
want := make(map[raft.ServerID]bool)
for _, s := range servers {
want[s.config.RaftConfig.LocalID] = true
}

found := make([]raft.ServerID, 0, len(c.Servers))
for _, s := range c.Servers {
found = append(found, s.ID)
if !want[s.ID] {
return fmt.Errorf("don't want %q", s.ID)
}
delete(want, s.ID)
}

if len(want) > 0 {
return fmt.Errorf("didn't find %v in %#+v", want, found)
}
return nil
}

for _, s := range servers {
future := s.raft.GetConfiguration()
if err := future.Error(); err != nil {
return err
}
if err := verifyRaft(future.Configuration()); err != nil {
return err
}
}
return nil
}

func TestAutopilot_CleanupDeadServer(t *testing.T) {
ci.Parallel(t)
t.Run("raft_v3", func(t *testing.T) { testCleanupDeadServer(t, 3) })
}

func testCleanupDeadServer(t *testing.T, raftVersion int) {
conf := func(c *Config) {
c.NumSchedulers = 0 // reduces test log noise
c.BootstrapExpect = 3
c.RaftConfig.ProtocolVersion = raft.ProtocolVersion(raftVersion)
c.RaftConfig.ProtocolVersion = raft.ProtocolVersion(3)
}

s1, cleanupS1 := TestServer(t, conf)
Expand All @@ -97,16 +56,11 @@ func testCleanupDeadServer(t *testing.T, raftVersion int) {
defer cleanupS3()

servers := []*Server{s1, s2, s3}

// Try to join
TestJoin(t, servers...)

for _, s := range servers {
testutil.WaitForLeader(t, s.RPC)
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) })
}
t.Logf("waiting for initial stable cluster")
waitForStableLeadership(t, servers)

// Bring up a new server
s4, cleanupS4 := TestServer(t, conf)
defer cleanupS4()

Expand All @@ -115,12 +69,14 @@ func testCleanupDeadServer(t *testing.T, raftVersion int) {
for i, s := range servers {
if !s.IsLeader() {
killedIdx = i
t.Logf("killing a server (index %d)", killedIdx)
s.Shutdown()
break
}
}

retry.Run(t, func(r *retry.R) {
t.Logf("waiting for server loss to be detected")
testutil.WaitForResultUntil(10*time.Second, func() (bool, error) {
for i, s := range servers {
alive := 0
if i == killedIdx {
Expand All @@ -134,27 +90,26 @@ func testCleanupDeadServer(t *testing.T, raftVersion int) {
}

if alive != 2 {
r.Fatalf("expected 2 alive servers but found %v", alive)
return false, fmt.Errorf("expected 2 alive servers but found %v", alive)
}
}
})
return true, nil
}, func(err error) { must.NoError(t, err) })

// Join the new server
servers[killedIdx] = s4
t.Logf("adding server s4")
TestJoin(t, servers...)

t.Logf("waiting for dead server to be removed")
waitForStableLeadership(t, servers)

// Make sure the dead server is removed and we're back to 3 total peers
for _, s := range servers {
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) })
}
}

func TestAutopilot_CleanupDeadServerPeriodic(t *testing.T) {
ci.Parallel(t)

conf := func(c *Config) {
c.NumSchedulers = 0 // reduces test log noise
c.BootstrapExpect = 5
}

Expand All @@ -174,37 +129,27 @@ func TestAutopilot_CleanupDeadServerPeriodic(t *testing.T) {
defer cleanupS5()

servers := []*Server{s1, s2, s3, s4, s5}

// Join the servers to s1, and wait until they are all promoted to
// voters.
TestJoin(t, servers...)
retry.Run(t, func(r *retry.R) {
r.Check(wantRaft(servers))
for _, s := range servers {
r.Check(wantPeers(s, 5))
}
})

// Kill a non-leader server
t.Logf("waiting for initial stable cluster")
waitForStableLeadership(t, servers)

t.Logf("killing a non-leader server")
if leader := waitForStableLeadership(t, servers); leader == s4 {
s1, s4 = s4, s1
}
s4.Shutdown()

// Should be removed from the peers automatically
t.Logf("waiting for dead peer to be removed")
servers = []*Server{s1, s2, s3, s5}
retry.Run(t, func(r *retry.R) {
r.Check(wantRaft(servers))
for _, s := range servers {
r.Check(wantPeers(s, 4))
}
})
waitForStableLeadership(t, servers)
}

func TestAutopilot_RollingUpdate(t *testing.T) {
ci.Parallel(t)

conf := func(c *Config) {
c.NumSchedulers = 0 // reduces test log noise
c.BootstrapExpect = 3
c.RaftConfig.ProtocolVersion = 3
}
Expand All @@ -218,70 +163,38 @@ func TestAutopilot_RollingUpdate(t *testing.T) {
s3, cleanupS3 := TestServer(t, conf)
defer cleanupS3()

// Join the servers to s1, and wait until they are all promoted to
// voters.
servers := []*Server{s1, s2, s3}
TestJoin(t, s1, s2, s3)
retry.Run(t, func(r *retry.R) {
r.Check(wantRaft(servers))
for _, s := range servers {
r.Check(wantPeers(s, 3))
}
})

t.Logf("waiting for initial stable cluster")
waitForStableLeadership(t, servers)

// Add one more server like we are doing a rolling update.
t.Logf("adding server s4")
s4, cleanupS4 := TestServer(t, conf)
defer cleanupS4()
TestJoin(t, s1, s4)

// Wait for s4 to stabilize and get promoted to a voter
t.Logf("waiting for s4 to stabilize and be promoted")
servers = append(servers, s4)
retry.Run(t, func(r *retry.R) {
r.Check(wantRaft(servers))
for _, s := range servers {
r.Check(wantPeers(s, 4))
}
})
waitForStableLeadership(t, servers)

// Now kill one of the "old" nodes like we are doing a rolling update.
t.Logf("shutting down server s3")
s3.Shutdown()

isVoter := func() bool {
future := s1.raft.GetConfiguration()
if err := future.Error(); err != nil {
t.Fatalf("err: %v", err)
}
for _, s := range future.Configuration().Servers {
if string(s.ID) == string(s4.config.NodeID) {
return s.Suffrage == raft.Voter
}
}
t.Fatalf("didn't find s4")
return false
}

t.Logf("waiting for s4 to stabalize and be promoted")

// Wait for s4 to stabilize, get promoted to a voter, and for s3 to be
// removed.
// Wait for s3 to be removed and the cluster to stablize.
t.Logf("waiting for cluster to stabilize")
servers = []*Server{s1, s2, s4}
retry.Run(t, func(r *retry.R) {
r.Check(wantRaft(servers))
for _, s := range servers {
r.Check(wantPeers(s, 3))
}
if !isVoter() {
r.Fatalf("should be a voter")
}
})
waitForStableLeadership(t, servers)
}

func TestAutopilot_CleanupStaleRaftServer(t *testing.T) {
t.Skip("TestAutopilot_CleanupDeadServer is very flaky, removing it for now")
ci.Parallel(t)

conf := func(c *Config) {
c.NumSchedulers = 0 // reduces test log noise
c.BootstrapExpect = 3
}
s1, cleanupS1 := TestServer(t, conf)
Expand All @@ -299,38 +212,27 @@ func TestAutopilot_CleanupStaleRaftServer(t *testing.T) {
defer cleanupS4()

servers := []*Server{s1, s2, s3}

// Join the servers to s1
TestJoin(t, s1, s2, s3)

t.Logf("waiting for initial stable cluster")
leader := waitForStableLeadership(t, servers)

// Add s4 to peers directly
t.Logf("adding server s4 to peers directly")
addr := fmt.Sprintf("127.0.0.1:%d", s4.config.RPCAddr.Port)
future := leader.raft.AddVoter(raft.ServerID(s4.config.NodeID), raft.ServerAddress(addr), 0, 0)
if err := future.Error(); err != nil {
t.Fatal(err)
}

// Verify we have 4 peers
peers, err := s1.numPeers()
if err != nil {
t.Fatal(err)
}
if peers != 4 {
t.Fatalf("bad: %v", peers)
}

// Wait for s4 to be removed
for _, s := range []*Server{s1, s2, s3} {
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) })
}
t.Logf("waiting for 4th server to be removed")
waitForStableLeadership(t, servers)
}

func TestAutopilot_PromoteNonVoter(t *testing.T) {
ci.Parallel(t)

s1, cleanupS1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0 // reduces test log noise
c.RaftConfig.ProtocolVersion = 3
})
defer cleanupS1()
Expand All @@ -339,52 +241,31 @@ func TestAutopilot_PromoteNonVoter(t *testing.T) {
testutil.WaitForLeader(t, s1.RPC)

s2, cleanupS2 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0 // reduces test log noise
c.BootstrapExpect = 0
c.RaftConfig.ProtocolVersion = 3
})
defer cleanupS2()
TestJoin(t, s1, s2)

// Make sure we see it as a nonvoter initially. We wait until half
// the stabilization period has passed.
retry.Run(t, func(r *retry.R) {
future := s1.raft.GetConfiguration()
if err := future.Error(); err != nil {
r.Fatal(err)
}

servers := future.Configuration().Servers
if len(servers) != 2 {
r.Fatalf("bad: %v", servers)
}
if servers[1].Suffrage != raft.Nonvoter {
r.Fatalf("bad: %v", servers)
}
health := s1.autopilot.GetServerHealth(raft.ServerID(servers[1].ID))
if health == nil {
r.Fatalf("nil health, %v", s1.GetClusterHealth())
}
if !health.Healthy {
r.Fatalf("bad: %v", health)
}
if time.Since(health.StableSince) < s1.config.AutopilotConfig.ServerStabilizationTime/2 {
r.Fatal("stable period not elapsed")
}
})

// Make sure it ends up as a voter.
retry.Run(t, func(r *retry.R) {
// Note: we can't reliably detect that the server is initially a non-voter,
// because it can transition too quickly for the test setup to detect,
// especially in low-resource environments like CI. We'll assume that
// happens correctly here and only test that it transitions to become a
// voter.
testutil.WaitForResultUntil(10*time.Second, func() (bool, error) {
future := s1.raft.GetConfiguration()
if err := future.Error(); err != nil {
r.Fatal(err)
return false, err
}

servers := future.Configuration().Servers
if len(servers) != 2 {
r.Fatalf("bad: %v", servers)
return false, fmt.Errorf("expected 2 servers, got: %v", servers)
}
if servers[1].Suffrage != raft.Voter {
r.Fatalf("bad: %v", servers)
return false, fmt.Errorf("expected server to be voter: %v", servers)
}
})
return true, nil
}, func(err error) { must.NoError(t, err) })

}
Loading

0 comments on commit 534869e

Please sign in to comment.