Skip to content

Commit

Permalink
Merge #69940
Browse files Browse the repository at this point in the history
69940: roachtest: improve the `cluster-init` roachtest r=knz a=knz

Fixes #67105
Fixes  #66516.
Fixes  #69949.
Fixes #70003

Release justification: non-production code changes

Release note: None

Co-authored-by: Raphael 'kena' Poss <[email protected]>
Co-authored-by: Tobias Grieger <[email protected]>
  • Loading branch information
3 people committed Sep 13, 2021
2 parents c2d5e42 + f1578fa commit d69481b
Show file tree
Hide file tree
Showing 2 changed files with 168 additions and 155 deletions.
301 changes: 152 additions & 149 deletions pkg/cmd/roachtest/tests/cluster_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,18 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/util/httputil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"golang.org/x/sync/errgroup"
)

func runClusterInit(ctx context.Context, t test.Test, c cluster.Cluster) {
c.Put(ctx, t.Cockroach(), "./cockroach")

t.L().Printf("retrieving VM addresses")
addrs, err := c.InternalAddr(ctx, c.All())
if err != nil {
t.Fatal(err)
Expand All @@ -44,170 +45,172 @@ func runClusterInit(ctx context.Context, t test.Test, c cluster.Cluster) {
}

// We start all nodes with the same join flags and then issue an "init"
// command to one of the nodes.
for _, initNode := range []int{1, 2} {
// command to one of the nodes. We do this twice, since roachtest has some
// special casing for the first node in a cluster (the join flags of all nodes
// default to just the first node) and we want to make sure that we're not
// relying on it.
for _, initNode := range []int{2, 1} {
c.Wipe(ctx)

func() {
var g errgroup.Group
t.L().Printf("starting test with init node %d", initNode)
c.Start(ctx, option.StartArgs(
// We don't want roachprod to auto-init this cluster.
"--skip-init",
// We need to point all nodes at all other nodes. By default
// roachprod will point all nodes at the first node, but this
// won't allow init'ing any but the first node - we require
// that all nodes can discover the init'ed node (transitively)
// via their join flags.
"--args", "--join="+strings.Join(addrs, ","),
))

urlMap := make(map[int]string)
adminUIAddrs, err := c.ExternalAdminUIAddr(ctx, c.All())
if err != nil {
t.Fatal(err)
}
for i, addr := range adminUIAddrs {
urlMap[i+1] = `http://` + addr
}

t.L().Printf("waiting for the servers to bind their ports")
if err := retry.ForDuration(10*time.Second, func() error {
for i := 1; i <= c.Spec().NodeCount; i++ {
i := i
g.Go(func() error {
return c.RunE(ctx, c.Node(i),
fmt.Sprintf(
`mkdir -p {log-dir} && `+
`./cockroach start --insecure --background --store={store-dir} `+
`--log-dir={log-dir} --cache=10%% --max-sql-memory=10%% `+
`--listen-addr=:{pgport:%[1]d} --http-port=$[{pgport:%[1]d}+1] `+
`--join=`+strings.Join(addrs, ",")+
`> {log-dir}/cockroach.stdout 2> {log-dir}/cockroach.stderr`, i))
})
}

urlMap := make(map[int]string)
adminUIAddrs, err := c.ExternalAdminUIAddr(ctx, c.All())
if err != nil {
t.Fatal(err)
}
for i, addr := range adminUIAddrs {
urlMap[i+1] = `http://` + addr
resp, err := httputil.Get(ctx, urlMap[i]+"/health")
if err != nil {
return err
}
resp.Body.Close()
}

// Wait for the servers to bind their ports.
if err := retry.ForDuration(10*time.Second, func() error {
for i := 1; i <= c.Spec().NodeCount; i++ {
resp, err := httputil.Get(ctx, urlMap[i]+"/health")
return nil
}); err != nil {
t.Fatal(err)
}
t.L().Printf("all nodes started, establishing SQL connections")

var dbs []*gosql.DB
for i := 1; i <= c.Spec().NodeCount; i++ {
db := c.Conn(ctx, i)
defer db.Close()
dbs = append(dbs, db)
}

// Initially, we can connect to any node, but queries issued will hang.
t.L().Printf("checking that the SQL conns are not failing immediately")
errCh := make(chan error, len(dbs))
for _, db := range dbs {
db := db
go func() {
var val int
errCh <- db.QueryRow("SELECT 1").Scan(&val)
}()
}

// Give them time to get a "connection refused" or similar error if
// the server isn't listening.
time.Sleep(time.Second)
select {
case err := <-errCh:
t.Fatalf("query finished prematurely with err %v", err)
default:
}

// Check that the /health endpoint is functional even before cluster init,
// whereas other debug endpoints return an appropriate error.
httpTests := []struct {
endpoint string
expectedStatus int
}{
{"/health", http.StatusOK},
{"/health?ready=1", http.StatusServiceUnavailable},
{"/_status/nodes", http.StatusNotFound},
}
for _, tc := range httpTests {
for _, withCookie := range []bool{false, true} {
t.L().Printf("checking for HTTP endpoint %q, using authentication = %v", tc.endpoint, withCookie)
req, err := http.NewRequest("GET", urlMap[1]+tc.endpoint, nil /* body */)
if err != nil {
t.Fatalf("unexpected error while constructing request for %s: %s", tc.endpoint, err)
}
if withCookie {
// Prevent regression of #25771 by also sending authenticated
// requests, like would be sent if an admin UI were open against
// this node while it booted.
cookie, err := server.EncodeSessionCookie(&serverpb.SessionCookie{
// The actual contents of the cookie don't matter; the presence of
// a valid encoded cookie is enough to trigger the authentication
// code paths.
}, false /* forHTTPSOnly - cluster is insecure */)
if err != nil {
return err
t.Fatal(err)
}
resp.Body.Close()
req.AddCookie(cookie)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("unexpected error hitting %s endpoint: %v", tc.endpoint, err)
}
defer resp.Body.Close()
if resp.StatusCode != tc.expectedStatus {
bodyBytes, _ := ioutil.ReadAll(resp.Body)
t.Fatalf("unexpected response code %d (expected %d) hitting %s endpoint: %v",
resp.StatusCode, tc.expectedStatus, tc.endpoint, string(bodyBytes))
}
return nil
}); err != nil {
t.Fatal(err)
}

var dbs []*gosql.DB
for i := 1; i <= c.Spec().NodeCount; i++ {
db := c.Conn(ctx, i)
defer db.Close()
dbs = append(dbs, db)
}

// Initially, we can connect to any node, but queries issued will hang.
errCh := make(chan error, len(dbs))
for _, db := range dbs {
db := db
go func() {
var val int
errCh <- db.QueryRow("SELECT 1").Scan(&val)
}()
}

t.L().Printf("sending init command to node %d", initNode)
c.Run(ctx, c.Node(initNode),
fmt.Sprintf(`./cockroach init --insecure --port={pgport:%d}`, initNode))

// This will only succeed if 3 nodes joined the cluster.
WaitFor3XReplication(t, dbs[0])

execCLI := func(runNode int, extraArgs ...string) (string, error) {
args := []string{"./cockroach"}
args = append(args, extraArgs...)
args = append(args, "--insecure")
args = append(args, fmt.Sprintf("--port={pgport:%d}", runNode))
buf, err := c.RunWithBuffer(ctx, t.L(), c.Node(runNode), args...)
t.L().Printf("%s\n", buf)
return string(buf), err
}

{
t.L().Printf("checking that double init fails")
// Make sure that running init again returns the expected error message and
// does not break the cluster. We have to use ExecCLI rather than OneShot in
// order to actually get the output from the command.
if output, err := execCLI(initNode, "init"); err == nil {
t.Fatalf("expected error running init command on initialized cluster\n%s", output)
} else if !strings.Contains(output, "cluster has already been initialized") {
t.Fatalf("unexpected output when running init command on initialized cluster: %v\n%s",
err, output)
}
}

// Give them time to get a "connection refused" or similar error if
// the server isn't listening.
time.Sleep(time.Second)
// Once initialized, the queries we started earlier will finish.
t.L().Printf("waiting for original SQL queries to complete now cluster is initialized")
deadline := time.After(10 * time.Second)
for i := 0; i < len(dbs); i++ {
select {
case err := <-errCh:
t.Fatalf("query finished prematurely with err %v", err)
default:
}

// Check that the /health endpoint is functional even before cluster init,
// whereas other debug endpoints return an appropriate error.
httpTests := []struct {
endpoint string
expectedStatus int
}{
{"/health", http.StatusOK},
{"/health?ready=1", http.StatusServiceUnavailable},
{"/_status/nodes", http.StatusNotFound},
}
for _, tc := range httpTests {
for _, withCookie := range []bool{false, true} {
req, err := http.NewRequest("GET", urlMap[1]+tc.endpoint, nil /* body */)
if err != nil {
t.Fatalf("unexpected error while constructing request for %s: %s", tc.endpoint, err)
}
if withCookie {
// Prevent regression of #25771 by also sending authenticated
// requests, like would be sent if an admin UI were open against
// this node while it booted.
cookie, err := server.EncodeSessionCookie(&serverpb.SessionCookie{
// The actual contents of the cookie don't matter; the presence of
// a valid encoded cookie is enough to trigger the authentication
// code paths.
}, false /* forHTTPSOnly - cluster is insecure */)
if err != nil {
t.Fatal(err)
}
req.AddCookie(cookie)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("unexpected error hitting %s endpoint: %v", tc.endpoint, err)
}
defer resp.Body.Close()
if resp.StatusCode != tc.expectedStatus {
bodyBytes, _ := ioutil.ReadAll(resp.Body)
t.Fatalf("unexpected response code %d (expected %d) hitting %s endpoint: %v",
resp.StatusCode, tc.expectedStatus, tc.endpoint, string(bodyBytes))
}
}

}

c.Run(ctx, c.Node(initNode),
fmt.Sprintf(`./cockroach init --insecure --port={pgport:%d}`, initNode))
if err := g.Wait(); err != nil {
t.Fatal(err)
}

// This will only succeed if 3 nodes joined the cluster.
WaitFor3XReplication(t, dbs[0])

execCLI := func(runNode int, extraArgs ...string) (string, error) {
args := []string{"./cockroach"}
args = append(args, extraArgs...)
args = append(args, "--insecure")
args = append(args, fmt.Sprintf("--port={pgport:%d}", runNode))
buf, err := c.RunWithBuffer(ctx, t.L(), c.Node(runNode), args...)
t.L().Printf("%s\n", buf)
return string(buf), err
}

{
// Make sure that running init again returns the expected error message and
// does not break the cluster. We have to use ExecCLI rather than OneShot in
// order to actually get the output from the command.
if output, err := execCLI(initNode, "init"); err == nil {
t.Fatalf("expected error running init command on initialized cluster\n%s", output)
} else if !strings.Contains(output, "cluster has already been initialized") {
t.Fatalf("unexpected output when running init command on initialized cluster: %v\n%s",
err, output)
if err != nil {
t.Fatalf("querying node %d: %s", i, err)
}
case <-deadline:
t.Fatalf("timed out waiting for query %d", i)
}
}

// Once initialized, the queries we started earlier will finish.
deadline := time.After(10 * time.Second)
for i := 0; i < len(dbs); i++ {
select {
case err := <-errCh:
if err != nil {
t.Fatalf("querying node %d: %s", i, err)
}
case <-deadline:
t.Fatalf("timed out waiting for query %d", i)
}
t.L().Printf("testing new SQL queries")
for i, db := range dbs {
var val int
if err := db.QueryRow("SELECT 1").Scan(&val); err != nil {
t.Fatalf("querying node %d: %s", i, err)
}
}

// New queries will work too.
for i, db := range dbs {
var val int
if err := db.QueryRow("SELECT 1").Scan(&val); err != nil {
t.Fatalf("querying node %d: %s", i, err)
}
}
}()
t.L().Printf("test complete")
}
}
22 changes: 16 additions & 6 deletions pkg/cmd/roachtest/tests/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,25 @@ import (
func WaitFor3XReplication(t test.Test, db *gosql.DB) {
t.L().Printf("waiting for up-replication...")
tStart := timeutil.Now()
for ok := false; !ok; time.Sleep(time.Second) {
if err := db.QueryRow(
"SELECT min(array_length(replicas, 1)) >= 3 FROM crdb_internal.ranges",
).Scan(&ok); err != nil {
var oldN int
for {
ctx, cancel := context.WithTimeout(context.TODO(), 15*time.Second)
var n int
if err := db.QueryRowContext(
ctx,
"SELECT count(1) FROM crdb_internal.ranges WHERE array_length(replicas, 1) < 3 ",
).Scan(&n); err != nil {
t.Fatal(err)
}
if timeutil.Since(tStart) > 30*time.Second {
t.L().Printf("still waiting for full replication")
if n == 0 {
return
}
cancel()
if timeutil.Since(tStart) > 30*time.Second || oldN != n {
t.L().Printf("still waiting for full replication (%d ranges left)", n)
}
oldN = n
time.Sleep(time.Second)
}
}

Expand Down

0 comments on commit d69481b

Please sign in to comment.