Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

roachtest: improve the cluster-init roachtest #69940

Merged
merged 4 commits into from
Sep 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
301 changes: 152 additions & 149 deletions pkg/cmd/roachtest/tests/cluster_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,18 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/util/httputil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"golang.org/x/sync/errgroup"
)

func runClusterInit(ctx context.Context, t test.Test, c cluster.Cluster) {
c.Put(ctx, t.Cockroach(), "./cockroach")

t.L().Printf("retrieving VM addresses")
addrs, err := c.InternalAddr(ctx, c.All())
if err != nil {
t.Fatal(err)
Expand All @@ -44,170 +45,172 @@ func runClusterInit(ctx context.Context, t test.Test, c cluster.Cluster) {
}

// We start all nodes with the same join flags and then issue an "init"
// command to one of the nodes.
for _, initNode := range []int{1, 2} {
// command to one of the nodes. We do this twice, since roachtest has some
// special casing for the first node in a cluster (the join flags of all nodes
// default to just the first node) and we want to make sure that we're not
// relying on it.
for _, initNode := range []int{2, 1} {
c.Wipe(ctx)

func() {
var g errgroup.Group
t.L().Printf("starting test with init node %d", initNode)
c.Start(ctx, option.StartArgs(
// We don't want roachprod to auto-init this cluster.
"--skip-init",
// We need to point all nodes at all other nodes. By default
// roachprod will point all nodes at the first node, but this
// won't allow init'ing any but the first node - we require
// that all nodes can discover the init'ed node (transitively)
// via their join flags.
"--args", "--join="+strings.Join(addrs, ","),
))

urlMap := make(map[int]string)
adminUIAddrs, err := c.ExternalAdminUIAddr(ctx, c.All())
if err != nil {
t.Fatal(err)
}
for i, addr := range adminUIAddrs {
urlMap[i+1] = `http://` + addr
}

t.L().Printf("waiting for the servers to bind their ports")
if err := retry.ForDuration(10*time.Second, func() error {
for i := 1; i <= c.Spec().NodeCount; i++ {
i := i
g.Go(func() error {
return c.RunE(ctx, c.Node(i),
fmt.Sprintf(
`mkdir -p {log-dir} && `+
`./cockroach start --insecure --background --store={store-dir} `+
`--log-dir={log-dir} --cache=10%% --max-sql-memory=10%% `+
`--listen-addr=:{pgport:%[1]d} --http-port=$[{pgport:%[1]d}+1] `+
`--join=`+strings.Join(addrs, ",")+
`> {log-dir}/cockroach.stdout 2> {log-dir}/cockroach.stderr`, i))
})
}

urlMap := make(map[int]string)
adminUIAddrs, err := c.ExternalAdminUIAddr(ctx, c.All())
if err != nil {
t.Fatal(err)
}
for i, addr := range adminUIAddrs {
urlMap[i+1] = `http://` + addr
resp, err := httputil.Get(ctx, urlMap[i]+"/health")
if err != nil {
return err
}
resp.Body.Close()
}

// Wait for the servers to bind their ports.
if err := retry.ForDuration(10*time.Second, func() error {
for i := 1; i <= c.Spec().NodeCount; i++ {
resp, err := httputil.Get(ctx, urlMap[i]+"/health")
return nil
}); err != nil {
t.Fatal(err)
}
t.L().Printf("all nodes started, establishing SQL connections")

var dbs []*gosql.DB
for i := 1; i <= c.Spec().NodeCount; i++ {
db := c.Conn(ctx, i)
defer db.Close()
dbs = append(dbs, db)
}

// Initially, we can connect to any node, but queries issued will hang.
t.L().Printf("checking that the SQL conns are not failing immediately")
errCh := make(chan error, len(dbs))
for _, db := range dbs {
db := db
go func() {
var val int
errCh <- db.QueryRow("SELECT 1").Scan(&val)
}()
}

// Give them time to get a "connection refused" or similar error if
// the server isn't listening.
time.Sleep(time.Second)
select {
case err := <-errCh:
t.Fatalf("query finished prematurely with err %v", err)
default:
}

// Check that the /health endpoint is functional even before cluster init,
// whereas other debug endpoints return an appropriate error.
httpTests := []struct {
endpoint string
expectedStatus int
}{
{"/health", http.StatusOK},
{"/health?ready=1", http.StatusServiceUnavailable},
{"/_status/nodes", http.StatusNotFound},
}
for _, tc := range httpTests {
for _, withCookie := range []bool{false, true} {
t.L().Printf("checking for HTTP endpoint %q, using authentication = %v", tc.endpoint, withCookie)
req, err := http.NewRequest("GET", urlMap[1]+tc.endpoint, nil /* body */)
if err != nil {
t.Fatalf("unexpected error while constructing request for %s: %s", tc.endpoint, err)
}
if withCookie {
// Prevent regression of #25771 by also sending authenticated
// requests, like would be sent if an admin UI were open against
// this node while it booted.
cookie, err := server.EncodeSessionCookie(&serverpb.SessionCookie{
// The actual contents of the cookie don't matter; the presence of
// a valid encoded cookie is enough to trigger the authentication
// code paths.
}, false /* forHTTPSOnly - cluster is insecure */)
if err != nil {
return err
t.Fatal(err)
}
resp.Body.Close()
req.AddCookie(cookie)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("unexpected error hitting %s endpoint: %v", tc.endpoint, err)
}
defer resp.Body.Close()
if resp.StatusCode != tc.expectedStatus {
bodyBytes, _ := ioutil.ReadAll(resp.Body)
t.Fatalf("unexpected response code %d (expected %d) hitting %s endpoint: %v",
resp.StatusCode, tc.expectedStatus, tc.endpoint, string(bodyBytes))
}
return nil
}); err != nil {
t.Fatal(err)
}

var dbs []*gosql.DB
for i := 1; i <= c.Spec().NodeCount; i++ {
db := c.Conn(ctx, i)
defer db.Close()
dbs = append(dbs, db)
}

// Initially, we can connect to any node, but queries issued will hang.
errCh := make(chan error, len(dbs))
for _, db := range dbs {
db := db
go func() {
var val int
errCh <- db.QueryRow("SELECT 1").Scan(&val)
}()
}

t.L().Printf("sending init command to node %d", initNode)
c.Run(ctx, c.Node(initNode),
fmt.Sprintf(`./cockroach init --insecure --port={pgport:%d}`, initNode))

// This will only succeed if 3 nodes joined the cluster.
WaitFor3XReplication(t, dbs[0])

execCLI := func(runNode int, extraArgs ...string) (string, error) {
args := []string{"./cockroach"}
args = append(args, extraArgs...)
args = append(args, "--insecure")
args = append(args, fmt.Sprintf("--port={pgport:%d}", runNode))
buf, err := c.RunWithBuffer(ctx, t.L(), c.Node(runNode), args...)
t.L().Printf("%s\n", buf)
return string(buf), err
}

{
t.L().Printf("checking that double init fails")
// Make sure that running init again returns the expected error message and
// does not break the cluster. We have to use ExecCLI rather than OneShot in
// order to actually get the output from the command.
if output, err := execCLI(initNode, "init"); err == nil {
t.Fatalf("expected error running init command on initialized cluster\n%s", output)
} else if !strings.Contains(output, "cluster has already been initialized") {
t.Fatalf("unexpected output when running init command on initialized cluster: %v\n%s",
err, output)
}
}

// Give them time to get a "connection refused" or similar error if
// the server isn't listening.
time.Sleep(time.Second)
// Once initialized, the queries we started earlier will finish.
t.L().Printf("waiting for original SQL queries to complete now cluster is initialized")
deadline := time.After(10 * time.Second)
for i := 0; i < len(dbs); i++ {
select {
case err := <-errCh:
t.Fatalf("query finished prematurely with err %v", err)
default:
}

// Check that the /health endpoint is functional even before cluster init,
// whereas other debug endpoints return an appropriate error.
httpTests := []struct {
endpoint string
expectedStatus int
}{
{"/health", http.StatusOK},
{"/health?ready=1", http.StatusServiceUnavailable},
{"/_status/nodes", http.StatusNotFound},
}
for _, tc := range httpTests {
for _, withCookie := range []bool{false, true} {
req, err := http.NewRequest("GET", urlMap[1]+tc.endpoint, nil /* body */)
if err != nil {
t.Fatalf("unexpected error while constructing request for %s: %s", tc.endpoint, err)
}
if withCookie {
// Prevent regression of #25771 by also sending authenticated
// requests, like would be sent if an admin UI were open against
// this node while it booted.
cookie, err := server.EncodeSessionCookie(&serverpb.SessionCookie{
// The actual contents of the cookie don't matter; the presence of
// a valid encoded cookie is enough to trigger the authentication
// code paths.
}, false /* forHTTPSOnly - cluster is insecure */)
if err != nil {
t.Fatal(err)
}
req.AddCookie(cookie)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("unexpected error hitting %s endpoint: %v", tc.endpoint, err)
}
defer resp.Body.Close()
if resp.StatusCode != tc.expectedStatus {
bodyBytes, _ := ioutil.ReadAll(resp.Body)
t.Fatalf("unexpected response code %d (expected %d) hitting %s endpoint: %v",
resp.StatusCode, tc.expectedStatus, tc.endpoint, string(bodyBytes))
}
}

}

c.Run(ctx, c.Node(initNode),
fmt.Sprintf(`./cockroach init --insecure --port={pgport:%d}`, initNode))
if err := g.Wait(); err != nil {
t.Fatal(err)
}

// This will only succeed if 3 nodes joined the cluster.
WaitFor3XReplication(t, dbs[0])

execCLI := func(runNode int, extraArgs ...string) (string, error) {
args := []string{"./cockroach"}
args = append(args, extraArgs...)
args = append(args, "--insecure")
args = append(args, fmt.Sprintf("--port={pgport:%d}", runNode))
buf, err := c.RunWithBuffer(ctx, t.L(), c.Node(runNode), args...)
t.L().Printf("%s\n", buf)
return string(buf), err
}

{
// Make sure that running init again returns the expected error message and
// does not break the cluster. We have to use ExecCLI rather than OneShot in
// order to actually get the output from the command.
if output, err := execCLI(initNode, "init"); err == nil {
t.Fatalf("expected error running init command on initialized cluster\n%s", output)
} else if !strings.Contains(output, "cluster has already been initialized") {
t.Fatalf("unexpected output when running init command on initialized cluster: %v\n%s",
err, output)
if err != nil {
t.Fatalf("querying node %d: %s", i, err)
}
case <-deadline:
t.Fatalf("timed out waiting for query %d", i)
}
}

// Once initialized, the queries we started earlier will finish.
deadline := time.After(10 * time.Second)
for i := 0; i < len(dbs); i++ {
select {
case err := <-errCh:
if err != nil {
t.Fatalf("querying node %d: %s", i, err)
}
case <-deadline:
t.Fatalf("timed out waiting for query %d", i)
}
t.L().Printf("testing new SQL queries")
for i, db := range dbs {
var val int
if err := db.QueryRow("SELECT 1").Scan(&val); err != nil {
t.Fatalf("querying node %d: %s", i, err)
}
}

// New queries will work too.
for i, db := range dbs {
var val int
if err := db.QueryRow("SELECT 1").Scan(&val); err != nil {
t.Fatalf("querying node %d: %s", i, err)
}
}
}()
t.L().Printf("test complete")
}
}
22 changes: 16 additions & 6 deletions pkg/cmd/roachtest/tests/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,25 @@ import (
func WaitFor3XReplication(t test.Test, db *gosql.DB) {
t.L().Printf("waiting for up-replication...")
tStart := timeutil.Now()
for ok := false; !ok; time.Sleep(time.Second) {
if err := db.QueryRow(
"SELECT min(array_length(replicas, 1)) >= 3 FROM crdb_internal.ranges",
).Scan(&ok); err != nil {
var oldN int
for {
ctx, cancel := context.WithTimeout(context.TODO(), 15*time.Second)
var n int
if err := db.QueryRowContext(
ctx,
"SELECT count(1) FROM crdb_internal.ranges WHERE array_length(replicas, 1) < 3 ",
).Scan(&n); err != nil {
t.Fatal(err)
}
if timeutil.Since(tStart) > 30*time.Second {
t.L().Printf("still waiting for full replication")
if n == 0 {
return
}
cancel()
if timeutil.Since(tStart) > 30*time.Second || oldN != n {
t.L().Printf("still waiting for full replication (%d ranges left)", n)
}
oldN = n
time.Sleep(time.Second)
}
}

Expand Down