From c52cfc260fbdf715e376dcba72181498c812d247 Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Thu, 14 Dec 2023 00:11:31 +0000 Subject: [PATCH 01/12] storepool: implement status string method Introduce a string method for `storeStatus`, which previously printed ordinals. Add testing for both the store list and store pool string methods. Part of: #102948 Release note: None --- .../kvserver/allocator/storepool/BUILD.bazel | 1 + .../storepool/override_store_pool_test.go | 11 ++- .../allocator/storepool/store_pool.go | 11 ++- .../allocator/storepool/store_pool_test.go | 97 +++++++++++++++++++ 4 files changed, 114 insertions(+), 6 deletions(-) diff --git a/pkg/kv/kvserver/allocator/storepool/BUILD.bazel b/pkg/kv/kvserver/allocator/storepool/BUILD.bazel index 629ee3f92546..e242b3695705 100644 --- a/pkg/kv/kvserver/allocator/storepool/BUILD.bazel +++ b/pkg/kv/kvserver/allocator/storepool/BUILD.bazel @@ -43,6 +43,7 @@ go_test( "//pkg/roachpb", "//pkg/settings/cluster", "//pkg/testutils/gossiputil", + "//pkg/util/admission/admissionpb", "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/log", diff --git a/pkg/kv/kvserver/allocator/storepool/override_store_pool_test.go b/pkg/kv/kvserver/allocator/storepool/override_store_pool_test.go index 9fd8d7396264..5e9c43b3d867 100644 --- a/pkg/kv/kvserver/allocator/storepool/override_store_pool_test.go +++ b/pkg/kv/kvserver/allocator/storepool/override_store_pool_test.go @@ -96,11 +96,12 @@ func TestOverrideStorePoolStatusString(t *testing.T) { // Mark node 5 as draining. mnl.SetNodeStatus(5, livenesspb.NodeLivenessStatus_DRAINING) - require.Equal(t, "1: range-count=0 fraction-used=0.00\n"+ - "2 (status=1): range-count=0 fraction-used=0.00\n"+ - "3 (status=5): range-count=0 fraction-used=0.00\n"+ - "4: range-count=0 fraction-used=0.00\n"+ - "5 (status=7): range-count=0 fraction-used=0.00\n", + require.Equal(t, + "1: range-count=0 fraction-used=0.00\n"+ + "2 (status=dead): range-count=0 fraction-used=0.00\n"+ + "3 (status=decommissioning): range-count=0 fraction-used=0.00\n"+ + "4: range-count=0 fraction-used=0.00\n"+ + "5 (status=draining): range-count=0 fraction-used=0.00\n", sp.String(), ) } diff --git a/pkg/kv/kvserver/allocator/storepool/store_pool.go b/pkg/kv/kvserver/allocator/storepool/store_pool.go index 66c9357d6f66..abaabe861a6e 100644 --- a/pkg/kv/kvserver/allocator/storepool/store_pool.go +++ b/pkg/kv/kvserver/allocator/storepool/store_pool.go @@ -111,6 +111,15 @@ const ( storeStatusDraining ) +func (ss storeStatus) String() string { + if ss < storeStatusDead || ss > storeStatusDraining { + panic(fmt.Sprintf("unknown store status: %d", ss)) + } + return [...]string{"", + "dead", "unknown", "throttled", "available", + "decommissioning", "suspect", "draining"}[ss] +} + func (sd *StoreDetail) status( now hlc.Timestamp, deadThreshold time.Duration, @@ -409,7 +418,7 @@ func (sp *StorePool) statusString(nl NodeLivenessFunc) string { fmt.Fprintf(&buf, "%d", id) status := detail.status(now, timeUntilNodeDead, nl, timeAfterNodeSuspect) if status != storeStatusAvailable { - fmt.Fprintf(&buf, " (status=%d)", status) + fmt.Fprintf(&buf, " (status=%s)", status) } if detail.Desc != nil { fmt.Fprintf(&buf, ": range-count=%d fraction-used=%.2f", diff --git a/pkg/kv/kvserver/allocator/storepool/store_pool_test.go b/pkg/kv/kvserver/allocator/storepool/store_pool_test.go index 080a86ab0d90..f35066b93b03 100644 --- a/pkg/kv/kvserver/allocator/storepool/store_pool_test.go +++ b/pkg/kv/kvserver/allocator/storepool/store_pool_test.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/testutils/gossiputil" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -862,3 +863,99 @@ func TestStorePoolDecommissioningReplicas(t *testing.T) { t.Fatalf("expected decommissioning replicas %+v; got %+v", e, a) } } + +// TestStorePoolString tests the string output of the store pool +// implementation. +func TestStorePoolString(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + const nodeCount = 9 + + ctx := context.Background() + st := cluster.MakeTestingClusterSettings() + stopper, g, _, sp, mnl := CreateTestStorePool(ctx, st, + liveness.TestTimeUntilNodeDead, false, /* deterministic */ + func() int { return nodeCount }, + livenesspb.NodeLivenessStatus_DEAD) + defer stopper.Stop(ctx) + sg := gossiputil.NewStoreGossiper(g) + + stores := []*roachpb.StoreDescriptor{} + for i := 1; i <= nodeCount; i++ { + stores = append(stores, &roachpb.StoreDescriptor{ + StoreID: roachpb.StoreID(i), + Node: roachpb.NodeDescriptor{NodeID: roachpb.NodeID(i)}, + Capacity: roachpb.StoreCapacity{ + Capacity: 100, + Available: int64(100 - i*10), + RangeCount: int32(i * 10), + }, + }) + } + + sg.GossipStores(stores, t) + mnl.SetNodeStatus(1, livenesspb.NodeLivenessStatus_UNKNOWN) + mnl.SetNodeStatus(2, livenesspb.NodeLivenessStatus_DEAD) + mnl.SetNodeStatus(3, livenesspb.NodeLivenessStatus_UNAVAILABLE) + mnl.SetNodeStatus(4, livenesspb.NodeLivenessStatus_LIVE) + mnl.SetNodeStatus(5, livenesspb.NodeLivenessStatus_DECOMMISSIONING) + mnl.SetNodeStatus(6, livenesspb.NodeLivenessStatus_DECOMMISSIONED) + mnl.SetNodeStatus(7, livenesspb.NodeLivenessStatus_DRAINING) + mnl.SetNodeStatus(8, livenesspb.NodeLivenessStatus_LIVE) + mnl.SetNodeStatus(9, livenesspb.NodeLivenessStatus_LIVE) + sp.DetailsMu.StoreDetails[8].LastUnavailable = sp.clock.Now() + sp.DetailsMu.StoreDetails[9].ThrottledUntil = sp.clock.Now().AddDuration(time.Second) + + require.Equal(t, "1 (status=unknown): range-count=10 fraction-used=0.10\n"+ + "2 (status=dead): range-count=20 fraction-used=0.20\n"+ + "3 (status=unknown): range-count=30 fraction-used=0.30\n"+ + "4: range-count=40 fraction-used=0.40\n"+ + "5 (status=decommissioning): range-count=50 fraction-used=0.50\n"+ + "6 (status=dead): range-count=60 fraction-used=0.60\n"+ + "7 (status=draining): range-count=70 fraction-used=0.70\n"+ + "8 (status=suspect): range-count=80 fraction-used=0.80\n"+ + "9 (status=throttled): range-count=90 fraction-used=0.90 [throttled=1.0s]\n", + sp.String()) +} + +// TestStoreListString tests the string output of store list. +func TestStoreListString(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + const nodeCount = 5 + const scale = 10 + stores := []roachpb.StoreDescriptor{} + for i := 1; i <= nodeCount; i++ { + stores = append(stores, roachpb.StoreDescriptor{ + StoreID: roachpb.StoreID(i), + Node: roachpb.NodeDescriptor{NodeID: roachpb.NodeID(i)}, + Capacity: roachpb.StoreCapacity{ + LogicalBytes: int64(i << scale), + RangeCount: int32(i * scale), + LeaseCount: int32(i * scale), + QueriesPerSecond: float64(i * scale), + CPUPerSecond: float64(i << scale), + IOThreshold: admissionpb.IOThreshold{ + L0NumSubLevels: int64(i), + L0NumSubLevelsThreshold: scale, + L0NumFiles: int64(i), + L0NumFilesThreshold: scale, + }, + }, + }) + } + + require.Equal(t, + " candidate: avg-ranges=0.00 avg-leases=0.00 avg-disk-usage=0 B avg-queries-per-second=0.00 avg-store-cpu-per-second=0µs ", + MakeStoreList([]roachpb.StoreDescriptor{}).String()) + + require.Equal(t, " candidate: avg-ranges=30.00 avg-leases=30.00 avg-disk-usage=3.0 KiB avg-queries-per-second=30.00 avg-store-cpu-per-second=3µs\n"+ + " 1: ranges=10 leases=10 disk-usage=1.0 KiB queries-per-second=10.00 store-cpu-per-second=1µs io-overload=0.10\n"+ + " 2: ranges=20 leases=20 disk-usage=2.0 KiB queries-per-second=20.00 store-cpu-per-second=2µs io-overload=0.20\n"+ + " 3: ranges=30 leases=30 disk-usage=3.0 KiB queries-per-second=30.00 store-cpu-per-second=3µs io-overload=0.30\n"+ + " 4: ranges=40 leases=40 disk-usage=4.0 KiB queries-per-second=40.00 store-cpu-per-second=4µs io-overload=0.40\n"+ + " 5: ranges=50 leases=50 disk-usage=5.0 KiB queries-per-second=50.00 store-cpu-per-second=5µs io-overload=0.50\n", + MakeStoreList(stores).String()) +} From 7bfd79f7a2cd54dddf7f3fe70c40eefbe07b9a93 Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Thu, 14 Dec 2023 01:03:11 +0000 Subject: [PATCH 02/12] allocator: add string method testing Add testing for the string output of candidates and load. These are later used as regression tests for implementing safe formatting. Part of: #102948 Release note: None --- pkg/BUILD.bazel | 2 ++ .../allocatorimpl/allocator_scorer_test.go | 33 +++++++++++++++++++ pkg/kv/kvserver/allocator/load/BUILD.bazel | 9 ++++- pkg/kv/kvserver/allocator/load/load_test.go | 21 ++++++++++++ 4 files changed, 64 insertions(+), 1 deletion(-) create mode 100644 pkg/kv/kvserver/allocator/load/load_test.go diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index db40f0dd506b..a187af464605 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -209,6 +209,7 @@ ALL_TESTS = [ "//pkg/kv/kvserver/abortspan:abortspan_test", "//pkg/kv/kvserver/allocator/allocator2:allocator2_test", "//pkg/kv/kvserver/allocator/allocatorimpl:allocatorimpl_test", + "//pkg/kv/kvserver/allocator/load:load_test", "//pkg/kv/kvserver/allocator/storepool:storepool_test", "//pkg/kv/kvserver/apply:apply_test", "//pkg/kv/kvserver/asim/gossip:gossip_test", @@ -1327,6 +1328,7 @@ GO_TARGETS = [ "//pkg/kv/kvserver/allocator/allocatorimpl:allocatorimpl", "//pkg/kv/kvserver/allocator/allocatorimpl:allocatorimpl_test", "//pkg/kv/kvserver/allocator/load:load", + "//pkg/kv/kvserver/allocator/load:load_test", "//pkg/kv/kvserver/allocator/plan:plan", "//pkg/kv/kvserver/allocator/storepool:storepool", "//pkg/kv/kvserver/allocator/storepool:storepool_test", diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer_test.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer_test.go index 417823572a3e..6a7856f25802 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer_test.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer_test.go @@ -1663,3 +1663,36 @@ func TestMaxCapacity(t *testing.T) { } } } + +func TestCandidateListString(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + const numCandidates = 3 + + cl := candidateList{} + for i := 1; i <= numCandidates; i++ { + cl = append(cl, candidate{ + store: roachpb.StoreDescriptor{StoreID: roachpb.StoreID(i)}, + valid: i%2 == 0, + fullDisk: i%2 == 0, + necessary: i%2 == 0, + voterNecessary: i%2 == 0, + diversityScore: float64(i / numCandidates), + ioOverloaded: i%2 == 0, + ioOverloadScore: float64(i / numCandidates), + convergesScore: i%3 - 1, + balanceScore: balanceStatus(i%3 - 1), + hasNonVoter: i%2 == 0, + rangeCount: i, + details: fmt.Sprintf("mock detail %d", i), + }) + } + + require.Equal(t, "[]", candidateList{}.String()) + require.Equal(t, "[\n"+ + "s1, valid:false, fulldisk:false, necessary:false, voterNecessary:false, diversity:0.00, ioOverloaded: false, ioOverload: 0.00, converges:0, balance:0, hasNonVoter:false, rangeCount:1, queriesPerSecond:0.00, details:(mock detail 1)\n"+ + "s2, valid:true, fulldisk:true, necessary:true, voterNecessary:true, diversity:0.00, ioOverloaded: true, ioOverload: 0.00, converges:1, balance:1, hasNonVoter:true, rangeCount:2, queriesPerSecond:0.00, details:(mock detail 2)\n"+ + "s3, valid:false, fulldisk:false, necessary:false, voterNecessary:false, diversity:1.00, ioOverloaded: false, ioOverload: 1.00, converges:-1, balance:-1, hasNonVoter:false, rangeCount:3, queriesPerSecond:0.00, details:(mock detail 3)]", + cl.String()) +} diff --git a/pkg/kv/kvserver/allocator/load/BUILD.bazel b/pkg/kv/kvserver/allocator/load/BUILD.bazel index 75ddd2098044..c8304f62a357 100644 --- a/pkg/kv/kvserver/allocator/load/BUILD.bazel +++ b/pkg/kv/kvserver/allocator/load/BUILD.bazel @@ -1,4 +1,4 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "load", @@ -11,3 +11,10 @@ go_library( visibility = ["//visibility:public"], deps = ["//pkg/util/humanizeutil"], ) + +go_test( + name = "load_test", + srcs = ["load_test.go"], + embed = [":load"], + deps = ["@com_github_stretchr_testify//require"], +) diff --git a/pkg/kv/kvserver/allocator/load/load_test.go b/pkg/kv/kvserver/allocator/load/load_test.go new file mode 100644 index 000000000000..30112be7319e --- /dev/null +++ b/pkg/kv/kvserver/allocator/load/load_test.go @@ -0,0 +1,21 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package load + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestVectorLoadString(t *testing.T) { + require.Equal(t, "(queries-per-second=1.0 cpu-per-second=1ms)", Vector{1, 1000000}.String()) +} From 46d2133de12d65988817aad89ecefff4815289ba Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Tue, 12 Dec 2023 22:33:29 +0000 Subject: [PATCH 03/12] allocator: implement safe formatting for transfer outcome Implement `SafeValue` for `LeaseTransferOutcome` to unredact the transfer outcome in logs. Note this is currently used only in draining when shedding leases. Release note: None Part of: #102948 --- pkg/kv/kvserver/allocator/base.go | 3 +++ pkg/testutils/lint/passes/redactcheck/redactcheck.go | 3 +++ 2 files changed, 6 insertions(+) diff --git a/pkg/kv/kvserver/allocator/base.go b/pkg/kv/kvserver/allocator/base.go index 40e1a9127b6f..1e9c34ccb640 100644 --- a/pkg/kv/kvserver/allocator/base.go +++ b/pkg/kv/kvserver/allocator/base.go @@ -218,3 +218,6 @@ func (o LeaseTransferOutcome) String() string { return fmt.Sprintf("unexpected status value: %d", o) } } + +// SafeValue implements the redact.SafeValue interface. +func (o LeaseTransferOutcome) SafeValue() {} diff --git a/pkg/testutils/lint/passes/redactcheck/redactcheck.go b/pkg/testutils/lint/passes/redactcheck/redactcheck.go index c0b8255670c1..1176a838a3d0 100644 --- a/pkg/testutils/lint/passes/redactcheck/redactcheck.go +++ b/pkg/testutils/lint/passes/redactcheck/redactcheck.go @@ -89,6 +89,9 @@ func runAnalyzer(pass *analysis.Pass) (interface{}, error) { "RaftTerm": {}, "PushTxnType": {}, }, + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator": { + "LeaseTransferOutcome": {}, + }, "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb": { "SeqNum": {}, }, From c4503943397ec7211a753f7cc4a1ad100fc30c74 Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Tue, 12 Dec 2023 22:45:57 +0000 Subject: [PATCH 04/12] load: implement safe formatting for load Implement `SafeFormat` for `Load` to unredact load messages in logs. Release note: None Part of: #102948 --- pkg/kv/kvserver/allocator/load/BUILD.bazel | 5 +++- pkg/kv/kvserver/allocator/load/dimension.go | 12 +++++--- pkg/kv/kvserver/allocator/load/load.go | 7 ++++- pkg/kv/kvserver/allocator/load/vector.go | 30 +++++++++++-------- .../lint/passes/redactcheck/redactcheck.go | 3 ++ 5 files changed, 39 insertions(+), 18 deletions(-) diff --git a/pkg/kv/kvserver/allocator/load/BUILD.bazel b/pkg/kv/kvserver/allocator/load/BUILD.bazel index c8304f62a357..dfc95ab1703e 100644 --- a/pkg/kv/kvserver/allocator/load/BUILD.bazel +++ b/pkg/kv/kvserver/allocator/load/BUILD.bazel @@ -9,7 +9,10 @@ go_library( ], importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/load", visibility = ["//visibility:public"], - deps = ["//pkg/util/humanizeutil"], + deps = [ + "//pkg/util/humanizeutil", + "@com_github_cockroachdb_redact//:redact", + ], ) go_test( diff --git a/pkg/kv/kvserver/allocator/load/dimension.go b/pkg/kv/kvserver/allocator/load/dimension.go index b6d53c1d6bb1..50d58ca44086 100644 --- a/pkg/kv/kvserver/allocator/load/dimension.go +++ b/pkg/kv/kvserver/allocator/load/dimension.go @@ -15,6 +15,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" + "github.com/cockroachdb/redact" ) // Dimension is a singe dimension of load that a component may track. @@ -42,13 +43,16 @@ func (d Dimension) String() string { } } -// Format returns a formatted string for a value. -func (d Dimension) Format(value float64) string { +// SafeValue implements the redact.SafeValue interface. +func (d Dimension) SafeValue() {} + +// format returns a formatted string for a value. +func (d Dimension) format(value float64) redact.SafeString { switch d { case Queries: - return fmt.Sprintf("%.1f", value) + return redact.SafeString(fmt.Sprintf("%.1f", value)) case CPU: - return string(humanizeutil.Duration(time.Duration(int64(value)))) + return humanizeutil.Duration(time.Duration(int64(value))) default: panic(fmt.Sprintf("cannot format value: unknown dimension with ordinal %d", d)) } diff --git a/pkg/kv/kvserver/allocator/load/load.go b/pkg/kv/kvserver/allocator/load/load.go index 3280e628e0ac..a7b70831b0dc 100644 --- a/pkg/kv/kvserver/allocator/load/load.go +++ b/pkg/kv/kvserver/allocator/load/load.go @@ -10,12 +10,17 @@ package load -import "math" +import ( + "math" + + "github.com/cockroachdb/redact" +) // Load represents a named collection of load dimensions. It is used for // performing arithmetic and comparison between comparable objects which have // load. type Load interface { + redact.SafeFormatter // Dim returns the value of the Dimension given. Dim(dim Dimension) float64 // String returns a string representation of Load. diff --git a/pkg/kv/kvserver/allocator/load/vector.go b/pkg/kv/kvserver/allocator/load/vector.go index 1599d23b87b9..e863d4e8aa6f 100644 --- a/pkg/kv/kvserver/allocator/load/vector.go +++ b/pkg/kv/kvserver/allocator/load/vector.go @@ -12,7 +12,8 @@ package load import ( "fmt" - "strings" + + "github.com/cockroachdb/redact" ) // Vector is a static container which implements the Load interface. @@ -21,25 +22,30 @@ type Vector [nDimensions]float64 var _ Load = Vector{} // Dim returns the value of the Dimension given. -func (s Vector) Dim(dim Dimension) float64 { - if int(dim) > len(s) || dim < 0 { +func (v Vector) Dim(dim Dimension) float64 { + if int(dim) > len(v) || dim < 0 { panic(fmt.Sprintf("Unknown load dimension access, %d", dim)) } - return s[dim] + return v[dim] } // String returns a string representation of Load. -func (s Vector) String() string { - var buf strings.Builder +func (v Vector) String() string { + return redact.StringWithoutMarkers(v) +} + +// SafeFormat implements the redact.SafeFormatter interface. +func (v Vector) SafeFormat(w redact.SafePrinter, _ rune) { + var buf redact.StringBuilder - fmt.Fprint(&buf, "(") - for i, val := range s { + buf.SafeRune('(') + for i, val := range v { if i > 0 { - fmt.Fprint(&buf, " ") + buf.SafeRune(' ') } dim := Dimension(i) - fmt.Fprintf(&buf, "%s=%s", dim.String(), dim.Format(val)) + buf.Printf("%v=%v", dim, dim.format(val)) } - fmt.Fprint(&buf, ")") - return buf.String() + buf.SafeRune(')') + w.Print(buf) } diff --git a/pkg/testutils/lint/passes/redactcheck/redactcheck.go b/pkg/testutils/lint/passes/redactcheck/redactcheck.go index 1176a838a3d0..48964a23d23d 100644 --- a/pkg/testutils/lint/passes/redactcheck/redactcheck.go +++ b/pkg/testutils/lint/passes/redactcheck/redactcheck.go @@ -92,6 +92,9 @@ func runAnalyzer(pass *analysis.Pass) (interface{}, error) { "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator": { "LeaseTransferOutcome": {}, }, + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/load": { + "Dimension": {}, + }, "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb": { "SeqNum": {}, }, From 54faa163121cdf88adc1aa7927e358dd0badcbb3 Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Wed, 13 Dec 2023 00:13:16 +0000 Subject: [PATCH 05/12] allocatorimpl: implement safe format/value/error `AllocatorAction`, `TargetReplicaType` and `ReplicaStatus` were previously hidden in redacted logs. Implement `redact.SafeValue` to unredact. Also, implement `SafeFormatError` for allocator errors. Release note: None Part of: #102948 --- .../allocator/allocatorimpl/BUILD.bazel | 1 + .../allocator/allocatorimpl/allocator.go | 74 ++++++++++++------- .../allocatorimpl/allocator_scorer.go | 33 ++++++--- .../lint/passes/redactcheck/redactcheck.go | 5 ++ 4 files changed, 76 insertions(+), 37 deletions(-) diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/BUILD.bazel b/pkg/kv/kvserver/allocator/allocatorimpl/BUILD.bazel index 1d011bfd7170..40188ec61a78 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/BUILD.bazel +++ b/pkg/kv/kvserver/allocator/allocatorimpl/BUILD.bazel @@ -30,6 +30,7 @@ go_library( "//pkg/util/syncutil", "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", + "@com_github_cockroachdb_redact//:redact", "@io_etcd_go_raft_v3//:raft", "@io_etcd_go_raft_v3//tracker", ], diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go index 10d922d985fe..f9437d745cf7 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go @@ -16,7 +16,6 @@ import ( "fmt" "math" "math/rand" - "strings" "time" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" @@ -32,6 +31,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" + "github.com/cockroachdb/redact" "go.etcd.io/raft/v3" "go.etcd.io/raft/v3/tracker" ) @@ -233,6 +233,9 @@ func (a AllocatorAction) String() string { return allocatorActionNames[a] } +// SafeValue implements the redact.SafeValue interface. +func (a AllocatorAction) SafeValue() {} + // Priority defines the priorities for various repair operations. // // NB: These priorities only influence the replicateQueue's understanding of @@ -344,6 +347,9 @@ func (t TargetReplicaType) String() string { } } +// SafeValue implements the redact.SafeValue interface. +func (t TargetReplicaType) SafeValue() {} + func (s ReplicaStatus) String() string { switch s { case Alive: @@ -357,6 +363,9 @@ func (s ReplicaStatus) String() string { } } +// SafeValue implements the redact.SafeValue interface. +func (t ReplicaStatus) SafeValue() {} + type transferDecision int const ( @@ -379,64 +388,75 @@ type allocatorError struct { throttledStores int } +var _ errors.SafeFormatter = &allocatorError{} + func (ae *allocatorError) Error() string { - var existingVoterStr string + return redact.Sprint(ae).StripMarkers() +} + +func (ae *allocatorError) SafeFormatError(p errors.Printer) (next error) { + var existingVoterStr redact.RedactableString if ae.existingVoterCount == 1 { existingVoterStr = "1 already has a voter" } else { - existingVoterStr = fmt.Sprintf("%d already have a voter", ae.existingVoterCount) + existingVoterStr = redact.Sprintf("%d already have a voter", + ae.existingVoterCount) } - var existingNonVoterStr string + var existingNonVoterStr redact.RedactableString if ae.existingNonVoterCount == 1 { existingNonVoterStr = "1 already has a non-voter" } else { - existingNonVoterStr = fmt.Sprintf("%d already have a non-voter", ae.existingNonVoterCount) + existingNonVoterStr = redact.Sprintf("%d already have a non-voter", + ae.existingNonVoterCount) } - var baseMsg string + var baseMsg redact.RedactableString if ae.throttledStores != 0 { - baseMsg = fmt.Sprintf( + baseMsg = redact.Sprintf( "0 of %d live stores are able to take a new replica for the range (%d throttled, %s, %s)", - ae.aliveStores, ae.throttledStores, existingVoterStr, existingNonVoterStr) + ae.aliveStores, ae.throttledStores, + existingVoterStr, existingNonVoterStr) } else { - baseMsg = fmt.Sprintf( + baseMsg = redact.Sprintf( "0 of %d live stores are able to take a new replica for the range (%s, %s)", ae.aliveStores, existingVoterStr, existingNonVoterStr) } if len(ae.constraints) == 0 && len(ae.voterConstraints) == 0 { - if ae.throttledStores > 0 { - return baseMsg + p.Print(baseMsg) + if ae.throttledStores == 0 { + p.Printf("; likely not enough nodes in cluster") } - return baseMsg + "; likely not enough nodes in cluster" + return } - var b strings.Builder - b.WriteString(baseMsg) - b.WriteString("; replicas must match constraints [") + var b redact.StringBuilder + b.Print(baseMsg) + b.Printf("; replicas must match constraints [") for i := range ae.constraints { if i > 0 { - b.WriteByte(' ') + b.SafeRune(' ') } - b.WriteByte('{') - b.WriteString(ae.constraints[i].String()) - b.WriteByte('}') + b.SafeRune('{') + b.Print(ae.constraints[i]) + b.SafeRune('}') } - b.WriteString("]") + b.SafeRune(']') - b.WriteString("; voting replicas must match voter_constraints [") + b.Printf("; voting replicas must match voter_constraints [") for i := range ae.voterConstraints { if i > 0 { - b.WriteByte(' ') + b.SafeRune(' ') } - b.WriteByte('{') - b.WriteString(ae.voterConstraints[i].String()) - b.WriteByte('}') + b.SafeRune('{') + b.Print(ae.voterConstraints[i].String()) + b.SafeRune('}') } - b.WriteString("]") + b.SafeRune(']') - return b.String() + p.Print(b) + return nil } func (*allocatorError) AllocationErrorMarker() {} diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer.go index 7fb57851258d..a1a151c9ba55 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/redact" ) const ( @@ -705,18 +706,24 @@ type candidate struct { } func (c candidate) String() string { - str := fmt.Sprintf("s%d, valid:%t, fulldisk:%t, necessary:%t, "+ + return redact.StringWithoutMarkers(c) +} + +// SafeFormat implements the redact.SafeFormatter interface. +func (c candidate) SafeFormat(w redact.SafePrinter, _ rune) { + w.Printf("s%d, valid:%t, fulldisk:%t, necessary:%t, "+ "voterNecessary:%t, diversity:%.2f, ioOverloaded: %t, ioOverload: %.2f, "+ "converges:%d, balance:%d, hasNonVoter:%t, rangeCount:%d, queriesPerSecond:%.2f", c.store.StoreID, c.valid, c.fullDisk, c.necessary, c.voterNecessary, c.diversityScore, c.ioOverloaded, c.ioOverloadScore, c.convergesScore, c.balanceScore, c.hasNonVoter, c.rangeCount, c.store.Capacity.QueriesPerSecond) if c.details != "" { - return fmt.Sprintf("%s, details:(%s)", str, c.details) + w.Printf(", details:(%s)", c.details) } - return str } +// compactString returns a compact represntation of the candidate. Note this +// method is currently only used to populate the range log via details. func (c candidate) compactString() string { var buf bytes.Buffer fmt.Fprintf(&buf, "s%d", c.store.StoreID) @@ -837,17 +844,23 @@ func (c candidate) compare(o candidate) float64 { type candidateList []candidate func (cl candidateList) String() string { + return redact.StringWithoutMarkers(cl) +} + +// SafeFormat implements the redact.SafeFormatter interface. +func (cl candidateList) SafeFormat(w redact.SafePrinter, r rune) { if len(cl) == 0 { - return "[]" + w.Printf("[]") + return } - var buffer bytes.Buffer - buffer.WriteRune('[') + var buf redact.StringBuilder + buf.SafeRune('[') for _, c := range cl { - buffer.WriteRune('\n') - buffer.WriteString(c.String()) + buf.SafeRune('\n') + buf.Print(c) } - buffer.WriteRune(']') - return buffer.String() + buf.SafeRune(']') + w.Print(buf) } // byScore implements sort.Interface to sort by scores. diff --git a/pkg/testutils/lint/passes/redactcheck/redactcheck.go b/pkg/testutils/lint/passes/redactcheck/redactcheck.go index 48964a23d23d..7863fdacaac0 100644 --- a/pkg/testutils/lint/passes/redactcheck/redactcheck.go +++ b/pkg/testutils/lint/passes/redactcheck/redactcheck.go @@ -92,6 +92,11 @@ func runAnalyzer(pass *analysis.Pass) (interface{}, error) { "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator": { "LeaseTransferOutcome": {}, }, + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl": { + "AllocatorAction": {}, + "TargetReplicaType": {}, + "ReplicaStatus": {}, + }, "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/load": { "Dimension": {}, }, From f0b55c1454170e761a5bf2c3327362c06ef15aaf Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Wed, 13 Dec 2023 00:47:57 +0000 Subject: [PATCH 06/12] plan: make range progress redactable string `raftRangeProgress` is used to format the raft progress for a range. Update `raftRangeProgress` to return a redactable string to show in redacted logs. Release note: None Part of: #102948 --- pkg/kv/kvserver/allocator/plan/BUILD.bazel | 1 + pkg/kv/kvserver/allocator/plan/util.go | 25 +++++++++++----------- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/pkg/kv/kvserver/allocator/plan/BUILD.bazel b/pkg/kv/kvserver/allocator/plan/BUILD.bazel index 80cdaa0bddfa..393aea3f24b9 100644 --- a/pkg/kv/kvserver/allocator/plan/BUILD.bazel +++ b/pkg/kv/kvserver/allocator/plan/BUILD.bazel @@ -23,6 +23,7 @@ go_library( "//pkg/util/retry", "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", + "@com_github_cockroachdb_redact//:redact", "@io_etcd_go_raft_v3//:raft", ], ) diff --git a/pkg/kv/kvserver/allocator/plan/util.go b/pkg/kv/kvserver/allocator/plan/util.go index cbb8def379eb..13162b1543a0 100644 --- a/pkg/kv/kvserver/allocator/plan/util.go +++ b/pkg/kv/kvserver/allocator/plan/util.go @@ -11,15 +11,14 @@ package plan import ( - "bytes" "context" - "fmt" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" + "github.com/cockroachdb/redact" "go.etcd.io/raft/v3" ) @@ -139,28 +138,30 @@ func ReplicationChangesForRebalance( // rangeRaftStatus pretty-prints the Raft progress (i.e. Raft log position) of // the replicas. -func rangeRaftProgress(raftStatus *raft.Status, replicas []roachpb.ReplicaDescriptor) string { +func rangeRaftProgress( + raftStatus *raft.Status, replicas []roachpb.ReplicaDescriptor, +) redact.RedactableString { if raftStatus == nil { return "[no raft status]" } else if len(raftStatus.Progress) == 0 { return "[no raft progress]" } - var buf bytes.Buffer - buf.WriteString("[") + var buf redact.StringBuilder + buf.SafeRune('[') for i, r := range replicas { if i > 0 { - buf.WriteString(", ") + buf.Printf(", ") } - fmt.Fprintf(&buf, "%d", r.ReplicaID) + buf.Print(r.ReplicaID) if uint64(r.ReplicaID) == raftStatus.Lead { - buf.WriteString("*") + buf.SafeRune('*') } if progress, ok := raftStatus.Progress[uint64(r.ReplicaID)]; ok { - fmt.Fprintf(&buf, ":%d", progress.Match) + buf.Printf(":%d", progress.Match) } else { - buf.WriteString(":?") + buf.Printf(":?") } } - buf.WriteString("]") - return buf.String() + buf.SafeRune(']') + return buf.RedactableString() } From 6a67abfb277c7429919ccf11a27bdc349b7be94d Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Wed, 13 Dec 2023 01:09:21 +0000 Subject: [PATCH 07/12] storepool: implement safe formatting for store pool(list) Implement `SafeFormat` for `StorePool` and `StoreList` to unredact log messages. Release note: None Part of: #102948 --- .../kvserver/allocator/storepool/BUILD.bazel | 1 + .../storepool/override_store_pool.go | 8 ++- .../allocator/storepool/store_pool.go | 50 ++++++++++++------- .../allocator/storepool/store_pool_test.go | 2 +- .../lint/passes/redactcheck/redactcheck.go | 3 ++ 5 files changed, 45 insertions(+), 19 deletions(-) diff --git a/pkg/kv/kvserver/allocator/storepool/BUILD.bazel b/pkg/kv/kvserver/allocator/storepool/BUILD.bazel index e242b3695705..3bca83ef369f 100644 --- a/pkg/kv/kvserver/allocator/storepool/BUILD.bazel +++ b/pkg/kv/kvserver/allocator/storepool/BUILD.bazel @@ -27,6 +27,7 @@ go_library( "//pkg/util/syncutil", "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", + "@com_github_cockroachdb_redact//:redact", ], ) diff --git a/pkg/kv/kvserver/allocator/storepool/override_store_pool.go b/pkg/kv/kvserver/allocator/storepool/override_store_pool.go index 348632fa26a6..9de436b8a4d0 100644 --- a/pkg/kv/kvserver/allocator/storepool/override_store_pool.go +++ b/pkg/kv/kvserver/allocator/storepool/override_store_pool.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/redact" ) // OverrideStorePool is an implementation of AllocatorStorePool that allows @@ -92,7 +93,12 @@ func NewOverrideStorePool( } func (o *OverrideStorePool) String() string { - return o.sp.statusString(o.overrideNodeLivenessFn) + return redact.StringWithoutMarkers(o) +} + +// SafeFormat implements the redact.SafeFormatter interface. +func (o *OverrideStorePool) SafeFormat(w redact.SafePrinter, _ rune) { + w.Print(o.sp.statusString(o.overrideNodeLivenessFn)) } // IsStoreReadyForRoutineReplicaTransfer implements the AllocatorStorePool interface. diff --git a/pkg/kv/kvserver/allocator/storepool/store_pool.go b/pkg/kv/kvserver/allocator/storepool/store_pool.go index abaabe861a6e..9d15b58263ad 100644 --- a/pkg/kv/kvserver/allocator/storepool/store_pool.go +++ b/pkg/kv/kvserver/allocator/storepool/store_pool.go @@ -11,7 +11,6 @@ package storepool import ( - "bytes" "context" "fmt" "sort" @@ -31,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/shuffle" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" + "github.com/cockroachdb/redact" ) // FailedReservationsTimeout specifies a duration during which the local @@ -120,6 +120,9 @@ func (ss storeStatus) String() string { "decommissioning", "suspect", "draining"}[ss] } +// SafeValue implements the redact.SafeValue interface. +func (ss storeStatus) SafeValue() {} + func (sd *StoreDetail) status( now hlc.Timestamp, deadThreshold time.Duration, @@ -220,6 +223,7 @@ type CapacityChangeFn func( // of all known stores in the cluster and information on their health. type AllocatorStorePool interface { fmt.Stringer + redact.SafeFormatter // ClusterNodeCount returns the number of nodes that are possible allocation // targets. @@ -395,10 +399,15 @@ func NewStorePool( } func (sp *StorePool) String() string { - return sp.statusString(sp.NodeLivenessFn) + return redact.StringWithoutMarkers(sp) +} + +// SafeFormat implements the redact.SafeFormatter interface. +func (sp *StorePool) SafeFormat(w redact.SafePrinter, _ rune) { + w.Print(sp.statusString(sp.NodeLivenessFn)) } -func (sp *StorePool) statusString(nl NodeLivenessFunc) string { +func (sp *StorePool) statusString(nl NodeLivenessFunc) redact.RedactableString { sp.DetailsMu.RLock() defer sp.DetailsMu.RUnlock() @@ -408,28 +417,30 @@ func (sp *StorePool) statusString(nl NodeLivenessFunc) string { } sort.Sort(ids) - var buf bytes.Buffer + var buf redact.StringBuilder now := sp.clock.Now() timeUntilNodeDead := liveness.TimeUntilNodeDead.Get(&sp.st.SV) timeAfterNodeSuspect := liveness.TimeAfterNodeSuspect.Get(&sp.st.SV) for _, id := range ids { detail := sp.DetailsMu.StoreDetails[id] - fmt.Fprintf(&buf, "%d", id) + buf.Print(id) status := detail.status(now, timeUntilNodeDead, nl, timeAfterNodeSuspect) if status != storeStatusAvailable { - fmt.Fprintf(&buf, " (status=%s)", status) + buf.Printf(" (status=%s)", status) } if detail.Desc != nil { - fmt.Fprintf(&buf, ": range-count=%d fraction-used=%.2f", - detail.Desc.Capacity.RangeCount, detail.Desc.Capacity.FractionUsed()) + buf.Printf(": range-count=%d fraction-used=%.2f", + detail.Desc.Capacity.RangeCount, + detail.Desc.Capacity.FractionUsed()) } if detail.ThrottledUntil.After(now) { - fmt.Fprintf(&buf, " [throttled=%.1fs]", detail.ThrottledUntil.GoTime().Sub(now.GoTime()).Seconds()) + buf.Printf(" [throttled=%v]", humanizeutil.Duration( + detail.ThrottledUntil.GoTime().Sub(now.GoTime()))) } - _, _ = buf.WriteString("\n") + buf.SafeRune('\n') } - return buf.String() + return buf.RedactableString() } // storeGossipUpdate is the Gossip callback used to keep the StorePool up to date. @@ -970,8 +981,13 @@ func MakeStoreList(descriptors []roachpb.StoreDescriptor) StoreList { } func (sl StoreList) String() string { - var buf bytes.Buffer - fmt.Fprintf(&buf, + return redact.StringWithoutMarkers(sl) +} + +// SafeFormat implements the redact.SafeFormatter interface. +func (sl StoreList) SafeFormat(w redact.SafePrinter, _ rune) { + var buf redact.StringBuilder + buf.Printf( " candidate: avg-ranges=%.2f avg-leases=%.2f avg-disk-usage=%s avg-queries-per-second=%.2f avg-store-cpu-per-second=%s", sl.CandidateRanges.Mean, sl.CandidateLeases.Mean, @@ -980,13 +996,13 @@ func (sl StoreList) String() string { humanizeutil.Duration(time.Duration(int64(sl.CandidateCPU.Mean))), ) if len(sl.Stores) > 0 { - fmt.Fprintf(&buf, "\n") + buf.Printf("\n") } else { - fmt.Fprintf(&buf, " ") + buf.Printf(" ") } for _, desc := range sl.Stores { ioScore, _ := desc.Capacity.IOThreshold.Score() - fmt.Fprintf(&buf, " %d: ranges=%d leases=%d disk-usage=%s queries-per-second=%.2f store-cpu-per-second=%s io-overload=%.2f\n", + buf.Printf(" %v: ranges=%d leases=%d disk-usage=%s queries-per-second=%.2f store-cpu-per-second=%s io-overload=%.2f\n", desc.StoreID, desc.Capacity.RangeCount, desc.Capacity.LeaseCount, humanizeutil.IBytes(desc.Capacity.LogicalBytes), desc.Capacity.QueriesPerSecond, @@ -994,7 +1010,7 @@ func (sl StoreList) String() string { ioScore, ) } - return buf.String() + w.Print(buf) } // ExcludeInvalid takes a store list and removes Stores that would be explicitly invalid diff --git a/pkg/kv/kvserver/allocator/storepool/store_pool_test.go b/pkg/kv/kvserver/allocator/storepool/store_pool_test.go index f35066b93b03..82d471367e81 100644 --- a/pkg/kv/kvserver/allocator/storepool/store_pool_test.go +++ b/pkg/kv/kvserver/allocator/storepool/store_pool_test.go @@ -915,7 +915,7 @@ func TestStorePoolString(t *testing.T) { "6 (status=dead): range-count=60 fraction-used=0.60\n"+ "7 (status=draining): range-count=70 fraction-used=0.70\n"+ "8 (status=suspect): range-count=80 fraction-used=0.80\n"+ - "9 (status=throttled): range-count=90 fraction-used=0.90 [throttled=1.0s]\n", + "9 (status=throttled): range-count=90 fraction-used=0.90 [throttled=1s]\n", sp.String()) } diff --git a/pkg/testutils/lint/passes/redactcheck/redactcheck.go b/pkg/testutils/lint/passes/redactcheck/redactcheck.go index 7863fdacaac0..e6d870970da9 100644 --- a/pkg/testutils/lint/passes/redactcheck/redactcheck.go +++ b/pkg/testutils/lint/passes/redactcheck/redactcheck.go @@ -100,6 +100,9 @@ func runAnalyzer(pass *analysis.Pass) (interface{}, error) { "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/load": { "Dimension": {}, }, + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool": { + "storeStatus": {}, + }, "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb": { "SeqNum": {}, }, From ea43cc478786815e584c6cc61ce561c9a5ed8e88 Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Wed, 13 Dec 2023 22:14:54 +0000 Subject: [PATCH 08/12] kvserver: cleanup store rebalancer logging Clean up inconsistent parentheses and make range rebalance logging consistent with lease rebalance logging. Epic: none Release note: None --- pkg/kv/kvserver/store_rebalancer.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/kv/kvserver/store_rebalancer.go b/pkg/kv/kvserver/store_rebalancer.go index 1d359eed9484..e83e4cdea9c3 100644 --- a/pkg/kv/kvserver/store_rebalancer.go +++ b/pkg/kv/kvserver/store_rebalancer.go @@ -578,7 +578,7 @@ func (sr *StoreRebalancer) TransferToRebalanceRanges( ) bool { if rctx.LessThanMaxThresholds() { log.KvDistribution.Infof(ctx, - "load-based lease transfers successfully brought s%d down to %s load, mean=%s, upperThreshold=%s)", + "load-based lease transfers successfully brought s%d down to %s load, mean=%s, upperThreshold=%s", rctx.LocalDesc.StoreID, rctx.LocalDesc.Capacity.Load(), rctx.allStoresList.LoadMeans(), rctx.maxThresholds) return false @@ -657,7 +657,7 @@ func (sr *StoreRebalancer) applyRangeRebalance( descBeforeRebalance, _ := candidateReplica.DescAndSpanConfig() log.KvDistribution.Infof( ctx, - "rebalancing r%d (%s load) to better balance load: voters from %v to %v; non-voters from %v to %v", + "rebalancing r%d load=%s to better balance load: voters from %v to %v; non-voters from %v to %v", candidateReplica.GetRangeID(), candidateReplica.RangeUsageInfo().Load(), descBeforeRebalance.Replicas().Voters(), From 30da6af5f6299afc1945e82552d2d2ad6b6dd2f3 Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Tue, 19 Dec 2023 15:21:29 +0000 Subject: [PATCH 09/12] allocatorimpl: check IO overload in should transfer lease Previously, when the lease IO overload enforcement was set to shed and a store satisfied the IO overload criteria, it would not be guaranteed to have its leases enqueued into the replicate queue. Add an IO overload check in should transfer lease to ensure it does. Epic: none Release note: None --- .../allocator/allocatorimpl/allocator.go | 6 ++ .../allocator/allocatorimpl/allocator_test.go | 100 ++++++++++++++++++ 2 files changed, 106 insertions(+) diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go index 10d922d985fe..61ae7b298bd8 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go @@ -2522,6 +2522,12 @@ func (a *Allocator) ShouldTransferLease( if a.leaseholderShouldMoveDueToPreferences(ctx, storePool, conf, leaseRepl, existing) { return true } + + if a.leaseholderShouldMoveDueToIOOverload( + ctx, storePool, existing, leaseRepl.StoreID(), a.IOOverloadOptions()) { + return true + } + existing = a.ValidLeaseTargets( ctx, storePool, diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go index 1faffaa6b36f..5541b894e361 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go @@ -2842,6 +2842,106 @@ func TestAllocatorShouldTransferSuspected(t *testing.T) { assertShouldTransferLease(true) } +func TestAllocatorShouldTransferLeaseIOOverload(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + floats := func(nums ...float64) []float64 { + return nums + } + + // We want the shed threshold to be 0.9 and the overload threhsold to be 0.5 + // i.e. block transfers at >=0.5 and block transfers + shed leases at >=0.9. + const shedThreshold = 0.9 + const threshold = 0.5 + + testCases := []struct { + name string + leaseCounts, IOScores []float64 + leaseholder roachpb.StoreID + excludeLeaseRepl bool + expected bool + enforcement IOOverloadEnforcementLevel + }{ + { + name: "shouldn't transfer off of store with high io overload when block enforcement", + leaseCounts: floats(100, 100, 100, 100, 100), + IOScores: floats(2.5, 1.5, 0.5, 0, 0), + leaseholder: 1, + expected: false, + enforcement: IOOverloadThresholdBlockTransfers, + }, + { + name: "should transfer off of store with high io overload when shed enforcement", + leaseCounts: floats(100, 100, 100, 100, 100), + IOScores: floats(2.5, 1.5, 0.5, 0, 0), + leaseholder: 1, + // Store 3 is above the threshold (1.0 > 0.8), but equal to the avg (1.0), so + // it is still considered a non-IO-overloaded candidate. + expected: true, + enforcement: IOOverloadThresholdShed, + }, + { + name: "should transfer to io overloaded store(s) when no action enforcement", + leaseCounts: floats(0, 100, 100, 400, 400), + IOScores: floats(2.5, 1.5, 0.5, 0, 0), + leaseholder: 5, + expected: true, + enforcement: IOOverloadThresholdIgnore, + }, + { + name: "dont transfer off of store with high io overload but less than shed threshold with shed enforcement", + leaseCounts: floats(0, 0, 0, 0, 0), + IOScores: floats(0.89, 0, 0, 0, 0), + leaseholder: 1, + expected: false, + enforcement: IOOverloadThresholdShed, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + stopper, g, sp, a, _ := CreateTestAllocator(ctx, 10, true /* deterministic */) + defer stopper.Stop(ctx) + n := len(tc.leaseCounts) + stores := make([]*roachpb.StoreDescriptor, n) + existing := make([]roachpb.ReplicaDescriptor, 0, n) + for i := range tc.leaseCounts { + existing = append(existing, replicas(roachpb.StoreID(i+1))...) + stores[i] = &roachpb.StoreDescriptor{ + StoreID: roachpb.StoreID(i + 1), + Node: roachpb.NodeDescriptor{NodeID: roachpb.NodeID(i + 1)}, + Capacity: roachpb.StoreCapacity{ + LeaseCount: int32(tc.leaseCounts[i]), + IOThreshold: TestingIOThresholdWithScore(tc.IOScores[i]), + }, + } + } + + sg := gossiputil.NewStoreGossiper(g) + sg.GossipStores(stores, t) + LeaseIOOverloadThresholdEnforcement.Override(ctx, &a.st.SV, int64(tc.enforcement)) + LeaseIOOverloadThreshold.Override(ctx, &a.st.SV, threshold) + LeaseIOOverloadShedThreshold.Override(ctx, &a.st.SV, shedThreshold) + + shouldTransfer := a.ShouldTransferLease( + ctx, + sp, + &roachpb.RangeDescriptor{}, + emptySpanConfig(), + existing, + &mockRepl{ + replicationFactor: int32(n), + storeID: tc.leaseholder, + }, + allocator.RangeUsageInfo{}, /* stats */ + ) + require.Equal(t, tc.expected, shouldTransfer) + }) + } + +} + func TestAllocatorLeasePreferences(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) From 5cc562649d36d8e82df8cba9376e7c00b9aef9ed Mon Sep 17 00:00:00 2001 From: David Taylor Date: Wed, 20 Dec 2023 13:51:15 +0000 Subject: [PATCH 10/12] go.mod: bump Pebble to bfc74496212f https://github.com/cockroachdb/pebble/commit/bfc74496 define FormatSyntheticPrefixes version https://github.com/cockroachdb/pebble/commit/a27878fb ingest,manifest,sstable: connect prefix rules in ingest to prefix replacing iterators https://github.com/cockroachdb/pebble/commit/a452c0e5 manifest: add encoding of prefix rules to manifest entries https://github.com/cockroachdb/pebble/commit/e7d75289 sstable: add prefixReplacingIterator https://github.com/cockroachdb/pebble/commit/e40df58f sstable: add prefix arg to buildTestTableWithProvider https://github.com/cockroachdb/pebble/commit/35cfce46 external_iterator: don't double-close Readers in case of err https://github.com/cockroachdb/pebble/commit/69994ddb sstable: update comment about SINGLEDEL, foreign ssts and compactions https://github.com/cockroachdb/pebble/commit/40d5f216 db: code comment about guaranteeing durability of initial state https://github.com/cockroachdb/pebble/commit/5c5ad7ed keyspan: add Assert iterator https://github.com/cockroachdb/pebble/commit/bbf7dc40 db: deprecate older format versions Release note: none. --- DEPS.bzl | 6 ++--- build/bazelutil/distdir_files.bzl | 2 +- go.mod | 2 +- go.sum | 4 +-- pkg/ccl/cliccl/testdata/ear-list | 27 ++++++++----------- .../storageccl/engineccl/encrypted_fs_test.go | 7 +++-- 6 files changed, 21 insertions(+), 27 deletions(-) diff --git a/DEPS.bzl b/DEPS.bzl index 309a40557c51..8a0decb3650e 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -1613,10 +1613,10 @@ def go_deps(): patches = [ "@com_github_cockroachdb_cockroach//build/patches:com_github_cockroachdb_pebble.patch", ], - sha256 = "1878bb40f322c5c93bb7db26b6287219eb56507fc59b82292fcd4d2187639a16", - strip_prefix = "github.com/cockroachdb/pebble@v0.0.0-20231218155426-48b54c29d8fe", + sha256 = "40cd8262139752f6c002d98be91763eb30cf5d6e2288f7e2163f47ee9b0ca50e", + strip_prefix = "github.com/cockroachdb/pebble@v0.0.0-20231220134654-bfc74496212f", urls = [ - "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/pebble/com_github_cockroachdb_pebble-v0.0.0-20231218155426-48b54c29d8fe.zip", + "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/pebble/com_github_cockroachdb_pebble-v0.0.0-20231220134654-bfc74496212f.zip", ], ) go_repository( diff --git a/build/bazelutil/distdir_files.bzl b/build/bazelutil/distdir_files.bzl index 820feba1f4f7..f6edffe94f4e 100644 --- a/build/bazelutil/distdir_files.bzl +++ b/build/bazelutil/distdir_files.bzl @@ -322,7 +322,7 @@ DISTDIR_FILES = { "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/gostdlib/com_github_cockroachdb_gostdlib-v1.19.0.zip": "c4d516bcfe8c07b6fc09b8a9a07a95065b36c2855627cb3514e40c98f872b69e", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/logtags/com_github_cockroachdb_logtags-v0.0.0-20230118201751-21c54148d20b.zip": "ca7776f47e5fecb4c495490a679036bfc29d95bd7625290cfdb9abb0baf97476", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/metamorphic/com_github_cockroachdb_metamorphic-v0.0.0-20231108215700-4ba948b56895.zip": "28c8cf42192951b69378cf537be5a9a43f2aeb35542908cc4fe5f689505853ea", - "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/pebble/com_github_cockroachdb_pebble-v0.0.0-20231218155426-48b54c29d8fe.zip": "1878bb40f322c5c93bb7db26b6287219eb56507fc59b82292fcd4d2187639a16", + "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/pebble/com_github_cockroachdb_pebble-v0.0.0-20231220134654-bfc74496212f.zip": "40cd8262139752f6c002d98be91763eb30cf5d6e2288f7e2163f47ee9b0ca50e", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/redact/com_github_cockroachdb_redact-v1.1.5.zip": "11b30528eb0dafc8bc1a5ba39d81277c257cbe6946a7564402f588357c164560", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/returncheck/com_github_cockroachdb_returncheck-v0.0.0-20200612231554-92cdbca611dd.zip": "ce92ba4352deec995b1f2eecf16eba7f5d51f5aa245a1c362dfe24c83d31f82b", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/stress/com_github_cockroachdb_stress-v0.0.0-20220803192808-1806698b1b7b.zip": "3fda531795c600daf25532a4f98be2a1335cd1e5e182c72789bca79f5f69fcc1", diff --git a/go.mod b/go.mod index e0f8c7112d3d..c470db355c4f 100644 --- a/go.mod +++ b/go.mod @@ -113,7 +113,7 @@ require ( github.com/cockroachdb/go-test-teamcity v0.0.0-20191211140407-cff980ad0a55 github.com/cockroachdb/gostdlib v1.19.0 github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b - github.com/cockroachdb/pebble v0.0.0-20231218155426-48b54c29d8fe + github.com/cockroachdb/pebble v0.0.0-20231220134654-bfc74496212f github.com/cockroachdb/redact v1.1.5 github.com/cockroachdb/returncheck v0.0.0-20200612231554-92cdbca611dd github.com/cockroachdb/stress v0.0.0-20220803192808-1806698b1b7b diff --git a/go.sum b/go.sum index 2dfa81f22747..03ebcc991ccd 100644 --- a/go.sum +++ b/go.sum @@ -493,8 +493,8 @@ github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b h1:r6VH0faHjZe github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b/go.mod h1:Vz9DsVWQQhf3vs21MhPMZpMGSht7O/2vFW2xusFUVOs= github.com/cockroachdb/metamorphic v0.0.0-20231108215700-4ba948b56895 h1:XANOgPYtvELQ/h4IrmPAohXqe2pWA8Bwhejr3VQoZsA= github.com/cockroachdb/metamorphic v0.0.0-20231108215700-4ba948b56895/go.mod h1:aPd7gM9ov9M8v32Yy5NJrDyOcD8z642dqs+F0CeNXfA= -github.com/cockroachdb/pebble v0.0.0-20231218155426-48b54c29d8fe h1:ZBhPcgWjnfy2PFWlvPlcOXAfAQqOIdpfksijpKiMWcc= -github.com/cockroachdb/pebble v0.0.0-20231218155426-48b54c29d8fe/go.mod h1:BHuaMa/lK7fUe75BlsteiiTu8ptIG+qSAuDtGMArP18= +github.com/cockroachdb/pebble v0.0.0-20231220134654-bfc74496212f h1:9E2dB7cBwxs8bP4NoZiMEOT6bloFH6kSVhaw0CpkVV0= +github.com/cockroachdb/pebble v0.0.0-20231220134654-bfc74496212f/go.mod h1:BHuaMa/lK7fUe75BlsteiiTu8ptIG+qSAuDtGMArP18= github.com/cockroachdb/redact v1.1.3/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg= github.com/cockroachdb/redact v1.1.5 h1:u1PMllDkdFfPWaNGMyLD1+so+aq3uUItthCFqzwPJ30= github.com/cockroachdb/redact v1.1.5/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg= diff --git a/pkg/ccl/cliccl/testdata/ear-list b/pkg/ccl/cliccl/testdata/ear-list index 0ca62aaacc3a..7362ef47a79f 100644 --- a/pkg/ccl/cliccl/testdata/ear-list +++ b/pkg/ccl/cliccl/testdata/ear-list @@ -8,23 +8,18 @@ list 000004.log: env type: Data, AES128_CTR keyID: bbb65a9d114c2a18740f27b6933b74f61018bd5adf545c153b48ffe6473336ef - nonce: d1 05 79 53 68 35 a0 f1 44 01 22 79 - counter: 1497766936 + nonce: 71 12 f7 22 9a fb 90 24 4e 58 27 01 + counter: 3082989236 000005.sst: env type: Data, AES128_CTR keyID: bbb65a9d114c2a18740f27b6933b74f61018bd5adf545c153b48ffe6473336ef - nonce: d0 b1 31 4b 08 b9 f6 08 7e e6 af 40 - counter: 2167389540 + nonce: 5a 01 ec 25 29 aa 75 fc 92 76 48 ad + counter: 1294279409 COCKROACHDB_DATA_KEYS_000001_monolith: env type: Store, AES128_CTR keyID: f594229216d81add7811c4360212eb7629b578ef4eab6e5d05679b3c5de48867 nonce: 8f 4c ba 1a a3 4f db 3c db 84 cf f5 counter: 2436226951 -CURRENT: - env type: Data, AES128_CTR - keyID: bbb65a9d114c2a18740f27b6933b74f61018bd5adf545c153b48ffe6473336ef - nonce: 71 12 f7 22 9a fb 90 24 4e 58 27 01 - counter: 3082989236 MANIFEST-000001: env type: Data, AES128_CTR keyID: bbb65a9d114c2a18740f27b6933b74f61018bd5adf545c153b48ffe6473336ef @@ -33,20 +28,20 @@ MANIFEST-000001: OPTIONS-000003: env type: Data, AES128_CTR keyID: bbb65a9d114c2a18740f27b6933b74f61018bd5adf545c153b48ffe6473336ef - nonce: 86 a7 78 ad 4b da 62 56 d5 e2 d1 70 - counter: 798955289 + nonce: 80 18 c0 79 61 c7 cf ef b4 25 4e 78 + counter: 1483615076 marker.datakeys.000001.COCKROACHDB_DATA_KEYS_000001_monolith: env type: Store, AES128_CTR keyID: f594229216d81add7811c4360212eb7629b578ef4eab6e5d05679b3c5de48867 nonce: 55 d7 d4 27 6c 97 9b dd f1 5d 40 c8 counter: 467030050 -marker.format-version.000012.013: +marker.format-version.000001.013: env type: Data, AES128_CTR keyID: bbb65a9d114c2a18740f27b6933b74f61018bd5adf545c153b48ffe6473336ef - nonce: 23 d9 b2 e1 39 b0 87 ed f9 6d 49 20 - counter: 3481614039 + nonce: d3 97 11 b3 1a ed 22 2b 74 fb 02 0c + counter: 1229228536 marker.manifest.000001.MANIFEST-000001: env type: Data, AES128_CTR keyID: bbb65a9d114c2a18740f27b6933b74f61018bd5adf545c153b48ffe6473336ef - nonce: d3 97 11 b3 1a ed 22 2b 74 fb 02 0c - counter: 1229228536 + nonce: 18 c2 a6 23 cc 6e 2e 7c 8e bf 84 77 + counter: 3159373900 diff --git a/pkg/ccl/storageccl/engineccl/encrypted_fs_test.go b/pkg/ccl/storageccl/engineccl/encrypted_fs_test.go index ef1c1d68f57e..46e6d4660f07 100644 --- a/pkg/ccl/storageccl/engineccl/encrypted_fs_test.go +++ b/pkg/ccl/storageccl/engineccl/encrypted_fs_test.go @@ -273,11 +273,10 @@ func TestPebbleEncryption(t *testing.T) { stats, err := db.GetEnvStats() require.NoError(t, err) - // Opening the DB should've created OPTIONS, CURRENT, MANIFEST and the - // WAL. - require.GreaterOrEqual(t, stats.TotalFiles, uint64(4)) + // Opening the DB should've created OPTIONS, MANIFEST, and the WAL. + require.GreaterOrEqual(t, stats.TotalFiles, uint64(3)) // We also created markers for the format version and the manifest. - require.Equal(t, uint64(6), stats.ActiveKeyFiles) + require.Equal(t, uint64(5), stats.ActiveKeyFiles) var s enginepbccl.EncryptionStatus require.NoError(t, protoutil.Unmarshal(stats.EncryptionStatus, &s)) require.Equal(t, "16.key", s.ActiveStoreKey.Source) From e615aec402e042cf8fd280a55bb335bc46e4e89d Mon Sep 17 00:00:00 2001 From: Ricky Stewart Date: Wed, 20 Dec 2023 13:36:43 -0600 Subject: [PATCH 11/12] application_api: skip more tests under race Epic: CRDB-8308 Release note: None --- pkg/server/application_api/sessions_test.go | 2 +- pkg/server/application_api/sql_stats_test.go | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/server/application_api/sessions_test.go b/pkg/server/application_api/sessions_test.go index ca943bb35501..539da974591b 100644 --- a/pkg/server/application_api/sessions_test.go +++ b/pkg/server/application_api/sessions_test.go @@ -324,7 +324,7 @@ func TestListClosedSessions(t *testing.T) { defer log.Scope(t).Close(t) // The active sessions might close before the stress race can finish. - skip.UnderStressRace(t, "active sessions") + skip.UnderRace(t, "active sessions") ctx := context.Background() testCluster := serverutils.StartCluster(t, 3, base.TestClusterArgs{}) diff --git a/pkg/server/application_api/sql_stats_test.go b/pkg/server/application_api/sql_stats_test.go index 2bdf538a6855..93c40e20a1f5 100644 --- a/pkg/server/application_api/sql_stats_test.go +++ b/pkg/server/application_api/sql_stats_test.go @@ -395,6 +395,8 @@ func TestStatusAPIStatements(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + skip.UnderRace(t, "probable OOM") + // Increase the timeout for the http client if under stress. additionalTimeout := 0 * time.Second if skip.Stress() { From de430dfb4cbb153543f509b5958fd5bef8c73cbb Mon Sep 17 00:00:00 2001 From: Andrew Baptist Date: Sun, 17 Dec 2023 15:38:37 -0500 Subject: [PATCH 12/12] kvclient: remove RPCContext from DistSender RPCContext is unnecessary in DistSender. The only two things that are needed are the LatencyFunc, which is extracted from the RemoteClocks, and the Stopper which should have been passed in directly. This change makes DistSender a tiny bit more easily testable. Epic: none Release note: None --- pkg/kv/kvclient/kvcoord/dist_sender.go | 39 ++++--- .../kvclient/kvcoord/dist_sender_rangefeed.go | 6 +- .../dist_sender_rangefeed_mock_test.go | 2 +- .../kvcoord/dist_sender_server_test.go | 6 +- pkg/kv/kvclient/kvcoord/dist_sender_test.go | 104 ++++++++---------- .../kvcoord/local_test_cluster_util.go | 2 +- pkg/kv/kvclient/kvcoord/range_iter_test.go | 8 +- pkg/kv/kvclient/kvcoord/send_test.go | 2 +- .../txn_interceptor_pipeliner_client_test.go | 2 +- pkg/kv/kvserver/store_test.go | 2 +- pkg/server/server.go | 3 +- pkg/server/tenant.go | 3 +- 12 files changed, 84 insertions(+), 95 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index 7014fe3b8bd0..af4398ce7f80 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -509,7 +509,8 @@ type FirstRangeProvider interface { type DistSender struct { log.AmbientContext - st *cluster.Settings + st *cluster.Settings + stopper *stop.Stopper // clock is used to set time for some calls. E.g. read-only ops // which span ranges and don't require read consistency. clock *hlc.Clock @@ -529,15 +530,10 @@ type DistSender struct { // This is not required if a RangeDescriptorDB is supplied. firstRangeProvider FirstRangeProvider transportFactory TransportFactory - rpcContext *rpc.Context // nodeDialer allows RPC calls from the SQL layer to the KV layer. nodeDialer *nodedialer.Dialer rpcRetryOptions retry.Options asyncSenderSem *quotapool.IntPool - // clusterID is the logical cluster ID used to verify access to enterprise features. - // It is copied out of the rpcContext at construction time and used in - // testing. - logicalClusterID *base.ClusterIDContainer // batchInterceptor is set for tenants; when set, information about all // BatchRequests and BatchResponses are passed through this interceptor, which @@ -588,6 +584,7 @@ type DistSenderConfig struct { AmbientCtx log.AmbientContext Settings *cluster.Settings + Stopper *stop.Stopper Clock *hlc.Clock NodeDescs NodeDescStore // NodeIDGetter, if set, provides non-gossip based implementation for @@ -595,7 +592,6 @@ type DistSenderConfig struct { // preferentially route requests to a local replica (if one exists). NodeIDGetter func() roachpb.NodeID RPCRetryOptions *retry.Options - RPCContext *rpc.Context // NodeDialer is the dialer from the SQL layer to the KV layer. NodeDialer *nodedialer.Dialer @@ -628,6 +624,8 @@ type DistSenderConfig struct { TestingKnobs ClientTestingKnobs HealthFunc HealthFunc + + LatencyFunc LatencyFunc } // NewDistSender returns a batch.Sender instance which connects to the @@ -648,6 +646,7 @@ func NewDistSender(cfg DistSenderConfig) *DistSender { } ds := &DistSender{ st: cfg.Settings, + stopper: cfg.Stopper, clock: cfg.Clock, nodeDescs: cfg.NodeDescs, nodeIDGetter: nodeIDGetter, @@ -655,6 +654,7 @@ func NewDistSender(cfg DistSenderConfig) *DistSender { kvInterceptor: cfg.KVInterceptor, locality: cfg.Locality, healthFunc: cfg.HealthFunc, + latencyFunc: cfg.LatencyFunc, } if ds.st == nil { ds.st = cluster.MakeTestingClusterSettings() @@ -679,7 +679,7 @@ func NewDistSender(cfg DistSenderConfig) *DistSender { getRangeDescCacheSize := func() int64 { return rangeDescriptorCacheSize.Get(&ds.st.SV) } - ds.rangeCache = rangecache.NewRangeCache(ds.st, rdb, getRangeDescCacheSize, cfg.RPCContext.Stopper) + ds.rangeCache = rangecache.NewRangeCache(ds.st, rdb, getRangeDescCacheSize, cfg.Stopper) if tf := cfg.TestingKnobs.TransportFactory; tf != nil { ds.transportFactory = tf } else { @@ -693,21 +693,16 @@ func NewDistSender(cfg DistSenderConfig) *DistSender { if cfg.RPCRetryOptions != nil { ds.rpcRetryOptions = *cfg.RPCRetryOptions } - if cfg.RPCContext == nil { - panic("no RPCContext set in DistSenderConfig") - } - ds.rpcContext = cfg.RPCContext ds.nodeDialer = cfg.NodeDialer if ds.rpcRetryOptions.Closer == nil { - ds.rpcRetryOptions.Closer = ds.rpcContext.Stopper.ShouldQuiesce() + ds.rpcRetryOptions.Closer = cfg.Stopper.ShouldQuiesce() } - ds.logicalClusterID = cfg.RPCContext.LogicalClusterID ds.asyncSenderSem = quotapool.NewIntPool("DistSender async concurrency", uint64(senderConcurrencyLimit.Get(&ds.st.SV))) senderConcurrencyLimit.SetOnChange(&ds.st.SV, func(ctx context.Context) { ds.asyncSenderSem.UpdateCapacity(uint64(senderConcurrencyLimit.Get(&ds.st.SV))) }) - ds.rpcContext.Stopper.AddCloser(ds.asyncSenderSem.Closer("stopper")) + cfg.Stopper.AddCloser(ds.asyncSenderSem.Closer("stopper")) if ds.firstRangeProvider != nil { ctx := ds.AnnotateCtx(context.Background()) @@ -722,8 +717,12 @@ func NewDistSender(cfg DistSenderConfig) *DistSender { if cfg.TestingKnobs.LatencyFunc != nil { ds.latencyFunc = cfg.TestingKnobs.LatencyFunc - } else { - ds.latencyFunc = ds.rpcContext.RemoteClocks.Latency + } + // Some tests don't set the latencyFunc. + if ds.latencyFunc == nil { + ds.latencyFunc = func(roachpb.NodeID) (time.Duration, bool) { + return time.Millisecond, true + } } if cfg.TestingKnobs.OnRangeSpanningNonTxnalBatch != nil { @@ -1233,9 +1232,9 @@ func (ds *DistSender) divideAndSendParallelCommit( qiBatchIdx := batchIdx + 1 qiResponseCh := make(chan response, 1) - runTask := ds.rpcContext.Stopper.RunAsyncTask + runTask := ds.stopper.RunAsyncTask if ds.disableParallelBatches { - runTask = ds.rpcContext.Stopper.RunTask + runTask = ds.stopper.RunTask } if err := runTask(ctx, "kv.DistSender: sending pre-commit query intents", func(ctx context.Context) { // Map response index to the original un-swapped batch index. @@ -1776,7 +1775,7 @@ func (ds *DistSender) sendPartialBatchAsync( responseCh chan response, positions []int, ) bool { - if err := ds.rpcContext.Stopper.RunAsyncTaskEx( + if err := ds.stopper.RunAsyncTaskEx( ctx, stop.TaskOpts{ TaskName: "kv.DistSender: sending partial batch", diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index e89be712c8c7..43d243f4acbc 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -756,15 +756,11 @@ func (a *activeRangeFeed) acquireCatchupScanQuota( func newTransportForRange( ctx context.Context, desc *roachpb.RangeDescriptor, ds *DistSender, ) (Transport, error) { - var latencyFn LatencyFunc - if ds.rpcContext != nil { - latencyFn = ds.rpcContext.RemoteClocks.Latency - } replicas, err := NewReplicaSlice(ctx, ds.nodeDescs, desc, nil, AllExtantReplicas) if err != nil { return nil, err } - replicas.OptimizeReplicaOrder(ds.st, ds.nodeIDGetter(), ds.healthFunc, latencyFn, ds.locality) + replicas.OptimizeReplicaOrder(ds.st, ds.nodeIDGetter(), ds.healthFunc, ds.latencyFunc, ds.locality) opts := SendOptions{class: connectionClass(&ds.st.SV)} return ds.transportFactory(opts, ds.nodeDialer, replicas) } diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go index f967a97521a3..6b09b8ffec95 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go @@ -155,7 +155,7 @@ func TestDistSenderRangeFeedRetryOnTransportErrors(t *testing.T) { Clock: clock, NodeDescs: g, RPCRetryOptions: &retry.Options{MaxRetries: 10}, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: func(SendOptions, *nodedialer.Dialer, ReplicaSlice) (Transport, error) { return transport, nil diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go index 140c22a57475..725686864fb5 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go @@ -92,7 +92,7 @@ func TestRangeLookupWithOpenTransaction(t *testing.T) { Settings: cluster.MakeTestingClusterSettings(), Clock: s.Clock(), NodeDescs: gs, - RPCContext: s.RPCContext(), + Stopper: s.Stopper(), NodeDialer: nodedialer.New(s.RPCContext(), gossip.AddressResolver(gs)), FirstRangeProvider: gs, }) @@ -1128,7 +1128,7 @@ func TestMultiRangeScanReverseScanInconsistent(t *testing.T) { Settings: s.ClusterSettings(), Clock: clock, NodeDescs: gs, - RPCContext: s.RPCContext(), + Stopper: s.Stopper(), NodeDialer: nodedialer.New(s.RPCContext(), gossip.AddressResolver(gs)), FirstRangeProvider: gs, }) @@ -1656,7 +1656,7 @@ func TestBatchPutWithConcurrentSplit(t *testing.T) { AmbientCtx: s.AmbientCtx(), Clock: s.Clock(), NodeDescs: gs, - RPCContext: s.RPCContext(), + Stopper: s.Stopper(), NodeDialer: nodedialer.New(s.RPCContext(), gossip.AddressResolver(gs)), Settings: cluster.MakeTestingClusterSettings(), FirstRangeProvider: gs, diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_test.go index 4dd2d15b6368..27e9c0516a5a 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_test.go @@ -449,7 +449,7 @@ func TestSendRPCOrder(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: transportFactory, }, @@ -603,7 +603,7 @@ func TestImmutableBatchArgs(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), }, @@ -750,7 +750,7 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), }, @@ -843,7 +843,7 @@ func TestBackoffOnNotLeaseHolderErrorDuringTransfer(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), }, @@ -934,7 +934,7 @@ func TestNoBackoffOnNotLeaseHolderErrorFromFollowerRead(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), }, @@ -1004,7 +1004,7 @@ func TestNoBackoffOnNotLeaseHolderErrorWithoutLease(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(sendFn), }, @@ -1101,7 +1101,7 @@ func TestDistSenderMovesOnFromReplicaWithStaleLease(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(sendFn), }, @@ -1223,7 +1223,7 @@ func TestDistSenderIgnoresNLHEBasedOnOldRangeGeneration(t *testing.T) { AmbientCtx: log.AmbientContext{Tracer: tracer}, Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(sendFn), }, @@ -1332,7 +1332,7 @@ func TestDistSenderRetryOnTransportErrors(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(sendFn), }, @@ -1436,7 +1436,7 @@ func TestDistSenderDownNodeEvictLeaseholder(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(transport), }, @@ -1494,7 +1494,7 @@ func TestRetryOnDescriptorLookupError(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(stubRPCSendFn), }, @@ -1570,7 +1570,7 @@ func TestEvictOnFirstRangeGossip(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: SenderTransportFactory( tracing.NewTracer(), @@ -1716,7 +1716,7 @@ func TestEvictCacheOnError(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), }, @@ -1792,7 +1792,7 @@ func TestEvictCacheOnUnknownLeaseHolder(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), }, @@ -1892,7 +1892,7 @@ func TestRetryOnWrongReplicaError(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), }, @@ -1993,7 +1993,7 @@ func TestRetryOnWrongReplicaErrorWithSuggestion(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), }, @@ -2027,7 +2027,7 @@ func TestGetFirstRangeDescriptor(t *testing.T) { ds := NewDistSender(DistSenderConfig{ AmbientCtx: log.MakeTestingAmbientContext(stopper.Tracer()), NodeDescs: n.Nodes[0].Gossip, - RPCContext: n.RPCContext, + Stopper: stopper, NodeDialer: nodedialer.New(n.RPCContext, gossip.AddressResolver(n.Nodes[0].Gossip)), FirstRangeProvider: n.Nodes[0].Gossip, Settings: cluster.MakeTestingClusterSettings(), @@ -2117,7 +2117,7 @@ func TestSendRPCRetry(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), }, @@ -2240,7 +2240,7 @@ func TestDistSenderDescriptorUpdatesOnSuccessfulRPCs(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), }, @@ -2356,7 +2356,7 @@ func TestSendRPCRangeNotFoundError(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), }, @@ -2437,7 +2437,7 @@ func TestMultiRangeGapReverse(t *testing.T) { AmbientCtx: log.MakeTestingAmbientContext(stopper.Tracer()), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, RangeDescriptorDB: rdb, TestingKnobs: ClientTestingKnobs{ TransportFactory: SenderTransportFactory(tr, sender), @@ -2545,7 +2545,7 @@ func TestMultiRangeMergeStaleDescriptor(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), }, @@ -2594,7 +2594,7 @@ func TestRangeLookupOptionOnReverseScan(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(stubRPCSendFn), }, @@ -2634,7 +2634,7 @@ func TestClockUpdateOnResponse(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, RangeDescriptorDB: defaultMockRangeDescriptorDB, NodeDialer: nodedialer.New(rpcContext, gossip.AddressResolver(g)), Settings: cluster.MakeTestingClusterSettings(), @@ -2768,7 +2768,7 @@ func TestTruncateWithSpanAndDescriptor(t *testing.T) { AmbientCtx: log.MakeTestingAmbientContext(stopper.Tracer()), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(sendStub), }, @@ -2895,7 +2895,7 @@ func TestTruncateWithLocalSpanAndDescriptor(t *testing.T) { AmbientCtx: log.MakeTestingAmbientContext(stopper.Tracer()), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(sendStub), }, @@ -3086,7 +3086,7 @@ func TestMultiRangeWithEndTxn(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), }, @@ -3219,7 +3219,7 @@ func TestParallelCommitSplitFromQueryIntents(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), }, @@ -3348,7 +3348,7 @@ func TestParallelCommitsDetectIntentMissingCause(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), }, @@ -3434,7 +3434,7 @@ func TestCountRanges(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(stubRPCSendFn), }, @@ -3521,7 +3521,7 @@ func TestPProfLabelsAppliedToBatchRequestHeader(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), }, @@ -3575,7 +3575,7 @@ func TestGatewayNodeID(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), }, @@ -3786,7 +3786,7 @@ func TestMultipleErrorsMerged(t *testing.T) { AmbientCtx: log.MakeTestingAmbientContext(stopper.Tracer()), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), }, @@ -3924,7 +3924,7 @@ func TestErrorIndexAlignment(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), }, @@ -4004,7 +4004,7 @@ func TestCanSendToFollower(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), }, @@ -4055,7 +4055,6 @@ func TestCanSendToFollower(t *testing.T) { sentTo = roachpb.ReplicaDescriptor{} canSend = c.canSendToFollower ds := NewDistSender(cfg) - ds.logicalClusterID = &base.ClusterIDContainer{} // Make store 2 the leaseholder. lease := roachpb.Lease{ Replica: testUserRangeDescriptor3Replicas.InternalReplicas[1], @@ -4228,7 +4227,7 @@ func TestEvictMetaRange(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), }, @@ -4334,7 +4333,7 @@ func TestConnectionClass(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: transportFactory, }, @@ -4491,7 +4490,7 @@ func TestEvictionTokenCoalesce(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, RPCRetryOptions: &retry.Options{ MaxRetries: 1, }, @@ -4688,7 +4687,7 @@ func TestErrorIndexOnRangeSplit(t *testing.T) { AmbientCtx: log.AmbientContext{Tracer: tr}, Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, RangeDescriptorDB: initialRDB, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(transportFn), @@ -4824,7 +4823,7 @@ func TestRequestSubdivisionAfterDescriptorChange(t *testing.T) { AmbientCtx: log.AmbientContext{Tracer: tr}, Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, RangeDescriptorDB: initialRDB, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(transportFn), @@ -4917,7 +4916,7 @@ func TestRequestSubdivisionAfterDescriptorChangeWithUnavailableReplicasTerminate Clock: clock, NodeDescs: g, RPCRetryOptions: rpcRetryOptions, - RPCContext: rpcContext, + Stopper: stopper, RangeDescriptorDB: splitRDB, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(transportFn), @@ -5084,7 +5083,7 @@ func TestDescriptorChangeAfterRequestSubdivision(t *testing.T) { AmbientCtx: log.AmbientContext{Tracer: tr}, Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, RangeDescriptorDB: initialRDB, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(transportFn), @@ -5122,7 +5121,6 @@ func TestSendToReplicasSkipsStaleReplicas(t *testing.T) { defer stopper.Stop(ctx) clock := hlc.NewClockForTesting(nil) - rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper) ns := &mockNodeStore{ nodes: []roachpb.NodeDescriptor{ @@ -5310,7 +5308,7 @@ func TestSendToReplicasSkipsStaleReplicas(t *testing.T) { AmbientCtx: log.MakeTestingAmbientContext(tr), Clock: clock, NodeDescs: ns, - RPCContext: rpcContext, + Stopper: stopper, RangeDescriptorDB: MockRangeDescriptorDB(func(key roachpb.RKey, reverse bool) ( []roachpb.RangeDescriptor, []roachpb.RangeDescriptor, error, ) { @@ -5356,8 +5354,6 @@ func TestDistSenderComputeNetworkCost(t *testing.T) { stopper := stop.NewStopper() defer stopper.Stop(ctx) - clock := hlc.NewClockForTesting(nil) - rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper) rddb := MockRangeDescriptorDB(func(key roachpb.RKey, reverse bool) ( []roachpb.RangeDescriptor, []roachpb.RangeDescriptor, error, ) { @@ -5605,7 +5601,7 @@ func TestDistSenderComputeNetworkCost(t *testing.T) { for _, isWrite := range []bool{true, false} { t.Run(fmt.Sprintf("isWrite=%t/%s", isWrite, tc.name), func(t *testing.T) { tc.cfg.AmbientCtx = log.MakeTestingAmbientContext(tracing.NewTracer()) - tc.cfg.RPCContext = rpcContext + tc.cfg.Stopper = stopper tc.cfg.RangeDescriptorDB = rddb tc.cfg.Settings = st ds := NewDistSender(*tc.cfg) @@ -5640,7 +5636,6 @@ func TestDistSenderDescEvictionAfterLeaseUpdate(t *testing.T) { // success. clock := hlc.NewClockForTesting(nil) - rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper) ns := &mockNodeStore{nodes: []roachpb.NodeDescriptor{ {NodeID: 1, Address: util.UnresolvedAddr{}}, {NodeID: 2, Address: util.UnresolvedAddr{}}, @@ -5699,7 +5694,7 @@ func TestDistSenderDescEvictionAfterLeaseUpdate(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: ns, - RPCContext: rpcContext, + Stopper: stopper, RangeDescriptorDB: MockRangeDescriptorDB(func(key roachpb.RKey, reverse bool) ( []roachpb.RangeDescriptor, []roachpb.RangeDescriptor, error, ) { @@ -5742,7 +5737,6 @@ func TestDistSenderRPCMetrics(t *testing.T) { defer stopper.Stop(ctx) clock := hlc.NewClockForTesting(nil) - rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper) ns := &mockNodeStore{nodes: []roachpb.NodeDescriptor{ {NodeID: 1, Address: util.UnresolvedAddr{}}, {NodeID: 2, Address: util.UnresolvedAddr{}}, @@ -5778,7 +5772,7 @@ func TestDistSenderRPCMetrics(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: ns, - RPCContext: rpcContext, + Stopper: stopper, RangeDescriptorDB: MockRangeDescriptorDB(func(key roachpb.RKey, reverse bool) ( []roachpb.RangeDescriptor, []roachpb.RangeDescriptor, error, ) { @@ -5913,7 +5907,6 @@ func TestDistSenderNLHEFromUninitializedReplicaDoesNotCauseUnboundedBackoff(t *t // TODO(arul): remove the speculative lease version of this test in 23.1. clock := hlc.NewClockForTesting(nil) - rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper) ns := &mockNodeStore{nodes: []roachpb.NodeDescriptor{ {NodeID: 1, Address: util.UnresolvedAddr{}}, {NodeID: 2, Address: util.UnresolvedAddr{}}, @@ -5983,7 +5976,7 @@ func TestDistSenderNLHEFromUninitializedReplicaDoesNotCauseUnboundedBackoff(t *t AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: ns, - RPCContext: rpcContext, + Stopper: stopper, RangeDescriptorDB: MockRangeDescriptorDB(func(key roachpb.RKey, reverse bool) ( []roachpb.RangeDescriptor, []roachpb.RangeDescriptor, error, ) { @@ -6044,7 +6037,6 @@ func TestOptimisticRangeDescriptorLookups(t *testing.T) { stopper := stop.NewStopper() manualC := timeutil.NewManualTime(timeutil.Unix(0, 1)) clock := hlc.NewClockForTesting(manualC) - rpcContext := rpc.NewInsecureTestingContext(context.Background(), clock, stopper) ns := &mockNodeStore{nodes: []roachpb.NodeDescriptor{ {NodeID: 1, Address: util.UnresolvedAddr{}}, @@ -6076,7 +6068,7 @@ func TestOptimisticRangeDescriptorLookups(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: ns, - RPCContext: rpcContext, + Stopper: stopper, FirstRangeProvider: fr, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(transportFn), diff --git a/pkg/kv/kvclient/kvcoord/local_test_cluster_util.go b/pkg/kv/kvclient/kvcoord/local_test_cluster_util.go index 9a3488e6dcc7..9eca0dd128fb 100644 --- a/pkg/kv/kvclient/kvcoord/local_test_cluster_util.go +++ b/pkg/kv/kvclient/kvcoord/local_test_cluster_util.go @@ -90,7 +90,7 @@ func NewDistSenderForLocalTestCluster( Settings: st, Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, RPCRetryOptions: &retryOpts, NodeDialer: nodedialer.New(rpcContext, gossip.AddressResolver(g)), FirstRangeProvider: g, diff --git a/pkg/kv/kvclient/kvcoord/range_iter_test.go b/pkg/kv/kvclient/kvcoord/range_iter_test.go index 9c94503ac3d2..798945ab94d7 100644 --- a/pkg/kv/kvclient/kvcoord/range_iter_test.go +++ b/pkg/kv/kvclient/kvcoord/range_iter_test.go @@ -63,7 +63,7 @@ func TestRangeIterForward(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, RangeDescriptorDB: alphaRangeDescriptorDB, Settings: cluster.MakeTestingClusterSettings(), }) @@ -99,7 +99,7 @@ func TestRangeIterSeekForward(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, RangeDescriptorDB: alphaRangeDescriptorDB, Settings: cluster.MakeTestingClusterSettings(), }) @@ -138,7 +138,7 @@ func TestRangeIterReverse(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, RangeDescriptorDB: alphaRangeDescriptorDB, Settings: cluster.MakeTestingClusterSettings(), }) @@ -174,7 +174,7 @@ func TestRangeIterSeekReverse(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, RangeDescriptorDB: alphaRangeDescriptorDB, Settings: cluster.MakeTestingClusterSettings(), }) diff --git a/pkg/kv/kvclient/kvcoord/send_test.go b/pkg/kv/kvclient/kvcoord/send_test.go index 48d7849a7d43..37a1f2a6542a 100644 --- a/pkg/kv/kvclient/kvcoord/send_test.go +++ b/pkg/kv/kvclient/kvcoord/send_test.go @@ -372,7 +372,7 @@ func sendBatch( AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Settings: cluster.MakeTestingClusterSettings(), NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, NodeDialer: nodeDialer, FirstRangeProvider: g, TestingKnobs: ClientTestingKnobs{ diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_client_test.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_client_test.go index f474d01bc7c0..1bfc4de739d2 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_client_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_client_test.go @@ -109,7 +109,7 @@ func TestTxnPipelinerCondenseLockSpans(t *testing.T) { AmbientCtx: ambient, Clock: s.Clock, NodeDescs: s.Gossip, - RPCContext: s.Cfg.RPCContext, + Stopper: s.Stopper(), TestingKnobs: kvcoord.ClientTestingKnobs{ TransportFactory: kvcoord.TestingAdaptSimpleTransport(sendFn), }, diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go index ef5376fc2625..fae579ff1523 100644 --- a/pkg/kv/kvserver/store_test.go +++ b/pkg/kv/kvserver/store_test.go @@ -256,7 +256,7 @@ func createTestStoreWithoutStart( Settings: cfg.Settings, Clock: cfg.Clock, NodeDescs: mockNodeStore{desc: nodeDesc}, - RPCContext: rpcContext, + Stopper: stopper, RPCRetryOptions: &retry.Options{}, NodeDialer: cfg.NodeDialer, FirstRangeProvider: rangeProv, diff --git a/pkg/server/server.go b/pkg/server/server.go index 4ee3fbb3a19b..d8eda103c525 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -452,7 +452,8 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf Settings: st, Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, + LatencyFunc: rpcContext.RemoteClocks.Latency, RPCRetryOptions: &retryOpts, NodeDialer: kvNodeDialer, FirstRangeProvider: g, diff --git a/pkg/server/tenant.go b/pkg/server/tenant.go index b8f7bd5905f2..6df184b7f6c5 100644 --- a/pkg/server/tenant.go +++ b/pkg/server/tenant.go @@ -1191,7 +1191,8 @@ func makeTenantSQLServerArgs( NodeDescs: tenantConnect, NodeIDGetter: deps.nodeIDGetter, RPCRetryOptions: &rpcRetryOptions, - RPCContext: rpcContext, + Stopper: stopper, + LatencyFunc: rpcContext.RemoteClocks.Latency, NodeDialer: kvNodeDialer, RangeDescriptorDB: tenantConnect, Locality: baseCfg.Locality,