From 052e9e0c9c074f859a2c625affc15fa6d57599e3 Mon Sep 17 00:00:00 2001 From: Jeffrey Xiao Date: Mon, 8 Jul 2019 18:16:57 -0400 Subject: [PATCH] roachtest: add kv test to assert against range lookup regressions Adds two kv50 roachtests to detect regressions in the number of range lookups done in distsender. The two tests added are the following workloads: 1. Split workload: two workers splitting random ranges. 2. Relocate workload: four workers changing the replicas of a range to a random subset of nodes. These workloads consistently invalidate range descriptors stored in the range descriptor cache. Release note: None --- pkg/cmd/roachtest/kv.go | 133 ++++++++++++++++++++++++++++++++++ pkg/cmd/roachtest/registry.go | 1 + pkg/cmd/roachtest/ts_util.go | 116 ++++++++++++++++++++++------- 3 files changed, 224 insertions(+), 26 deletions(-) diff --git a/pkg/cmd/roachtest/kv.go b/pkg/cmd/roachtest/kv.go index 7cd32b7544f8..9f8055c126f8 100644 --- a/pkg/cmd/roachtest/kv.go +++ b/pkg/cmd/roachtest/kv.go @@ -12,13 +12,16 @@ package main import ( "context" + gosql "database/sql" "fmt" + "math/rand" "net/http" "strconv" "strings" "time" "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/ts/tspb" "github.com/cockroachdb/cockroach/pkg/util/httputil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -498,3 +501,133 @@ func registerKVScalability(r *testRegistry) { } } } + +func registerKVRangeLookups(r *testRegistry) { + type rangeLookupWorkloadType int + const ( + splitWorkload rangeLookupWorkloadType = iota + relocateWorkload + ) + + const ( + nodes = 8 + cpus = 8 + ) + + runRangeLookups := func(ctx context.Context, t *test, c *cluster, workers int, workloadType rangeLookupWorkloadType, maximumRangeLookupsPerSec float64) { + nodes := c.spec.NodeCount - 1 + doneInit := make(chan struct{}) + doneWorkload := make(chan struct{}) + c.Put(ctx, cockroach, "./cockroach", c.Range(1, nodes)) + c.Put(ctx, workload, "./workload", c.Node(nodes+1)) + c.Start(ctx, t, c.Range(1, nodes)) + + t.Status("running workload") + + conns := make([]*gosql.DB, nodes) + for i := 0; i < nodes; i++ { + conns[i] = c.Conn(ctx, i+1) + } + defer func() { + for i := 0; i < nodes; i++ { + conns[i].Close() + } + }() + waitForFullReplication(t, conns[0]) + + m := newMonitor(ctx, c, c.Range(1, nodes)) + m.Go(func(ctx context.Context) error { + defer close(doneWorkload) + cmd := fmt.Sprintf("./workload init kv {pgurl:1-%d}", nodes) + c.Run(ctx, c.Node(nodes+1), cmd) + close(doneInit) + concurrency := ifLocal("", " --concurrency="+fmt.Sprint(nodes*64)) + splits := " --splits=1000" + duration := " --duration=" + ifLocal("10s", "10m") + readPercent := " --read-percent=50" + // We run kv with --tolerate-errors, since the relocate workload is + // expected to create `result is ambiguous (removing replica)` errors. + cmd = fmt.Sprintf("./workload run kv --tolerate-errors"+ + concurrency+splits+duration+readPercent+ + " {pgurl:1-%d}", nodes) + start := timeutil.Now() + c.Run(ctx, c.Node(nodes+1), cmd) + end := timeutil.Now() + verifyLookupsPerSec(ctx, c, t, c.Node(1), start, end, maximumRangeLookupsPerSec) + return nil + }) + + <-doneInit + for i := 0; i < workers; i++ { + m.Go(func(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-doneWorkload: + return nil + default: + } + + conn := conns[c.Range(1, nodes).randNode()[0]-1] + switch workloadType { + case splitWorkload: + _, err := conn.ExecContext(ctx, ` + ALTER TABLE + kv.kv + SPLIT AT + VALUES (CAST(floor(random() * 9223372036854775808) AS INT)) + WITH EXPIRATION '1s' + `) + if err != nil && !pgerror.IsSQLRetryableError(err) { + return err + } + case relocateWorkload: + newReplicas := rand.Perm(nodes)[:3] + _, err := conn.ExecContext(ctx, ` + ALTER TABLE + kv.kv + EXPERIMENTAL_RELOCATE + SELECT ARRAY[$1, $2, $3], CAST(floor(random() * 9223372036854775808) AS INT) + `, newReplicas[0]+1, newReplicas[1]+1, newReplicas[2]+1) + if err != nil && !pgerror.IsSQLRetryableError(err) && !isExpectedRelocateError(err) { + return err + } + default: + panic("unexpected") + } + } + }) + } + m.Wait() + } + for _, item := range []struct { + workers int + workloadType rangeLookupWorkloadType + maximumRangeLookupsPerSec float64 + }{ + {2, splitWorkload, 10.0}, + // Relocates are expected to fail periodically when relocating random + // ranges, so use more workers. + {4, relocateWorkload, 50.0}, + } { + // For use in closure. + item := item + var workloadName string + switch item.workloadType { + case splitWorkload: + workloadName = "split" + case relocateWorkload: + workloadName = "relocate" + default: + panic("unexpected") + } + r.Add(testSpec{ + Name: fmt.Sprintf("kv50/rangelookups/%s/nodes=%d", workloadName, nodes), + Cluster: makeClusterSpec(nodes+1, cpu(cpus)), + Run: func(ctx context.Context, t *test, c *cluster) { + runRangeLookups(ctx, t, c, item.workers, item.workloadType, item.maximumRangeLookupsPerSec) + }, + }) + } +} diff --git a/pkg/cmd/roachtest/registry.go b/pkg/cmd/roachtest/registry.go index fb4936916a28..2033327875d0 100644 --- a/pkg/cmd/roachtest/registry.go +++ b/pkg/cmd/roachtest/registry.go @@ -46,6 +46,7 @@ func registerTests(r *testRegistry) { registerKVGracefulDraining(r) registerKVScalability(r) registerKVSplits(r) + registerKVRangeLookups(r) registerLargeRange(r) registerLedger(r) registerNetwork(r) diff --git a/pkg/cmd/roachtest/ts_util.go b/pkg/cmd/roachtest/ts_util.go index 84d06bf65d5f..b954410d0bc5 100644 --- a/pkg/cmd/roachtest/ts_util.go +++ b/pkg/cmd/roachtest/ts_util.go @@ -19,17 +19,51 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/httputil" ) -func verifyTxnPerSecond( - ctx context.Context, - c *cluster, - t *test, - adminNode nodeListOption, - start, end time.Time, - txnTarget, maxPercentTimeUnderTarget float64, -) { - // Query needed information over the timespan of the query. - adminURLs := c.ExternalAdminUIAddr(ctx, adminNode) - url := "http://" + adminURLs[0] + "/ts/query" +// tsQueryType represents the type of the time series query to retrieve. In +// most cases, tests are verifying either the "total" or "rate" metrics, so +// this enum type simplifies the API of tspb.Query. +type tsQueryType int + +const ( + // total indicates to query the total of the metric. Specifically, + // downsampler will be average, aggregator will be sum, and derivative will + // be none. + total tsQueryType = iota + // rate indicates to query the rate of change of the metric. Specifically, + // downsampler will be average, aggregator will be sum, and derivative will + // be non-negative derivative. + rate +) + +type tsQuery struct { + name string + queryType tsQueryType +} + +func getMetrics( + t *test, adminURL string, start, end time.Time, tsQueries []tsQuery, +) tspb.TimeSeriesQueryResponse { + url := "http://" + adminURL + "/ts/query" + queries := make([]tspb.Query, len(tsQueries)) + for i := 0; i < len(tsQueries); i++ { + switch tsQueries[i].queryType { + case total: + queries[i] = tspb.Query{ + Name: tsQueries[i].name, + Downsampler: tspb.TimeSeriesQueryAggregator_AVG.Enum(), + SourceAggregator: tspb.TimeSeriesQueryAggregator_SUM.Enum(), + } + case rate: + queries[i] = tspb.Query{ + Name: tsQueries[i].name, + Downsampler: tspb.TimeSeriesQueryAggregator_AVG.Enum(), + SourceAggregator: tspb.TimeSeriesQueryAggregator_SUM.Enum(), + Derivative: tspb.TimeSeriesQueryDerivative_NON_NEGATIVE_DERIVATIVE.Enum(), + } + default: + panic("unexpected") + } + } request := tspb.TimeSeriesQueryRequest{ StartNanos: start.UnixNano(), EndNanos: end.UnixNano(), @@ -37,26 +71,29 @@ func verifyTxnPerSecond( // because the time series query system does not support downsampling // offsets. SampleNanos: (1 * time.Minute).Nanoseconds(), - Queries: []tspb.Query{ - { - Name: "cr.node.txn.commits", - Downsampler: tspb.TimeSeriesQueryAggregator_AVG.Enum(), - SourceAggregator: tspb.TimeSeriesQueryAggregator_SUM.Enum(), - Derivative: tspb.TimeSeriesQueryDerivative_NON_NEGATIVE_DERIVATIVE.Enum(), - }, - // Query *without* the derivative applied so we can get a total count of - // txns over the time period. - { - Name: "cr.node.txn.commits", - Downsampler: tspb.TimeSeriesQueryAggregator_AVG.Enum(), - SourceAggregator: tspb.TimeSeriesQueryAggregator_SUM.Enum(), - }, - }, + Queries: queries, } var response tspb.TimeSeriesQueryResponse if err := httputil.PostJSON(http.Client{}, url, &request, &response); err != nil { t.Fatal(err) } + return response +} + +func verifyTxnPerSecond( + ctx context.Context, + c *cluster, + t *test, + adminNode nodeListOption, + start, end time.Time, + txnTarget, maxPercentTimeUnderTarget float64, +) { + // Query needed information over the timespan of the query. + adminURL := c.ExternalAdminUIAddr(ctx, adminNode)[0] + response := getMetrics(t, adminURL, start, end, []tsQuery{ + {name: "cr.node.txn.commits", queryType: total}, + {name: "cr.node.txn.commits", queryType: rate}, + }) // Drop the first two minutes of datapoints as a "ramp-up" period. perMinute := response.Results[0].Datapoints[2:] @@ -89,3 +126,30 @@ func verifyTxnPerSecond( t.l.Printf("spent %f%% of time below target of %f txn/s", perc*100, txnTarget) } } + +func verifyLookupsPerSec( + ctx context.Context, + c *cluster, + t *test, + adminNode nodeListOption, + start, end time.Time, + rangeLookupsTarget float64, +) { + // Query needed information over the timespan of the query. + adminURL := c.ExternalAdminUIAddr(ctx, adminNode)[0] + response := getMetrics(t, adminURL, start, end, []tsQuery{ + {name: "cr.node.distsender.rangelookups", queryType: rate}, + }) + + // Drop the first two minutes of datapoints as a "ramp-up" period. + perMinute := response.Results[0].Datapoints[2:] + + // Verify that each individual one minute periods were below the target. + for _, dp := range perMinute { + if dp.Value > rangeLookupsTarget { + t.Fatalf("Found minute interval with %f lookup/sec above target of %f lookup/sec\n", dp.Value, rangeLookupsTarget) + } else { + t.l.Printf("Found minute interval with %f lookup/sec\n", dp.Value) + } + } +}