Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
38792: roachtest: add kv test to assert against range lookup regressions r=jeffrey-xiao a=jeffrey-xiao

Adds two kv50 roachtests to detect regressions in the number of range lookups done in distsender. The two tests added are the following workloads:

1. Split workload: two workers splitting random ranges.
2. Relocate workload: four workers changing the replicas of a range to
   a random subset of nodes.

These workloads consistently invalidate range descriptors stored in the range descriptor cache.

Release note: None

Co-authored-by: Jeffrey Xiao <[email protected]>
  • Loading branch information
craig[bot] and jeffrey-xiao committed Jul 16, 2019
2 parents 9c17c1b + 052e9e0 commit 862b79a
Show file tree
Hide file tree
Showing 3 changed files with 224 additions and 26 deletions.
133 changes: 133 additions & 0 deletions pkg/cmd/roachtest/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@ package main

import (
"context"
gosql "database/sql"
"fmt"
"math/rand"
"net/http"
"strconv"
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/ts/tspb"
"github.com/cockroachdb/cockroach/pkg/util/httputil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down Expand Up @@ -496,3 +499,133 @@ func registerKVScalability(r *testRegistry) {
}
}
}

func registerKVRangeLookups(r *testRegistry) {
type rangeLookupWorkloadType int
const (
splitWorkload rangeLookupWorkloadType = iota
relocateWorkload
)

const (
nodes = 8
cpus = 8
)

runRangeLookups := func(ctx context.Context, t *test, c *cluster, workers int, workloadType rangeLookupWorkloadType, maximumRangeLookupsPerSec float64) {
nodes := c.spec.NodeCount - 1
doneInit := make(chan struct{})
doneWorkload := make(chan struct{})
c.Put(ctx, cockroach, "./cockroach", c.Range(1, nodes))
c.Put(ctx, workload, "./workload", c.Node(nodes+1))
c.Start(ctx, t, c.Range(1, nodes))

t.Status("running workload")

conns := make([]*gosql.DB, nodes)
for i := 0; i < nodes; i++ {
conns[i] = c.Conn(ctx, i+1)
}
defer func() {
for i := 0; i < nodes; i++ {
conns[i].Close()
}
}()
waitForFullReplication(t, conns[0])

m := newMonitor(ctx, c, c.Range(1, nodes))
m.Go(func(ctx context.Context) error {
defer close(doneWorkload)
cmd := fmt.Sprintf("./workload init kv {pgurl:1-%d}", nodes)
c.Run(ctx, c.Node(nodes+1), cmd)
close(doneInit)
concurrency := ifLocal("", " --concurrency="+fmt.Sprint(nodes*64))
splits := " --splits=1000"
duration := " --duration=" + ifLocal("10s", "10m")
readPercent := " --read-percent=50"
// We run kv with --tolerate-errors, since the relocate workload is
// expected to create `result is ambiguous (removing replica)` errors.
cmd = fmt.Sprintf("./workload run kv --tolerate-errors"+
concurrency+splits+duration+readPercent+
" {pgurl:1-%d}", nodes)
start := timeutil.Now()
c.Run(ctx, c.Node(nodes+1), cmd)
end := timeutil.Now()
verifyLookupsPerSec(ctx, c, t, c.Node(1), start, end, maximumRangeLookupsPerSec)
return nil
})

<-doneInit
for i := 0; i < workers; i++ {
m.Go(func(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-doneWorkload:
return nil
default:
}

conn := conns[c.Range(1, nodes).randNode()[0]-1]
switch workloadType {
case splitWorkload:
_, err := conn.ExecContext(ctx, `
ALTER TABLE
kv.kv
SPLIT AT
VALUES (CAST(floor(random() * 9223372036854775808) AS INT))
WITH EXPIRATION '1s'
`)
if err != nil && !pgerror.IsSQLRetryableError(err) {
return err
}
case relocateWorkload:
newReplicas := rand.Perm(nodes)[:3]
_, err := conn.ExecContext(ctx, `
ALTER TABLE
kv.kv
EXPERIMENTAL_RELOCATE
SELECT ARRAY[$1, $2, $3], CAST(floor(random() * 9223372036854775808) AS INT)
`, newReplicas[0]+1, newReplicas[1]+1, newReplicas[2]+1)
if err != nil && !pgerror.IsSQLRetryableError(err) && !isExpectedRelocateError(err) {
return err
}
default:
panic("unexpected")
}
}
})
}
m.Wait()
}
for _, item := range []struct {
workers int
workloadType rangeLookupWorkloadType
maximumRangeLookupsPerSec float64
}{
{2, splitWorkload, 10.0},
// Relocates are expected to fail periodically when relocating random
// ranges, so use more workers.
{4, relocateWorkload, 50.0},
} {
// For use in closure.
item := item
var workloadName string
switch item.workloadType {
case splitWorkload:
workloadName = "split"
case relocateWorkload:
workloadName = "relocate"
default:
panic("unexpected")
}
r.Add(testSpec{
Name: fmt.Sprintf("kv50/rangelookups/%s/nodes=%d", workloadName, nodes),
Cluster: makeClusterSpec(nodes+1, cpu(cpus)),
Run: func(ctx context.Context, t *test, c *cluster) {
runRangeLookups(ctx, t, c, item.workers, item.workloadType, item.maximumRangeLookupsPerSec)
},
})
}
}
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func registerTests(r *testRegistry) {
registerKVGracefulDraining(r)
registerKVScalability(r)
registerKVSplits(r)
registerKVRangeLookups(r)
registerLargeRange(r)
registerLedger(r)
registerNetwork(r)
Expand Down
116 changes: 90 additions & 26 deletions pkg/cmd/roachtest/ts_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,44 +19,81 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/httputil"
)

