From f604957ff21fd1e5065df745ea8c4b981291089f Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Fri, 6 Sep 2024 15:40:50 -0400 Subject: [PATCH] kvflowinspectpb: add send tokens and rename existing as eval The `kvflowinspectpb.Stream` represents a replication stream state and is used in tests and `inspectz` related observability. `kvflowinspectpb.Stream` used to contain two available token fields, broken into work class. As rac2 is being added, two additional token fields are required, accounting for send tokens. Rename the existing fields with `Eval` added and introduce two more fields, mirroring the existing ones as `Send`. Part of: #128091 Release note: None --- .../kvserver/flow_control_integration_test.go | 8 +-- .../kvflowcontroller/kvflowcontroller.go | 16 +++--- .../kvflowcontroller/kvflowcontroller_test.go | 24 ++++----- .../kvflowhandle/testdata/handle_inspect | 54 ++++++++++++------- .../kvflowinspectpb/kvflowinspect.proto | 17 ++++-- pkg/sql/crdb_internal.go | 4 +- 6 files changed, 76 insertions(+), 47 deletions(-) diff --git a/pkg/kv/kvserver/flow_control_integration_test.go b/pkg/kv/kvserver/flow_control_integration_test.go index 4991072a2f77..0c9fc216260a 100644 --- a/pkg/kv/kvserver/flow_control_integration_test.go +++ b/pkg/kv/kvserver/flow_control_integration_test.go @@ -2357,24 +2357,24 @@ func (h *flowControlTestHelper) checkAllTokensReturned( return fmt.Errorf("expected %d replication streams, got %d", expStreamCount, len(streams)) } for _, stream := range streams { - if stream.AvailableRegularTokens != 16<<20 { + if stream.AvailableEvalRegularTokens != 16<<20 { return fmt.Errorf("expected %s of regular flow tokens for %s, got %s", humanize.IBytes(16<<20), kvflowcontrol.Stream{ TenantID: stream.TenantID, StoreID: stream.StoreID, }, - humanize.IBytes(uint64(stream.AvailableRegularTokens)), + humanize.IBytes(uint64(stream.AvailableEvalRegularTokens)), ) } - if stream.AvailableElasticTokens != 8<<20 { + if stream.AvailableEvalElasticTokens != 8<<20 { return fmt.Errorf("expected %s of elastic flow tokens for %s, got %s", humanize.IBytes(8<<20), kvflowcontrol.Stream{ TenantID: stream.TenantID, StoreID: stream.StoreID, }, - humanize.IBytes(uint64(stream.AvailableElasticTokens)), + humanize.IBytes(uint64(stream.AvailableEvalElasticTokens)), ) } } diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go index f731590fac51..bb5af1f7f49a 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go @@ -230,10 +230,10 @@ func (c *Controller) Inspect(ctx context.Context) []kvflowinspectpb.Stream { c.mu.buckets.Range(func(stream kvflowcontrol.Stream, b *bucket) bool { b.mu.RLock() streams = append(streams, kvflowinspectpb.Stream{ - TenantID: stream.TenantID, - StoreID: stream.StoreID, - AvailableRegularTokens: int64(b.tokensLocked(regular)), - AvailableElasticTokens: int64(b.tokensLocked(elastic)), + TenantID: stream.TenantID, + StoreID: stream.StoreID, + AvailableEvalRegularTokens: int64(b.tokensLocked(regular)), + AvailableEvalElasticTokens: int64(b.tokensLocked(elastic)), }) b.mu.RUnlock() return true @@ -253,10 +253,10 @@ func (c *Controller) InspectStream( ) kvflowinspectpb.Stream { tokens := c.getTokensForStream(stream) return kvflowinspectpb.Stream{ - TenantID: stream.TenantID, - StoreID: stream.StoreID, - AvailableRegularTokens: int64(tokens.regular), - AvailableElasticTokens: int64(tokens.elastic), + TenantID: stream.TenantID, + StoreID: stream.StoreID, + AvailableEvalRegularTokens: int64(tokens.regular), + AvailableEvalElasticTokens: int64(tokens.elastic), } } diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller_test.go b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller_test.go index 41e3b3bb164f..72f89e6eee6c 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller_test.go @@ -317,8 +317,8 @@ func TestBucketSignalingBug(t *testing.T) { controller.DeductTokens(ctx, admissionpb.NormalPri, 10, stream) controller.DeductTokens(ctx, admissionpb.BulkNormalPri, 10, stream) streamState := controller.InspectStream(ctx, stream) - require.Equal(t, int64(0), streamState.AvailableRegularTokens) - require.Equal(t, int64(-15), streamState.AvailableElasticTokens) + require.Equal(t, int64(0), streamState.AvailableEvalRegularTokens) + require.Equal(t, int64(-15), streamState.AvailableEvalElasticTokens) connectedStream := &mockConnectedStream{ stream: stream, @@ -357,8 +357,8 @@ func TestBucketSignalingBug(t *testing.T) { controller.ReturnTokens(ctx, admissionpb.NormalPri, 1, stream) streamState = controller.InspectStream(ctx, stream) - require.Equal(t, int64(1), streamState.AvailableRegularTokens) - require.Equal(t, int64(-14), streamState.AvailableElasticTokens) + require.Equal(t, int64(1), streamState.AvailableEvalRegularTokens) + require.Equal(t, int64(-14), streamState.AvailableEvalElasticTokens) // Sleep to give enough time for regular work to get admitted. time.Sleep(2 * time.Second) @@ -373,13 +373,13 @@ func TestBucketSignalingBug(t *testing.T) { // Return enough tokens that the elastic work gets admitted. controller.ReturnTokens(ctx, admissionpb.NormalPri, 9, stream) streamState = controller.InspectStream(ctx, stream) - require.Equal(t, int64(10), streamState.AvailableRegularTokens) - require.Equal(t, int64(-5), streamState.AvailableElasticTokens) + require.Equal(t, int64(10), streamState.AvailableEvalRegularTokens) + require.Equal(t, int64(-5), streamState.AvailableEvalElasticTokens) controller.ReturnTokens(ctx, admissionpb.BulkNormalPri, 7, stream) streamState = controller.InspectStream(ctx, stream) - require.Equal(t, int64(10), streamState.AvailableRegularTokens) - require.Equal(t, int64(2), streamState.AvailableElasticTokens) + require.Equal(t, int64(10), streamState.AvailableEvalRegularTokens) + require.Equal(t, int64(2), streamState.AvailableEvalElasticTokens) <-lowPriAdmitted } @@ -398,10 +398,10 @@ func TestInspectController(t *testing.T) { } makeInspectStream := func(id uint64, availableElastic, availableRegular int64) kvflowinspectpb.Stream { return kvflowinspectpb.Stream{ - TenantID: roachpb.MustMakeTenantID(id), - StoreID: roachpb.StoreID(id), - AvailableElasticTokens: availableElastic, - AvailableRegularTokens: availableRegular, + TenantID: roachpb.MustMakeTenantID(id), + StoreID: roachpb.StoreID(id), + AvailableEvalRegularTokens: availableRegular, + AvailableEvalElasticTokens: availableElastic, } } makeConnectedStream := func(id uint64) kvflowcontrol.ConnectedStream { diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/testdata/handle_inspect b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/testdata/handle_inspect index 5e8b4225726e..1af94c6380e6 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/testdata/handle_inspect +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/testdata/handle_inspect @@ -18,8 +18,10 @@ echo "id": "1" }, "store_id": 1, - "available_regular_tokens": "16777216", - "available_elastic_tokens": "8388608" + "available_eval_regular_tokens": "16777216", + "available_eval_elastic_tokens": "8388608", + "available_send_regular_tokens": "0", + "available_send_elastic_tokens": "0" }, "tracked_deductions": [ ] @@ -37,8 +39,10 @@ echo "id": "1" }, "store_id": 1, - "available_regular_tokens": "15728640", - "available_elastic_tokens": "6291456" + "available_eval_regular_tokens": "15728640", + "available_eval_elastic_tokens": "6291456", + "available_send_regular_tokens": "0", + "available_send_elastic_tokens": "0" }, "tracked_deductions": [ { @@ -72,8 +76,10 @@ echo "id": "1" }, "store_id": 1, - "available_regular_tokens": "16777216", - "available_elastic_tokens": "6291456" + "available_eval_regular_tokens": "16777216", + "available_eval_elastic_tokens": "6291456", + "available_send_regular_tokens": "0", + "available_send_elastic_tokens": "0" }, "tracked_deductions": [ { @@ -100,8 +106,10 @@ echo "id": "1" }, "store_id": 2, - "available_regular_tokens": "16777216", - "available_elastic_tokens": "7340032" + "available_eval_regular_tokens": "16777216", + "available_eval_elastic_tokens": "7340032", + "available_send_regular_tokens": "0", + "available_send_elastic_tokens": "0" }, "tracked_deductions": [ { @@ -120,8 +128,10 @@ echo "id": "1" }, "store_id": 3, - "available_regular_tokens": "16777216", - "available_elastic_tokens": "7340032" + "available_eval_regular_tokens": "16777216", + "available_eval_elastic_tokens": "7340032", + "available_send_regular_tokens": "0", + "available_send_elastic_tokens": "0" }, "tracked_deductions": [ { @@ -147,8 +157,10 @@ echo "id": "1" }, "store_id": 1, - "available_regular_tokens": "16777216", - "available_elastic_tokens": "6291456" + "available_eval_regular_tokens": "16777216", + "available_eval_elastic_tokens": "6291456", + "available_send_regular_tokens": "0", + "available_send_elastic_tokens": "0" }, "tracked_deductions": [ { @@ -175,8 +187,10 @@ echo "id": "1" }, "store_id": 3, - "available_regular_tokens": "16777216", - "available_elastic_tokens": "7340032" + "available_eval_regular_tokens": "16777216", + "available_eval_elastic_tokens": "7340032", + "available_send_regular_tokens": "0", + "available_send_elastic_tokens": "0" }, "tracked_deductions": [ { @@ -202,8 +216,10 @@ echo "id": "1" }, "store_id": 1, - "available_regular_tokens": "16777216", - "available_elastic_tokens": "8388608" + "available_eval_regular_tokens": "16777216", + "available_eval_elastic_tokens": "8388608", + "available_send_regular_tokens": "0", + "available_send_elastic_tokens": "0" }, "tracked_deductions": [ ] @@ -214,8 +230,10 @@ echo "id": "1" }, "store_id": 3, - "available_regular_tokens": "16777216", - "available_elastic_tokens": "8388608" + "available_eval_regular_tokens": "16777216", + "available_eval_elastic_tokens": "8388608", + "available_send_regular_tokens": "0", + "available_send_elastic_tokens": "0" }, "tracked_deductions": [ ] diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb/kvflowinspect.proto b/pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb/kvflowinspect.proto index fbc808de3cbc..af1aad352a55 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb/kvflowinspect.proto +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb/kvflowinspect.proto @@ -80,7 +80,8 @@ message ConnectedStream { } // Stream represents a given kvflowcontrol.Stream and the number of tokens -// available for it (as maintained by the node-level kvflowcontrol.Controller). +// available for it (as maintained by the node-level kvflowcontrol.Controller, +// or in >= v24.3, by the node level StreamTokenCounterProvider). message Stream { roachpb.TenantID tenant_id = 1 [ (gogoproto.nullable) = false, @@ -88,8 +89,18 @@ message Stream { uint64 store_id = 2 [ (gogoproto.customname) = "StoreID", (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.StoreID"]; - int64 available_regular_tokens = 3; - int64 available_elastic_tokens = 4; + // AvailableEvalRegularTokens represents the currently available tokens for + // regular replication traffic, deducted for evaluation. + int64 available_eval_regular_tokens = 3; + // AvailableEvalElasticTokens represents the currently available tokens for + // elastic replication traffic, deducted for evaluation. + int64 available_eval_elastic_tokens = 4; + // AvailableSendRegularTokens represents the currently available tokens for + // regular replication traffic, deducted before sending. + int64 available_send_regular_tokens = 5; + // AvailableSendElasticTokens represents the currently available tokens for + // elastic replication traffic, deducted before sending. + int64 available_send_elastic_tokens = 6; } // TrackedDeductions represents a tracked token deduction, as typically done by diff --git a/pkg/sql/crdb_internal.go b/pkg/sql/crdb_internal.go index 0de0c2374cb5..4e9fbfcba234 100644 --- a/pkg/sql/crdb_internal.go +++ b/pkg/sql/crdb_internal.go @@ -8865,8 +8865,8 @@ CREATE TABLE crdb_internal.kv_flow_controller ( if err := addRow( tree.NewDInt(tree.DInt(stream.TenantID.ToUint64())), tree.NewDInt(tree.DInt(stream.StoreID)), - tree.NewDInt(tree.DInt(stream.AvailableRegularTokens)), - tree.NewDInt(tree.DInt(stream.AvailableElasticTokens)), + tree.NewDInt(tree.DInt(stream.AvailableEvalRegularTokens)), + tree.NewDInt(tree.DInt(stream.AvailableEvalElasticTokens)), ); err != nil { return err }