Skip to content

Commit

Permalink
sql: add crdb_internal.kv_flow.*v2 tables
Browse files Browse the repository at this point in the history
The existing flow control tables are:

```
crdb_internal.kv_flow_controller
crdb_internal.kv_flow_control_handles
crdb_internal.kv_flow_token_deductions
```

Which are used in testing to assert on flow control stream state and
transitions.

Introduce another set of three tables, most of which are identical to
the existing tables but populating the table data using rac2.

```
crdb_internal.kv_flow_controller_v2
crdb_internal.kv_flow_control_handles_v2
crdb_internal.kv_flow_token_deductions_v2
```

`crdb_internal.kv_flow_controller_v2` has two additional columns,
tracking the amount of available (regular|elastic) send tokens. The
schema is:

```
CREATE TABLE crdb_internal.kv_flow_controller_v2 (
  tenant_id                     INT NOT NULL,
  store_id                      INT NOT NULL,
  available_eval_regular_tokens INT NOT NULL,
  available_eval_elastic_tokens INT NOT NULL,
  available_send_regular_tokens INT NOT NULL,
  available_send_elastic_tokens INT NOT NULL
);
```

Note that unless rac2 is enabled, the tables are unlikely to show
anything interesting. Both tables (v1 and v2) are kept for
compatibility, with an intent to replace the v1 tables with the v2 ones
after v1 replication flow control is removed entirely from the code.

Resolves: cockroachdb#128091
Release note: None
  • Loading branch information