func verifyTxnPerSecond(
ctx context.Context,
c *cluster,
t *test,
adminNode nodeListOption,
start, end time.Time,
txnTarget, maxPercentTimeUnderTarget float64,
) {
// Query needed information over the timespan of the query.
adminURLs := c.ExternalAdminUIAddr(ctx, adminNode)
url := "http://" + adminURLs[0] + "/ts/query"
// tsQueryType represents the type of the time series query to retrieve. In
// most cases, tests are verifying either the "total" or "rate" metrics, so
// this enum type simplifies the API of tspb.Query.
type tsQueryType int

const (
// total indicates to query the total of the metric. Specifically,
// downsampler will be average, aggregator will be sum, and derivative will
// be none.
total tsQueryType = iota
// rate indicates to query the rate of change of the metric. Specifically,
// downsampler will be average, aggregator will be sum, and derivative will
// be non-negative derivative.
rate
)

type tsQuery struct {
name string
queryType tsQueryType
}

func getMetrics(
t *test, adminURL string, start, end time.Time, tsQueries []tsQuery,
) tspb.TimeSeriesQueryResponse {
url := "http://" + adminURL + "/ts/query"
queries := make([]tspb.Query, len(tsQueries))
for i := 0; i < len(tsQueries); i++ {
switch tsQueries[i].queryType {
case total:
queries[i] = tspb.Query{
Name: tsQueries[i].name,
Downsampler: tspb.TimeSeriesQueryAggregator_AVG.Enum(),
SourceAggregator: tspb.TimeSeriesQueryAggregator_SUM.Enum(),
}
case rate:
queries[i] = tspb.Query{
Name: tsQueries[i].name,
Downsampler: tspb.TimeSeriesQueryAggregator_AVG.Enum(),
SourceAggregator: tspb.TimeSeriesQueryAggregator_SUM.Enum(),
Derivative: tspb.TimeSeriesQueryDerivative_NON_NEGATIVE_DERIVATIVE.Enum(),
}
default:
panic("unexpected")
}
}
request := tspb.TimeSeriesQueryRequest{
StartNanos: start.UnixNano(),
EndNanos: end.UnixNano(),
// Ask for one minute intervals. We can't just ask for the whole hour
// because the time series query system does not support downsampling
// offsets.
SampleNanos: (1 * time.Minute).Nanoseconds(),
Queries: []tspb.Query{
{
Name: "cr.node.txn.commits",
Downsampler: tspb.TimeSeriesQueryAggregator_AVG.Enum(),
SourceAggregator: tspb.TimeSeriesQueryAggregator_SUM.Enum(),
Derivative: tspb.TimeSeriesQueryDerivative_NON_NEGATIVE_DERIVATIVE.Enum(),
},
// Query *without* the derivative applied so we can get a total count of
// txns over the time period.
{
Name: "cr.node.txn.commits",
Downsampler: tspb.TimeSeriesQueryAggregator_AVG.Enum(),
SourceAggregator: tspb.TimeSeriesQueryAggregator_SUM.Enum(),
},
},
Queries: queries,
}
var response tspb.TimeSeriesQueryResponse
if err := httputil.PostJSON(http.Client{}, url, &request, &response); err != nil {
t.Fatal(err)
}
return response
}

func verifyTxnPerSecond(
ctx context.Context,
c *cluster,
t *test,
adminNode nodeListOption,
start, end time.Time,
txnTarget, maxPercentTimeUnderTarget float64,
) {
// Query needed information over the timespan of the query.
adminURL := c.ExternalAdminUIAddr(ctx, adminNode)[0]
response := getMetrics(t, adminURL, start, end, []tsQuery{
{name: "cr.node.txn.commits", queryType: total},
{name: "cr.node.txn.commits", queryType: rate},
})

// Drop the first two minutes of datapoints as a "ramp-up" period.
perMinute := response.Results[0].Datapoints[2:]
Expand Down Expand Up @@ -89,3 +126,30 @@ func verifyTxnPerSecond(
t.l.Printf("spent %f%% of time below target of %f txn/s", perc*100, txnTarget)
}
}

func verifyLookupsPerSec(
ctx context.Context,
c *cluster,
t *test,
adminNode nodeListOption,
start, end time.Time,
rangeLookupsTarget float64,
) {
// Query needed information over the timespan of the query.
adminURL := c.ExternalAdminUIAddr(ctx, adminNode)[0]
response := getMetrics(t, adminURL, start, end, []tsQuery{
{name: "cr.node.distsender.rangelookups", queryType: rate},
})

// Drop the first two minutes of datapoints as a "ramp-up" period.
perMinute := response.Results[0].Datapoints[2:]

// Verify that each individual one minute periods were below the target.
for _, dp := range perMinute {
if dp.Value > rangeLookupsTarget {
t.Fatalf("Found minute interval with %f lookup/sec above target of %f lookup/sec\n", dp.Value, rangeLookupsTarget)
} else {
t.l.Printf("Found minute interval with %f lookup/sec\n", dp.Value)
}
}
}

0 comments on commit 862b79a

Please sign in to comment.