Skip to content

Commit

Permalink
kvflowinspectpb: add send tokens and rename existing as eval
Browse files Browse the repository at this point in the history
The `kvflowinspectpb.Stream` represents a replication stream state and
is used in tests and `inspectz` related observability.

`kvflowinspectpb.Stream` used to contain two available token fields,
broken into work class. As rac2 is being added, two additional token
fields are required, accounting for send tokens.

Rename the existing fields with `Eval` added and introduce two more
fields, mirroring the existing ones as `Send`.

Part of: cockroachdb#128091
Release note: None
  • Loading branch information
kvoli committed Sep 12, 2024
1 parent a463cb3 commit f604957
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 47 deletions.
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/flow_control_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2357,24 +2357,24 @@ func (h *flowControlTestHelper) checkAllTokensReturned(
return fmt.Errorf("expected %d replication streams, got %d", expStreamCount, len(streams))
}
for _, stream := range streams {
if stream.AvailableRegularTokens != 16<<20 {
if stream.AvailableEvalRegularTokens != 16<<20 {
return fmt.Errorf("expected %s of regular flow tokens for %s, got %s",
humanize.IBytes(16<<20),
kvflowcontrol.Stream{
TenantID: stream.TenantID,
StoreID: stream.StoreID,
},
humanize.IBytes(uint64(stream.AvailableRegularTokens)),
humanize.IBytes(uint64(stream.AvailableEvalRegularTokens)),
)
}
if stream.AvailableElasticTokens != 8<<20 {
if stream.AvailableEvalElasticTokens != 8<<20 {
return fmt.Errorf("expected %s of elastic flow tokens for %s, got %s",
humanize.IBytes(8<<20),
kvflowcontrol.Stream{
TenantID: stream.TenantID,
StoreID: stream.StoreID,
},
humanize.IBytes(uint64(stream.AvailableElasticTokens)),
humanize.IBytes(uint64(stream.AvailableEvalElasticTokens)),
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,10 @@ func (c *Controller) Inspect(ctx context.Context) []kvflowinspectpb.Stream {
c.mu.buckets.Range(func(stream kvflowcontrol.Stream, b *bucket) bool {
b.mu.RLock()
streams = append(streams, kvflowinspectpb.Stream{
TenantID: stream.TenantID,
StoreID: stream.StoreID,
AvailableRegularTokens: int64(b.tokensLocked(regular)),
AvailableElasticTokens: int64(b.tokensLocked(elastic)),
TenantID: stream.TenantID,
StoreID: stream.StoreID,
AvailableEvalRegularTokens: int64(b.tokensLocked(regular)),
AvailableEvalElasticTokens: int64(b.tokensLocked(elastic)),
})
b.mu.RUnlock()
return true
Expand All @@ -253,10 +253,10 @@ func (c *Controller) InspectStream(
) kvflowinspectpb.Stream {
tokens := c.getTokensForStream(stream)
return kvflowinspectpb.Stream{
TenantID: stream.TenantID,
StoreID: stream.StoreID,
AvailableRegularTokens: int64(tokens.regular),
AvailableElasticTokens: int64(tokens.elastic),
TenantID: stream.TenantID,
StoreID: stream.StoreID,
AvailableEvalRegularTokens: int64(tokens.regular),
AvailableEvalElasticTokens: int64(tokens.elastic),
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,8 @@ func TestBucketSignalingBug(t *testing.T) {
controller.DeductTokens(ctx, admissionpb.NormalPri, 10, stream)
controller.DeductTokens(ctx, admissionpb.BulkNormalPri, 10, stream)
streamState := controller.InspectStream(ctx, stream)
require.Equal(t, int64(0), streamState.AvailableRegularTokens)
require.Equal(t, int64(-15), streamState.AvailableElasticTokens)
require.Equal(t, int64(0), streamState.AvailableEvalRegularTokens)
require.Equal(t, int64(-15), streamState.AvailableEvalElasticTokens)

connectedStream := &mockConnectedStream{
stream: stream,
Expand Down Expand Up @@ -357,8 +357,8 @@ func TestBucketSignalingBug(t *testing.T) {

controller.ReturnTokens(ctx, admissionpb.NormalPri, 1, stream)
streamState = controller.InspectStream(ctx, stream)
require.Equal(t, int64(1), streamState.AvailableRegularTokens)
require.Equal(t, int64(-14), streamState.AvailableElasticTokens)
require.Equal(t, int64(1), streamState.AvailableEvalRegularTokens)
require.Equal(t, int64(-14), streamState.AvailableEvalElasticTokens)

// Sleep to give enough time for regular work to get admitted.
time.Sleep(2 * time.Second)
Expand All @@ -373,13 +373,13 @@ func TestBucketSignalingBug(t *testing.T) {
// Return enough tokens that the elastic work gets admitted.
controller.ReturnTokens(ctx, admissionpb.NormalPri, 9, stream)
streamState = controller.InspectStream(ctx, stream)
require.Equal(t, int64(10), streamState.AvailableRegularTokens)
require.Equal(t, int64(-5), streamState.AvailableElasticTokens)
require.Equal(t, int64(10), streamState.AvailableEvalRegularTokens)
require.Equal(t, int64(-5), streamState.AvailableEvalElasticTokens)

controller.ReturnTokens(ctx, admissionpb.BulkNormalPri, 7, stream)
streamState = controller.InspectStream(ctx, stream)
require.Equal(t, int64(10), streamState.AvailableRegularTokens)
require.Equal(t, int64(2), streamState.AvailableElasticTokens)
require.Equal(t, int64(10), streamState.AvailableEvalRegularTokens)
require.Equal(t, int64(2), streamState.AvailableEvalElasticTokens)
<-lowPriAdmitted
}

Expand All @@ -398,10 +398,10 @@ func TestInspectController(t *testing.T) {
}
makeInspectStream := func(id uint64, availableElastic, availableRegular int64) kvflowinspectpb.Stream {
return kvflowinspectpb.Stream{
TenantID: roachpb.MustMakeTenantID(id),
StoreID: roachpb.StoreID(id),
AvailableElasticTokens: availableElastic,
AvailableRegularTokens: availableRegular,
TenantID: roachpb.MustMakeTenantID(id),
StoreID: roachpb.StoreID(id),
AvailableEvalRegularTokens: availableRegular,
AvailableEvalElasticTokens: availableElastic,
}
}
makeConnectedStream := func(id uint64) kvflowcontrol.ConnectedStream {
Expand Down
54 changes: 36 additions & 18 deletions pkg/kv/kvserver/kvflowcontrol/kvflowhandle/testdata/handle_inspect
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ echo
"id": "1"
},
"store_id": 1,
"available_regular_tokens": "16777216",
"available_elastic_tokens": "8388608"
"available_eval_regular_tokens": "16777216",
"available_eval_elastic_tokens": "8388608",
"available_send_regular_tokens": "0",
"available_send_elastic_tokens": "0"
},
"tracked_deductions": [
]
Expand All @@ -37,8 +39,10 @@ echo
"id": "1"
},
"store_id": 1,
"available_regular_tokens": "15728640",
"available_elastic_tokens": "6291456"
"available_eval_regular_tokens": "15728640",
"available_eval_elastic_tokens": "6291456",
"available_send_regular_tokens": "0",
"available_send_elastic_tokens": "0"
},
"tracked_deductions": [
{
Expand Down Expand Up @@ -72,8 +76,10 @@ echo
"id": "1"
},
"store_id": 1,
"available_regular_tokens": "16777216",
"available_elastic_tokens": "6291456"
"available_eval_regular_tokens": "16777216",
"available_eval_elastic_tokens": "6291456",
"available_send_regular_tokens": "0",
"available_send_elastic_tokens": "0"
},
"tracked_deductions": [
{
Expand All @@ -100,8 +106,10 @@ echo
"id": "1"
},
"store_id": 2,
"available_regular_tokens": "16777216",
"available_elastic_tokens": "7340032"
"available_eval_regular_tokens": "16777216",
"available_eval_elastic_tokens": "7340032",
"available_send_regular_tokens": "0",
"available_send_elastic_tokens": "0"
},
"tracked_deductions": [
{
Expand All @@ -120,8 +128,10 @@ echo
"id": "1"
},
"store_id": 3,
"available_regular_tokens": "16777216",
"available_elastic_tokens": "7340032"
"available_eval_regular_tokens": "16777216",
"available_eval_elastic_tokens": "7340032",
"available_send_regular_tokens": "0",
"available_send_elastic_tokens": "0"
},
"tracked_deductions": [
{
Expand All @@ -147,8 +157,10 @@ echo
"id": "1"
},
"store_id": 1,
"available_regular_tokens": "16777216",
"available_elastic_tokens": "6291456"
"available_eval_regular_tokens": "16777216",
"available_eval_elastic_tokens": "6291456",
"available_send_regular_tokens": "0",
"available_send_elastic_tokens": "0"
},
"tracked_deductions": [
{
Expand All @@ -175,8 +187,10 @@ echo
"id": "1"
},
"store_id": 3,
"available_regular_tokens": "16777216",
"available_elastic_tokens": "7340032"
"available_eval_regular_tokens": "16777216",
"available_eval_elastic_tokens": "7340032",
"available_send_regular_tokens": "0",
"available_send_elastic_tokens": "0"
},
"tracked_deductions": [
{
Expand All @@ -202,8 +216,10 @@ echo
"id": "1"
},
"store_id": 1,
"available_regular_tokens": "16777216",
"available_elastic_tokens": "8388608"
"available_eval_regular_tokens": "16777216",
"available_eval_elastic_tokens": "8388608",
"available_send_regular_tokens": "0",
"available_send_elastic_tokens": "0"
},
"tracked_deductions": [
]
Expand All @@ -214,8 +230,10 @@ echo
"id": "1"
},
"store_id": 3,
"available_regular_tokens": "16777216",
"available_elastic_tokens": "8388608"
"available_eval_regular_tokens": "16777216",
"available_eval_elastic_tokens": "8388608",
"available_send_regular_tokens": "0",
"available_send_elastic_tokens": "0"
},
"tracked_deductions": [
]
Expand Down
17 changes: 14 additions & 3 deletions pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb/kvflowinspect.proto
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,27 @@ message ConnectedStream {
}

// Stream represents a given kvflowcontrol.Stream and the number of tokens
// available for it (as maintained by the node-level kvflowcontrol.Controller).
// available for it (as maintained by the node-level kvflowcontrol.Controller,
// or in >= v24.3, by the node level StreamTokenCounterProvider).
message Stream {
roachpb.TenantID tenant_id = 1 [
(gogoproto.nullable) = false,
(gogoproto.customname) = "TenantID"];
uint64 store_id = 2 [
(gogoproto.customname) = "StoreID",
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.StoreID"];
int64 available_regular_tokens = 3;
int64 available_elastic_tokens = 4;
// AvailableEvalRegularTokens represents the currently available tokens for
// regular replication traffic, deducted for evaluation.
int64 available_eval_regular_tokens = 3;
// AvailableEvalElasticTokens represents the currently available tokens for
// elastic replication traffic, deducted for evaluation.
int64 available_eval_elastic_tokens = 4;
// AvailableSendRegularTokens represents the currently available tokens for
// regular replication traffic, deducted before sending.
int64 available_send_regular_tokens = 5;
// AvailableSendElasticTokens represents the currently available tokens for
// elastic replication traffic, deducted before sending.
int64 available_send_elastic_tokens = 6;
}

// TrackedDeductions represents a tracked token deduction, as typically done by
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/crdb_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -8865,8 +8865,8 @@ CREATE TABLE crdb_internal.kv_flow_controller (
if err := addRow(
tree.NewDInt(tree.DInt(stream.TenantID.ToUint64())),
tree.NewDInt(tree.DInt(stream.StoreID)),
tree.NewDInt(tree.DInt(stream.AvailableRegularTokens)),
tree.NewDInt(tree.DInt(stream.AvailableElasticTokens)),
tree.NewDInt(tree.DInt(stream.AvailableEvalRegularTokens)),
tree.NewDInt(tree.DInt(stream.AvailableEvalElasticTokens)),
); err != nil {
return err
}
Expand Down

0 comments on commit f604957

Please sign in to comment.