From 9d142accabbdae1008a160c5d4571d03da4a9f72 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Mon, 13 Mar 2023 09:32:15 -0400 Subject: [PATCH] server: extend span statistics endpoint Extends: #96223 This PR extends the implementation of our SpanStats RPC endpoint to fetch stats for multiple spans at once. By extending the endpoint, we amortize the cost of the RPC's node fanout across all requested spans, whereas previously, we were issuing a fanout per span requested. Additionally, this change batches KV layer requests for ranges fully contained by the span, instead of issuing a request per fully contained range. Release note (sql change): span statistics are unavailable on mixed-version clusters --- .../serverccl/statusccl/tenant_status_test.go | 120 +++++++-- pkg/cmd/roachtest/registry/owners.go | 1 + pkg/cmd/roachtest/tests/BUILD.bazel | 1 + .../tests/mixed_version_tenant_span_stats.go | 242 ++++++++++++++++++ pkg/cmd/roachtest/tests/registry.go | 1 + pkg/roachpb/BUILD.bazel | 2 + pkg/roachpb/span_stats.go | 39 +++ pkg/roachpb/span_stats.proto | 22 +- pkg/rpc/auth_tenant.go | 11 +- pkg/server/admin.go | 11 +- pkg/server/span_stats_server.go | 206 ++++++++++----- pkg/server/span_stats_test.go | 190 ++++++++++++-- pkg/server/status.go | 13 + pkg/server/status_test.go | 24 +- pkg/sql/faketreeeval/evalctx.go | 2 +- pkg/sql/planner.go | 7 +- pkg/sql/sem/builtins/generator_builtins.go | 134 +++++++--- pkg/sql/sem/eval/deps.go | 2 +- 18 files changed, 871 insertions(+), 157 deletions(-) create mode 100644 pkg/cmd/roachtest/tests/mixed_version_tenant_span_stats.go create mode 100644 pkg/roachpb/span_stats.go diff --git a/pkg/ccl/serverccl/statusccl/tenant_status_test.go b/pkg/ccl/serverccl/statusccl/tenant_status_test.go index 1492cb73ec58..95f59a2e998e 100644 --- a/pkg/ccl/serverccl/statusccl/tenant_status_test.go +++ b/pkg/ccl/serverccl/statusccl/tenant_status_test.go @@ -149,9 +149,8 @@ func testTenantSpanStats(ctx context.Context, t *testing.T, helper serverccl.Ten t.Run("test tenant permissioning", func(t *testing.T) { req := roachpb.SpanStatsRequest{ - NodeID: "0", - StartKey: roachpb.RKey(aSpan.Key), - EndKey: roachpb.RKey(aSpan.EndKey), + NodeID: "0", + Spans: []roachpb.Span{aSpan}, } resp := roachpb.SpanStatsResponse{} @@ -162,19 +161,44 @@ func testTenantSpanStats(ctx context.Context, t *testing.T, helper serverccl.Ten adminClient := helper.TestCluster().TenantHTTPClient(t, 1, true) adminClient.PostJSON("/_status/span", &req, &resp) - require.Greaterf(t, resp.RangeCount, int32(0), "postive range count") + require.Greaterf(t, resp.SpanToStats[aSpan.String()].RangeCount, int32(0), "positive range count") }) t.Run("test tenant isolation", func(t *testing.T) { _, err := tenantA.TenantStatusSrv().(serverpb.TenantStatusServer).SpanStats(ctx, &roachpb.SpanStatsRequest{ - NodeID: "0", // 0 indicates we want stats from all nodes. - StartKey: roachpb.RKey(bSpan.Key), - EndKey: roachpb.RKey(bSpan.EndKey), + NodeID: "0", // 0 indicates we want stats from all nodes. + Spans: []roachpb.Span{bSpan}, }) require.Error(t, err) }) + t.Run("test invalid request payload", func(t *testing.T) { + _, err := tenantA.TenantStatusSrv().(serverpb.TenantStatusServer).SpanStats(ctx, + &roachpb.SpanStatsRequest{ + NodeID: "0", // 0 indicates we want stats from all nodes. + StartKey: roachpb.RKey(aSpan.Key), + EndKey: roachpb.RKey(aSpan.EndKey), + }) + require.ErrorContains(t, err, `span stats request - unexpected populated legacy fields (StartKey, EndKey)`) + }) + + t.Run("test exceed span request limit", func(t *testing.T) { + // Set the span batch limit to 1. + _, err := helper.HostCluster().ServerConn(0).Exec(`SET CLUSTER SETTING server.span_stats.span_batch_limit = 1`) + require.NoError(t, err) + _, err = tenantA.TenantStatusSrv().(serverpb.TenantStatusServer).SpanStats(ctx, + &roachpb.SpanStatsRequest{ + NodeID: "0", // 0 indicates we want stats from all nodes. + Spans: []roachpb.Span{aSpan, aSpan}, + }) + require.ErrorContains(t, err, `error getting span statistics - number of spans in request payload (2) exceeds`+ + ` 'server.span_stats.span_batch_limit' cluster setting limit (1)`) + // Reset the span batch limit to default. + _, err = helper.HostCluster().ServerConn(0).Exec(`SET CLUSTER SETTING server.span_stats.span_batch_limit = $1`, roachpb.DefaultSpanStatsSpanLimit) + require.NoError(t, err) + }) + t.Run("test KV node fan-out", func(t *testing.T) { _, tID, err := keys.DecodeTenantPrefix(aSpan.Key) require.NoError(t, err) @@ -186,9 +210,8 @@ func testTenantSpanStats(ctx context.Context, t *testing.T, helper serverccl.Ten controlStats, err := tenantA.TenantStatusSrv().(serverpb.TenantStatusServer).SpanStats(ctx, &roachpb.SpanStatsRequest{ - NodeID: "0", // 0 indicates we want stats from all nodes. - StartKey: roachpb.RKey(aSpan.Key), - EndKey: roachpb.RKey(aSpan.EndKey), + NodeID: "0", // 0 indicates we want stats from all nodes. + Spans: []roachpb.Span{aSpan}, }) require.NoError(t, err) @@ -212,14 +235,81 @@ func testTenantSpanStats(ctx context.Context, t *testing.T, helper serverccl.Ten stats, err := tenantA.TenantStatusSrv().(serverpb.TenantStatusServer).SpanStats(ctx, &roachpb.SpanStatsRequest{ - NodeID: "0", // 0 indicates we want stats from all nodes. - StartKey: roachpb.RKey(aSpan.Key), - EndKey: roachpb.RKey(aSpan.EndKey), + NodeID: "0", // 0 indicates we want stats from all nodes. + Spans: []roachpb.Span{aSpan}, }) require.NoError(t, err) - require.Equal(t, controlStats.RangeCount+1, stats.RangeCount) - require.Equal(t, controlStats.TotalStats.LiveCount+int64(len(incKeys)), stats.TotalStats.LiveCount) + + controlSpanStats := controlStats.SpanToStats[aSpan.String()] + testSpanStats := stats.SpanToStats[aSpan.String()] + require.Equal(t, controlSpanStats.RangeCount+1, testSpanStats.RangeCount) + require.Equal(t, controlSpanStats.TotalStats.LiveCount+int64(len(incKeys)), testSpanStats.TotalStats.LiveCount) + + // Make a multi-span call + type spanCase struct { + span roachpb.Span + expectedRangeCount int32 + expectedLiveCount int64 + } + spanCases := []spanCase{ + { + // "a", "b" - single range, single key + span: roachpb.Span{ + Key: makeKey(keys.MakeTenantPrefix(tID), []byte(incKeys[0])), + EndKey: makeKey(keys.MakeTenantPrefix(tID), []byte(incKeys[1])), + }, + expectedRangeCount: 1, + expectedLiveCount: 1, + }, + { + // "d", "f" - single range, multiple keys + span: roachpb.Span{ + Key: makeKey(keys.MakeTenantPrefix(tID), []byte(incKeys[3])), + EndKey: makeKey(keys.MakeTenantPrefix(tID), []byte(incKeys[5])), + }, + expectedRangeCount: 1, + expectedLiveCount: 2, + }, + { + // "bb", "e" - multiple ranges, multiple keys + span: roachpb.Span{ + Key: makeKey(keys.MakeTenantPrefix(tID), []byte(incKeys[2])), + EndKey: makeKey(keys.MakeTenantPrefix(tID), []byte(incKeys[4])), + }, + expectedRangeCount: 2, + expectedLiveCount: 2, + }, + + { + // "a", "d" - multiple ranges, multiple keys + span: roachpb.Span{ + Key: makeKey(keys.MakeTenantPrefix(tID), []byte(incKeys[0])), + EndKey: makeKey(keys.MakeTenantPrefix(tID), []byte(incKeys[3])), + }, + expectedRangeCount: 2, + expectedLiveCount: 3, + }, + } + + var spans []roachpb.Span + for _, sc := range spanCases { + spans = append(spans, sc.span) + } + + stats, err = tenantA.TenantStatusSrv().(serverpb.TenantStatusServer).SpanStats(ctx, + &roachpb.SpanStatsRequest{ + NodeID: "0", // 0 indicates we want stats from all nodes. + Spans: spans, + }) + + require.NoError(t, err) + // Check each span has their expected values. + for _, sc := range spanCases { + spanStats := stats.SpanToStats[sc.span.String()] + require.Equal(t, spanStats.RangeCount, sc.expectedRangeCount, fmt.Sprintf("mismatch on expected range count for span case with span %v", sc.span.String())) + require.Equal(t, spanStats.TotalStats.LiveCount, sc.expectedLiveCount, fmt.Sprintf("mismatch on expected live count for span case with span %v", sc.span.String())) + } }) } diff --git a/pkg/cmd/roachtest/registry/owners.go b/pkg/cmd/roachtest/registry/owners.go index 62db1eac17ce..5fdda3dd2a6c 100644 --- a/pkg/cmd/roachtest/registry/owners.go +++ b/pkg/cmd/roachtest/registry/owners.go @@ -30,4 +30,5 @@ const ( OwnerTestEng Owner = `test-eng` OwnerDevInf Owner = `dev-inf` OwnerMultiTenant Owner = `multi-tenant` + OwnerClusterObs Owner = `cluster-observability` ) diff --git a/pkg/cmd/roachtest/tests/BUILD.bazel b/pkg/cmd/roachtest/tests/BUILD.bazel index 74329cd5a953..7c3bb2eb7058 100644 --- a/pkg/cmd/roachtest/tests/BUILD.bazel +++ b/pkg/cmd/roachtest/tests/BUILD.bazel @@ -96,6 +96,7 @@ go_library( "mixed_version_job_compatibility_in_declarative_schema_changer.go", "mixed_version_jobs.go", "mixed_version_schemachange.go", + "mixed_version_tenant_span_stats.go", "multitenant.go", "multitenant_distsql.go", "multitenant_shared_process.go", diff --git a/pkg/cmd/roachtest/tests/mixed_version_tenant_span_stats.go b/pkg/cmd/roachtest/tests/mixed_version_tenant_span_stats.go new file mode 100644 index 000000000000..7a115a37c222 --- /dev/null +++ b/pkg/cmd/roachtest/tests/mixed_version_tenant_span_stats.go @@ -0,0 +1,242 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package tests + +import ( + "context" + "encoding/json" + "fmt" + "math/rand" + "strings" + "time" + + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/mixedversion" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/roachprod/install" + "github.com/cockroachdb/cockroach/pkg/roachprod/logger" + "github.com/cockroachdb/cockroach/pkg/util/version" + "github.com/cockroachdb/errors" +) + +func registerTenantSpanStatsMixedVersion(r registry.Registry) { + testSpec := registry.TestSpec{ + Name: "tenant-span-stats/mixed-version", + Owner: registry.OwnerClusterObs, + Cluster: r.MakeClusterSpec(4), + EncryptionSupport: registry.EncryptionMetamorphic, + Timeout: 30 * time.Minute, + Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { + v, err := version.Parse("v23.2.0") + if err != nil { + t.Fatalf("unable to parse version correctly %v", err) + } + if t.BuildVersion().AtLeast(v) { + t.Fatal("predecessor binary already includes RPC protobuf changes, this test is no longer needed -- remove me") + } + + mvt := mixedversion.NewTest(ctx, t, t.L(), c, c.All()) + + tableName := "test" + var startKey string + var endKey string + var res *install.RunResultDetails + var errOutput errorOutput + + mvt.OnStartup("fetch span stats - start", func(ctx context.Context, l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper) error { + // Create a test table. + err = h.Exec(rng, fmt.Sprintf(`CREATE TABLE %s (num int)`, tableName)) + if err != nil { + return err + } + // Get the table's span keys. + row := h.QueryRow(rng, + fmt.Sprintf(`SELECT crdb_internal.pretty_key(crdb_internal.table_span('%s'::regclass::oid::int)[1], 0)`, tableName)) + err = row.Scan(&startKey) + if err != nil { + return err + } + row = h.QueryRow(rng, + fmt.Sprintf(`SELECT crdb_internal.pretty_key(crdb_internal.table_span('%s'::regclass::oid::int)[2], 0)`, tableName)) + err = row.Scan(&endKey) + if err != nil { + return err + } + // Dial a node for span stats on startup. All nodes will be on the previous version, this is a sanity check. + l.Printf("Fetch span stats from node (start).") + res, err = fetchSpanStatsFromNode(ctx, l, c, c.Node(1), oldReqBody(2, startKey, endKey)) + if err != nil { + return err + } + // Ensure the result can be marshalled into a valid span stats response. + var spanStats roachpb.SpanStatsResponse + return json.Unmarshal([]byte(res.Stdout), &spanStats) + }) + + mvt.InMixedVersion("fetch span stats - mixed", func(ctx context.Context, l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper) error { + prevToCurrentError := "unable to service a mixed version request" + currentToPrevError := "An internal server error has occurred" + unknownFieldError := "unknown field" + + // If we have nodes in mixed versions. + if len(h.Context().FromVersionNodes) > 0 && len(h.Context().ToVersionNodes) > 0 { + prevVersNodeID := h.Context().FromVersionNodes[0] + currVersNodeID := h.Context().ToVersionNodes[0] + + // Fetch span stats from previous version node, dialing to a current version node. + l.Printf("Fetch span stats from previous version node (%v), dialing to a current version node (%v).", prevVersNodeID, currVersNodeID) + res, err = fetchSpanStatsFromNode(ctx, l, c, c.Node(prevVersNodeID), oldReqBody(currVersNodeID, startKey, endKey)) + if err != nil { + return err + } + // Expect an error in the stdout - mixed version error. + // Ensure the result can be marshalled into a valid error response. + err = json.Unmarshal([]byte(res.Stdout), &errOutput) + if err != nil { + return err + } + // Ensure we get the expected error. + expected := assertExpectedError(errOutput.Error, prevToCurrentError) + if !expected { + return errors.Newf("expected '%s' in error message, got: '%v'", prevToCurrentError, errOutput.Error) + } + + // Fetch span stats from current version node, dialing to a previous version node. + l.Printf("Fetch span stats from current version node (%v), dialing to a previous version node (%v).", currVersNodeID, prevVersNodeID) + res, err = fetchSpanStatsFromNode(ctx, l, c, c.Node(currVersNodeID), newReqBody(prevVersNodeID, startKey, endKey)) + if err != nil { + return err + } + // Expect an error in the stdout - mixed version error. + // Ensure the result can be marshalled into a valid error response. + err = json.Unmarshal([]byte(res.Stdout), &errOutput) + if err != nil { + return err + } + // Ensure we get the expected error. + expectedCurrToPrev := assertExpectedError(errOutput.Message, currentToPrevError) + expectedUnknown := assertExpectedError(errOutput.Message, unknownFieldError) + if !expectedCurrToPrev && !expectedUnknown { + return errors.Newf("expected '%s' or '%s' in error message, got: '%v'", currentToPrevError, unknownFieldError, errOutput.Error) + } + + // Fanout from current version node. + l.Printf("Fanout from current version node (mixed).") + res, err = fetchSpanStatsFromNode(ctx, l, c, c.Node(currVersNodeID), newReqBody(0, startKey, endKey)) + if err != nil { + return err + } + // Expect an error in the stdout - mixed version error. + // Ensure the result can be marshalled into a valid error response. + err = json.Unmarshal([]byte(res.Stdout), &errOutput) + if err != nil { + return err + } + // Ensure we get the expected error. + expectedCurrToPrev = assertExpectedError(errOutput.Message, currentToPrevError) + expectedUnknown = assertExpectedError(errOutput.Message, unknownFieldError) + if !expectedCurrToPrev && !expectedUnknown { + return errors.Newf("expected '%s' or '%s' in error message, got: '%v'", currentToPrevError, unknownFieldError, errOutput.Error) + } + } else { + // All nodes are on one version, but we're in mixed state (i.e. cluster version is on a different version) + var issueNodeID int + var dialNodeID int + // All nodes on current version + if len(h.Context().ToVersionNodes) == 4 { + issueNodeID = h.Context().ToVersionNodes[0] + dialNodeID = h.Context().ToVersionNodes[1] + } else { + // All nodes on previous version + issueNodeID = h.Context().FromVersionNodes[0] + dialNodeID = h.Context().FromVersionNodes[1] + } + // Dial a node for span stats. + l.Printf("Dial a node for span stats (different cluster version).") + res, err = fetchSpanStatsFromNode(ctx, l, c, c.Node(issueNodeID), newReqBody(dialNodeID, startKey, endKey)) + if err != nil { + return err + } + // Expect an error in the stdout - mixed version error. + // Ensure the result can be marshalled into a valid error response. + err = json.Unmarshal([]byte(res.Stdout), &errOutput) + if err != nil { + return err + } + // Ensure we get the expected error. + expectedCurrToPrev := assertExpectedError(errOutput.Message, currentToPrevError) + expectedUnknown := assertExpectedError(errOutput.Message, unknownFieldError) + if !expectedCurrToPrev && !expectedUnknown { + return errors.Newf("expected '%s' or '%s' in error message, got: '%v'", currentToPrevError, unknownFieldError, errOutput.Error) + } + } + return nil + }) + + mvt.AfterUpgradeFinalized("fetch span stats - finalized", func(ctx context.Context, l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper) error { + // Fanout on finalized version, sanity check. + l.Printf("Fanout from current version node (finalized).") + res, err = fetchSpanStatsFromNode(ctx, l, c, c.Node(1), newReqBody(0, startKey, endKey)) + if err != nil { + return err + } + // Ensure the result can be marshalled into a valid span stats response. + var spanStats roachpb.SpanStatsResponse + return json.Unmarshal([]byte(res.Stdout), &spanStats) + }) + mvt.Run() + }, + } + r.Add(testSpec) +} + +func fetchSpanStatsFromNode( + ctx context.Context, + l *logger.Logger, + c cluster.Cluster, + node option.NodeListOption, + reqBody string, +) (*install.RunResultDetails, error) { + adminAddrs, err := c.InternalAdminUIAddr(ctx, l, node) + if err != nil { + return nil, errors.Wrap(err, "error getting internal admin ui address") + } + res, err := c.RunWithDetailsSingleNode(ctx, l, node, + "curl", "-X", "POST", fmt.Sprintf("http://%s/_status/span", + adminAddrs[0]), "-H", "'Content-Type: application/json'", "-d", fmt.Sprintf("'%s'", reqBody)) + if err != nil { + return nil, errors.Wrap(err, "error getting span statistics") + } + l.Printf("fetchSpanStatsFromNode - curl result", res.Stdout) + return &res, nil +} + +func assertExpectedError(errorMessage string, expectedError string) bool { + return strings.Contains(errorMessage, expectedError) +} + +func oldReqBody(nodeID int, startKey string, endKey string) string { + return fmt.Sprintf(`{"node_id": "%v", "start_key": "%v", "end_key": "%v"}`, nodeID, startKey, endKey) +} + +func newReqBody(nodeID int, startKey string, endKey string) string { + return fmt.Sprintf(`{"node_id": "%v", "spans": [{"key": "%v", "end_key": "%v"}]}`, nodeID, startKey, endKey) +} + +type errorOutput struct { + Error string `json:"error"` + Code int `json:"code"` + Message string `json:"message"` + Details []interface{} `json:"details"` +} diff --git a/pkg/cmd/roachtest/tests/registry.go b/pkg/cmd/roachtest/tests/registry.go index b732a2811c5b..5003808a59c5 100644 --- a/pkg/cmd/roachtest/tests/registry.go +++ b/pkg/cmd/roachtest/tests/registry.go @@ -142,6 +142,7 @@ func RegisterTests(r registry.Registry) { registerVersion(r) registerYCSB(r) registerDeclarativeSchemaChangerJobCompatibilityInMixedVersion(r) + registerTenantSpanStatsMixedVersion(r) } // RegisterBenchmarks registers all benchmarks to the registry. This powers `roachtest bench`. diff --git a/pkg/roachpb/BUILD.bazel b/pkg/roachpb/BUILD.bazel index ce9a0bf41a2f..edba34e5a26f 100644 --- a/pkg/roachpb/BUILD.bazel +++ b/pkg/roachpb/BUILD.bazel @@ -15,6 +15,7 @@ go_library( "metadata_replicas.go", "span_config.go", "span_group.go", + "span_stats.go", "tenant.go", "version.go", ], @@ -28,6 +29,7 @@ go_library( "//pkg/kv/kvserver/allocator/load", "//pkg/kv/kvserver/concurrency/isolation", "//pkg/kv/kvserver/concurrency/lock", + "//pkg/settings", "//pkg/storage/enginepb", "//pkg/util", "//pkg/util/bitarray", diff --git a/pkg/roachpb/span_stats.go b/pkg/roachpb/span_stats.go new file mode 100644 index 000000000000..3518820bb9e5 --- /dev/null +++ b/pkg/roachpb/span_stats.go @@ -0,0 +1,39 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package roachpb + +import "github.com/cockroachdb/cockroach/pkg/settings" + +// Put span statistics cluster settings here to avoid import cycle. + +const DefaultSpanStatsSpanLimit = 500 + +// SpanStatsBatchLimit registers the maximum number of spans allowed in a +// span stats request payload. +var SpanStatsBatchLimit = settings.RegisterIntSetting( + settings.TenantWritable, + "server.span_stats.span_batch_limit", + "the maximum number of spans allowed in a request payload for span statistics", + DefaultSpanStatsSpanLimit, + settings.PositiveInt, +) + +const defaultRangeStatsBatchLimit = 100 + +// RangeStatsBatchLimit registers the maximum number of ranges to be batched +// when fetching range stats for a span. +var RangeStatsBatchLimit = settings.RegisterIntSetting( + settings.TenantWritable, + "server.span_stats.range_batch_limit", + "the maximum batch size when fetching ranges statistics for a span", + defaultRangeStatsBatchLimit, + settings.PositiveInt, +) diff --git a/pkg/roachpb/span_stats.proto b/pkg/roachpb/span_stats.proto index 6af485307457..fa699f53ac3c 100644 --- a/pkg/roachpb/span_stats.proto +++ b/pkg/roachpb/span_stats.proto @@ -15,6 +15,7 @@ option go_package = "github.com/cockroachdb/cockroach/pkg/roachpb"; import "storage/enginepb/mvcc.proto"; import "gogoproto/gogo.proto"; import "google/api/annotations.proto"; +import "roachpb/data.proto"; // SpanStatsRequest is used to request a SpanStatsResponse for the given key // span and node id. A node_id value of 0 indicates that the server should @@ -22,17 +23,32 @@ import "google/api/annotations.proto"; // result from across the cluster. message SpanStatsRequest { string node_id = 1 [(gogoproto.customname) = "NodeID"]; + + // Note: start_key and end_key are legacy fields, safe to deprecate (i.e. reserve) in 23.2. + // Callers are intended to use the spans field below. + // Field is not reserved as we use it to detect whether a request is being made from a previous + // versioned node (i.e. in a mixed-version cluster). + // TODO(thomas): reserved these fields in 23.2. bytes start_key = 2 [(gogoproto.casttype) = "RKey"]; bytes end_key = 3 [(gogoproto.casttype) = "RKey"]; + + repeated Span spans = 4 [(gogoproto.nullable) = false]; } -message SpanStatsResponse { +message SpanStats { + cockroach.storage.enginepb.MVCCStats total_stats = 1 [(gogoproto.nullable) = false]; // range_count measures the number of ranges that the request span falls within. // A SpanStatsResponse for a span that lies within a range, and whose start // key sorts after the range start, and whose end key sorts before the // range end, will have a range_count value of 1. int32 range_count = 2; uint64 approximate_disk_bytes = 3; - cockroach.storage.enginepb.MVCCStats total_stats = 1 - [(gogoproto.nullable) = false]; +} + +message SpanStatsResponse { + cockroach.storage.enginepb.MVCCStats total_stats = 1 [(gogoproto.nullable) = false]; + // See the range_count comment for the SpanStats proto. + int32 range_count = 2; + uint64 approximate_disk_bytes = 3; + map span_to_stats = 4; } diff --git a/pkg/rpc/auth_tenant.go b/pkg/rpc/auth_tenant.go index 01343c81d8b8..95e73e18257e 100644 --- a/pkg/rpc/auth_tenant.go +++ b/pkg/rpc/auth_tenant.go @@ -207,10 +207,13 @@ func (a tenantAuthorizer) authGetRangeDescriptors( func (a tenantAuthorizer) authSpanStats( tenID roachpb.TenantID, args *roachpb.SpanStatsRequest, ) error { - return validateSpan(tenID, roachpb.Span{ - Key: args.StartKey.AsRawKey(), - EndKey: args.EndKey.AsRawKey(), - }) + for _, span := range args.Spans { + err := validateSpan(tenID, span) + if err != nil { + return err + } + } + return nil } // authRangeLookup authorizes the provided tenant to invoke the RangeLookup RPC diff --git a/pkg/server/admin.go b/pkg/server/admin.go index 9e6bfd635648..486a8e395ff8 100644 --- a/pkg/server/admin.go +++ b/pkg/server/admin.go @@ -1422,9 +1422,8 @@ func (s *adminServer) statsForSpan( if err == nil { client := serverpb.NewStatusClient(conn) req := roachpb.SpanStatsRequest{ - StartKey: rSpan.Key, - EndKey: rSpan.EndKey, - NodeID: nodeID.String(), + Spans: []roachpb.Span{span}, + NodeID: nodeID.String(), } spanResponse, err = client.SpanStats(ctx, &req) } @@ -1459,9 +1458,9 @@ func (s *adminServer) statsForSpan( }, ) } else { - tableStatResponse.Stats.Add(resp.resp.TotalStats) - tableStatResponse.ReplicaCount += int64(resp.resp.RangeCount) - tableStatResponse.ApproximateDiskBytes += resp.resp.ApproximateDiskBytes + tableStatResponse.Stats.Add(resp.resp.SpanToStats[span.String()].TotalStats) + tableStatResponse.ReplicaCount += int64(resp.resp.SpanToStats[span.String()].RangeCount) + tableStatResponse.ApproximateDiskBytes += resp.resp.SpanToStats[span.String()].ApproximateDiskBytes } case <-ctx.Done(): // Caller gave up, stop doing work. diff --git a/pkg/server/span_stats_server.go b/pkg/server/span_stats_server.go index 8305c69198d8..6403f62f4f0d 100644 --- a/pkg/server/span_stats_server.go +++ b/pkg/server/span_stats_server.go @@ -14,7 +14,9 @@ import ( "context" "strconv" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangestats" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server/serverpb" @@ -23,31 +25,32 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/timeutil" ) +const MixedVersionErr = "span stats request - unable to service a mixed version request" +const UnexpectedLegacyRequest = "span stats request - unexpected populated legacy fields (StartKey, EndKey)" +const nodeErrorMsgPlaceholder = "could not get span stats sample for node %d: %v" +const exceedSpanLimitPlaceholder = "error getting span statistics - number of spans in request payload (%d) exceeds 'server.span_stats.span_batch_limit' cluster setting limit (%d)" + func (s *systemStatusServer) spanStatsFanOut( ctx context.Context, req *roachpb.SpanStatsRequest, ) (*roachpb.SpanStatsResponse, error) { - res := &roachpb.SpanStatsResponse{} - - rSpan := roachpb.RSpan{ - Key: req.StartKey, - EndKey: req.EndKey, + res := &roachpb.SpanStatsResponse{ + SpanToStats: make(map[string]*roachpb.SpanStats), } - nodeIDs, _, err := nodeIDsAndRangeCountForSpan(ctx, s.distSender, rSpan) + // Response level error + var respErr error + + spansPerNode, err := s.getSpansPerNode(ctx, req, s.distSender) if err != nil { return nil, err } - nodesWithReplica := make(map[roachpb.NodeID]bool) - for _, nodeID := range nodeIDs { - nodesWithReplica[nodeID] = true - } - // We should only fan out to a node if it has replicas for this span. + // We should only fan out to a node if it has replicas of any span. // A blind fan out would be wasteful. smartDial := func( ctx context.Context, nodeID roachpb.NodeID, ) (interface{}, error) { - if _, ok := nodesWithReplica[nodeID]; ok { + if _, ok := spansPerNode[nodeID]; ok { return s.dialNode(ctx, nodeID) } return nil, nil @@ -55,30 +58,42 @@ func (s *systemStatusServer) spanStatsFanOut( nodeFn := func(ctx context.Context, client interface{}, nodeID roachpb.NodeID) (interface{}, error) { // `smartDial` may skip this node, so check to see if the client is nil. + // If it is, return nil response. if client == nil { - return &roachpb.SpanStatsResponse{}, nil + return nil, nil } - stats, err := client.(serverpb.StatusClient).SpanStats(ctx, + + resp, err := client.(serverpb.StatusClient).SpanStats(ctx, &roachpb.SpanStatsRequest{ - NodeID: nodeID.String(), - StartKey: req.StartKey, - EndKey: req.EndKey, + NodeID: nodeID.String(), + Spans: spansPerNode[nodeID], }) - if err != nil { - return nil, err - } - return stats, err + return resp, err } responseFn := func(nodeID roachpb.NodeID, resp interface{}) { + // Noop if nil response (returned from skipped node). + if resp == nil { + return + } + nodeResponse := resp.(*roachpb.SpanStatsResponse) - res.ApproximateDiskBytes += nodeResponse.ApproximateDiskBytes - res.TotalStats.Add(nodeResponse.TotalStats) - res.RangeCount += nodeResponse.RangeCount + + for spanStr, spanStats := range nodeResponse.SpanToStats { + _, exists := res.SpanToStats[spanStr] + if !exists { + res.SpanToStats[spanStr] = spanStats + } else { + res.SpanToStats[spanStr].ApproximateDiskBytes += spanStats.ApproximateDiskBytes + res.SpanToStats[spanStr].TotalStats.Add(spanStats.TotalStats) + res.SpanToStats[spanStr].RangeCount += spanStats.RangeCount + } + } } errorFn := func(nodeID roachpb.NodeID, err error) { - log.Errorf(ctx, "could not get span stats sample for node %d: %v", nodeID, err) + log.Errorf(ctx, nodeErrorMsgPlaceholder, nodeID, err) + respErr = err } if err := s.statusServer.iterateNodes( @@ -92,59 +107,80 @@ func (s *systemStatusServer) spanStatsFanOut( return nil, err } - return res, nil + return res, respErr } func (s *systemStatusServer) getLocalStats( ctx context.Context, req *roachpb.SpanStatsRequest, ) (*roachpb.SpanStatsResponse, error) { - res := &roachpb.SpanStatsResponse{} + var res = &roachpb.SpanStatsResponse{SpanToStats: make(map[string]*roachpb.SpanStats)} + ri := kvcoord.MakeRangeIterator(s.distSender) - sp := roachpb.RSpan{ - Key: req.StartKey, - EndKey: req.EndKey, + // For each span + for _, span := range req.Spans { + rSpan, err := keys.SpanAddr(span) + if err != nil { + return nil, err + } + // Seek to the span's start key. + ri.Seek(ctx, rSpan.Key, kvcoord.Ascending) + spanStats, err := s.statsForSpan(ctx, ri, rSpan) + if err != nil { + return nil, err + } + res.SpanToStats[span.String()] = spanStats } - ri := kvcoord.MakeRangeIterator(s.distSender) - ri.Seek(ctx, sp.Key, kvcoord.Ascending) + return res, nil +} +func (s *systemStatusServer) statsForSpan( + ctx context.Context, ri kvcoord.RangeIterator, rSpan roachpb.RSpan, +) (*roachpb.SpanStats, error) { + // Seek to the span's start key. + ri.Seek(ctx, rSpan.Key, kvcoord.Ascending) + spanStats := &roachpb.SpanStats{} + var fullyContainedKeysBatch []roachpb.Key + var err error + // Iterate through the span's ranges. for { if !ri.Valid() { return nil, ri.Error() } + // Get the descriptor for the current range of the span. desc := ri.Desc() descSpan := desc.RSpan() - res.RangeCount += 1 + spanStats.RangeCount += 1 // Is the descriptor fully contained by the request span? - if sp.ContainsKeyRange(descSpan.Key, desc.EndKey) { - // If so, obtain stats for this range via RangeStats. - rangeStats, err := s.rangeStatsFetcher.RangeStats(ctx, - desc.StartKey.AsRawKey()) - if err != nil { - return nil, err - } - for _, resp := range rangeStats { - res.TotalStats.Add(resp.MVCCStats) + if rSpan.ContainsKeyRange(descSpan.Key, desc.EndKey) { + // Collect into fullyContainedKeys batch. + fullyContainedKeysBatch = append(fullyContainedKeysBatch, desc.StartKey.AsRawKey()) + // If we've exceeded the batch limit, request range stats for the current batch. + if len(fullyContainedKeysBatch) > int(roachpb.RangeStatsBatchLimit.Get(&s.st.SV)) { + // Obtain stats for fully contained ranges via RangeStats. + fullyContainedKeysBatch, err = flushBatchedContainedKeys(ctx, s.rangeStatsFetcher, fullyContainedKeysBatch, spanStats) + if err != nil { + return nil, err + } } - } else { // Otherwise, do an MVCC Scan. // We should only scan the part of the range that our request span // encompasses. - scanStart := sp.Key - scanEnd := sp.EndKey + scanStart := rSpan.Key + scanEnd := rSpan.EndKey // If our request span began before the start of this range, // start scanning from this range's start key. - if descSpan.Key.Compare(sp.Key) == 1 { + if descSpan.Key.Compare(rSpan.Key) == 1 { scanStart = descSpan.Key } // If our request span ends after the end of this range, // stop scanning at this range's end key. - if descSpan.EndKey.Compare(sp.EndKey) == -1 { + if descSpan.EndKey.Compare(rSpan.EndKey) == -1 { scanEnd = descSpan.EndKey } - err := s.stores.VisitStores(func(s *kvserver.Store) error { + err = s.stores.VisitStores(func(s *kvserver.Store) error { stats, err := storage.ComputeStats( s.TODOEngine(), scanStart.AsRawKey(), @@ -156,7 +192,7 @@ func (s *systemStatusServer) getLocalStats( return err } - res.TotalStats.Add(stats) + spanStats.TotalStats.Add(stats) return nil }) @@ -165,28 +201,35 @@ func (s *systemStatusServer) getLocalStats( } } - if !ri.NeedAnother(sp) { + if !ri.NeedAnother(rSpan) { break } ri.Next(ctx) } - + // If we still have some remaining ranges, request range stats for the current batch. + if len(fullyContainedKeysBatch) > 0 { + // Obtain stats for fully contained ranges via RangeStats. + _, err = flushBatchedContainedKeys(ctx, s.rangeStatsFetcher, fullyContainedKeysBatch, spanStats) + if err != nil { + return nil, err + } + // Nil the batch. + fullyContainedKeysBatch = nil + } // Finally, get the approximate disk bytes from each store. - err := s.stores.VisitStores(func(store *kvserver.Store) error { - approxDiskBytes, err := store.TODOEngine().ApproximateDiskBytes(req. - StartKey.AsRawKey(), req.EndKey.AsRawKey()) + err = s.stores.VisitStores(func(store *kvserver.Store) error { + approxDiskBytes, err := store.TODOEngine().ApproximateDiskBytes(rSpan.Key.AsRawKey(), rSpan.EndKey.AsRawKey()) if err != nil { return err } - res.ApproximateDiskBytes += approxDiskBytes + spanStats.ApproximateDiskBytes += approxDiskBytes return nil }) if err != nil { return nil, err } - - return res, nil + return spanStats, nil } // getSpanStatsInternal will return span stats according to the nodeID specified @@ -225,3 +268,52 @@ func (s *systemStatusServer) getSpanStatsInternal( } return client.SpanStats(ctx, req) } + +func (s *systemStatusServer) getSpansPerNode( + ctx context.Context, req *roachpb.SpanStatsRequest, ds *kvcoord.DistSender, +) (map[roachpb.NodeID][]roachpb.Span, error) { + // Mapping of node ids to spans with a replica on the node. + spansPerNode := make(map[roachpb.NodeID][]roachpb.Span) + + // Iterate over the request spans. + for _, span := range req.Spans { + rSpan, err := keys.SpanAddr(span) + if err != nil { + return nil, err + } + // Get the node ids belonging to the span. + nodeIDs, _, err := nodeIDsAndRangeCountForSpan(ctx, ds, rSpan) + if err != nil { + return nil, err + } + // Add the span to the map for each of the node IDs it belongs to. + for _, nodeID := range nodeIDs { + spansPerNode[nodeID] = append(spansPerNode[nodeID], span) + } + } + return spansPerNode, nil +} + +func flushBatchedContainedKeys( + ctx context.Context, + fetcher *rangestats.Fetcher, + fullyContainedKeysBatch []roachpb.Key, + spanStats *roachpb.SpanStats, +) ([]roachpb.Key, error) { + // Obtain stats for fully contained ranges via RangeStats. + rangeStats, err := fetcher.RangeStats(ctx, + fullyContainedKeysBatch...) + if err != nil { + return nil, err + } + for _, resp := range rangeStats { + spanStats.TotalStats.Add(resp.MVCCStats) + } + // Reset the keys batch + return fullyContainedKeysBatch[:0], nil +} + +func isLegacyRequest(req *roachpb.SpanStatsRequest) bool { + // If the start/end key fields are not nil, we have a request using the old request format. + return req.StartKey != nil || req.EndKey != nil +} diff --git a/pkg/server/span_stats_test.go b/pkg/server/span_stats_test.go index a05597c8e074..2d5ab3b3ba56 100644 --- a/pkg/server/span_stats_test.go +++ b/pkg/server/span_stats_test.go @@ -11,14 +11,20 @@ package server import ( + "bytes" "context" + "fmt" + "strconv" "testing" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" @@ -63,37 +69,167 @@ func TestLocalSpanStats(t *testing.T) { } } - // Verify stats across different spans. - for _, tcase := range []struct { - startKey string - endKey string + // Create spans encompassing different ranges. + spans := []roachpb.Span{ + { + Key: roachpb.Key("a"), + EndKey: roachpb.Key("i"), + }, + { + Key: roachpb.Key("a"), + EndKey: roachpb.Key("c"), + }, + { + Key: roachpb.Key("b"), + EndKey: roachpb.Key("e"), + }, + { + Key: roachpb.Key("e"), + EndKey: roachpb.Key("i"), + }, + { + Key: roachpb.Key("b"), + EndKey: roachpb.Key("d"), + }, + { + Key: roachpb.Key("b"), + EndKey: roachpb.Key("bbb"), + }, + } + + type testCase struct { + span roachpb.Span expectedRanges int32 expectedKeys int64 + } + + testCases := []testCase{ + {spans[0], 4, 6}, + {spans[1], 1, 3}, + {spans[2], 2, 5}, + {spans[3], 2, 1}, + {spans[4], 2, 3}, + {spans[5], 1, 2}, + } + // Multi-span request + multiResult, err := s.status.getLocalStats(ctx, + &roachpb.SpanStatsRequest{ + NodeID: "0", + Spans: spans, + }, + ) + require.NoError(t, err) + + // Verify stats across different spans. + for _, tcase := range testCases { + rSpan, err := keys.SpanAddr(tcase.span) + require.NoError(t, err) + + // Assert expected values from multi-span request + spanStats := multiResult.SpanToStats[tcase.span.String()] + require.Equal(t, spanStats.RangeCount, tcase.expectedRanges, fmt.Sprintf( + "Multi-span: expected %d ranges in span [%s - %s], found %d", tcase.expectedRanges, rSpan.Key.String(), rSpan.EndKey.String(), spanStats.RangeCount)) + require.Equal(t, spanStats.TotalStats.LiveCount, tcase.expectedKeys, fmt.Sprintf( + "Multi-span: expected %d keys in span [%s - %s], found %d", tcase.expectedKeys, rSpan.Key.String(), rSpan.EndKey.String(), spanStats.TotalStats.LiveCount)) + } +} + +// BenchmarkSpanStats measures the cost of collecting span statistics. +func BenchmarkSpanStats(b *testing.B) { + skip.UnderShort(b) + defer log.Scope(b).Close(b) + + createCluster := func(numNodes int) serverutils.TestClusterInterface { + return serverutils.StartNewTestCluster(b, numNodes, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationAuto, + }) + } + + clusterSpecs := []struct { + name string + numNodes int }{ - {"a", "i", 4, 6}, - {"a", "c", 1, 3}, - {"b", "e", 2, 5}, - {"e", "i", 2, 1}, - {"b", "d", 2, 3}, - {"b", "bbb", 1, 2}, - } { - start, end := tcase.startKey, tcase.endKey - result, err := s.status.getLocalStats(ctx, - &roachpb.SpanStatsRequest{ - NodeID: "0", - StartKey: roachpb.RKey(start), - EndKey: roachpb.RKey(end), - }, - ) - if err != nil { - t.Fatal(err) - } + { + name: "3node", + numNodes: 3, + }, + } - if a, e := result.RangeCount, tcase.expectedRanges; a != e { - t.Errorf("Expected %d ranges in span [%s - %s], found %d", e, start, end, a) - } - if a, e := result.TotalStats.LiveCount, tcase.expectedKeys; a != e { - t.Errorf("Expected %d keys in span [%s - %s], found %d", e, start, end, a) + type testSpec struct { + numSpans int + numRanges int + } + + var testSpecs []testSpec + numSpanCases := []int{10, 100, 200} + numRangeCases := []int{25, 50, 100} + for _, numSpans := range numSpanCases { + for _, numRanges := range numRangeCases { + testSpecs = append(testSpecs, testSpec{numSpans: numSpans, numRanges: numRanges}) } } + + for _, cluster := range clusterSpecs { + b.Run(cluster.name, func(b *testing.B) { + tc := createCluster(cluster.numNodes) + ctx := context.Background() + defer tc.Stopper().Stop(ctx) + for _, ts := range testSpecs { + b.Run(fmt.Sprintf("BenchmarkSpanStats - span stats for %d node cluster, collecting %d spans with %d ranges each", + cluster.numNodes, + ts.numSpans, + ts.numRanges, + ), func(b *testing.B) { + + tenant := tc.Server(0).TenantOrServer() + tenantCodec := keys.MakeSQLCodec(serverutils.TestTenantID()) + tenantPrefix := tenantCodec.TenantPrefix() + + makeKey := func(keys ...[]byte) roachpb.Key { + return bytes.Join(keys, nil) + } + + var spans []roachpb.Span + + // Create a table spans + var spanStartKey roachpb.Key + for i := 0; i < ts.numSpans; i++ { + spanStartKey = nil + // Create ranges. + var key roachpb.Key + for j := 0; j < ts.numRanges; j++ { + key = makeKey(tenantPrefix, []byte(strconv.Itoa(i*j))) + if spanStartKey == nil { + spanStartKey = key + } + _, _, err := tc.Server(0).SplitRange(key) + require.NoError(b, err) + } + spans = append(spans, roachpb.Span{ + Key: spanStartKey, + EndKey: key, + }) + } + + _, err := tc.ServerConn(0).Exec(`SET CLUSTER SETTING server.span_stats.span_batch_limit = $1`, ts.numSpans) + require.NoError(b, err) + _, err = tc.ServerConn(0).Exec(`SET CLUSTER SETTING server.span_stats.range_batch_limit = $1`, ts.numRanges) + require.NoError(b, err) + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := tenant.TenantStatusServer().(serverpb.TenantStatusServer).SpanStats(ctx, + &roachpb.SpanStatsRequest{ + NodeID: "0", // 0 indicates we want stats from all nodes. + Spans: spans, + }) + require.NoError(b, err) + } + b.StopTimer() + }) + } + }) + } } diff --git a/pkg/server/status.go b/pkg/server/status.go index 6f6e7387e5f1..120454b9d988 100644 --- a/pkg/server/status.go +++ b/pkg/server/status.go @@ -3469,6 +3469,19 @@ func (s *systemStatusServer) SpanStats( // already returns a proper gRPC error status. return nil, err } + // If we receive a request using the old format. + if isLegacyRequest(req) { + // We want to force 23.1 callers to use the new format (e.g. Spans field). + if req.NodeID == "0" { + return nil, errors.New(UnexpectedLegacyRequest) + } + // We want to error if we receive a legacy request from a 22.2 + // node (e.g. during a mixed-version fanout). + return nil, errors.New(MixedVersionErr) + } + if len(req.Spans) > int(roachpb.SpanStatsBatchLimit.Get(&s.st.SV)) { + return nil, errors.Newf(exceedSpanLimitPlaceholder, len(req.Spans), int(roachpb.SpanStatsBatchLimit.Get(&s.st.SV))) + } return s.getSpanStatsInternal(ctx, req) } diff --git a/pkg/server/status_test.go b/pkg/server/status_test.go index 74abbdbd7d7f..46335a835425 100644 --- a/pkg/server/status_test.go +++ b/pkg/server/status_test.go @@ -1374,10 +1374,13 @@ func TestSpanStatsResponse(t *testing.T) { } var response roachpb.SpanStatsResponse + span := roachpb.Span{ + Key: roachpb.RKeyMin.AsRawKey(), + EndKey: roachpb.RKeyMax.AsRawKey(), + } request := roachpb.SpanStatsRequest{ - NodeID: "1", - StartKey: []byte(roachpb.RKeyMin), - EndKey: []byte(roachpb.RKeyMax), + NodeID: "1", + Spans: []roachpb.Span{span}, } url := ts.AdminURL() + statusPrefix + "span" @@ -1388,7 +1391,8 @@ func TestSpanStatsResponse(t *testing.T) { if err != nil { t.Fatal(err) } - if a, e := int(response.RangeCount), initialRanges; a != e { + responseSpanStats := response.SpanToStats[span.String()] + if a, e := int(responseSpanStats.RangeCount), initialRanges; a != e { t.Errorf("expected %d ranges, found %d", e, a) } } @@ -1403,10 +1407,13 @@ func TestSpanStatsGRPCResponse(t *testing.T) { rpcStopper := stop.NewStopper() defer rpcStopper.Stop(ctx) rpcContext := newRPCTestContext(ctx, ts, ts.RPCContext().Config) + span := roachpb.Span{ + Key: roachpb.RKeyMin.AsRawKey(), + EndKey: roachpb.RKeyMax.AsRawKey(), + } request := roachpb.SpanStatsRequest{ - NodeID: "1", - StartKey: []byte(roachpb.RKeyMin), - EndKey: []byte(roachpb.RKeyMax), + NodeID: "1", + Spans: []roachpb.Span{span}, } url := ts.ServingRPCAddr() @@ -1425,7 +1432,8 @@ func TestSpanStatsGRPCResponse(t *testing.T) { if err != nil { t.Fatal(err) } - if a, e := int(response.RangeCount), initialRanges; a != e { + responseSpanStats := response.SpanToStats[span.String()] + if a, e := int(responseSpanStats.RangeCount), initialRanges; a != e { t.Fatalf("expected %d ranges, found %d", e, a) } } diff --git a/pkg/sql/faketreeeval/evalctx.go b/pkg/sql/faketreeeval/evalctx.go index cbee54f63656..6c3527d86583 100644 --- a/pkg/sql/faketreeeval/evalctx.go +++ b/pkg/sql/faketreeeval/evalctx.go @@ -485,7 +485,7 @@ func (ep *DummyEvalPlanner) GetRangeDescByID( // SpanStats is part of the eval.Planner interface. func (ep *DummyEvalPlanner) SpanStats( - context.Context, roachpb.RKey, roachpb.RKey, + context.Context, roachpb.Spans, ) (stats *roachpb.SpanStatsResponse, err error) { return } diff --git a/pkg/sql/planner.go b/pkg/sql/planner.go index e6076bd1dc27..5523232ab859 100644 --- a/pkg/sql/planner.go +++ b/pkg/sql/planner.go @@ -904,12 +904,11 @@ func (p *planner) GetStreamIngestManager(ctx context.Context) (eval.StreamIngest // SpanStats returns a stats for the given span of keys. func (p *planner) SpanStats( - ctx context.Context, startKey roachpb.RKey, endKey roachpb.RKey, + ctx context.Context, spans roachpb.Spans, ) (*roachpb.SpanStatsResponse, error) { req := &roachpb.SpanStatsRequest{ - NodeID: "0", - StartKey: startKey, - EndKey: endKey, + NodeID: "0", + Spans: spans, } return p.ExecCfg().TenantStatusServer.SpanStats(ctx, req) } diff --git a/pkg/sql/sem/builtins/generator_builtins.go b/pkg/sql/sem/builtins/generator_builtins.go index 39a1be8ff685..9b52997b698a 100644 --- a/pkg/sql/sem/builtins/generator_builtins.go +++ b/pkg/sql/sem/builtins/generator_builtins.go @@ -2920,24 +2920,34 @@ func makeIdentGenerator( }, nil } +type spanStatsDetails struct { + dbId int + tableId int +} + type tableSpanStatsIterator struct { - it eval.InternalRows - codec keys.SQLCodec - p eval.Planner - currDbId int - currTableId int + argDbId int + argTableId int + it eval.InternalRows + codec keys.SQLCodec + p eval.Planner + spanStatsBatchLimit int + // Each iter + iterSpanIdx int + spanStatsDetails []spanStatsDetails currStatsResponse *roachpb.SpanStatsResponse - singleTableReq bool } -func newTableSpanStatsIterator(eval *eval.Context, dbId int, tableId int) *tableSpanStatsIterator { - return &tableSpanStatsIterator{codec: eval.Codec, p: eval.Planner, currDbId: dbId, currTableId: tableId, singleTableReq: tableId != 0} +func newTableSpanStatsIterator( + eval *eval.Context, dbId int, tableId int, spanBatchLimit int, +) *tableSpanStatsIterator { + return &tableSpanStatsIterator{codec: eval.Codec, p: eval.Planner, argDbId: dbId, argTableId: tableId, spanStatsBatchLimit: spanBatchLimit} } // Start implements the tree.ValueGenerator interface. func (tssi *tableSpanStatsIterator) Start(ctx context.Context, _ *kv.Txn) error { var err error = nil - tssi.it, err = tssi.p.GetDetailsForSpanStats(ctx, tssi.currDbId, tssi.currTableId) + tssi.it, err = tssi.p.GetDetailsForSpanStats(ctx, tssi.argDbId, tssi.argTableId) return err } @@ -2946,41 +2956,101 @@ func (tssi *tableSpanStatsIterator) Next(ctx context.Context) (bool, error) { if tssi.it == nil { return false, errors.AssertionFailedf("Start must be called before Next") } - next, err := tssi.it.Next(ctx) - if err != nil || !next { + // Check if we can iterate through the span details buffer. + if tssi.iterSpanIdx+1 < len(tssi.spanStatsDetails) { + tssi.iterSpanIdx++ + return true, nil + } + + // There are no more span details to iterate in the buffer. + // Instead, we continue to fetch more span stats (if possible). + hasMoreRows, err := tssi.fetchSpanStats(ctx) + if err != nil { return false, err } - // Pull the current row. - row := tssi.it.Cur() - tssi.currDbId = int(tree.MustBeDInt(row[0])) - tssi.currTableId = int(tree.MustBeDInt(row[1])) + // After fetching new span stats, reset the index. + tssi.iterSpanIdx = 0 + return hasMoreRows || len(tssi.spanStatsDetails) != 0, nil +} + +func (tssi *tableSpanStatsIterator) fetchSpanStats(ctx context.Context) (bool, error) { + // Reset span details. + tssi.spanStatsDetails = tssi.spanStatsDetails[:0] + + var ok bool + var err error + var spans []roachpb.Span + // While we have more rows + for ok, err = tssi.it.Next(ctx); ok; ok, err = tssi.it.Next(ctx) { + + // Pull the current row. + row := tssi.it.Cur() + dbId := int(tree.MustBeDInt(row[0])) + tableId := int(tree.MustBeDInt(row[1])) + + // Add the row data to span stats details + tssi.spanStatsDetails = append(tssi.spanStatsDetails, spanStatsDetails{ + dbId: dbId, + tableId: tableId, + }) + + // Gather the span for the current span stats request. + tableStartKey := tssi.codec.TablePrefix(uint32(tableId)) + spans = append(spans, roachpb.Span{ + Key: tableStartKey, + EndKey: tableStartKey.PrefixEnd(), + }) + + // Exit the loop if we're reached our limit of spans + // for the span stats request. + if len(tssi.spanStatsDetails) >= tssi.spanStatsBatchLimit { + break + } + } - // Set our current stats response. - startKey := roachpb.RKey(tssi.codec.TablePrefix(uint32(tssi.currTableId))) - tssi.currStatsResponse, err = tssi.p.SpanStats(ctx, startKey, startKey.PrefixEnd()) + // If we encounter an error while iterating over rows, + // return error before fetching span stats. if err != nil { return false, err } - return next, err + + // If we have spans, request span stats + if len(spans) > 0 { + tssi.currStatsResponse, err = tssi.p.SpanStats(ctx, spans) + } + + if err != nil { + return false, err + } + return ok, err } // Values implements the tree.ValueGenerator interface. func (tssi *tableSpanStatsIterator) Values() (tree.Datums, error) { - liveBytes := tssi.currStatsResponse.TotalStats.LiveBytes - totalBytes := tssi.currStatsResponse.TotalStats.KeyBytes + - tssi.currStatsResponse.TotalStats.ValBytes + - tssi.currStatsResponse.TotalStats.RangeKeyBytes + - tssi.currStatsResponse.TotalStats.RangeValBytes + // Get the current span details. + spanDetails := tssi.spanStatsDetails[tssi.iterSpanIdx] + startKey := tssi.codec.TablePrefix(uint32(spanDetails.tableId)) + tableSpan := roachpb.Span{Key: startKey, EndKey: startKey.PrefixEnd()} + // Get the current span stats. + spanStats, found := tssi.currStatsResponse.SpanToStats[tableSpan.String()] + if !found { + return nil, errors.Errorf("could not find span stats for table span: %s", tableSpan.String()) + } + + totalBytes := spanStats.TotalStats.KeyBytes + + spanStats.TotalStats.ValBytes + + spanStats.TotalStats.RangeKeyBytes + + spanStats.TotalStats.RangeValBytes livePercentage := float64(0) if totalBytes > 0 { - livePercentage = float64(liveBytes) / float64(totalBytes) + livePercentage = float64(spanStats.TotalStats.LiveBytes) / float64(totalBytes) } return []tree.Datum{ - tree.NewDInt(tree.DInt(tssi.currDbId)), - tree.NewDInt(tree.DInt(tssi.currTableId)), - tree.NewDInt(tree.DInt(tssi.currStatsResponse.RangeCount)), - tree.NewDInt(tree.DInt(tssi.currStatsResponse.ApproximateDiskBytes)), - tree.NewDInt(tree.DInt(liveBytes)), + tree.NewDInt(tree.DInt(spanDetails.dbId)), + tree.NewDInt(tree.DInt(spanDetails.tableId)), + tree.NewDInt(tree.DInt(spanStats.RangeCount)), + tree.NewDInt(tree.DInt(spanStats.ApproximateDiskBytes)), + tree.NewDInt(tree.DInt(spanStats.TotalStats.LiveBytes)), tree.NewDInt(tree.DInt(totalBytes)), tree.NewDFloat(tree.DFloat(livePercentage)), }, nil @@ -3024,5 +3094,7 @@ func makeTableSpanStatsGenerator( return nil, errors.New("provided table id must be greater than or equal to 1") } } - return newTableSpanStatsIterator(evalCtx, dbId, tableId), nil + + spanBatchLimit := roachpb.SpanStatsBatchLimit.Get(&evalCtx.Settings.SV) + return newTableSpanStatsIterator(evalCtx, dbId, tableId, int(spanBatchLimit)), nil } diff --git a/pkg/sql/sem/eval/deps.go b/pkg/sql/sem/eval/deps.go index 329e0d4b7933..8b780d0f9262 100644 --- a/pkg/sql/sem/eval/deps.go +++ b/pkg/sql/sem/eval/deps.go @@ -384,7 +384,7 @@ type Planner interface { // GetRangeDescByID gets the RangeDescriptor by the specified RangeID. GetRangeDescByID(context.Context, roachpb.RangeID) (roachpb.RangeDescriptor, error) - SpanStats(context.Context, roachpb.RKey, roachpb.RKey) (*roachpb.SpanStatsResponse, error) + SpanStats(context.Context, roachpb.Spans) (*roachpb.SpanStatsResponse, error) GetDetailsForSpanStats(ctx context.Context, dbId int, tableId int) (InternalRows, error)