Skip to content

Commit

Permalink
sql: Add new generator crdb_internal.scan_storage_internal_keys()
Browse files Browse the repository at this point in the history
This new builtin is used to gather specific pebble metrics for a node
and store id (within an given keyspan). The builtin returns information
about the different types of keys (including snapshot pinned keys) as
well as bytes.

Informs: cockroachdb#94659
Release-note: None
  • Loading branch information
craig[bot] authored and raggar committed Jul 31, 2023
1 parent e57e974 commit 0b19e7b
Show file tree
Hide file tree
Showing 14 changed files with 220 additions and 0 deletions.
2 changes: 2 additions & 0 deletions docs/generated/sql/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -3311,6 +3311,8 @@ table. Returns an error if validation fails.</p>
</span></td><td>Immutable</td></tr>
<tr><td><a name="crdb_internal.round_decimal_values"></a><code>crdb_internal.round_decimal_values(val: <a href="decimal.html">decimal</a>[], scale: <a href="int.html">int</a>) &rarr; <a href="decimal.html">decimal</a>[]</code></td><td><span class="funcdesc"><p>This function is used internally to round decimal array values during mutations.</p>
</span></td><td>Stable</td></tr>
<tr><td><a name="crdb_internal.scan_storage_internal_keys"></a><code>crdb_internal.scan_storage_internal_keys(node_id: <a href="int.html">int</a>, store_id: <a href="int.html">int</a>, start_key: <a href="bytes.html">bytes</a>, end_key: <a href="bytes.html">bytes</a>) &rarr; tuple{int AS node_id, int AS store_id, int AS level, int AS snapshot_pinned_keys, int AS snapshot_pinned_keys_<a href="bytes.html">bytes</a>, int AS point_key_delete_count, int AS point_key_delete_<a href="bytes.html">bytes</a>, int AS point_key_set_count, int AS point_key_set_<a href="bytes.html">bytes</a>, int AS range_delete_count, int AS range_delete_<a href="bytes.html">bytes</a>, int AS range_key_set_count, int AS range_key_set_<a href="bytes.html">bytes</a>, int AS range_key_delete_count, int AS range_key_delete_bytes}</code></td><td><span class="funcdesc"><p>Scans a store’s storage engine, computing statistics describing the internal keys within the span [start_key, end_key).</p>
</span></td><td>Volatile</td></tr>
<tr><td><a name="crdb_internal.schedule_sql_stats_compaction"></a><code>crdb_internal.schedule_sql_stats_compaction() &rarr; <a href="bool.html">bool</a></code></td><td><span class="funcdesc"><p>This function is used to start a SQL stats compaction job.</p>
</span></td><td>Volatile</td></tr>
<tr><td><a name="crdb_internal.serialize_session"></a><code>crdb_internal.serialize_session() &rarr; <a href="bytes.html">bytes</a></code></td><td><span class="funcdesc"><p>This function serializes the variables in the current session.</p>
Expand Down
9 changes: 9 additions & 0 deletions pkg/kv/kvserver/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,15 @@ message GetTableMetricsResponse{
repeated storage.enginepb.SSTableMetricsInfo table_metrics = 1 [(gogoproto.nullable) = false, (gogoproto.customname) = "TableMetrics"];
}

message ScanStorageInternalKeysRequest {
StoreRequestHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true];
roachpb.Span span = 2 [(gogoproto.nullable) = false];
}

message ScanStorageInternalKeysResponse {
repeated storage.enginepb.StorageInternalKeysMetrics advanced_metrics = 1 [(gogoproto.nullable) = false, (gogoproto.customname) = "AdvancedPebbleMetrics"];
}

message CompactEngineSpanResponse {
}

Expand Down
26 changes: 26 additions & 0 deletions pkg/kv/kvserver/storage_engine_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,32 @@ func (c *StorageEngineClient) GetTableMetrics(
return resp.TableMetrics, nil
}

