From de430dfb4cbb153543f509b5958fd5bef8c73cbb Mon Sep 17 00:00:00 2001 From: Andrew Baptist Date: Sun, 17 Dec 2023 15:38:37 -0500 Subject: [PATCH] kvclient: remove RPCContext from DistSender RPCContext is unnecessary in DistSender. The only two things that are needed are the LatencyFunc, which is extracted from the RemoteClocks, and the Stopper which should have been passed in directly. This change makes DistSender a tiny bit more easily testable. Epic: none Release note: None --- pkg/kv/kvclient/kvcoord/dist_sender.go | 39 ++++--- .../kvclient/kvcoord/dist_sender_rangefeed.go | 6 +- .../dist_sender_rangefeed_mock_test.go | 2 +- .../kvcoord/dist_sender_server_test.go | 6 +- pkg/kv/kvclient/kvcoord/dist_sender_test.go | 104 ++++++++---------- .../kvcoord/local_test_cluster_util.go | 2 +- pkg/kv/kvclient/kvcoord/range_iter_test.go | 8 +- pkg/kv/kvclient/kvcoord/send_test.go | 2 +- .../txn_interceptor_pipeliner_client_test.go | 2 +- pkg/kv/kvserver/store_test.go | 2 +- pkg/server/server.go | 3 +- pkg/server/tenant.go | 3 +- 12 files changed, 84 insertions(+), 95 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index 7014fe3b8bd0..af4398ce7f80 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -509,7 +509,8 @@ type FirstRangeProvider interface { type DistSender struct { log.AmbientContext - st *cluster.Settings + st *cluster.Settings + stopper *stop.Stopper // clock is used to set time for some calls. E.g. read-only ops // which span ranges and don't require read consistency. clock *hlc.Clock @@ -529,15 +530,10 @@ type DistSender struct { // This is not required if a RangeDescriptorDB is supplied. firstRangeProvider FirstRangeProvider transportFactory TransportFactory - rpcContext *rpc.Context // nodeDialer allows RPC calls from the SQL layer to the KV layer. nodeDialer *nodedialer.Dialer rpcRetryOptions retry.Options asyncSenderSem *quotapool.IntPool - // clusterID is the logical cluster ID used to verify access to enterprise features. - // It is copied out of the rpcContext at construction time and used in - // testing. - logicalClusterID *base.ClusterIDContainer // batchInterceptor is set for tenants; when set, information about all // BatchRequests and BatchResponses are passed through this interceptor, which @@ -588,6 +584,7 @@ type DistSenderConfig struct { AmbientCtx log.AmbientContext Settings *cluster.Settings + Stopper *stop.Stopper Clock *hlc.Clock NodeDescs NodeDescStore // NodeIDGetter, if set, provides non-gossip based implementation for @@ -595,7 +592,6 @@ type DistSenderConfig struct { // preferentially route requests to a local replica (if one exists). NodeIDGetter func() roachpb.NodeID RPCRetryOptions *retry.Options - RPCContext *rpc.Context // NodeDialer is the dialer from the SQL layer to the KV layer. NodeDialer *nodedialer.Dialer @@ -628,6 +624,8 @@ type DistSenderConfig struct { TestingKnobs ClientTestingKnobs HealthFunc HealthFunc + + LatencyFunc LatencyFunc } // NewDistSender returns a batch.Sender instance which connects to the @@ -648,6 +646,7 @@ func NewDistSender(cfg DistSenderConfig) *DistSender { } ds := &DistSender{ st: cfg.Settings, + stopper: cfg.Stopper, clock: cfg.Clock, nodeDescs: cfg.NodeDescs, nodeIDGetter: nodeIDGetter, @@ -655,6 +654,7 @@ func NewDistSender(cfg DistSenderConfig) *DistSender { kvInterceptor: cfg.KVInterceptor, locality: cfg.Locality, healthFunc: cfg.HealthFunc, + latencyFunc: cfg.LatencyFunc, } if ds.st == nil { ds.st = cluster.MakeTestingClusterSettings() @@ -679,7 +679,7 @@ func NewDistSender(cfg DistSenderConfig) *DistSender { getRangeDescCacheSize := func() int64 { return rangeDescriptorCacheSize.Get(&ds.st.SV) } - ds.rangeCache = rangecache.NewRangeCache(ds.st, rdb, getRangeDescCacheSize, cfg.RPCContext.Stopper) + ds.rangeCache = rangecache.NewRangeCache(ds.st, rdb, getRangeDescCacheSize, cfg.Stopper) if tf := cfg.TestingKnobs.TransportFactory; tf != nil { ds.transportFactory = tf } else { @@ -693,21 +693,16 @@ func NewDistSender(cfg DistSenderConfig) *DistSender { if cfg.RPCRetryOptions != nil { ds.rpcRetryOptions = *cfg.RPCRetryOptions } - if cfg.RPCContext == nil { - panic("no RPCContext set in DistSenderConfig") - } - ds.rpcContext = cfg.RPCContext ds.nodeDialer = cfg.NodeDialer if ds.rpcRetryOptions.Closer == nil { - ds.rpcRetryOptions.Closer = ds.rpcContext.Stopper.ShouldQuiesce() + ds.rpcRetryOptions.Closer = cfg.Stopper.ShouldQuiesce() } - ds.logicalClusterID = cfg.RPCContext.LogicalClusterID ds.asyncSenderSem = quotapool.NewIntPool("DistSender async concurrency", uint64(senderConcurrencyLimit.Get(&ds.st.SV))) senderConcurrencyLimit.SetOnChange(&ds.st.SV, func(ctx context.Context) { ds.asyncSenderSem.UpdateCapacity(uint64(senderConcurrencyLimit.Get(&ds.st.SV))) }) - ds.rpcContext.Stopper.AddCloser(ds.asyncSenderSem.Closer("stopper")) + cfg.Stopper.AddCloser(ds.asyncSenderSem.Closer("stopper")) if ds.firstRangeProvider != nil { ctx := ds.AnnotateCtx(context.Background()) @@ -722,8 +717,12 @@ func NewDistSender(cfg DistSenderConfig) *DistSender { if cfg.TestingKnobs.LatencyFunc != nil { ds.latencyFunc = cfg.TestingKnobs.LatencyFunc - } else { - ds.latencyFunc = ds.rpcContext.RemoteClocks.Latency + } + // Some tests don't set the latencyFunc. + if ds.latencyFunc == nil { + ds.latencyFunc = func(roachpb.NodeID) (time.Duration, bool) { + return time.Millisecond, true + } } if cfg.TestingKnobs.OnRangeSpanningNonTxnalBatch != nil { @@ -1233,9 +1232,9 @@ func (ds *DistSender) divideAndSendParallelCommit( qiBatchIdx := batchIdx + 1 qiResponseCh := make(chan response, 1) - runTask := ds.rpcContext.Stopper.RunAsyncTask + runTask := ds.stopper.RunAsyncTask if ds.disableParallelBatches { - runTask = ds.rpcContext.Stopper.RunTask + runTask = ds.stopper.RunTask } if err := runTask(ctx, "kv.DistSender: sending pre-commit query intents", func(ctx context.Context) { // Map response index to the original un-swapped batch index. @@ -1776,7 +1775,7 @@ func (ds *DistSender) sendPartialBatchAsync( responseCh chan response, positions []int, ) bool { - if err := ds.rpcContext.Stopper.RunAsyncTaskEx( + if err := ds.stopper.RunAsyncTaskEx( ctx, stop.TaskOpts{ TaskName: "kv.DistSender: sending partial batch", diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index e89be712c8c7..43d243f4acbc 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -756,15 +756,11 @@ func (a *activeRangeFeed) acquireCatchupScanQuota( func newTransportForRange( ctx context.Context, desc *roachpb.RangeDescriptor, ds *DistSender, ) (Transport, error) { - var latencyFn LatencyFunc - if ds.rpcContext != nil { - latencyFn = ds.rpcContext.RemoteClocks.Latency - } replicas, err := NewReplicaSlice(ctx, ds.nodeDescs, desc, nil, AllExtantReplicas) if err != nil { return nil, err } - replicas.OptimizeReplicaOrder(ds.st, ds.nodeIDGetter(), ds.healthFunc, latencyFn, ds.locality) + replicas.OptimizeReplicaOrder(ds.st, ds.nodeIDGetter(), ds.healthFunc, ds.latencyFunc, ds.locality) opts := SendOptions{class: connectionClass(&ds.st.SV)} return ds.transportFactory(opts, ds.nodeDialer, replicas) } diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go index f967a97521a3..6b09b8ffec95 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go @@ -155,7 +155,7 @@ func TestDistSenderRangeFeedRetryOnTransportErrors(t *testing.T) { Clock: clock, NodeDescs: g, RPCRetryOptions: &retry.Options{MaxRetries: 10}, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: func(SendOptions, *nodedialer.Dialer, ReplicaSlice) (Transport, error) { return transport, nil diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go index 140c22a57475..725686864fb5 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go @@ -92,7 +92,7 @@ func TestRangeLookupWithOpenTransaction(t *testing.T) { Settings: cluster.MakeTestingClusterSettings(), Clock: s.Clock(), NodeDescs: gs, - RPCContext: s.RPCContext(), + Stopper: s.Stopper(), NodeDialer: nodedialer.New(s.RPCContext(), gossip.AddressResolver(gs)), FirstRangeProvider: gs, }) @@ -1128,7 +1128,7 @@ func TestMultiRangeScanReverseScanInconsistent(t *testing.T) { Settings: s.ClusterSettings(), Clock: clock, NodeDescs: gs, - RPCContext: s.RPCContext(), + Stopper: s.Stopper(), NodeDialer: nodedialer.New(s.RPCContext(), gossip.AddressResolver(gs)), FirstRangeProvider: gs, }) @@ -1656,7 +1656,7 @@ func TestBatchPutWithConcurrentSplit(t *testing.T) { AmbientCtx: s.AmbientCtx(), Clock: s.Clock(), NodeDescs: gs, - RPCContext: s.RPCContext(), + Stopper: s.Stopper(), NodeDialer: nodedialer.New(s.RPCContext(), gossip.AddressResolver(gs)), Settings: cluster.MakeTestingClusterSettings(), FirstRangeProvider: gs, diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_test.go index 4dd2d15b6368..27e9c0516a5a 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_test.go @@ -449,7 +449,7 @@ func TestSendRPCOrder(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: transportFactory, }, @@ -603,7 +603,7 @@ func TestImmutableBatchArgs(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), }, @@ -750,7 +750,7 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), }, @@ -843,7 +843,7 @@ func TestBackoffOnNotLeaseHolderErrorDuringTransfer(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), }, @@ -934,7 +934,7 @@ func TestNoBackoffOnNotLeaseHolderErrorFromFollowerRead(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), }, @@ -1004,7 +1004,7 @@ func TestNoBackoffOnNotLeaseHolderErrorWithoutLease(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(sendFn), }, @@ -1101,7 +1101,7 @@ func TestDistSenderMovesOnFromReplicaWithStaleLease(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(sendFn), }, @@ -1223,7 +1223,7 @@ func TestDistSenderIgnoresNLHEBasedOnOldRangeGeneration(t *testing.T) { AmbientCtx: log.AmbientContext{Tracer: tracer}, Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(sendFn), }, @@ -1332,7 +1332,7 @@ func TestDistSenderRetryOnTransportErrors(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(sendFn), }, @@ -1436,7 +1436,7 @@ func TestDistSenderDownNodeEvictLeaseholder(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(transport), }, @@ -1494,7 +1494,7 @@ func TestRetryOnDescriptorLookupError(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(stubRPCSendFn), }, @@ -1570,7 +1570,7 @@ func TestEvictOnFirstRangeGossip(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: SenderTransportFactory( tracing.NewTracer(), @@ -1716,7 +1716,7 @@ func TestEvictCacheOnError(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), }, @@ -1792,7 +1792,7 @@ func TestEvictCacheOnUnknownLeaseHolder(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), }, @@ -1892,7 +1892,7 @@ func TestRetryOnWrongReplicaError(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), }, @@ -1993,7 +1993,7 @@ func TestRetryOnWrongReplicaErrorWithSuggestion(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), }, @@ -2027,7 +2027,7 @@ func TestGetFirstRangeDescriptor(t *testing.T) { ds := NewDistSender(DistSenderConfig{ AmbientCtx: log.MakeTestingAmbientContext(stopper.Tracer()), NodeDescs: n.Nodes[0].Gossip, - RPCContext: n.RPCContext, + Stopper: stopper, NodeDialer: nodedialer.New(n.RPCContext, gossip.AddressResolver(n.Nodes[0].Gossip)), FirstRangeProvider: n.Nodes[0].Gossip, Settings: cluster.MakeTestingClusterSettings(), @@ -2117,7 +2117,7 @@ func TestSendRPCRetry(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), }, @@ -2240,7 +2240,7 @@ func TestDistSenderDescriptorUpdatesOnSuccessfulRPCs(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), }, @@ -2356,7 +2356,7 @@ func TestSendRPCRangeNotFoundError(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), }, @@ -2437,7 +2437,7 @@ func TestMultiRangeGapReverse(t *testing.T) { AmbientCtx: log.MakeTestingAmbientContext(stopper.Tracer()), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, RangeDescriptorDB: rdb, TestingKnobs: ClientTestingKnobs{ TransportFactory: SenderTransportFactory(tr, sender), @@ -2545,7 +2545,7 @@ func TestMultiRangeMergeStaleDescriptor(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), }, @@ -2594,7 +2594,7 @@ func TestRangeLookupOptionOnReverseScan(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(stubRPCSendFn), }, @@ -2634,7 +2634,7 @@ func TestClockUpdateOnResponse(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, RangeDescriptorDB: defaultMockRangeDescriptorDB, NodeDialer: nodedialer.New(rpcContext, gossip.AddressResolver(g)), Settings: cluster.MakeTestingClusterSettings(), @@ -2768,7 +2768,7 @@ func TestTruncateWithSpanAndDescriptor(t *testing.T) { AmbientCtx: log.MakeTestingAmbientContext(stopper.Tracer()), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(sendStub), }, @@ -2895,7 +2895,7 @@ func TestTruncateWithLocalSpanAndDescriptor(t *testing.T) { AmbientCtx: log.MakeTestingAmbientContext(stopper.Tracer()), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(sendStub), }, @@ -3086,7 +3086,7 @@ func TestMultiRangeWithEndTxn(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), }, @@ -3219,7 +3219,7 @@ func TestParallelCommitSplitFromQueryIntents(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), }, @@ -3348,7 +3348,7 @@ func TestParallelCommitsDetectIntentMissingCause(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), }, @@ -3434,7 +3434,7 @@ func TestCountRanges(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(stubRPCSendFn), }, @@ -3521,7 +3521,7 @@ func TestPProfLabelsAppliedToBatchRequestHeader(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), }, @@ -3575,7 +3575,7 @@ func TestGatewayNodeID(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), }, @@ -3786,7 +3786,7 @@ func TestMultipleErrorsMerged(t *testing.T) { AmbientCtx: log.MakeTestingAmbientContext(stopper.Tracer()), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), }, @@ -3924,7 +3924,7 @@ func TestErrorIndexAlignment(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), }, @@ -4004,7 +4004,7 @@ func TestCanSendToFollower(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), }, @@ -4055,7 +4055,6 @@ func TestCanSendToFollower(t *testing.T) { sentTo = roachpb.ReplicaDescriptor{} canSend = c.canSendToFollower ds := NewDistSender(cfg) - ds.logicalClusterID = &base.ClusterIDContainer{} // Make store 2 the leaseholder. lease := roachpb.Lease{ Replica: testUserRangeDescriptor3Replicas.InternalReplicas[1], @@ -4228,7 +4227,7 @@ func TestEvictMetaRange(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(testFn), }, @@ -4334,7 +4333,7 @@ func TestConnectionClass(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, TestingKnobs: ClientTestingKnobs{ TransportFactory: transportFactory, }, @@ -4491,7 +4490,7 @@ func TestEvictionTokenCoalesce(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, RPCRetryOptions: &retry.Options{ MaxRetries: 1, }, @@ -4688,7 +4687,7 @@ func TestErrorIndexOnRangeSplit(t *testing.T) { AmbientCtx: log.AmbientContext{Tracer: tr}, Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, RangeDescriptorDB: initialRDB, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(transportFn), @@ -4824,7 +4823,7 @@ func TestRequestSubdivisionAfterDescriptorChange(t *testing.T) { AmbientCtx: log.AmbientContext{Tracer: tr}, Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, RangeDescriptorDB: initialRDB, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(transportFn), @@ -4917,7 +4916,7 @@ func TestRequestSubdivisionAfterDescriptorChangeWithUnavailableReplicasTerminate Clock: clock, NodeDescs: g, RPCRetryOptions: rpcRetryOptions, - RPCContext: rpcContext, + Stopper: stopper, RangeDescriptorDB: splitRDB, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(transportFn), @@ -5084,7 +5083,7 @@ func TestDescriptorChangeAfterRequestSubdivision(t *testing.T) { AmbientCtx: log.AmbientContext{Tracer: tr}, Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, RangeDescriptorDB: initialRDB, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(transportFn), @@ -5122,7 +5121,6 @@ func TestSendToReplicasSkipsStaleReplicas(t *testing.T) { defer stopper.Stop(ctx) clock := hlc.NewClockForTesting(nil) - rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper) ns := &mockNodeStore{ nodes: []roachpb.NodeDescriptor{ @@ -5310,7 +5308,7 @@ func TestSendToReplicasSkipsStaleReplicas(t *testing.T) { AmbientCtx: log.MakeTestingAmbientContext(tr), Clock: clock, NodeDescs: ns, - RPCContext: rpcContext, + Stopper: stopper, RangeDescriptorDB: MockRangeDescriptorDB(func(key roachpb.RKey, reverse bool) ( []roachpb.RangeDescriptor, []roachpb.RangeDescriptor, error, ) { @@ -5356,8 +5354,6 @@ func TestDistSenderComputeNetworkCost(t *testing.T) { stopper := stop.NewStopper() defer stopper.Stop(ctx) - clock := hlc.NewClockForTesting(nil) - rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper) rddb := MockRangeDescriptorDB(func(key roachpb.RKey, reverse bool) ( []roachpb.RangeDescriptor, []roachpb.RangeDescriptor, error, ) { @@ -5605,7 +5601,7 @@ func TestDistSenderComputeNetworkCost(t *testing.T) { for _, isWrite := range []bool{true, false} { t.Run(fmt.Sprintf("isWrite=%t/%s", isWrite, tc.name), func(t *testing.T) { tc.cfg.AmbientCtx = log.MakeTestingAmbientContext(tracing.NewTracer()) - tc.cfg.RPCContext = rpcContext + tc.cfg.Stopper = stopper tc.cfg.RangeDescriptorDB = rddb tc.cfg.Settings = st ds := NewDistSender(*tc.cfg) @@ -5640,7 +5636,6 @@ func TestDistSenderDescEvictionAfterLeaseUpdate(t *testing.T) { // success. clock := hlc.NewClockForTesting(nil) - rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper) ns := &mockNodeStore{nodes: []roachpb.NodeDescriptor{ {NodeID: 1, Address: util.UnresolvedAddr{}}, {NodeID: 2, Address: util.UnresolvedAddr{}}, @@ -5699,7 +5694,7 @@ func TestDistSenderDescEvictionAfterLeaseUpdate(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: ns, - RPCContext: rpcContext, + Stopper: stopper, RangeDescriptorDB: MockRangeDescriptorDB(func(key roachpb.RKey, reverse bool) ( []roachpb.RangeDescriptor, []roachpb.RangeDescriptor, error, ) { @@ -5742,7 +5737,6 @@ func TestDistSenderRPCMetrics(t *testing.T) { defer stopper.Stop(ctx) clock := hlc.NewClockForTesting(nil) - rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper) ns := &mockNodeStore{nodes: []roachpb.NodeDescriptor{ {NodeID: 1, Address: util.UnresolvedAddr{}}, {NodeID: 2, Address: util.UnresolvedAddr{}}, @@ -5778,7 +5772,7 @@ func TestDistSenderRPCMetrics(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: ns, - RPCContext: rpcContext, + Stopper: stopper, RangeDescriptorDB: MockRangeDescriptorDB(func(key roachpb.RKey, reverse bool) ( []roachpb.RangeDescriptor, []roachpb.RangeDescriptor, error, ) { @@ -5913,7 +5907,6 @@ func TestDistSenderNLHEFromUninitializedReplicaDoesNotCauseUnboundedBackoff(t *t // TODO(arul): remove the speculative lease version of this test in 23.1. clock := hlc.NewClockForTesting(nil) - rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper) ns := &mockNodeStore{nodes: []roachpb.NodeDescriptor{ {NodeID: 1, Address: util.UnresolvedAddr{}}, {NodeID: 2, Address: util.UnresolvedAddr{}}, @@ -5983,7 +5976,7 @@ func TestDistSenderNLHEFromUninitializedReplicaDoesNotCauseUnboundedBackoff(t *t AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: ns, - RPCContext: rpcContext, + Stopper: stopper, RangeDescriptorDB: MockRangeDescriptorDB(func(key roachpb.RKey, reverse bool) ( []roachpb.RangeDescriptor, []roachpb.RangeDescriptor, error, ) { @@ -6044,7 +6037,6 @@ func TestOptimisticRangeDescriptorLookups(t *testing.T) { stopper := stop.NewStopper() manualC := timeutil.NewManualTime(timeutil.Unix(0, 1)) clock := hlc.NewClockForTesting(manualC) - rpcContext := rpc.NewInsecureTestingContext(context.Background(), clock, stopper) ns := &mockNodeStore{nodes: []roachpb.NodeDescriptor{ {NodeID: 1, Address: util.UnresolvedAddr{}}, @@ -6076,7 +6068,7 @@ func TestOptimisticRangeDescriptorLookups(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: ns, - RPCContext: rpcContext, + Stopper: stopper, FirstRangeProvider: fr, TestingKnobs: ClientTestingKnobs{ TransportFactory: adaptSimpleTransport(transportFn), diff --git a/pkg/kv/kvclient/kvcoord/local_test_cluster_util.go b/pkg/kv/kvclient/kvcoord/local_test_cluster_util.go index 9a3488e6dcc7..9eca0dd128fb 100644 --- a/pkg/kv/kvclient/kvcoord/local_test_cluster_util.go +++ b/pkg/kv/kvclient/kvcoord/local_test_cluster_util.go @@ -90,7 +90,7 @@ func NewDistSenderForLocalTestCluster( Settings: st, Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, RPCRetryOptions: &retryOpts, NodeDialer: nodedialer.New(rpcContext, gossip.AddressResolver(g)), FirstRangeProvider: g, diff --git a/pkg/kv/kvclient/kvcoord/range_iter_test.go b/pkg/kv/kvclient/kvcoord/range_iter_test.go index 9c94503ac3d2..798945ab94d7 100644 --- a/pkg/kv/kvclient/kvcoord/range_iter_test.go +++ b/pkg/kv/kvclient/kvcoord/range_iter_test.go @@ -63,7 +63,7 @@ func TestRangeIterForward(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, RangeDescriptorDB: alphaRangeDescriptorDB, Settings: cluster.MakeTestingClusterSettings(), }) @@ -99,7 +99,7 @@ func TestRangeIterSeekForward(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, RangeDescriptorDB: alphaRangeDescriptorDB, Settings: cluster.MakeTestingClusterSettings(), }) @@ -138,7 +138,7 @@ func TestRangeIterReverse(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, RangeDescriptorDB: alphaRangeDescriptorDB, Settings: cluster.MakeTestingClusterSettings(), }) @@ -174,7 +174,7 @@ func TestRangeIterSeekReverse(t *testing.T) { AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, RangeDescriptorDB: alphaRangeDescriptorDB, Settings: cluster.MakeTestingClusterSettings(), }) diff --git a/pkg/kv/kvclient/kvcoord/send_test.go b/pkg/kv/kvclient/kvcoord/send_test.go index 48d7849a7d43..37a1f2a6542a 100644 --- a/pkg/kv/kvclient/kvcoord/send_test.go +++ b/pkg/kv/kvclient/kvcoord/send_test.go @@ -372,7 +372,7 @@ func sendBatch( AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), Settings: cluster.MakeTestingClusterSettings(), NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, NodeDialer: nodeDialer, FirstRangeProvider: g, TestingKnobs: ClientTestingKnobs{ diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_client_test.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_client_test.go index f474d01bc7c0..1bfc4de739d2 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_client_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_client_test.go @@ -109,7 +109,7 @@ func TestTxnPipelinerCondenseLockSpans(t *testing.T) { AmbientCtx: ambient, Clock: s.Clock, NodeDescs: s.Gossip, - RPCContext: s.Cfg.RPCContext, + Stopper: s.Stopper(), TestingKnobs: kvcoord.ClientTestingKnobs{ TransportFactory: kvcoord.TestingAdaptSimpleTransport(sendFn), }, diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go index ef5376fc2625..fae579ff1523 100644 --- a/pkg/kv/kvserver/store_test.go +++ b/pkg/kv/kvserver/store_test.go @@ -256,7 +256,7 @@ func createTestStoreWithoutStart( Settings: cfg.Settings, Clock: cfg.Clock, NodeDescs: mockNodeStore{desc: nodeDesc}, - RPCContext: rpcContext, + Stopper: stopper, RPCRetryOptions: &retry.Options{}, NodeDialer: cfg.NodeDialer, FirstRangeProvider: rangeProv, diff --git a/pkg/server/server.go b/pkg/server/server.go index 4ee3fbb3a19b..d8eda103c525 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -452,7 +452,8 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf Settings: st, Clock: clock, NodeDescs: g, - RPCContext: rpcContext, + Stopper: stopper, + LatencyFunc: rpcContext.RemoteClocks.Latency, RPCRetryOptions: &retryOpts, NodeDialer: kvNodeDialer, FirstRangeProvider: g, diff --git a/pkg/server/tenant.go b/pkg/server/tenant.go index b8f7bd5905f2..6df184b7f6c5 100644 --- a/pkg/server/tenant.go +++ b/pkg/server/tenant.go @@ -1191,7 +1191,8 @@ func makeTenantSQLServerArgs( NodeDescs: tenantConnect, NodeIDGetter: deps.nodeIDGetter, RPCRetryOptions: &rpcRetryOptions, - RPCContext: rpcContext, + Stopper: stopper, + LatencyFunc: rpcContext.RemoteClocks.Latency, NodeDialer: kvNodeDialer, RangeDescriptorDB: tenantConnect, Locality: baseCfg.Locality,