diff --git a/pkg/cmd/roachtest/tests/kv.go b/pkg/cmd/roachtest/tests/kv.go index 7533ad7481d8..b942fa5735eb 100644 --- a/pkg/cmd/roachtest/tests/kv.go +++ b/pkg/cmd/roachtest/tests/kv.go @@ -15,14 +15,13 @@ import ( gosql "database/sql" "fmt" "math/rand" - "net/http" "os" "strconv" "strings" "time" - "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/clusterstats" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec" @@ -30,8 +29,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachprod/install" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" - "github.com/cockroachdb/cockroach/pkg/ts/tspb" - "github.com/cockroachdb/cockroach/pkg/util/httputil" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" @@ -465,6 +462,9 @@ func registerKVGracefulDraining(r registry.Registry) { startOpts.RoachprodOpts.ExtraArgs = append(startOpts.RoachprodOpts.ExtraArgs, "--vmodule=store=2,store_rebalancer=2") c.Start(ctx, t.L(), startOpts, install.MakeClusterSettings(), c.Range(1, nodes)) + collector, cleanupFunc := setupStatCollector(ctx, t, c, c.Node(nodes+1), c.Range(1, nodes)) + defer cleanupFunc(ctx) + db := c.Conn(ctx, t.L(), 1) defer db.Close() @@ -511,37 +511,21 @@ func registerKVGracefulDraining(r registry.Registry) { t.WorkerStatus("waiting for perf to stabilize") // Before we start shutting down nodes, wait for the performance // of the workload to stabilize at the expected allowed level. - - adminURLs, err := c.ExternalAdminUIAddr(ctx, t.L(), c.Node(1)) - if err != nil { - return err - } - url := "http://" + adminURLs[0] + "/ts/query" - getQPSTimeSeries := func(start, end time.Time) ([]tspb.TimeSeriesDatapoint, error) { - request := tspb.TimeSeriesQueryRequest{ - StartNanos: start.UnixNano(), - EndNanos: end.UnixNano(), - // Check the performance in each timeseries sample interval. - SampleNanos: base.DefaultMetricsSampleInterval.Nanoseconds(), - Queries: []tspb.Query{ - { - Name: "cr.node.sql.query.count", - Downsampler: tspb.TimeSeriesQueryAggregator_AVG.Enum(), - SourceAggregator: tspb.TimeSeriesQueryAggregator_SUM.Enum(), - Derivative: tspb.TimeSeriesQueryDerivative_NON_NEGATIVE_DERIVATIVE.Enum(), - }, - }, - } - var response tspb.TimeSeriesQueryResponse - if err := httputil.PostJSON(http.Client{}, url, &request, &response); err != nil { + getQPSTimeSeries := func(start, end time.Time) (clusterstats.StatSeries, error) { + taggedSeries, err := collector.CollectInterval(ctx, t.L(), + clusterstats.Interval{From: start, To: end}, "sum(rate(sql_query_count[30s]))") + if err != nil { return nil, err } - if len(response.Results[0].Datapoints) <= 1 { - return nil, errors.Newf("not enough datapoints in timeseries query response: %+v", response) + + // We know there should be exactly one series because of the `sum` + // aggregation. + if ts := clusterstats.GetOneSeries(taggedSeries); ts != nil { + return ts, nil } - return response.Results[0].Datapoints, nil - } + return nil, errors.Newf("Unexpected return value when querying stats %v", taggedSeries) + } waitBegin := timeutil.Now() // Nb: we could want to use testutil.SucceedSoonError() here, // however that has a hardcoded timeout of 45 seconds, and @@ -567,7 +551,7 @@ func registerKVGracefulDraining(r registry.Registry) { if qps := dp.Value; qps < expectedQPS { return errors.Newf( "QPS of %.2f at time %v is below minimum allowable QPS of %.2f; entire timeseries: %+v", - qps, timeutil.Unix(0, dp.TimestampNanos), expectedQPS, datapoints) + qps, timeutil.Unix(0, dp.Time), expectedQPS, datapoints) } // The desired performance has been reached by the @@ -643,7 +627,7 @@ func registerKVGracefulDraining(r registry.Registry) { if qps := dp.Value; qps < expectedQPS { t.Fatalf( "QPS of %.2f at time %v is below minimum allowable QPS of %.2f; entire timeseries: %+v", - qps, timeutil.Unix(0, dp.TimestampNanos), expectedQPS, datapoints) + qps, timeutil.Unix(0, dp.Time), expectedQPS, datapoints) } } t.Status("perf is OK!")