diff --git a/pkg/cmd/roachtest/follower_reads.go b/pkg/cmd/roachtest/follower_reads.go index e97033215c3b..8a737e6a0674 100644 --- a/pkg/cmd/roachtest/follower_reads.go +++ b/pkg/cmd/roachtest/follower_reads.go @@ -14,52 +14,96 @@ import ( "bufio" "context" gosql "database/sql" + "fmt" "math/rand" "net/http" + "reflect" "regexp" "strconv" "time" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/ts/tspb" "github.com/cockroachdb/cockroach/pkg/util/httputil" + "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" ) func registerFollowerReads(r *testRegistry) { - r.Add(testSpec{ - Name: "follower-reads/nodes=3", - Owner: OwnerKV, - Cluster: makeClusterSpec(3 /* nodeCount */, cpu(2), geo()), - MinVersion: "v19.1.0", - Run: runFollowerReadsTest, - }) + register := func(survival survivalGoal, locality localitySetting) { + r.Add(testSpec{ + Name: fmt.Sprintf("follower-reads/survival=%s/locality=%s", survival, locality), + Owner: OwnerKV, + Cluster: makeClusterSpec( + 6, /* nodeCount */ + cpu(2), + geo(), + // This zone option looks strange, but it makes more sense once you + // understand what the test is doing. The test creates a multi-region + // database with either ZONE or REGION survivability and with a PRIMARY + // REGION of us-east1. This means that for ZONE survivability, the test + // wants 3 nodes in us-east1, 1 in us-west1, and 1 in europe-west2. For + // REGION surviability, the test wants 2 nodes in us-east1, 2 (or 1) in + // us-west1, and 1 (or 2) in europe-west2. + zones("us-east1-b,us-east1-b,us-east1-b,us-west1-b,us-west1-b,europe-west2-b"), + ), + Run: func(ctx context.Context, t *test, c *cluster) { + runFollowerReadsTest(ctx, t, c, survival, locality) + }, + }) + } + for _, survival := range []survivalGoal{zone, region} { + for _, locality := range []localitySetting{regional, global} { + register(survival, locality) + } + } } +// The survival goal of a multi-region database: ZONE or REGION. +type survivalGoal string + +// The locality setting of a multi-region table: REGIONAL or GLOBAL. +type localitySetting string + +const ( + zone survivalGoal = "zone" + region survivalGoal = "region" + + regional localitySetting = "regional" + global localitySetting = "global" +) + // runFollowerReadsTest is a basic litmus test that follower reads work. // The test does the following: // -// * Creates a database and table. +// * Creates a multi-region database and table. +// * Configures the database's survival goals. +// * Configures the table's locality setting. // * Installs a number of rows into that table. // * Queries the data initially with a recent timestamp and expecting an // error because the table does not exist in the past immediately following // creation. -// * Waits until the required duration has elapsed such that the installed -// data can be read with a follower read issued using `follower_timestamp()`. -// * Performs a single `follower_timestamp()` select query against a single -// row on all of the nodes and then observes the counter metric for -// store-level follower reads ensuring that they occurred on at least -// two of the nodes. +// * If using a REGIONAL table, waits until the required duration has elapsed +// such that the installed data can be read with a follower read issued using +// `follower_read_timestamp()`. +// * Performs a few select query against a single row on all of the nodes and +// then observes the counter metric for store-level follower reads ensuring +// that they occurred on at least two of the nodes. If using a REGIONAL table, +// these reads are stale through the use of `follower_read_timestamp()`. // * Performs reads against the written data on all of the nodes at a steady // rate for 20 seconds, ensure that the 90-%ile SQL latencies during that // time are under 10ms which implies that no WAN RPCs occurred. // -func runFollowerReadsTest(ctx context.Context, t *test, c *cluster) { - crdbNodes := c.Range(1, c.spec.NodeCount) - c.Put(ctx, cockroach, "./cockroach", crdbNodes) - c.Wipe(ctx, crdbNodes) - c.Start(ctx, t, crdbNodes) +func runFollowerReadsTest( + ctx context.Context, t *test, c *cluster, survival survivalGoal, locality localitySetting, +) { + c.Put(ctx, cockroach, "./cockroach") + c.Wipe(ctx) + c.Start(ctx, t) var conns []*gosql.DB for i := 0; i < c.spec.NodeCount; i++ { @@ -68,28 +112,86 @@ func runFollowerReadsTest(ctx context.Context, t *test, c *cluster) { } db := conns[0] - if _, err := db.ExecContext(ctx, "SET CLUSTER SETTING kv.closed_timestamp.follower_reads_enabled = 'true'"); err != nil { - t.Fatalf("failed to enable follower reads: %v", err) - } // Disable load based splitting and range merging because splits and merges - // interfere with follower reads. Rhis test's workload regularly triggers load + // interfere with follower reads. This test's workload regularly triggers load // based splitting in the first phase creating small ranges which later // in the test are merged. The merging tends to coincide with the final phase // of the test which attempts to observe low latency reads leading to // flakiness. - if _, err := db.ExecContext(ctx, "SET CLUSTER SETTING kv.range_split.by_load_enabled = 'false'"); err != nil { - t.Fatalf("failed to disable load based splitting: %v", err) - } - if _, err := db.ExecContext(ctx, "SET CLUSTER SETTING kv.range_merge.queue_enabled = 'false'"); err != nil { - t.Fatalf("failed to disable range merging: %v", err) - } - if r, err := db.ExecContext(ctx, "CREATE DATABASE test;"); err != nil { - t.Fatalf("failed to create database: %v %v", err, r) - } - if r, err := db.ExecContext(ctx, "CREATE TABLE test.test ( k INT8, v INT8, PRIMARY KEY (k) )"); err != nil { - t.Fatalf("failed to create table: %v %v", err, r) + _, err := db.ExecContext(ctx, "SET CLUSTER SETTING kv.range_split.by_load_enabled = 'false'") + require.NoError(t, err) + _, err = db.ExecContext(ctx, "SET CLUSTER SETTING kv.range_merge.queue_enabled = 'false'") + require.NoError(t, err) + + // Check the cluster regions. + if err := testutils.SucceedsSoonError(func() error { + rows, err := db.QueryContext(ctx, "SELECT region, zones[1] FROM [SHOW REGIONS FROM CLUSTER] ORDER BY 1") + require.NoError(t, err) + defer rows.Close() + + matrix, err := sqlutils.RowsToStrMatrix(rows) + require.NoError(t, err) + + expMatrix := [][]string{ + {"europe-west2", "europe-west2-b"}, + {"us-east1", "us-east1-b"}, + {"us-west1", "us-west1-b"}, + } + if !reflect.DeepEqual(matrix, expMatrix) { + return errors.Errorf("unexpected cluster regions: want %+v, got %+v", expMatrix, matrix) + } + return nil + }); err != nil { + t.Fatal(err) } - waitForFullReplication(t, db) + + // Create a multi-region database and table. + _, err = db.ExecContext(ctx, `CREATE DATABASE test`) + require.NoError(t, err) + _, err = db.ExecContext(ctx, `ALTER DATABASE test SET PRIMARY REGION "us-east1"`) + require.NoError(t, err) + _, err = db.ExecContext(ctx, `ALTER DATABASE test ADD REGION "us-west1"`) + require.NoError(t, err) + _, err = db.ExecContext(ctx, `ALTER DATABASE test ADD REGION "europe-west2"`) + require.NoError(t, err) + _, err = db.ExecContext(ctx, fmt.Sprintf(`ALTER DATABASE test SURVIVE %s FAILURE`, survival)) + require.NoError(t, err) + _, err = db.ExecContext(ctx, `CREATE TABLE test.test ( k INT8, v INT8, PRIMARY KEY (k) )`) + require.NoError(t, err) + _, err = db.ExecContext(ctx, fmt.Sprintf(`ALTER TABLE test.test SET LOCALITY %s`, locality)) + require.NoError(t, err) + + // Wait until the table has completed up-replication. + t.l.Printf("waiting for up-replication...") + require.NoError(t, retry.ForDuration(5*time.Minute, func() error { + const q = ` + SELECT + coalesce(array_length(voting_replicas, 1), 0), + coalesce(array_length(non_voting_replicas, 1), 0) + FROM + crdb_internal.ranges_no_leases + WHERE + table_name = 'test'` + var voters, nonVoters int + if err := db.QueryRowContext(ctx, q).Scan(&voters, &nonVoters); err != nil { + t.l.Printf("retrying: %v\n", err) + return err + } + + var ok bool + if survival == zone { + // Expect 3 voting replicas and 2 non-voting replicas. + ok = voters == 3 && nonVoters == 2 + } else { + // Expect 5 voting replicas and 0 non-voting replicas. + ok = voters == 5 && nonVoters == 0 + } + if !ok { + return errors.Newf("rebalancing not complete") + } + return nil + })) + const rows = 100 const concurrency = 32 sem := make(chan struct{}, concurrency) @@ -105,7 +207,7 @@ func runFollowerReadsTest(ctx context.Context, t *test, c *cluster) { } } // chooseKV picks a random key-value pair by exploiting the pseudo-random - // ordering of keys when traversing a map with in a range statement. + // ordering of keys when traversing a map within a range statement. chooseKV := func() (k int, v int64) { for k, v = range data { return k, v @@ -115,8 +217,18 @@ func runFollowerReadsTest(ctx context.Context, t *test, c *cluster) { verifySelect := func(ctx context.Context, node, k int, expectedVal int64) func() error { return func() error { nodeDB := conns[node-1] - r := nodeDB.QueryRowContext(ctx, "SELECT v FROM test.test AS OF SYSTEM "+ - "TIME follower_read_timestamp() WHERE k = $1", k) + var aost string + if locality == regional { + // For REGIONAL tables, only stale reads can be served off + // followers. + aost = "AS OF SYSTEM TIME follower_read_timestamp()" + } else { + // For GLOBAL tables, we can perfrom consistent reads and expect + // them to be served off followers. + aost = "" + } + q := fmt.Sprintf("SELECT v FROM test.test %s WHERE k = $1", aost) + r := nodeDB.QueryRowContext(ctx, q, k) var got int64 if err := r.Scan(&got); err != nil { // Ignore errors due to cancellation. @@ -153,16 +265,22 @@ func runFollowerReadsTest(ctx context.Context, t *test, c *cluster) { if err := g.Wait(); err != nil { t.Fatalf("failed to insert data: %v", err) } - // Wait for follower_timestamp() historical reads to have data. - followerReadDuration, err := computeFollowerReadDuration(ctx, db) - if err != nil { - t.Fatalf("failed to compute follower read duration: %v", err) - } - select { - case <-time.After(followerReadDuration): - case <-ctx.Done(): - t.Fatalf("context canceled: %v", ctx.Err()) + + if locality == regional { + // For REGIONAL tables, wait for follower_read_timestamp() historical + // reads to have data. For GLOBAL tables, this isn't needed, because + // we will be reading consistently (non-stale). + followerReadDuration, err := computeFollowerReadDuration(ctx, db) + if err != nil { + t.Fatalf("failed to compute follower read duration: %v", err) + } + select { + case <-time.After(followerReadDuration): + case <-ctx.Done(): + t.Fatalf("context canceled: %v", ctx.Err()) + } } + // Read the follower read counts before issuing the follower reads to observe // the delta and protect from follower reads which might have happened due to // system queries. @@ -170,16 +288,27 @@ func runFollowerReadsTest(ctx context.Context, t *test, c *cluster) { if err != nil { t.Fatalf("failed to get follower read counts: %v", err) } - // Perform reads at follower_timestamp() and ensure we get the expected value. + // Perform reads on each node and ensure we get the expected value. Do so a + // few times on each follower to give caches time to warm up. g, gCtx = errgroup.WithContext(ctx) k, v := chooseKV() for i := 1; i <= c.spec.NodeCount; i++ { - g.Go(verifySelect(gCtx, i, k, v)) + fn := verifySelect(gCtx, i, k, v) + g.Go(func() error { + for j := 0; j < 100; j++ { + if err := fn(); err != nil { + return err + } + } + return nil + }) } if err := g.Wait(); err != nil { t.Fatalf("error verifying node values: %v", err) } - // Verify that the follower read count increments on at least two nodes. + // Verify that the follower read count increments on at least two nodes - + // which we expect to be in the non-primary regions. + expNodesToSeeFollowerReads := 2 followerReadsAfter, err := getFollowerReadCounts(ctx, c) if err != nil { t.Fatalf("failed to get follower read counts: %v", err) @@ -190,9 +319,9 @@ func runFollowerReadsTest(ctx context.Context, t *test, c *cluster) { nodesWhichSawFollowerReads++ } } - if nodesWhichSawFollowerReads < 2 { - t.Fatalf("fewer than 2 follower reads occurred: saw %v before and %v after", - followerReadsBefore, followerReadsAfter) + if nodesWhichSawFollowerReads < expNodesToSeeFollowerReads { + t.Fatalf("fewer than %v follower reads occurred: saw %v before and %v after", + expNodesToSeeFollowerReads, followerReadsBefore, followerReadsAfter) } // Run reads for 3m which given the metrics window of 10s should guarantee // that the most recent SQL latency time series data should relate to at least