Skip to content

Commit

Permalink
roachtest: remove assert dead nodes
Browse files Browse the repository at this point in the history
Previously, the post assertions would check for dead nodes by using roachprod
Monitor. It checks if the data directory of a cockroach node is non-trivial.
This is not a reliable way to exclude empty nodes, or to check for dead nodes.
It also complicates the Monitor functionality unnecessarily.

This change refactors the code to rather use the HTTP health checks to determine
if there are dead nodes. Since we now know which node is used for the workload,
this check only needs to determine if the cockroach nodes are all still live.

Informs: cockroachdb#118214
Epic: None
  • Loading branch information
herkolategan committed Dec 10, 2024
1 parent 5a8f724 commit 3565edd
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 70 deletions.
74 changes: 28 additions & 46 deletions pkg/cmd/roachtest/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1466,43 +1466,6 @@ func (c *clusterImpl) FetchVMSpecs(ctx context.Context, l *logger.Logger) error
})
}

// checkNoDeadNode returns an error if at least one of the nodes that have a populated
// data dir are found to be not running. It prints both to t.L() and the test
// output.
func (c *clusterImpl) assertNoDeadNode(ctx context.Context, t test.Test) error {
if c.spec.NodeCount == 0 {
// No nodes can happen during unit tests and implies nothing to do.
return nil
}

t.L().Printf("checking for dead nodes")
eventsCh, err := roachprod.Monitor(ctx, t.L(), c.name, install.MonitorOpts{OneShot: true, IgnoreEmptyNodes: true})

// An error here means there was a problem initialising a SyncedCluster.
if err != nil {
return err
}

deadProcesses := 0
for info := range eventsCh {
t.L().Printf("%s", info)

if _, isDeath := info.Event.(install.MonitorProcessDead); isDeath {
deadProcesses++
}
}

var plural string
if deadProcesses > 1 {
plural = "es"
}

if deadProcesses > 0 {
return errors.Newf("%d dead cockroach process%s detected", deadProcesses, plural)
}
return nil
}

func selectedNodesOrDefault(
opts []option.Option, defaultNodes option.NodeListOption,
) option.NodeListOption {
Expand Down Expand Up @@ -1536,20 +1499,39 @@ func newHealthStatusResult(node int, status int, body []byte, err error) *Health
}
}

// SystemNodes looks for the system start script on each node and returns the
// node numbers that have it.
func (c *clusterImpl) SystemNodes() (option.NodeListOption, error) {
scriptPath := install.StartScriptPath(install.SystemInterfaceName, 0)
results, err := c.RunWithDetails(context.Background(), nil, option.WithNodes(c.All()), "test -f "+scriptPath)
if err != nil {
return nil, err
}
var nodes option.NodeListOption
for _, res := range results {
if res.RemoteExitStatus == 0 {
nodes = append(nodes, int(res.Node))
}
}
return nodes, nil
}