// ScanStorageInternalKeys is a tree.ScanStorageInternalKeys
func (c *StorageEngineClient) ScanStorageInternalKeys(
ctx context.Context, nodeID, storeID int32, startKey, endKey []byte,
) ([]enginepb.StorageInternalKeysMetrics, error) {
conn, err := c.nd.Dial(ctx, roachpb.NodeID(nodeID), rpc.DefaultClass)
if err != nil {
return []enginepb.StorageInternalKeysMetrics{}, errors.Wrapf(err, "could not dial node ID %d", nodeID)
}

client := NewPerStoreClient(conn)
req := &ScanStorageInternalKeysRequest{
StoreRequestHeader: StoreRequestHeader{
NodeID: roachpb.NodeID(nodeID),
StoreID: roachpb.StoreID(storeID),
},
Span: roachpb.Span{Key: roachpb.Key(startKey), EndKey: roachpb.Key(endKey)},
}

resp, err := client.ScanStorageInternalKeys(ctx, req)

if err != nil {
return []enginepb.StorageInternalKeysMetrics{}, err
}
return resp.AdvancedPebbleMetrics, nil
}

// SetCompactionConcurrency is a tree.CompactionConcurrencyFunc.
func (c *StorageEngineClient) SetCompactionConcurrency(
ctx context.Context, nodeID, storeID int32, compactionConcurrency uint64,
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/storage_services.proto
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,6 @@ service PerReplica {
service PerStore {
rpc CompactEngineSpan(cockroach.kv.kvserver.CompactEngineSpanRequest) returns (cockroach.kv.kvserver.CompactEngineSpanResponse) {}
rpc GetTableMetrics(cockroach.kv.kvserver.GetTableMetricsRequest) returns (cockroach.kv.kvserver.GetTableMetricsResponse) {}
rpc ScanStorageInternalKeys(cockroach.kv.kvserver.ScanStorageInternalKeysRequest) returns (cockroach.kv.kvserver.ScanStorageInternalKeysResponse) {}
rpc SetCompactionConcurrency(cockroach.kv.kvserver.CompactionConcurrencyRequest) returns (cockroach.kv.kvserver.CompactionConcurrencyResponse) {}
}
18 changes: 18 additions & 0 deletions pkg/kv/kvserver/stores_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,24 @@ func (is Server) GetTableMetrics(
return resp, err
}

func (is Server) ScanStorageInternalKeys(
ctx context.Context, req *ScanStorageInternalKeysRequest,
) (*ScanStorageInternalKeysResponse, error) {
resp := &ScanStorageInternalKeysResponse{}
err := is.execStoreCommand(ctx, req.StoreRequestHeader,
func(ctx context.Context, s *Store) error {
metrics, err := s.TODOEngine().ScanStorageInternalKeys(req.Span.Key, req.Span.EndKey)

if err != nil {
return err
}

resp.AdvancedPebbleMetrics = metrics
return nil
})
return resp, err
}

// SetCompactionConcurrency implements PerStoreServer. It changes the compaction
// concurrency of a store. While SetCompactionConcurrency is safe for concurrent
// use, it adds uncertainty about the compaction concurrency actually set on
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1377,6 +1377,10 @@ type ExecutorConfig struct {
// overlap with a key range for a specified node and store.
GetTableMetricsFunc eval.GetTableMetricsFunc

// ScanStorageInternalKeys is used to gather information about the types of
// keys (including snapshot pinned keys) at each level of a node store.
ScanStorageInternalKeysFunc eval.ScanStorageInternalKeysFunc

// TraceCollector is used to contact all live nodes in the cluster, and
// collect trace spans from their inflight node registries.
TraceCollector *collector.TraceCollector
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ func (evalCtx *extendedEvalContext) copyFromExecCfg(execCfg *ExecutorConfig) {
evalCtx.CompactEngineSpan = execCfg.CompactEngineSpanFunc
evalCtx.SetCompactionConcurrency = execCfg.CompactionConcurrencyFunc
evalCtx.GetTableMetrics = execCfg.GetTableMetricsFunc
evalCtx.ScanStorageInternalKeys = execCfg.ScanStorageInternalKeysFunc
evalCtx.TestingKnobs = execCfg.EvalContextTestingKnobs
evalCtx.ClusterID = execCfg.NodeInfo.LogicalClusterID()
evalCtx.ClusterName = execCfg.RPCContext.ClusterName()
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/sem/builtins/fixed_oids.go
Original file line number Diff line number Diff line change
Expand Up @@ -2436,6 +2436,7 @@ var builtinOidsArray = []string{
2463: `workload_index_recs(timestamptz: timestamptz) -> string`,
2464: `workload_index_recs(budget: string) -> string`,
2465: `workload_index_recs(timestamptz: timestamptz, budget: string) -> string`,
2466: `crdb_internal.scan_storage_internal_keys(node_id: int, store_id: int, start_key: bytes, end_key: bytes) -> tuple{int AS node_id, int AS store_id, int AS level, int AS snapshot_pinned_keys, int AS snapshot_pinned_keys_bytes, int AS point_key_delete_count, int AS point_key_delete_bytes, int AS point_key_set_count, int AS point_key_set_bytes, int AS range_delete_count, int AS range_delete_bytes, int AS range_key_set_count, int AS range_key_set_bytes, int AS range_key_delete_count, int AS range_key_delete_bytes}`,
}

var builtinOidsBySignature map[string]oid.Oid
Expand Down
123 changes: 123 additions & 0 deletions pkg/sql/sem/builtins/generator_builtins.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,23 @@ The last argument is a JSONB object containing the following optional fields:
volatility.Stable,
),
),
"crdb_internal.scan_storage_internal_keys": makeBuiltin(
tree.FunctionProperties{
Category: builtinconstants.CategorySystemInfo,
},
makeGeneratorOverload(
tree.ParamTypes{
{Name: "node_id", Typ: types.Int},
{Name: "store_id", Typ: types.Int},
{Name: "start_key", Typ: types.Bytes},
{Name: "end_key", Typ: types.Bytes},
},
storageInternalKeysGeneratorType,
makeStorageInternalKeysGenerator,
"Scans a store's storage engine, computing statistics describing the internal keys within the span [start_key, end_key).",
volatility.Volatile,
),
),
}

var decodePlanGistGeneratorType = types.String
Expand Down Expand Up @@ -3272,6 +3289,112 @@ func makeTableMetricsGenerator(
return newTableMetricsIterator(evalCtx, nodeID, storeID, start, end), nil
}

type storageInternalKeysIterator struct {
metrics []enginepb.StorageInternalKeysMetrics
evalCtx *eval.Context

iterIdx int
nodeID int32
storeID int32
start []byte
end []byte
}

var storageInternalKeysGeneratorType = types.MakeLabeledTuple(
[]*types.T{types.Int, types.Int, types.Int, types.Int, types.Int, types.Int, types.Int, types.Int,
types.Int, types.Int, types.Int, types.Int, types.Int, types.Int, types.Int},
[]string{
"node_id",
"store_id",
"level",
"snapshot_pinned_keys",
"snapshot_pinned_keys_bytes",
"point_key_delete_count",
"point_key_delete_bytes",
"point_key_set_count",
"point_key_set_bytes",
"range_delete_count",
"range_delete_bytes",
"range_key_set_count",
"range_key_set_bytes",
"range_key_delete_count",
"range_key_delete_bytes",
},
)

var _ eval.ValueGenerator = (*storageInternalKeysIterator)(nil)

func newStorageInternalKeysGenerator(
evalCtx *eval.Context, nodeID, storeID int32, start, end []byte,
) *storageInternalKeysIterator {
return &storageInternalKeysIterator{evalCtx: evalCtx, nodeID: nodeID, storeID: storeID, start: start, end: end}
}

// Start implements the tree.ValueGenerator interface.
func (s *storageInternalKeysIterator) Start(ctx context.Context, _ *kv.Txn) error {
var err error
s.metrics, err = s.evalCtx.ScanStorageInternalKeys(ctx, s.nodeID, s.storeID, s.start, s.end)
if err != nil {
err = errors.Wrapf(err, "getting table metrics for node %d store %d", s.nodeID, s.storeID)
}

return err
}

// Next implements the tree.ValueGenerator interface.
func (s *storageInternalKeysIterator) Next(_ context.Context) (bool, error) {
s.iterIdx++
return s.iterIdx <= len(s.metrics), nil
}

// Values implements the tree.ValueGenerator interface.
func (s *storageInternalKeysIterator) Values() (tree.Datums, error) {
metricsInfo := s.metrics[s.iterIdx-1]

return tree.Datums{
tree.NewDInt(tree.DInt(s.nodeID)),
tree.NewDInt(tree.DInt(s.storeID)),
tree.NewDInt(tree.DInt(metricsInfo.SnapshotPinnedKeys)),
tree.NewDInt(tree.DInt(metricsInfo.SnapshotPinnedKeysBytes)),
tree.NewDInt(tree.DInt(metricsInfo.PointKeyDeleteCount)),
tree.NewDInt(tree.DInt(metricsInfo.PointKeyDeleteBytes)),
tree.NewDInt(tree.DInt(metricsInfo.PointKeySetCount)),
tree.NewDInt(tree.DInt(metricsInfo.PointKeySetBytes)),
tree.NewDInt(tree.DInt(metricsInfo.RangeDeleteCount)),
tree.NewDInt(tree.DInt(metricsInfo.RangeDeleteBytes)),
tree.NewDInt(tree.DInt(metricsInfo.RangeKeySetCount)),
tree.NewDInt(tree.DInt(metricsInfo.RangeKeySetBytes)),
tree.NewDInt(tree.DInt(metricsInfo.RangeKeyDeleteCount)),
tree.NewDInt(tree.DInt(metricsInfo.RangeKeyDeleteBytes)),
}, nil
}

// Close implements the tree.ValueGenerator interface.
func (tmi *storageInternalKeysIterator) Close(_ context.Context) {}

// ResolvedType implements the tree.ValueGenerator interface.
func (tmi *storageInternalKeysIterator) ResolvedType() *types.T {
return tableMetricsGeneratorType
}

func makeStorageInternalKeysGenerator(
ctx context.Context, evalCtx *eval.Context, args tree.Datums,
) (eval.ValueGenerator, error) {
isAdmin, err := evalCtx.SessionAccessor.HasAdminRole(ctx)
if err != nil {
return nil, err
}
if !isAdmin {
return nil, errInsufficientPriv
}
nodeID := int32(tree.MustBeDInt(args[0]))
storeID := int32(tree.MustBeDInt(args[1]))
start := []byte(tree.MustBeDBytes(args[2]))
end := []byte(tree.MustBeDBytes(args[3]))

return newStorageInternalKeysGenerator(evalCtx, nodeID, storeID, start, end), nil
}

var tableSpanStatsGeneratorType = types.MakeLabeledTuple(
[]*types.T{types.Int, types.Int, types.Int, types.Int, types.Int, types.Int, types.Float},
[]string{"database_id", "table_id", "range_count", "approximate_disk_bytes", "live_bytes", "total_bytes", "live_percentage"},
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/sem/eval/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,9 @@ type Context struct {
// GetTableMetrics is used in crdb_internal.sstable_metrics.
GetTableMetrics GetTableMetricsFunc

// ScanStorageInternalKeys is used in crdb_internal.scan_storage_internal_keys.
ScanStorageInternalKeys ScanStorageInternalKeysFunc

// SetCompactionConcurrency is used to change the compaction concurrency of
// a store.
SetCompactionConcurrency SetCompactionConcurrencyFunc
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/sem/eval/deps.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,12 @@ type GetTableMetricsFunc func(
ctx context.Context, nodeID, storeID int32, startKey, endKey []byte,
) ([]enginepb.SSTableMetricsInfo, error)

// ScanStorageInternalKeysFunc is used to retrieve pebble metrics on a key span
// (end-exclusive) at the given (nodeID, storeID).
type ScanStorageInternalKeysFunc func(
ctx context.Context, nodeID, storeID int32, startKey, endKey []byte,
) ([]enginepb.StorageInternalKeysMetrics, error)

// SetCompactionConcurrencyFunc is used to change the compaction concurrency of a
// store.
type SetCompactionConcurrencyFunc func(
Expand Down
2 changes: 2 additions & 0 deletions pkg/storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -996,6 +996,8 @@ type Engine interface {
// CompactRange ensures that the specified range of key value pairs is
// optimized for space efficiency.
CompactRange(start, end roachpb.Key) error
// ScanStorageInternalKeys returns key level statistics for each level of a pebble store (that overlap start and end).
ScanStorageInternalKeys(start, end roachpb.Key) ([]enginepb.StorageInternalKeysMetrics, error)
// GetTableMetrics returns information about sstables that overlap start and end.
GetTableMetrics(start, end roachpb.Key) ([]enginepb.SSTableMetricsInfo, error)
// RegisterFlushCompletedCallback registers a callback that will be run for
Expand Down
17 changes: 17 additions & 0 deletions pkg/storage/enginepb/rocksdb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,23 @@ message SSTableMetricsInfo {
uint64 approximate_span_bytes = 4 [(gogoproto.customname) = "ApproximateSpanBytes"];
}

message StorageInternalKeysMetrics {
// level is the lsm tree level the metrics are found.
int32 level = 1;
uint64 snapshot_pinned_keys = 2 [(gogoproto.customname) = "SnapshotPinnedKeys"];
uint64 snapshot_pinnedKeys_bytes = 3 [(gogoproto.customname) = "SnapshotPinnedKeysBytes"];
uint64 point_key_delete_count = 4 [(gogoproto.customname) = "PointKeyDeleteCount"];
uint64 point_key_delete_bytes = 5 [(gogoproto.customname) = "PointKeyDeleteBytes"];
uint64 point_key_set_count = 6 [(gogoproto.customname) = "PointKeySetCount"];
uint64 point_key_set_bytes = 7 [(gogoproto.customname) = "PointKeySetBytes"];
uint64 range_delete_count = 8 [(gogoproto.customname) = "RangeDeleteCount"];
uint64 range_delete_bytes = 9 [(gogoproto.customname) = "RangeDeleteBytes"];
uint64 range_key_set_count = 10 [(gogoproto.customname) = "RangeKeySetCount"];
uint64 range_key_set_bytes = 11 [(gogoproto.customname) = "RangeKeySetBytes"];
uint64 range_key_delete_count = 12 [(gogoproto.customname) = "RangeKeyDeleteCount"];
uint64 range_key_delete_bytes = 13 [(gogoproto.customname) = "RangeKeyDeleteBytes"];
}

// SSTUserProperties contains the user-added properties of a single sstable.
message SSTUserProperties {
string path = 1;
Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -2063,6 +2063,13 @@ func (p *Pebble) GetTableMetrics(start, end roachpb.Key) ([]enginepb.SSTableMetr
return metricsInfo, nil
}

// ScanStorageInternalKeys implements the Engine interface.
func (p *Pebble) ScanStorageInternalKeys(
start, end roachpb.Key,
) ([]enginepb.StorageInternalKeysMetrics, error) {
return []enginepb.StorageInternalKeysMetrics{}, nil
}

// ApproximateDiskBytes implements the Engine interface.
func (p *Pebble) ApproximateDiskBytes(
from, to roachpb.Key,
Expand Down

0 comments on commit 0b19e7b

Please sign in to comment.