diff --git a/pkg/ccl/serverccl/statusccl/tenant_status_test.go b/pkg/ccl/serverccl/statusccl/tenant_status_test.go index 1492cb73ec58..95f59a2e998e 100644 --- a/pkg/ccl/serverccl/statusccl/tenant_status_test.go +++ b/pkg/ccl/serverccl/statusccl/tenant_status_test.go @@ -149,9 +149,8 @@ func testTenantSpanStats(ctx context.Context, t *testing.T, helper serverccl.Ten t.Run("test tenant permissioning", func(t *testing.T) { req := roachpb.SpanStatsRequest{ - NodeID: "0", - StartKey: roachpb.RKey(aSpan.Key), - EndKey: roachpb.RKey(aSpan.EndKey), + NodeID: "0", + Spans: []roachpb.Span{aSpan}, } resp := roachpb.SpanStatsResponse{} @@ -162,19 +161,44 @@ func testTenantSpanStats(ctx context.Context, t *testing.T, helper serverccl.Ten adminClient := helper.TestCluster().TenantHTTPClient(t, 1, true) adminClient.PostJSON("/_status/span", &req, &resp) - require.Greaterf(t, resp.RangeCount, int32(0), "postive range count") + require.Greaterf(t, resp.SpanToStats[aSpan.String()].RangeCount, int32(0), "positive range count") }) t.Run("test tenant isolation", func(t *testing.T) { _, err := tenantA.TenantStatusSrv().(serverpb.TenantStatusServer).SpanStats(ctx, &roachpb.SpanStatsRequest{ - NodeID: "0", // 0 indicates we want stats from all nodes. - StartKey: roachpb.RKey(bSpan.Key), - EndKey: roachpb.RKey(bSpan.EndKey), + NodeID: "0", // 0 indicates we want stats from all nodes. + Spans: []roachpb.Span{bSpan}, }) require.Error(t, err) }) + t.Run("test invalid request payload", func(t *testing.T) { + _, err := tenantA.TenantStatusSrv().(serverpb.TenantStatusServer).SpanStats(ctx, + &roachpb.SpanStatsRequest{ + NodeID: "0", // 0 indicates we want stats from all nodes. + StartKey: roachpb.RKey(aSpan.Key), + EndKey: roachpb.RKey(aSpan.EndKey), + }) + require.ErrorContains(t, err, `span stats request - unexpected populated legacy fields (StartKey, EndKey)`) + }) + + t.Run("test exceed span request limit", func(t *testing.T) { + // Set the span batch limit to 1. + _, err := helper.HostCluster().ServerConn(0).Exec(`SET CLUSTER SETTING server.span_stats.span_batch_limit = 1`) + require.NoError(t, err) + _, err = tenantA.TenantStatusSrv().(serverpb.TenantStatusServer).SpanStats(ctx, + &roachpb.SpanStatsRequest{ + NodeID: "0", // 0 indicates we want stats from all nodes. + Spans: []roachpb.Span{aSpan, aSpan}, + }) + require.ErrorContains(t, err, `error getting span statistics - number of spans in request payload (2) exceeds`+ + ` 'server.span_stats.span_batch_limit' cluster setting limit (1)`) + // Reset the span batch limit to default. + _, err = helper.HostCluster().ServerConn(0).Exec(`SET CLUSTER SETTING server.span_stats.span_batch_limit = $1`, roachpb.DefaultSpanStatsSpanLimit) + require.NoError(t, err) + }) + t.Run("test KV node fan-out", func(t *testing.T) { _, tID, err := keys.DecodeTenantPrefix(aSpan.Key) require.NoError(t, err) @@ -186,9 +210,8 @@ func testTenantSpanStats(ctx context.Context, t *testing.T, helper serverccl.Ten controlStats, err := tenantA.TenantStatusSrv().(serverpb.TenantStatusServer).SpanStats(ctx, &roachpb.SpanStatsRequest{ - NodeID: "0", // 0 indicates we want stats from all nodes. - StartKey: roachpb.RKey(aSpan.Key), - EndKey: roachpb.RKey(aSpan.EndKey), + NodeID: "0", // 0 indicates we want stats from all nodes. + Spans: []roachpb.Span{aSpan}, }) require.NoError(t, err) @@ -212,14 +235,81 @@ func testTenantSpanStats(ctx context.Context, t *testing.T, helper serverccl.Ten stats, err := tenantA.TenantStatusSrv().(serverpb.TenantStatusServer).SpanStats(ctx, &roachpb.SpanStatsRequest{ - NodeID: "0", // 0 indicates we want stats from all nodes. - StartKey: roachpb.RKey(aSpan.Key), - EndKey: roachpb.RKey(aSpan.EndKey), + NodeID: "0", // 0 indicates we want stats from all nodes. + Spans: []roachpb.Span{aSpan}, }) require.NoError(t, err) - require.Equal(t, controlStats.RangeCount+1, stats.RangeCount) - require.Equal(t, controlStats.TotalStats.LiveCount+int64(len(incKeys)), stats.TotalStats.LiveCount) + + controlSpanStats := controlStats.SpanToStats[aSpan.String()] + testSpanStats := stats.SpanToStats[aSpan.String()] + require.Equal(t, controlSpanStats.RangeCount+1, testSpanStats.RangeCount) + require.Equal(t, controlSpanStats.TotalStats.LiveCount+int64(len(incKeys)), testSpanStats.TotalStats.LiveCount) + + // Make a multi-span call + type spanCase struct { + span roachpb.Span + expectedRangeCount int32 + expectedLiveCount int64 + } + spanCases := []spanCase{ + { + // "a", "b" - single range, single key + span: roachpb.Span{ + Key: makeKey(keys.MakeTenantPrefix(tID), []byte(incKeys[0])), + EndKey: makeKey(keys.MakeTenantPrefix(tID), []byte(incKeys[1])), + }, + expectedRangeCount: 1, + expectedLiveCount: 1, + }, + { + // "d", "f" - single range, multiple keys + span: roachpb.Span{ + Key: makeKey(keys.MakeTenantPrefix(tID), []byte(incKeys[3])), + EndKey: makeKey(keys.MakeTenantPrefix(tID), []byte(incKeys[5])), + }, + expectedRangeCount: 1, + expectedLiveCount: 2, + }, + { + // "bb", "e" - multiple ranges, multiple keys + span: roachpb.Span{ + Key: makeKey(keys.MakeTenantPrefix(tID), []byte(incKeys[2])), + EndKey: makeKey(keys.MakeTenantPrefix(tID), []byte(incKeys[4])), + }, + expectedRangeCount: 2, + expectedLiveCount: 2, + }, + + { + // "a", "d" - multiple ranges, multiple keys + span: roachpb.Span{ + Key: makeKey(keys.MakeTenantPrefix(tID), []byte(incKeys[0])), + EndKey: makeKey(keys.MakeTenantPrefix(tID), []byte(incKeys[3])), + }, + expectedRangeCount: 2, + expectedLiveCount: 3, + }, + } + + var spans []roachpb.Span + for _, sc := range spanCases { + spans = append(spans, sc.span) + } + + stats, err = tenantA.TenantStatusSrv().(serverpb.TenantStatusServer).SpanStats(ctx, + &roachpb.SpanStatsRequest{ + NodeID: "0", // 0 indicates we want stats from all nodes. + Spans: spans, + }) + + require.NoError(t, err) + // Check each span has their expected values. + for _, sc := range spanCases { + spanStats := stats.SpanToStats[sc.span.String()] + require.Equal(t, spanStats.RangeCount, sc.expectedRangeCount, fmt.Sprintf("mismatch on expected range count for span case with span %v", sc.span.String())) + require.Equal(t, spanStats.TotalStats.LiveCount, sc.expectedLiveCount, fmt.Sprintf("mismatch on expected live count for span case with span %v", sc.span.String())) + } }) } diff --git a/pkg/cmd/roachtest/registry/owners.go b/pkg/cmd/roachtest/registry/owners.go index 62db1eac17ce..5fdda3dd2a6c 100644 --- a/pkg/cmd/roachtest/registry/owners.go +++ b/pkg/cmd/roachtest/registry/owners.go @@ -30,4 +30,5 @@ const ( OwnerTestEng Owner = `test-eng` OwnerDevInf Owner = `dev-inf` OwnerMultiTenant Owner = `multi-tenant` + OwnerClusterObs Owner = `cluster-observability` ) diff --git a/pkg/cmd/roachtest/tests/BUILD.bazel b/pkg/cmd/roachtest/tests/BUILD.bazel index 74329cd5a953..7c3bb2eb7058 100644 --- a/pkg/cmd/roachtest/tests/BUILD.bazel +++ b/pkg/cmd/roachtest/tests/BUILD.bazel @@ -96,6 +96,7 @@ go_library( "mixed_version_job_compatibility_in_declarative_schema_changer.go", "mixed_version_jobs.go", "mixed_version_schemachange.go", + "mixed_version_tenant_span_stats.go", "multitenant.go", "multitenant_distsql.go", "multitenant_shared_process.go", diff --git a/pkg/cmd/roachtest/tests/mixed_version_tenant_span_stats.go b/pkg/cmd/roachtest/tests/mixed_version_tenant_span_stats.go new file mode 100644 index 000000000000..7e59bc4bd551 --- /dev/null +++ b/pkg/cmd/roachtest/tests/mixed_version_tenant_span_stats.go @@ -0,0 +1,235 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package tests + +import ( + "context" + "encoding/json" + "fmt" + "math/rand" + "strings" + "time" + + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/mixedversion" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/roachprod/install" + "github.com/cockroachdb/cockroach/pkg/roachprod/logger" + "github.com/cockroachdb/cockroach/pkg/util/version" + "github.com/cockroachdb/errors" +) + +func registerTenantSpanStatsMixedVersion(r registry.Registry) { + testSpec := registry.TestSpec{ + Name: "tenant-span-stats/mixed-version", + Owner: registry.OwnerClusterObs, + Cluster: r.MakeClusterSpec(4), + EncryptionSupport: registry.EncryptionMetamorphic, + Timeout: 30 * time.Minute, + Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { + v, err := version.Parse("v23.2.0") + if err != nil { + t.Fatalf("unable to parse version correctly %v", err) + } + if t.BuildVersion().AtLeast(v) { + t.Fatal("predecessor binary already includes RPC protobuf changes, this test is no longer needed -- remove me") + } + + mvt := mixedversion.NewTest(ctx, t, t.L(), c, c.All()) + + tableName := "test" + var startKey string + var endKey string + var res *install.RunResultDetails + var errOutput errorOutput + + mvt.OnStartup("fetch span stats - start", func(ctx context.Context, l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper) error { + // Create a test table. + err = h.Exec(rng, fmt.Sprintf(`CREATE TABLE %s (num int)`, tableName)) + if err != nil { + return err + } + // Get the table's span keys. + row := h.QueryRow(rng, + fmt.Sprintf(`SELECT crdb_internal.pretty_key(crdb_internal.table_span('%s'::regclass::oid::int)[1], 0)`, tableName)) + err = row.Scan(&startKey) + if err != nil { + return err + } + row = h.QueryRow(rng, + fmt.Sprintf(`SELECT crdb_internal.pretty_key(crdb_internal.table_span('%s'::regclass::oid::int)[2], 0)`, tableName)) + err = row.Scan(&endKey) + if err != nil { + return err + } + // Dial a node for span stats on startup. All nodes will be on the previous version, this is a sanity check. + l.Printf("Fetch span stats from node (start).") + res, err = fetchSpanStatsFromNode(ctx, l, c, c.Node(1), oldReqBody(2, startKey, endKey)) + if err != nil { + return err + } + // Ensure the result can be marshalled into a valid span stats response. + var spanStats roachpb.SpanStatsResponse + return json.Unmarshal([]byte(res.Stdout), &spanStats) + }) + + mvt.InMixedVersion("fetch span stats - mixed", func(ctx context.Context, l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper) error { + prevToCurrentError := "unable to service a mixed version request" + currentToPrevError := "An internal server error has occurred" + unknownFieldError := "unknown field" + + // If we have nodes in mixed versions. + if len(h.Context().FromVersionNodes) > 0 && len(h.Context().ToVersionNodes) > 0 { + prevVersNodeID := h.Context().FromVersionNodes[0] + currVersNodeID := h.Context().ToVersionNodes[0] + + // Fetch span stats from previous version node, dialing to a current version node. + l.Printf("Fetch span stats from previous version node (%v), dialing to a current version node (%v).", prevVersNodeID, currVersNodeID) + res, err = fetchSpanStatsFromNode(ctx, l, c, c.Node(prevVersNodeID), oldReqBody(currVersNodeID, startKey, endKey)) + if err != nil { + return err + } + // Expect an error in the stdout - mixed version error. + // Ensure the result can be marshalled into a valid error response. + err = json.Unmarshal([]byte(res.Stdout), &errOutput) + if err != nil { + return err + } + // Ensure we get the expected error. + expected := assertExpectedError(errOutput.Error, prevToCurrentError) + if !expected { + return errors.Newf("expected '%s' in error message, got: '%v'", prevToCurrentError, errOutput.Error) + } + + // Fetch span stats from current version node, dialing to a previous version node. + l.Printf("Fetch span stats from current version node (%v), dialing to a previous version node (%v).", currVersNodeID, prevVersNodeID) + res, err = fetchSpanStatsFromNode(ctx, l, c, c.Node(currVersNodeID), newReqBody(prevVersNodeID, startKey, endKey)) + if err != nil { + return err + } + // Expect an error in the stdout - mixed version error. + // Ensure the result can be marshalled into a valid error response. + err = json.Unmarshal([]byte(res.Stdout), &errOutput) + if err != nil { + return err + } + // Ensure we get the expected error. + expectedCurrToPrev := assertExpectedError(errOutput.Message, currentToPrevError) + expectedUnknown := assertExpectedError(errOutput.Message, unknownFieldError) + if !expectedCurrToPrev && !expectedUnknown { + return errors.Newf("expected '%s' or '%s' in error message, got: '%v'", currentToPrevError, unknownFieldError, errOutput.Error) + } + + // Fanout from current version node. + l.Printf("Fanout from current version node (mixed).") + res, err = fetchSpanStatsFromNode(ctx, l, c, c.Node(currVersNodeID), newReqBody(0, startKey, endKey)) + if err != nil { + return err + } + // Expect an error in the stdout - mixed version error. + // Ensure the result can be marshalled into a valid error response. + err = json.Unmarshal([]byte(res.Stdout), &errOutput) + if err != nil { + return err + } + // Ensure we get the expected error. + expectedCurrToPrev = assertExpectedError(errOutput.Message, currentToPrevError) + expectedUnknown = assertExpectedError(errOutput.Message, unknownFieldError) + if !expectedCurrToPrev && !expectedUnknown { + return errors.Newf("expected '%s' or '%s' in error message, got: '%v'", currentToPrevError, unknownFieldError, errOutput.Error) + } + } + + // If all nodes are on 23.1, but we're in mixed state (i.e. cluster version is at 22.2) + l.Printf("Fanout on old cluster version.") + if len(h.Context().ToVersionNodes) == 4 { + currVersNodeID := h.Context().ToVersionNodes[0] + // Fanout from current version node. + l.Printf("Fanout from current version node (mixed).") + res, err = fetchSpanStatsFromNode(ctx, l, c, c.Node(currVersNodeID), newReqBody(0, startKey, endKey)) + if err != nil { + return err + } + // Expect an error in the stdout - mixed version error. + // Ensure the result can be marshalled into a valid error response. + err = json.Unmarshal([]byte(res.Stdout), &errOutput) + if err != nil { + return err + } + // Ensure we get the expected error. + expectedCurrToPrev := assertExpectedError(errOutput.Message, currentToPrevError) + expectedUnknown := assertExpectedError(errOutput.Message, unknownFieldError) + if !expectedCurrToPrev && !expectedUnknown { + return errors.Newf("expected '%s' or '%s' in error message, got: '%v'", currentToPrevError, unknownFieldError, errOutput.Error) + } + } + return nil + }) + + mvt.AfterUpgradeFinalized("fetch span stats - finalized", func(ctx context.Context, l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper) error { + // Fanout on finalized version, sanity check. + l.Printf("Fanout from current version node (finalized).") + res, err = fetchSpanStatsFromNode(ctx, l, c, c.Node(1), newReqBody(0, startKey, endKey)) + if err != nil { + return err + } + // Ensure the result can be marshalled into a valid span stats response. + var spanStats roachpb.SpanStatsResponse + return json.Unmarshal([]byte(res.Stdout), &spanStats) + }) + mvt.Run() + }, + } + r.Add(testSpec) +} + +func fetchSpanStatsFromNode( + ctx context.Context, + l *logger.Logger, + c cluster.Cluster, + node option.NodeListOption, + reqBody string, +) (*install.RunResultDetails, error) { + adminAddrs, err := c.InternalAdminUIAddr(ctx, l, node) + if err != nil { + return nil, errors.Wrap(err, "error getting internal admin ui address") + } + res, err := c.RunWithDetailsSingleNode(ctx, l, node, + "curl", "-X", "POST", fmt.Sprintf("http://%s/_status/span", + adminAddrs[0]), "-H", "'Content-Type: application/json'", "-d", fmt.Sprintf("'%s'", reqBody)) + if err != nil { + return nil, errors.Wrap(err, "error getting span statistics") + } + l.Printf("fetchSpanStatsFromNode - curl result", res.Stdout) + return &res, nil +} + +func assertExpectedError(errorMessage string, expectedError string) bool { + return strings.Contains(errorMessage, expectedError) +} + +func oldReqBody(nodeID int, startKey string, endKey string) string { + return fmt.Sprintf(`{"node_id": "%v", "start_key": "%v", "end_key": "%v"}`, nodeID, startKey, endKey) +} + +func newReqBody(nodeID int, startKey string, endKey string) string { + return fmt.Sprintf(`{"node_id": "%v", "spans": [{"key": "%v", "end_key": "%v"}]}`, nodeID, startKey, endKey) +} + +type errorOutput struct { + Error string `json:"error"` + Code int `json:"code"` + Message string `json:"message"` + Details []interface{} `json:"details"` +} diff --git a/pkg/cmd/roachtest/tests/registry.go b/pkg/cmd/roachtest/tests/registry.go index b732a2811c5b..5003808a59c5 100644 --- a/pkg/cmd/roachtest/tests/registry.go +++ b/pkg/cmd/roachtest/tests/registry.go @@ -142,6 +142,7 @@ func RegisterTests(r registry.Registry) { registerVersion(r) registerYCSB(r) registerDeclarativeSchemaChangerJobCompatibilityInMixedVersion(r) + registerTenantSpanStatsMixedVersion(r) } // RegisterBenchmarks registers all benchmarks to the registry. This powers `roachtest bench`. diff --git a/pkg/roachpb/BUILD.bazel b/pkg/roachpb/BUILD.bazel index ce9a0bf41a2f..edba34e5a26f 100644 --- a/pkg/roachpb/BUILD.bazel +++ b/pkg/roachpb/BUILD.bazel @@ -15,6 +15,7 @@ go_library( "metadata_replicas.go", "span_config.go", "span_group.go", + "span_stats.go", "tenant.go", "version.go", ], @@ -28,6 +29,7 @@ go_library( "//pkg/kv/kvserver/allocator/load", "//pkg/kv/kvserver/concurrency/isolation", "//pkg/kv/kvserver/concurrency/lock", + "//pkg/settings", "//pkg/storage/enginepb", "//pkg/util", "//pkg/util/bitarray", diff --git a/pkg/roachpb/span_stats.go b/pkg/roachpb/span_stats.go new file mode 100644 index 000000000000..3518820bb9e5 --- /dev/null +++ b/pkg/roachpb/span_stats.go @@ -0,0 +1,39 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package roachpb + +import "github.com/cockroachdb/cockroach/pkg/settings" + +// Put span statistics cluster settings here to avoid import cycle. + +const DefaultSpanStatsSpanLimit = 500 + +// SpanStatsBatchLimit registers the maximum number of spans allowed in a +// span stats request payload. +var SpanStatsBatchLimit = settings.RegisterIntSetting( + settings.TenantWritable, + "server.span_stats.span_batch_limit", + "the maximum number of spans allowed in a request payload for span statistics", + DefaultSpanStatsSpanLimit, + settings.PositiveInt, +) + +const defaultRangeStatsBatchLimit = 100 + +// RangeStatsBatchLimit registers the maximum number of ranges to be batched +// when fetching range stats for a span. +var RangeStatsBatchLimit = settings.RegisterIntSetting( + settings.TenantWritable, + "server.span_stats.range_batch_limit", + "the maximum batch size when fetching ranges statistics for a span", + defaultRangeStatsBatchLimit, + settings.PositiveInt, +) diff --git a/pkg/roachpb/span_stats.proto b/pkg/roachpb/span_stats.proto index 6af485307457..fa699f53ac3c 100644 --- a/pkg/roachpb/span_stats.proto +++ b/pkg/roachpb/span_stats.proto @@ -15,6 +15,7 @@ option go_package = "github.com/cockroachdb/cockroach/pkg/roachpb"; import "storage/enginepb/mvcc.proto"; import "gogoproto/gogo.proto"; import "google/api/annotations.proto"; +import "roachpb/data.proto"; // SpanStatsRequest is used to request a SpanStatsResponse for the given key // span and node id. A node_id value of 0 indicates that the server should @@ -22,17 +23,32 @@ import "google/api/annotations.proto"; // result from across the cluster. message SpanStatsRequest { string node_id = 1 [(gogoproto.customname) = "NodeID"]; + + // Note: start_key and end_key are legacy fields, safe to deprecate (i.e. reserve) in 23.2. + // Callers are intended to use the spans field below. + // Field is not reserved as we use it to detect whether a request is being made from a previous + // versioned node (i.e. in a mixed-version cluster). + // TODO(thomas): reserved these fields in 23.2. bytes start_key = 2 [(gogoproto.casttype) = "RKey"]; bytes end_key = 3 [(gogoproto.casttype) = "RKey"]; + + repeated Span spans = 4 [(gogoproto.nullable) = false]; } -message SpanStatsResponse { +message SpanStats { + cockroach.storage.enginepb.MVCCStats total_stats = 1 [(gogoproto.nullable) = false]; // range_count measures the number of ranges that the request span falls within. // A SpanStatsResponse for a span that lies within a range, and whose start // key sorts after the range start, and whose end key sorts before the // range end, will have a range_count value of 1. int32 range_count = 2; uint64 approximate_disk_bytes = 3; - cockroach.storage.enginepb.MVCCStats total_stats = 1 - [(gogoproto.nullable) = false]; +} + +message SpanStatsResponse { + cockroach.storage.enginepb.MVCCStats total_stats = 1 [(gogoproto.nullable) = false]; + // See the range_count comment for the SpanStats proto. + int32 range_count = 2; + uint64 approximate_disk_bytes = 3; + map span_to_stats = 4; } diff --git a/pkg/rpc/auth_tenant.go b/pkg/rpc/auth_tenant.go index 01343c81d8b8..95e73e18257e 100644 --- a/pkg/rpc/auth_tenant.go +++ b/pkg/rpc/auth_tenant.go @@ -207,10 +207,13 @@ func (a tenantAuthorizer) authGetRangeDescriptors( func (a tenantAuthorizer) authSpanStats( tenID roachpb.TenantID, args *roachpb.SpanStatsRequest, ) error { - return validateSpan(tenID, roachpb.Span{ - Key: args.StartKey.AsRawKey(), - EndKey: args.EndKey.AsRawKey(), - }) + for _, span := range args.Spans { + err := validateSpan(tenID, span) + if err != nil { + return err + } + } + return nil } // authRangeLookup authorizes the provided tenant to invoke the RangeLookup RPC diff --git a/pkg/server/admin.go b/pkg/server/admin.go index 9e6bfd635648..486a8e395ff8 100644 --- a/pkg/server/admin.go +++ b/pkg/server/admin.go @@ -1422,9 +1422,8 @@ func (s *adminServer) statsForSpan( if err == nil { client := serverpb.NewStatusClient(conn) req := roachpb.SpanStatsRequest{ - StartKey: rSpan.Key, - EndKey: rSpan.EndKey, - NodeID: nodeID.String(), + Spans: []roachpb.Span{span}, + NodeID: nodeID.String(), } spanResponse, err = client.SpanStats(ctx, &req) } @@ -1459,9 +1458,9 @@ func (s *adminServer) statsForSpan( }, ) } else { - tableStatResponse.Stats.Add(resp.resp.TotalStats) - tableStatResponse.ReplicaCount += int64(resp.resp.RangeCount) - tableStatResponse.ApproximateDiskBytes += resp.resp.ApproximateDiskBytes + tableStatResponse.Stats.Add(resp.resp.SpanToStats[span.String()].TotalStats) + tableStatResponse.ReplicaCount += int64(resp.resp.SpanToStats[span.String()].RangeCount) + tableStatResponse.ApproximateDiskBytes += resp.resp.SpanToStats[span.String()].ApproximateDiskBytes } case <-ctx.Done(): // Caller gave up, stop doing work. diff --git a/pkg/server/span_stats_server.go b/pkg/server/span_stats_server.go index 8305c69198d8..6403f62f4f0d 100644 --- a/pkg/server/span_stats_server.go +++ b/pkg/server/span_stats_server.go @@ -14,7 +14,9 @@ import ( "context" "strconv" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangestats" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server/serverpb" @@ -23,31 +25,32 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/timeutil" ) +const MixedVersionErr = "span stats request - unable to service a mixed version request" +const UnexpectedLegacyRequest = "span stats request - unexpected populated legacy fields (StartKey, EndKey)" +const nodeErrorMsgPlaceholder = "could not get span stats sample for node %d: %v" +const exceedSpanLimitPlaceholder = "error getting span statistics - number of spans in request payload (%d) exceeds 'server.span_stats.span_batch_limit' cluster setting limit (%d)" + func (s *systemStatusServer) spanStatsFanOut( ctx context.Context, req *roachpb.SpanStatsRequest, ) (*roachpb.SpanStatsResponse, error) { - res := &roachpb.SpanStatsResponse{} - - rSpan := roachpb.RSpan{ - Key: req.StartKey, - EndKey: req.EndKey, + res := &roachpb.SpanStatsResponse{ + SpanToStats: make(map[string]*roachpb.SpanStats), } - nodeIDs, _, err := nodeIDsAndRangeCountForSpan(ctx, s.distSender, rSpan) + // Response level error + var respErr error + + spansPerNode, err := s.getSpansPerNode(ctx, req, s.distSender) if err != nil { return nil, err } - nodesWithReplica := make(map[roachpb.NodeID]bool) - for _, nodeID := range nodeIDs { - nodesWithReplica[nodeID] = true - } - // We should only fan out to a node if it has replicas for this span. + // We should only fan out to a node if it has replicas of any span. // A blind fan out would be wasteful. smartDial := func( ctx context.Context, nodeID roachpb.NodeID, ) (interface{}, error) { - if _, ok := nodesWithReplica[nodeID]; ok { + if _, ok := spansPerNode[nodeID]; ok { return s.dialNode(ctx, nodeID) } return nil, nil @@ -55,30 +58,42 @@ func (s *systemStatusServer) spanStatsFanOut( nodeFn := func(ctx context.Context, client interface{}, nodeID roachpb.NodeID) (interface{}, error) { // `smartDial` may skip this node, so check to see if the client is nil. + // If it is, return nil response. if client == nil { - return &roachpb.SpanStatsResponse{}, nil + return nil, nil } - stats, err := client.(serverpb.StatusClient).SpanStats(ctx, + + resp, err := client.(serverpb.StatusClient).SpanStats(ctx, &roachpb.SpanStatsRequest{ - NodeID: nodeID.String(), - StartKey: req.StartKey, - EndKey: req.EndKey, + NodeID: nodeID.String(), + Spans: spansPerNode[nodeID], }) - if err != nil { - return nil, err - } - return stats, err + return resp, err } responseFn := func(nodeID roachpb.NodeID, resp interface{}) { + // Noop if nil response (returned from skipped node). + if resp == nil { + return + } + nodeResponse := resp.(*roachpb.SpanStatsResponse) - res.ApproximateDiskBytes += nodeResponse.ApproximateDiskBytes - res.TotalStats.Add(nodeResponse.TotalStats) - res.RangeCount += nodeResponse.RangeCount + + for spanStr, spanStats := range nodeResponse.SpanToStats { + _, exists := res.SpanToStats[spanStr] + if !exists { + res.SpanToStats[spanStr] = spanStats + } else { + res.SpanToStats[spanStr].ApproximateDiskBytes += spanStats.ApproximateDiskBytes + res.SpanToStats[spanStr].TotalStats.Add(spanStats.TotalStats) + res.SpanToStats[spanStr].RangeCount += spanStats.RangeCount + } + } } errorFn := func(nodeID roachpb.NodeID, err error) { - log.Errorf(ctx, "could not get span stats sample for node %d: %v", nodeID, err) + log.Errorf(ctx, nodeErrorMsgPlaceholder, nodeID, err) + respErr = err } if err := s.statusServer.iterateNodes( @@ -92,59 +107,80 @@ func (s *systemStatusServer) spanStatsFanOut( return nil, err } - return res, nil + return res, respErr } func (s *systemStatusServer) getLocalStats( ctx context.Context, req *roachpb.SpanStatsRequest, ) (*roachpb.SpanStatsResponse, error) { - res := &roachpb.SpanStatsResponse{} + var res = &roachpb.SpanStatsResponse{SpanToStats: make(map[string]*roachpb.SpanStats)} + ri := kvcoord.MakeRangeIterator(s.distSender) - sp := roachpb.RSpan{ - Key: req.StartKey, - EndKey: req.EndKey, + // For each span + for _, span := range req.Spans { + rSpan, err := keys.SpanAddr(span) + if err != nil { + return nil, err + } + // Seek to the span's start key. + ri.Seek(ctx, rSpan.Key, kvcoord.Ascending) + spanStats, err := s.statsForSpan(ctx, ri, rSpan) + if err != nil { + return nil, err + } + res.SpanToStats[span.String()] = spanStats } - ri := kvcoord.MakeRangeIterator(s.distSender) - ri.Seek(ctx, sp.Key, kvcoord.Ascending) + return res, nil +} +func (s *systemStatusServer) statsForSpan( + ctx context.Context, ri kvcoord.RangeIterator, rSpan roachpb.RSpan, +) (*roachpb.SpanStats, error) { + // Seek to the span's start key. + ri.Seek(ctx, rSpan.Key, kvcoord.Ascending) + spanStats := &roachpb.SpanStats{} + var fullyContainedKeysBatch []roachpb.Key + var err error + // Iterate through the span's ranges. for { if !ri.Valid() { return nil, ri.Error() } + // Get the descriptor for the current range of the span. desc := ri.Desc() descSpan := desc.RSpan() - res.RangeCount += 1 + spanStats.RangeCount += 1 // Is the descriptor fully contained by the request span? - if sp.ContainsKeyRange(descSpan.Key, desc.EndKey) { - // If so, obtain stats for this range via RangeStats. - rangeStats, err := s.rangeStatsFetcher.RangeStats(ctx, - desc.StartKey.AsRawKey()) - if err != nil { - return nil, err - } - for _, resp := range rangeStats { - res.TotalStats.Add(resp.MVCCStats) + if rSpan.ContainsKeyRange(descSpan.Key, desc.EndKey) { + // Collect into fullyContainedKeys batch. + fullyContainedKeysBatch = append(fullyContainedKeysBatch, desc.StartKey.AsRawKey()) + // If we've exceeded the batch limit, request range stats for the current batch. + if len(fullyContainedKeysBatch) > int(roachpb.RangeStatsBatchLimit.Get(&s.st.SV)) { + // Obtain stats for fully contained ranges via RangeStats. + fullyContainedKeysBatch, err = flushBatchedContainedKeys(ctx, s.rangeStatsFetcher, fullyContainedKeysBatch, spanStats) + if err != nil { + return nil, err + } } - } else { // Otherwise, do an MVCC Scan. // We should only scan the part of the range that our request span // encompasses. - scanStart := sp.Key - scanEnd := sp.EndKey + scanStart := rSpan.Key + scanEnd := rSpan.EndKey // If our request span began before the start of this range, // start scanning from this range's start key. - if descSpan.Key.Compare(sp.Key) == 1 { + if descSpan.Key.Compare(rSpan.Key) == 1 { scanStart = descSpan.Key } // If our request span ends after the end of this range, // stop scanning at this range's end key. - if descSpan.EndKey.Compare(sp.EndKey) == -1 { + if descSpan.EndKey.Compare(rSpan.EndKey) == -1 { scanEnd = descSpan.EndKey } - err := s.stores.VisitStores(func(s *kvserver.Store) error { + err = s.stores.VisitStores(func(s *kvserver.Store) error { stats, err := storage.ComputeStats( s.TODOEngine(), scanStart.AsRawKey(), @@ -156,7 +192,7 @@ func (s *systemStatusServer) getLocalStats( return err } - res.TotalStats.Add(stats) + spanStats.TotalStats.Add(stats) return nil }) @@ -165,28 +201,35 @@ func (s *systemStatusServer) getLocalStats( } } - if !ri.NeedAnother(sp) { + if !ri.NeedAnother(rSpan) { break } ri.Next(ctx) } - + // If we still have some remaining ranges, request range stats for the current batch. + if len(fullyContainedKeysBatch) > 0 { + // Obtain stats for fully contained ranges via RangeStats. + _, err = flushBatchedContainedKeys(ctx, s.rangeStatsFetcher, fullyContainedKeysBatch, spanStats) + if err != nil { + return nil, err + } + // Nil the batch. + fullyContainedKeysBatch = nil + } // Finally, get the approximate disk bytes from each store. - err := s.stores.VisitStores(func(store *kvserver.Store) error { - approxDiskBytes, err := store.TODOEngine().ApproximateDiskBytes(req. - StartKey.AsRawKey(), req.EndKey.AsRawKey()) + err = s.stores.VisitStores(func(store *kvserver.Store) error { + approxDiskBytes, err := store.TODOEngine().ApproximateDiskBytes(rSpan.Key.AsRawKey(), rSpan.EndKey.AsRawKey()) if err != nil { return err } - res.ApproximateDiskBytes += approxDiskBytes + spanStats.ApproximateDiskBytes += approxDiskBytes return nil }) if err != nil { return nil, err } - - return res, nil + return spanStats, nil } // getSpanStatsInternal will return span stats according to the nodeID specified @@ -225,3 +268,52 @@ func (s *systemStatusServer) getSpanStatsInternal( } return client.SpanStats(ctx, req) } + +func (s *systemStatusServer) getSpansPerNode( + ctx context.Context, req *roachpb.SpanStatsRequest, ds *kvcoord.DistSender, +) (map[roachpb.NodeID][]roachpb.Span, error) { + // Mapping of node ids to spans with a replica on the node. + spansPerNode := make(map[roachpb.NodeID][]roachpb.Span) + + // Iterate over the request spans. + for _, span := range req.Spans { + rSpan, err := keys.SpanAddr(span) + if err != nil { + return nil, err + } + // Get the node ids belonging to the span. + nodeIDs, _, err := nodeIDsAndRangeCountForSpan(ctx, ds, rSpan) + if err != nil { + return nil, err + } + // Add the span to the map for each of the node IDs it belongs to. + for _, nodeID := range nodeIDs { + spansPerNode[nodeID] = append(spansPerNode[nodeID], span) + } + } + return spansPerNode, nil +} + +func flushBatchedContainedKeys( + ctx context.Context, + fetcher *rangestats.Fetcher, + fullyContainedKeysBatch []roachpb.Key, + spanStats *roachpb.SpanStats, +) ([]roachpb.Key, error) { + // Obtain stats for fully contained ranges via RangeStats. + rangeStats, err := fetcher.RangeStats(ctx, + fullyContainedKeysBatch...) + if err != nil { + return nil, err + } + for _, resp := range rangeStats { + spanStats.TotalStats.Add(resp.MVCCStats) + } + // Reset the keys batch + return fullyContainedKeysBatch[:0], nil +} + +func isLegacyRequest(req *roachpb.SpanStatsRequest) bool { + // If the start/end key fields are not nil, we have a request using the old request format. + return req.StartKey != nil || req.EndKey != nil +} diff --git a/pkg/server/span_stats_test.go b/pkg/server/span_stats_test.go index a05597c8e074..2d5ab3b3ba56 100644 --- a/pkg/server/span_stats_test.go +++ b/pkg/server/span_stats_test.go @@ -11,14 +11,20 @@ package server import ( + "bytes" "context" + "fmt" + "strconv" "testing" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" @@ -63,37 +69,167 @@ func TestLocalSpanStats(t *testing.T) { } } - // Verify stats across different spans. - for _, tcase := range []struct { - startKey string - endKey string + // Create spans encompassing different ranges. + spans := []roachpb.Span{ + { + Key: roachpb.Key("a"), + EndKey: roachpb.Key("i"), + }, + { + Key: roachpb.Key("a"), + EndKey: roachpb.Key("c"), + }, + { + Key: roachpb.Key("b"), + EndKey: roachpb.Key("e"), + }, + { + Key: roachpb.Key("e"), + EndKey: roachpb.Key("i"), + }, + { + Key: roachpb.Key("b"), + EndKey: roachpb.Key("d"), + }, + { + Key: roachpb.Key("b"), + EndKey: roachpb.Key("bbb"), + }, + } + + type testCase struct { + span roachpb.Span expectedRanges int32 expectedKeys int64 + } + + testCases := []testCase{ + {spans[0], 4, 6}, + {spans[1], 1, 3}, + {spans[2], 2, 5}, + {spans[3], 2, 1}, + {spans[4], 2, 3}, + {spans[5], 1, 2}, + } + // Multi-span request + multiResult, err := s.status.getLocalStats(ctx, + &roachpb.SpanStatsRequest{ + NodeID: "0", + Spans: spans, + }, + ) + require.NoError(t, err) + + // Verify stats across different spans. + for _, tcase := range testCases { + rSpan, err := keys.SpanAddr(tcase.span) + require.NoError(t, err) + + // Assert expected values from multi-span request + spanStats := multiResult.SpanToStats[tcase.span.String()] + require.Equal(t, spanStats.RangeCount, tcase.expectedRanges, fmt.Sprintf( + "Multi-span: expected %d ranges in span [%s - %s], found %d", tcase.expectedRanges, rSpan.Key.String(), rSpan.EndKey.String(), spanStats.RangeCount)) + require.Equal(t, spanStats.TotalStats.LiveCount, tcase.expectedKeys, fmt.Sprintf( + "Multi-span: expected %d keys in span [%s - %s], found %d", tcase.expectedKeys, rSpan.Key.String(), rSpan.EndKey.String(), spanStats.TotalStats.LiveCount)) + } +} + +// BenchmarkSpanStats measures the cost of collecting span statistics. +func BenchmarkSpanStats(b *testing.B) { + skip.UnderShort(b) + defer log.Scope(b).Close(b) + + createCluster := func(numNodes int) serverutils.TestClusterInterface { + return serverutils.StartNewTestCluster(b, numNodes, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationAuto, + }) + } + + clusterSpecs := []struct { + name string + numNodes int }{ - {"a", "i", 4, 6}, - {"a", "c", 1, 3}, - {"b", "e", 2, 5}, - {"e", "i", 2, 1}, - {"b", "d", 2, 3}, - {"b", "bbb", 1, 2}, - } { - start, end := tcase.startKey, tcase.endKey - result, err := s.status.getLocalStats(ctx, - &roachpb.SpanStatsRequest{ - NodeID: "0", - StartKey: roachpb.RKey(start), - EndKey: roachpb.RKey(end), - }, - ) - if err != nil { - t.Fatal(err) - } + { + name: "3node", + numNodes: 3, + }, + } - if a, e := result.RangeCount, tcase.expectedRanges; a != e { - t.Errorf("Expected %d ranges in span [%s - %s], found %d", e, start, end, a) - } - if a, e := result.TotalStats.LiveCount, tcase.expectedKeys; a != e { - t.Errorf("Expected %d keys in span [%s - %s], found %d", e, start, end, a) + type testSpec struct { + numSpans int + numRanges int + } + + var testSpecs []testSpec + numSpanCases := []int{10, 100, 200} + numRangeCases := []int{25, 50, 100} + for _, numSpans := range numSpanCases { + for _, numRanges := range numRangeCases { + testSpecs = append(testSpecs, testSpec{numSpans: numSpans, numRanges: numRanges}) } } + + for _, cluster := range clusterSpecs { + b.Run(cluster.name, func(b *testing.B) { + tc := createCluster(cluster.numNodes) + ctx := context.Background() + defer tc.Stopper().Stop(ctx) + for _, ts := range testSpecs { + b.Run(fmt.Sprintf("BenchmarkSpanStats - span stats for %d node cluster, collecting %d spans with %d ranges each", + cluster.numNodes, + ts.numSpans, + ts.numRanges, + ), func(b *testing.B) { + + tenant := tc.Server(0).TenantOrServer() + tenantCodec := keys.MakeSQLCodec(serverutils.TestTenantID()) + tenantPrefix := tenantCodec.TenantPrefix() + + makeKey := func(keys ...[]byte) roachpb.Key { + return bytes.Join(keys, nil) + } + + var spans []roachpb.Span + + // Create a table spans + var spanStartKey roachpb.Key + for i := 0; i < ts.numSpans; i++ { + spanStartKey = nil + // Create ranges. + var key roachpb.Key + for j := 0; j < ts.numRanges; j++ { + key = makeKey(tenantPrefix, []byte(strconv.Itoa(i*j))) + if spanStartKey == nil { + spanStartKey = key + } + _, _, err := tc.Server(0).SplitRange(key) + require.NoError(b, err) + } + spans = append(spans, roachpb.Span{ + Key: spanStartKey, + EndKey: key, + }) + } + + _, err := tc.ServerConn(0).Exec(`SET CLUSTER SETTING server.span_stats.span_batch_limit = $1`, ts.numSpans) + require.NoError(b, err) + _, err = tc.ServerConn(0).Exec(`SET CLUSTER SETTING server.span_stats.range_batch_limit = $1`, ts.numRanges) + require.NoError(b, err) + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := tenant.TenantStatusServer().(serverpb.TenantStatusServer).SpanStats(ctx, + &roachpb.SpanStatsRequest{ + NodeID: "0", // 0 indicates we want stats from all nodes. + Spans: spans, + }) + require.NoError(b, err) + } + b.StopTimer() + }) + } + }) + } } diff --git a/pkg/server/status.go b/pkg/server/status.go index 6f6e7387e5f1..120454b9d988 100644 --- a/pkg/server/status.go +++ b/pkg/server/status.go @@ -3469,6 +3469,19 @@ func (s *systemStatusServer) SpanStats( // already returns a proper gRPC error status. return nil, err } + // If we receive a request using the old format. + if isLegacyRequest(req) { + // We want to force 23.1 callers to use the new format (e.g. Spans field). + if req.NodeID == "0" { + return nil, errors.New(UnexpectedLegacyRequest) + } + // We want to error if we receive a legacy request from a 22.2 + // node (e.g. during a mixed-version fanout). + return nil, errors.New(MixedVersionErr) + } + if len(req.Spans) > int(roachpb.SpanStatsBatchLimit.Get(&s.st.SV)) { + return nil, errors.Newf(exceedSpanLimitPlaceholder, len(req.Spans), int(roachpb.SpanStatsBatchLimit.Get(&s.st.SV))) + } return s.getSpanStatsInternal(ctx, req) } diff --git a/pkg/server/status_test.go b/pkg/server/status_test.go index 74abbdbd7d7f..46335a835425 100644 --- a/pkg/server/status_test.go +++ b/pkg/server/status_test.go @@ -1374,10 +1374,13 @@ func TestSpanStatsResponse(t *testing.T) { } var response roachpb.SpanStatsResponse + span := roachpb.Span{ + Key: roachpb.RKeyMin.AsRawKey(), + EndKey: roachpb.RKeyMax.AsRawKey(), + } request := roachpb.SpanStatsRequest{ - NodeID: "1", - StartKey: []byte(roachpb.RKeyMin), - EndKey: []byte(roachpb.RKeyMax), + NodeID: "1", + Spans: []roachpb.Span{span}, } url := ts.AdminURL() + statusPrefix + "span" @@ -1388,7 +1391,8 @@ func TestSpanStatsResponse(t *testing.T) { if err != nil { t.Fatal(err) } - if a, e := int(response.RangeCount), initialRanges; a != e { + responseSpanStats := response.SpanToStats[span.String()] + if a, e := int(responseSpanStats.RangeCount), initialRanges; a != e { t.Errorf("expected %d ranges, found %d", e, a) } } @@ -1403,10 +1407,13 @@ func TestSpanStatsGRPCResponse(t *testing.T) { rpcStopper := stop.NewStopper() defer rpcStopper.Stop(ctx) rpcContext := newRPCTestContext(ctx, ts, ts.RPCContext().Config) + span := roachpb.Span{ + Key: roachpb.RKeyMin.AsRawKey(), + EndKey: roachpb.RKeyMax.AsRawKey(), + } request := roachpb.SpanStatsRequest{ - NodeID: "1", - StartKey: []byte(roachpb.RKeyMin), - EndKey: []byte(roachpb.RKeyMax), + NodeID: "1", + Spans: []roachpb.Span{span}, } url := ts.ServingRPCAddr() @@ -1425,7 +1432,8 @@ func TestSpanStatsGRPCResponse(t *testing.T) { if err != nil { t.Fatal(err) } - if a, e := int(response.RangeCount), initialRanges; a != e { + responseSpanStats := response.SpanToStats[span.String()] + if a, e := int(responseSpanStats.RangeCount), initialRanges; a != e { t.Fatalf("expected %d ranges, found %d", e, a) } } diff --git a/pkg/sql/faketreeeval/evalctx.go b/pkg/sql/faketreeeval/evalctx.go index cbee54f63656..6c3527d86583 100644 --- a/pkg/sql/faketreeeval/evalctx.go +++ b/pkg/sql/faketreeeval/evalctx.go @@ -485,7 +485,7 @@ func (ep *DummyEvalPlanner) GetRangeDescByID( // SpanStats is part of the eval.Planner interface. func (ep *DummyEvalPlanner) SpanStats( - context.Context, roachpb.RKey, roachpb.RKey, + context.Context, roachpb.Spans, ) (stats *roachpb.SpanStatsResponse, err error) { return } diff --git a/pkg/sql/planner.go b/pkg/sql/planner.go index e6076bd1dc27..5523232ab859 100644 --- a/pkg/sql/planner.go +++ b/pkg/sql/planner.go @@ -904,12 +904,11 @@ func (p *planner) GetStreamIngestManager(ctx context.Context) (eval.StreamIngest // SpanStats returns a stats for the given span of keys. func (p *planner) SpanStats( - ctx context.Context, startKey roachpb.RKey, endKey roachpb.RKey, + ctx context.Context, spans roachpb.Spans, ) (*roachpb.SpanStatsResponse, error) { req := &roachpb.SpanStatsRequest{ - NodeID: "0", - StartKey: startKey, - EndKey: endKey, + NodeID: "0", + Spans: spans, } return p.ExecCfg().TenantStatusServer.SpanStats(ctx, req) } diff --git a/pkg/sql/sem/builtins/generator_builtins.go b/pkg/sql/sem/builtins/generator_builtins.go index 39a1be8ff685..9b52997b698a 100644 --- a/pkg/sql/sem/builtins/generator_builtins.go +++ b/pkg/sql/sem/builtins/generator_builtins.go @@ -2920,24 +2920,34 @@ func makeIdentGenerator( }, nil } +type spanStatsDetails struct { + dbId int + tableId int +} + type tableSpanStatsIterator struct { - it eval.InternalRows - codec keys.SQLCodec - p eval.Planner - currDbId int - currTableId int + argDbId int + argTableId int + it eval.InternalRows + codec keys.SQLCodec + p eval.Planner + spanStatsBatchLimit int + // Each iter + iterSpanIdx int + spanStatsDetails []spanStatsDetails currStatsResponse *roachpb.SpanStatsResponse - singleTableReq bool } -func newTableSpanStatsIterator(eval *eval.Context, dbId int, tableId int) *tableSpanStatsIterator { - return &tableSpanStatsIterator{codec: eval.Codec, p: eval.Planner, currDbId: dbId, currTableId: tableId, singleTableReq: tableId != 0} +func newTableSpanStatsIterator( + eval *eval.Context, dbId int, tableId int, spanBatchLimit int, +) *tableSpanStatsIterator { + return &tableSpanStatsIterator{codec: eval.Codec, p: eval.Planner, argDbId: dbId, argTableId: tableId, spanStatsBatchLimit: spanBatchLimit} } // Start implements the tree.ValueGenerator interface. func (tssi *tableSpanStatsIterator) Start(ctx context.Context, _ *kv.Txn) error { var err error = nil - tssi.it, err = tssi.p.GetDetailsForSpanStats(ctx, tssi.currDbId, tssi.currTableId) + tssi.it, err = tssi.p.GetDetailsForSpanStats(ctx, tssi.argDbId, tssi.argTableId) return err } @@ -2946,41 +2956,101 @@ func (tssi *tableSpanStatsIterator) Next(ctx context.Context) (bool, error) { if tssi.it == nil { return false, errors.AssertionFailedf("Start must be called before Next") } - next, err := tssi.it.Next(ctx) - if err != nil || !next { + // Check if we can iterate through the span details buffer. + if tssi.iterSpanIdx+1 < len(tssi.spanStatsDetails) { + tssi.iterSpanIdx++ + return true, nil + } + + // There are no more span details to iterate in the buffer. + // Instead, we continue to fetch more span stats (if possible). + hasMoreRows, err := tssi.fetchSpanStats(ctx) + if err != nil { return false, err } - // Pull the current row. - row := tssi.it.Cur() - tssi.currDbId = int(tree.MustBeDInt(row[0])) - tssi.currTableId = int(tree.MustBeDInt(row[1])) + // After fetching new span stats, reset the index. + tssi.iterSpanIdx = 0 + return hasMoreRows || len(tssi.spanStatsDetails) != 0, nil +} + +func (tssi *tableSpanStatsIterator) fetchSpanStats(ctx context.Context) (bool, error) { + // Reset span details. + tssi.spanStatsDetails = tssi.spanStatsDetails[:0] + + var ok bool + var err error + var spans []roachpb.Span + // While we have more rows + for ok, err = tssi.it.Next(ctx); ok; ok, err = tssi.it.Next(ctx) { + + // Pull the current row. + row := tssi.it.Cur() + dbId := int(tree.MustBeDInt(row[0])) + tableId := int(tree.MustBeDInt(row[1])) + + // Add the row data to span stats details + tssi.spanStatsDetails = append(tssi.spanStatsDetails, spanStatsDetails{ + dbId: dbId, + tableId: tableId, + }) + + // Gather the span for the current span stats request. + tableStartKey := tssi.codec.TablePrefix(uint32(tableId)) + spans = append(spans, roachpb.Span{ + Key: tableStartKey, + EndKey: tableStartKey.PrefixEnd(), + }) + + // Exit the loop if we're reached our limit of spans + // for the span stats request. + if len(tssi.spanStatsDetails) >= tssi.spanStatsBatchLimit { + break + } + } - // Set our current stats response. - startKey := roachpb.RKey(tssi.codec.TablePrefix(uint32(tssi.currTableId))) - tssi.currStatsResponse, err = tssi.p.SpanStats(ctx, startKey, startKey.PrefixEnd()) + // If we encounter an error while iterating over rows, + // return error before fetching span stats. if err != nil { return false, err } - return next, err + + // If we have spans, request span stats + if len(spans) > 0 { + tssi.currStatsResponse, err = tssi.p.SpanStats(ctx, spans) + } + + if err != nil { + return false, err + } + return ok, err } // Values implements the tree.ValueGenerator interface. func (tssi *tableSpanStatsIterator) Values() (tree.Datums, error) { - liveBytes := tssi.currStatsResponse.TotalStats.LiveBytes - totalBytes := tssi.currStatsResponse.TotalStats.KeyBytes + - tssi.currStatsResponse.TotalStats.ValBytes + - tssi.currStatsResponse.TotalStats.RangeKeyBytes + - tssi.currStatsResponse.TotalStats.RangeValBytes + // Get the current span details. + spanDetails := tssi.spanStatsDetails[tssi.iterSpanIdx] + startKey := tssi.codec.TablePrefix(uint32(spanDetails.tableId)) + tableSpan := roachpb.Span{Key: startKey, EndKey: startKey.PrefixEnd()} + // Get the current span stats. + spanStats, found := tssi.currStatsResponse.SpanToStats[tableSpan.String()] + if !found { + return nil, errors.Errorf("could not find span stats for table span: %s", tableSpan.String()) + } + + totalBytes := spanStats.TotalStats.KeyBytes + + spanStats.TotalStats.ValBytes + + spanStats.TotalStats.RangeKeyBytes + + spanStats.TotalStats.RangeValBytes livePercentage := float64(0) if totalBytes > 0 { - livePercentage = float64(liveBytes) / float64(totalBytes) + livePercentage = float64(spanStats.TotalStats.LiveBytes) / float64(totalBytes) } return []tree.Datum{ - tree.NewDInt(tree.DInt(tssi.currDbId)), - tree.NewDInt(tree.DInt(tssi.currTableId)), - tree.NewDInt(tree.DInt(tssi.currStatsResponse.RangeCount)), - tree.NewDInt(tree.DInt(tssi.currStatsResponse.ApproximateDiskBytes)), - tree.NewDInt(tree.DInt(liveBytes)), + tree.NewDInt(tree.DInt(spanDetails.dbId)), + tree.NewDInt(tree.DInt(spanDetails.tableId)), + tree.NewDInt(tree.DInt(spanStats.RangeCount)), + tree.NewDInt(tree.DInt(spanStats.ApproximateDiskBytes)), + tree.NewDInt(tree.DInt(spanStats.TotalStats.LiveBytes)), tree.NewDInt(tree.DInt(totalBytes)), tree.NewDFloat(tree.DFloat(livePercentage)), }, nil @@ -3024,5 +3094,7 @@ func makeTableSpanStatsGenerator( return nil, errors.New("provided table id must be greater than or equal to 1") } } - return newTableSpanStatsIterator(evalCtx, dbId, tableId), nil + + spanBatchLimit := roachpb.SpanStatsBatchLimit.Get(&evalCtx.Settings.SV) + return newTableSpanStatsIterator(evalCtx, dbId, tableId, int(spanBatchLimit)), nil } diff --git a/pkg/sql/sem/eval/deps.go b/pkg/sql/sem/eval/deps.go index 329e0d4b7933..8b780d0f9262 100644 --- a/pkg/sql/sem/eval/deps.go +++ b/pkg/sql/sem/eval/deps.go @@ -384,7 +384,7 @@ type Planner interface { // GetRangeDescByID gets the RangeDescriptor by the specified RangeID. GetRangeDescByID(context.Context, roachpb.RangeID) (roachpb.RangeDescriptor, error) - SpanStats(context.Context, roachpb.RKey, roachpb.RKey) (*roachpb.SpanStatsResponse, error) + SpanStats(context.Context, roachpb.Spans) (*roachpb.SpanStatsResponse, error) GetDetailsForSpanStats(ctx context.Context, dbId int, tableId int) (InternalRows, error)