// HealthStatus returns the result of the /health?ready=1 endpoint for each node.
func (c *clusterImpl) HealthStatus(
ctx context.Context, l *logger.Logger, nodes option.NodeListOption,
) ([]*HealthStatusResult, error) {
if len(nodes) < 1 {
nodeCount := len(nodes)
if nodeCount < 1 {
return nil, nil // unit tests
}

adminAddrs, err := c.ExternalAdminUIAddr(ctx, l, nodes)
if err != nil {
return nil, errors.WithDetail(err, "Unable to get admin UI address(es)")
}
client := roachtestutil.DefaultHTTPClient(c, l)
getStatus := func(ctx context.Context, node int) *HealthStatusResult {
url := fmt.Sprintf(`https://%s/health?ready=1`, adminAddrs[node-1])
getStatus := func(ctx context.Context, nodeIndex, node int) *HealthStatusResult {
url := fmt.Sprintf(`https://%s/health?ready=1`, adminAddrs[nodeIndex])
resp, err := client.Get(ctx, url)
if err != nil {
return newHealthStatusResult(node, 0, nil, err)
Expand All @@ -1561,16 +1543,16 @@ func (c *clusterImpl) HealthStatus(
return newHealthStatusResult(node, resp.StatusCode, body, err)
}

results := make([]*HealthStatusResult, c.spec.NodeCount)
results := make([]*HealthStatusResult, nodeCount)

_ = timeutil.RunWithTimeout(ctx, "health status", 15*time.Second, func(ctx context.Context) error {
var wg sync.WaitGroup
wg.Add(c.spec.NodeCount)
for i := 1; i <= c.spec.NodeCount; i++ {
go func(node int) {
wg.Add(nodeCount)
for i := 0; i < nodeCount; i++ {
go func() {
defer wg.Done()
results[node-1] = getStatus(ctx, node)
}(i)
results[i] = getStatus(ctx, i, nodes[i])
}()
}
wg.Wait()
return nil
Expand Down
38 changes: 26 additions & 12 deletions pkg/cmd/roachtest/test_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -1487,27 +1487,23 @@ func (r *testRunner) postTestAssertions(
postAssertCh := make(chan struct{})
_ = r.stopper.RunAsyncTask(ctx, "test-post-assertions", func(ctx context.Context) {
defer close(postAssertCh)
// When a dead node is detected, the subsequent post validation queries are likely
// to hang (reason unclear), and eventually timeout according to the statement_timeout.
// If this occurs frequently enough, we can look at skipping post validations on a node
// failure (or even on any test failure).
if err := c.assertNoDeadNode(ctx, t); err != nil {
// Some tests expect dead nodes, so they may opt out of this check.
if t.spec.SkipPostValidations&registry.PostValidationNoDeadNodes == 0 {
postAssertionErr(err)
} else {
t.L().Printf("dead node(s) detected but expected")
}

// Determine on which nodes the system tenant is running.
systemNodes, err := c.SystemNodes()
_, _ = fmt.Fprintf(os.Stderr, "SYSTEM NODES: %v\n", systemNodes)
if err != nil {
postAssertionErr(errors.WithDetail(err, "Unable to determine system nodes"))
}

// We collect all the admin health endpoints in parallel,
// and select the first one that succeeds to run the validation queries
statuses, err := c.HealthStatus(ctx, t.L(), c.All())
statuses, err := c.HealthStatus(ctx, t.L(), systemNodes)
if err != nil {
postAssertionErr(errors.WithDetail(err, "Unable to check health status"))
}

validationNode := 0
liveNodes := 0
for _, s := range statuses {
if s.Err != nil {
t.L().Printf("n%d:/health?ready=1 error=%s", s.Node, s.Err)
Expand All @@ -1523,6 +1519,24 @@ func (r *testRunner) postTestAssertions(
validationNode = s.Node // NB: s.Node is never zero
}
t.L().Printf("n%d:/health?ready=1 status=200 ok", s.Node)
liveNodes++
}

// When a dead node is detected, the subsequent post validation queries are likely
// to hang (reason unclear), and eventually timeout according to the statement_timeout.
// If this occurs frequently enough, we can look at skipping post validations on a node
// failure (or even on any test failure).
if liveNodes != len(systemNodes) {
t.L().Printf("validation: %d out of %d node(s) are alive", liveNodes, len(systemNodes))
// Some tests expect dead nodes, so they may opt out of this check.
if t.spec.SkipPostValidations&registry.PostValidationNoDeadNodes == 0 {
postAssertionErr(
errors.Newf("unexpected dead node(s) detected, only %d out of %d node(s) are alive",
liveNodes, len(systemNodes)),
)
} else {
t.L().Printf("dead node(s) detected but expected")
}
}

// We avoid trying to do this when t.Failed() (and in particular when there
Expand Down
26 changes: 14 additions & 12 deletions pkg/cmd/roachtest/tests/acceptance.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func registerAcceptance(r registry.Registry) {
randomized bool
workloadNode bool
incompatibleClouds registry.CloudSet
skipPostValidation registry.PostValidation
}{
// NOTE: acceptance tests are lightweight tests that run as part
// of CI. As such, they must:
Expand All @@ -44,7 +45,7 @@ func registerAcceptance(r registry.Registry) {
// properties, please register it separately (not as an acceptance
// test).
registry.OwnerKV: {
{name: "decommission-self", fn: runDecommissionSelf},
{name: "decommission-self", fn: runDecommissionSelf, skipPostValidation: registry.PostValidationNoDeadNodes},
{name: "event-log", fn: runEventLog},
{name: "gossip/peerings", fn: runGossipPeerings},
{name: "gossip/restart", fn: runGossipRestart},
Expand All @@ -60,7 +61,7 @@ func registerAcceptance(r registry.Registry) {
},
{name: "cli/node-status", fn: runCLINodeStatus},
{name: "cluster-init", fn: runClusterInit},
{name: "rapid-restart", fn: runRapidRestart},
{name: "rapid-restart", fn: runRapidRestart, skipPostValidation: registry.PostValidationNoDeadNodes},
},
registry.OwnerObservability: {
{name: "status-server", fn: runStatusServer},
Expand Down Expand Up @@ -139,16 +140,17 @@ func registerAcceptance(r registry.Registry) {
}

testSpec := registry.TestSpec{
Name: "acceptance/" + tc.name,
Owner: owner,
Cluster: r.MakeClusterSpec(numNodes, extraOptions...),
Skip: tc.skip,
EncryptionSupport: tc.encryptionSupport,
Timeout: 10 * time.Minute,
CompatibleClouds: registry.AllClouds.Remove(tc.incompatibleClouds),
Suites: registry.Suites(registry.Nightly, registry.Quick, registry.Acceptance),
Randomized: tc.randomized,
RequiresLicense: tc.requiresLicense,
Name: "acceptance/" + tc.name,
Owner: owner,
Cluster: r.MakeClusterSpec(numNodes, extraOptions...),
Skip: tc.skip,
EncryptionSupport: tc.encryptionSupport,
Timeout: 10 * time.Minute,
CompatibleClouds: registry.AllClouds.Remove(tc.incompatibleClouds),
Suites: registry.Suites(registry.Nightly, registry.Quick, registry.Acceptance),
Randomized: tc.randomized,
RequiresLicense: tc.requiresLicense,
SkipPostValidations: tc.skipPostValidation,
}

if tc.timeout != 0 {
Expand Down

0 comments on commit 3565edd

Please sign in to comment.