Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-21.1: roachtest/follower-reads: adopt multi-region abstractions, add GLOBAL tables #62965

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
235 changes: 182 additions & 53 deletions pkg/cmd/roachtest/follower_reads.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,52 +14,96 @@ import (
"bufio"
"context"
gosql "database/sql"
"fmt"
"math/rand"
"net/http"
"reflect"
"regexp"
"strconv"
"time"

"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/ts/tspb"
"github.com/cockroachdb/cockroach/pkg/util/httputil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)

func registerFollowerReads(r *testRegistry) {
r.Add(testSpec{
Name: "follower-reads/nodes=3",
Owner: OwnerKV,
Cluster: makeClusterSpec(3 /* nodeCount */, cpu(2), geo()),
MinVersion: "v19.1.0",
Run: runFollowerReadsTest,
})
register := func(survival survivalGoal, locality localitySetting) {
r.Add(testSpec{
Name: fmt.Sprintf("follower-reads/survival=%s/locality=%s", survival, locality),
Owner: OwnerKV,
Cluster: makeClusterSpec(
6, /* nodeCount */
cpu(2),
geo(),
// This zone option looks strange, but it makes more sense once you
// understand what the test is doing. The test creates a multi-region
// database with either ZONE or REGION survivability and with a PRIMARY
// REGION of us-east1. This means that for ZONE survivability, the test
// wants 3 nodes in us-east1, 1 in us-west1, and 1 in europe-west2. For
// REGION surviability, the test wants 2 nodes in us-east1, 2 (or 1) in
// us-west1, and 1 (or 2) in europe-west2.
zones("us-east1-b,us-east1-b,us-east1-b,us-west1-b,us-west1-b,europe-west2-b"),
),
Run: func(ctx context.Context, t *test, c *cluster) {
runFollowerReadsTest(ctx, t, c, survival, locality)
},
})
}
for _, survival := range []survivalGoal{zone, region} {
for _, locality := range []localitySetting{regional, global} {
register(survival, locality)
}
}
}

// The survival goal of a multi-region database: ZONE or REGION.
type survivalGoal string

// The locality setting of a multi-region table: REGIONAL or GLOBAL.
type localitySetting string

const (
zone survivalGoal = "zone"
region survivalGoal = "region"

regional localitySetting = "regional"
global localitySetting = "global"
)

// runFollowerReadsTest is a basic litmus test that follower reads work.
// The test does the following:
//
// * Creates a database and table.
// * Creates a multi-region database and table.
// * Configures the database's survival goals.
// * Configures the table's locality setting.
// * Installs a number of rows into that table.
// * Queries the data initially with a recent timestamp and expecting an
// error because the table does not exist in the past immediately following
// creation.
// * Waits until the required duration has elapsed such that the installed
// data can be read with a follower read issued using `follower_timestamp()`.
// * Performs a single `follower_timestamp()` select query against a single
// row on all of the nodes and then observes the counter metric for
// store-level follower reads ensuring that they occurred on at least
// two of the nodes.
// * If using a REGIONAL table, waits until the required duration has elapsed
// such that the installed data can be read with a follower read issued using
// `follower_read_timestamp()`.
// * Performs a few select query against a single row on all of the nodes and
// then observes the counter metric for store-level follower reads ensuring
// that they occurred on at least two of the nodes. If using a REGIONAL table,
// these reads are stale through the use of `follower_read_timestamp()`.
// * Performs reads against the written data on all of the nodes at a steady
// rate for 20 seconds, ensure that the 90-%ile SQL latencies during that
// time are under 10ms which implies that no WAN RPCs occurred.
//
func runFollowerReadsTest(ctx context.Context, t *test, c *cluster) {
crdbNodes := c.Range(1, c.spec.NodeCount)
c.Put(ctx, cockroach, "./cockroach", crdbNodes)
c.Wipe(ctx, crdbNodes)
c.Start(ctx, t, crdbNodes)
func runFollowerReadsTest(
ctx context.Context, t *test, c *cluster, survival survivalGoal, locality localitySetting,
) {
c.Put(ctx, cockroach, "./cockroach")
c.Wipe(ctx)
c.Start(ctx, t)

var conns []*gosql.DB
for i := 0; i < c.spec.NodeCount; i++ {
Expand All @@ -68,28 +112,86 @@ func runFollowerReadsTest(ctx context.Context, t *test, c *cluster) {
}
db := conns[0]

if _, err := db.ExecContext(ctx, "SET CLUSTER SETTING kv.closed_timestamp.follower_reads_enabled = 'true'"); err != nil {
t.Fatalf("failed to enable follower reads: %v", err)
}
// Disable load based splitting and range merging because splits and merges
// interfere with follower reads. Rhis test's workload regularly triggers load
// interfere with follower reads. This test's workload regularly triggers load
// based splitting in the first phase creating small ranges which later
// in the test are merged. The merging tends to coincide with the final phase
// of the test which attempts to observe low latency reads leading to
// flakiness.
if _, err := db.ExecContext(ctx, "SET CLUSTER SETTING kv.range_split.by_load_enabled = 'false'"); err != nil {
t.Fatalf("failed to disable load based splitting: %v", err)
}
if _, err := db.ExecContext(ctx, "SET CLUSTER SETTING kv.range_merge.queue_enabled = 'false'"); err != nil {
t.Fatalf("failed to disable range merging: %v", err)
}
if r, err := db.ExecContext(ctx, "CREATE DATABASE test;"); err != nil {
t.Fatalf("failed to create database: %v %v", err, r)
}
if r, err := db.ExecContext(ctx, "CREATE TABLE test.test ( k INT8, v INT8, PRIMARY KEY (k) )"); err != nil {
t.Fatalf("failed to create table: %v %v", err, r)
_, err := db.ExecContext(ctx, "SET CLUSTER SETTING kv.range_split.by_load_enabled = 'false'")
require.NoError(t, err)
_, err = db.ExecContext(ctx, "SET CLUSTER SETTING kv.range_merge.queue_enabled = 'false'")
require.NoError(t, err)

// Check the cluster regions.
if err := testutils.SucceedsSoonError(func() error {
rows, err := db.QueryContext(ctx, "SELECT region, zones[1] FROM [SHOW REGIONS FROM CLUSTER] ORDER BY 1")
require.NoError(t, err)
defer rows.Close()

matrix, err := sqlutils.RowsToStrMatrix(rows)
require.NoError(t, err)

expMatrix := [][]string{
{"europe-west2", "europe-west2-b"},
{"us-east1", "us-east1-b"},
{"us-west1", "us-west1-b"},
}
if !reflect.DeepEqual(matrix, expMatrix) {
return errors.Errorf("unexpected cluster regions: want %+v, got %+v", expMatrix, matrix)
}
return nil
}); err != nil {
t.Fatal(err)
}
waitForFullReplication(t, db)

// Create a multi-region database and table.
_, err = db.ExecContext(ctx, `CREATE DATABASE test`)
require.NoError(t, err)
_, err = db.ExecContext(ctx, `ALTER DATABASE test SET PRIMARY REGION "us-east1"`)
require.NoError(t, err)
_, err = db.ExecContext(ctx, `ALTER DATABASE test ADD REGION "us-west1"`)
require.NoError(t, err)
_, err = db.ExecContext(ctx, `ALTER DATABASE test ADD REGION "europe-west2"`)
require.NoError(t, err)
_, err = db.ExecContext(ctx, fmt.Sprintf(`ALTER DATABASE test SURVIVE %s FAILURE`, survival))
require.NoError(t, err)
_, err = db.ExecContext(ctx, `CREATE TABLE test.test ( k INT8, v INT8, PRIMARY KEY (k) )`)
require.NoError(t, err)
_, err = db.ExecContext(ctx, fmt.Sprintf(`ALTER TABLE test.test SET LOCALITY %s`, locality))
require.NoError(t, err)

// Wait until the table has completed up-replication.
t.l.Printf("waiting for up-replication...")
require.NoError(t, retry.ForDuration(5*time.Minute, func() error {
const q = `
SELECT
coalesce(array_length(voting_replicas, 1), 0),
coalesce(array_length(non_voting_replicas, 1), 0)
FROM
crdb_internal.ranges_no_leases
WHERE
table_name = 'test'`
var voters, nonVoters int
if err := db.QueryRowContext(ctx, q).Scan(&voters, &nonVoters); err != nil {
t.l.Printf("retrying: %v\n", err)
return err
}

var ok bool
if survival == zone {
// Expect 3 voting replicas and 2 non-voting replicas.
ok = voters == 3 && nonVoters == 2
} else {
// Expect 5 voting replicas and 0 non-voting replicas.
ok = voters == 5 && nonVoters == 0
}
if !ok {
return errors.Newf("rebalancing not complete")
}
return nil
}))

const rows = 100
const concurrency = 32
sem := make(chan struct{}, concurrency)
Expand All @@ -105,7 +207,7 @@ func runFollowerReadsTest(ctx context.Context, t *test, c *cluster) {
}
}
// chooseKV picks a random key-value pair by exploiting the pseudo-random
// ordering of keys when traversing a map with in a range statement.
// ordering of keys when traversing a map within a range statement.
chooseKV := func() (k int, v int64) {
for k, v = range data {
return k, v
Expand All @@ -115,8 +217,18 @@ func runFollowerReadsTest(ctx context.Context, t *test, c *cluster) {
verifySelect := func(ctx context.Context, node, k int, expectedVal int64) func() error {
return func() error {
nodeDB := conns[node-1]
r := nodeDB.QueryRowContext(ctx, "SELECT v FROM test.test AS OF SYSTEM "+
"TIME follower_read_timestamp() WHERE k = $1", k)
var aost string
if locality == regional {
// For REGIONAL tables, only stale reads can be served off
// followers.
aost = "AS OF SYSTEM TIME follower_read_timestamp()"
} else {
// For GLOBAL tables, we can perfrom consistent reads and expect
// them to be served off followers.
aost = ""
}
q := fmt.Sprintf("SELECT v FROM test.test %s WHERE k = $1", aost)
r := nodeDB.QueryRowContext(ctx, q, k)
var got int64
if err := r.Scan(&got); err != nil {
// Ignore errors due to cancellation.
Expand Down Expand Up @@ -153,33 +265,50 @@ func runFollowerReadsTest(ctx context.Context, t *test, c *cluster) {
if err := g.Wait(); err != nil {
t.Fatalf("failed to insert data: %v", err)
}
// Wait for follower_timestamp() historical reads to have data.
followerReadDuration, err := computeFollowerReadDuration(ctx, db)
if err != nil {
t.Fatalf("failed to compute follower read duration: %v", err)
}
select {
case <-time.After(followerReadDuration):
case <-ctx.Done():
t.Fatalf("context canceled: %v", ctx.Err())

if locality == regional {
// For REGIONAL tables, wait for follower_read_timestamp() historical
// reads to have data. For GLOBAL tables, this isn't needed, because
// we will be reading consistently (non-stale).
followerReadDuration, err := computeFollowerReadDuration(ctx, db)
if err != nil {
t.Fatalf("failed to compute follower read duration: %v", err)
}
select {
case <-time.After(followerReadDuration):
case <-ctx.Done():
t.Fatalf("context canceled: %v", ctx.Err())
}
}

// Read the follower read counts before issuing the follower reads to observe
// the delta and protect from follower reads which might have happened due to
// system queries.
followerReadsBefore, err := getFollowerReadCounts(ctx, c)
if err != nil {
t.Fatalf("failed to get follower read counts: %v", err)
}
// Perform reads at follower_timestamp() and ensure we get the expected value.
// Perform reads on each node and ensure we get the expected value. Do so a
// few times on each follower to give caches time to warm up.
g, gCtx = errgroup.WithContext(ctx)
k, v := chooseKV()
for i := 1; i <= c.spec.NodeCount; i++ {
g.Go(verifySelect(gCtx, i, k, v))
fn := verifySelect(gCtx, i, k, v)
g.Go(func() error {
for j := 0; j < 100; j++ {
if err := fn(); err != nil {
return err
}
}
return nil
})
}
if err := g.Wait(); err != nil {
t.Fatalf("error verifying node values: %v", err)
}
// Verify that the follower read count increments on at least two nodes.
// Verify that the follower read count increments on at least two nodes -
// which we expect to be in the non-primary regions.
expNodesToSeeFollowerReads := 2
followerReadsAfter, err := getFollowerReadCounts(ctx, c)
if err != nil {
t.Fatalf("failed to get follower read counts: %v", err)
Expand All @@ -190,9 +319,9 @@ func runFollowerReadsTest(ctx context.Context, t *test, c *cluster) {
nodesWhichSawFollowerReads++
}
}
if nodesWhichSawFollowerReads < 2 {
t.Fatalf("fewer than 2 follower reads occurred: saw %v before and %v after",
followerReadsBefore, followerReadsAfter)
if nodesWhichSawFollowerReads < expNodesToSeeFollowerReads {
t.Fatalf("fewer than %v follower reads occurred: saw %v before and %v after",
expNodesToSeeFollowerReads, followerReadsBefore, followerReadsAfter)
}
// Run reads for 3m which given the metrics window of 10s should guarantee
// that the most recent SQL latency time series data should relate to at least
Expand Down