kvoli committed Sep 12, 2024
1 parent 02a5e01 commit c0a5058
Show file tree
Hide file tree
Showing 16 changed files with 588 additions and 406 deletions.
3 changes: 3 additions & 0 deletions pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,11 @@ crdb_internal kv_catalog_namespace table node NULL N
crdb_internal kv_catalog_zones table node NULL NULL
crdb_internal kv_dropped_relations view node NULL NULL
crdb_internal kv_flow_control_handles table node NULL NULL
crdb_internal kv_flow_control_handles_v2 table node NULL NULL
crdb_internal kv_flow_controller table node NULL NULL
crdb_internal kv_flow_controller_v2 table node NULL NULL
crdb_internal kv_flow_token_deductions table node NULL NULL
crdb_internal kv_flow_token_deductions_v2 table node NULL NULL
crdb_internal kv_inherited_role_members table node NULL NULL
crdb_internal kv_node_liveness table node NULL NULL
crdb_internal kv_node_status table node NULL NULL
Expand Down
3 changes: 3 additions & 0 deletions pkg/cli/zip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,11 @@ table_name NOT IN (
'kv_dropped_relations',
'kv_inherited_role_members',
'kv_flow_control_handles',
'kv_flow_control_handles_v2',
'kv_flow_controller',
'kv_flow_controller_v2',
'kv_flow_token_deductions',
'kv_flow_token_deductions_v2',
'lost_descriptors_with_data',
'table_columns',
'table_row_statistics',
Expand Down
146 changes: 146 additions & 0 deletions pkg/sql/crdb_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,11 @@ var crdbInternal = virtualSchema{
catconstants.CrdbInternalInheritedRoleMembersTableID: crdbInternalInheritedRoleMembers,
catconstants.CrdbInternalKVSystemPrivilegesViewID: crdbInternalKVSystemPrivileges,
catconstants.CrdbInternalKVFlowHandlesID: crdbInternalKVFlowHandles,
catconstants.CrdbInternalKVFlowHandlesIDV2: crdbInternalKVFlowHandlesV2,
catconstants.CrdbInternalKVFlowControllerID: crdbInternalKVFlowController,
catconstants.CrdbInternalKVFlowControllerIDV2: crdbInternalKVFlowControllerV2,
catconstants.CrdbInternalKVFlowTokenDeductions: crdbInternalKVFlowTokenDeductions,
catconstants.CrdbInternalKVFlowTokenDeductionsV2: crdbInternalKVFlowTokenDeductionsV2,
catconstants.CrdbInternalRepairableCatalogCorruptionsViewID: crdbInternalRepairableCatalogCorruptions,
catconstants.CrdbInternalKVProtectedTS: crdbInternalKVProtectedTSTable,
catconstants.CrdbInternalKVSessionBasedLeases: crdbInternalSessionBasedLeases,
Expand Down Expand Up @@ -8874,6 +8877,45 @@ CREATE TABLE crdb_internal.kv_flow_controller (
return nil
},
}
var crdbInternalKVFlowControllerV2 = virtualSchemaTable{
comment: `node-level view of the kv flow controller v2, its active streams and available tokens state`,
schema: `
CREATE TABLE crdb_internal.kv_flow_controller_v2 (
tenant_id INT NOT NULL,
store_id INT NOT NULL,
available_eval_regular_tokens INT NOT NULL,
available_eval_elastic_tokens INT NOT NULL,
available_send_regular_tokens INT NOT NULL,
available_send_elastic_tokens INT NOT NULL
);`,
populate: func(ctx context.Context, p *planner, _ catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) error {
hasRoleOption, _, err := p.HasViewActivityOrViewActivityRedactedRole(ctx)
if err != nil {
return err
}
if !hasRoleOption {
return noViewActivityOrViewActivityRedactedRoleError(p.User())
}

resp, err := p.extendedEvalCtx.ExecCfg.InspectzServer.KVFlowControllerV2(ctx, &kvflowinspectpb.ControllerRequest{})
if err != nil {
return err
}
for _, stream := range resp.Streams {
if err := addRow(
tree.NewDInt(tree.DInt(stream.TenantID.ToUint64())),
tree.NewDInt(tree.DInt(stream.StoreID)),
tree.NewDInt(tree.DInt(stream.AvailableEvalRegularTokens)),
tree.NewDInt(tree.DInt(stream.AvailableEvalElasticTokens)),
tree.NewDInt(tree.DInt(stream.AvailableSendRegularTokens)),
tree.NewDInt(tree.DInt(stream.AvailableSendElasticTokens)),
); err != nil {
return err
}
}
return nil
},
}

var crdbInternalKVFlowHandles = virtualSchemaTable{
comment: `node-level view of active kv flow control handles, their underlying streams, and tracked state`,
Expand Down Expand Up @@ -8925,6 +8967,56 @@ CREATE TABLE crdb_internal.kv_flow_control_handles (
return populateFlowHandlesResponse(resp, addRow)
},
}
var crdbInternalKVFlowHandlesV2 = virtualSchemaTable{
comment: `node-level view of active kv flow range controllers, their underlying streams, and tracked state`,
schema: `
CREATE TABLE crdb_internal.kv_flow_control_handles_v2 (
range_id INT NOT NULL,
tenant_id INT NOT NULL,
store_id INT NOT NULL,
total_tracked_tokens INT NOT NULL,
INDEX(range_id)
);`,

indexes: []virtualIndex{
{
populate: func(ctx context.Context, constraint tree.Datum, p *planner, _ catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) (matched bool, err error) {
hasRoleOption, _, err := p.HasViewActivityOrViewActivityRedactedRole(ctx)
if err != nil {
return false, err
}
if !hasRoleOption {
return false, noViewActivityOrViewActivityRedactedRoleError(p.User())
}

rangeID := roachpb.RangeID(tree.MustBeDInt(constraint))
resp, err := p.extendedEvalCtx.ExecCfg.InspectzServer.KVFlowHandlesV2(
ctx, &kvflowinspectpb.HandlesRequest{
RangeIDs: []roachpb.RangeID{rangeID},
})
if err != nil {
return false, err
}
return true, populateFlowHandlesResponse(resp, addRow)
},
},
},
populate: func(ctx context.Context, p *planner, _ catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) error {
hasRoleOption, _, err := p.HasViewActivityOrViewActivityRedactedRole(ctx)
if err != nil {
return err
}
if !hasRoleOption {
return noViewActivityOrViewActivityRedactedRoleError(p.User())
}

resp, err := p.extendedEvalCtx.ExecCfg.InspectzServer.KVFlowHandlesV2(ctx, &kvflowinspectpb.HandlesRequest{})
if err != nil {
return err
}
return populateFlowHandlesResponse(resp, addRow)
},
}

func populateFlowHandlesResponse(
resp *kvflowinspectpb.HandlesResponse, addRow func(...tree.Datum) error,
Expand Down Expand Up @@ -9002,6 +9094,60 @@ CREATE TABLE crdb_internal.kv_flow_token_deductions (
},
}

var crdbInternalKVFlowTokenDeductionsV2 = virtualSchemaTable{
comment: `node-level view of tracked kv flow tokens`,
schema: `
CREATE TABLE crdb_internal.kv_flow_token_deductions_v2 (
range_id INT NOT NULL,
tenant_id INT NOT NULL,
store_id INT NOT NULL,
priority STRING NOT NULL,
log_term INT NOT NULL,
log_index INT NOT NULL,
tokens INT NOT NULL,
INDEX(range_id)
);`,

indexes: []virtualIndex{
{
populate: func(ctx context.Context, constraint tree.Datum, p *planner, _ catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) (matched bool, err error) {
hasRoleOption, _, err := p.HasViewActivityOrViewActivityRedactedRole(ctx)
if err != nil {
return false, err
}
if !hasRoleOption {
return false, noViewActivityOrViewActivityRedactedRoleError(p.User())
}

rangeID := roachpb.RangeID(tree.MustBeDInt(constraint))
resp, err := p.extendedEvalCtx.ExecCfg.InspectzServer.KVFlowHandlesV2(
ctx, &kvflowinspectpb.HandlesRequest{
RangeIDs: []roachpb.RangeID{rangeID},
})
if err != nil {
return false, err
}
return true, populateFlowTokensResponse(resp, addRow)
},
},
},
populate: func(ctx context.Context, p *planner, _ catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) error {
hasRoleOption, _, err := p.HasViewActivityOrViewActivityRedactedRole(ctx)
if err != nil {
return err
}
if !hasRoleOption {
return noViewActivityOrViewActivityRedactedRoleError(p.User())
}

resp, err := p.extendedEvalCtx.ExecCfg.InspectzServer.KVFlowHandlesV2(ctx, &kvflowinspectpb.HandlesRequest{})
if err != nil {
return err
}
return populateFlowTokensResponse(resp, addRow)
},
}

func populateFlowTokensResponse(
resp *kvflowinspectpb.HandlesResponse, addRow func(...tree.Datum) error,
) error {
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/crdb_internal
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,11 @@ crdb_internal kv_catalog_namespace table node NULL N
crdb_internal kv_catalog_zones table node NULL NULL
crdb_internal kv_dropped_relations view node NULL NULL
crdb_internal kv_flow_control_handles table node NULL NULL
crdb_internal kv_flow_control_handles_v2 table node NULL NULL
crdb_internal kv_flow_controller table node NULL NULL
crdb_internal kv_flow_controller_v2 table node NULL NULL
crdb_internal kv_flow_token_deductions table node NULL NULL
crdb_internal kv_flow_token_deductions_v2 table node NULL NULL
crdb_internal kv_inherited_role_members table node NULL NULL
crdb_internal kv_node_liveness table node NULL NULL
crdb_internal kv_node_status table node NULL NULL
Expand Down
Loading

0 comments on commit c0a5058

Please sign in to comment.