Skip to content

Commit

Permalink
kvclient: remove RPCContext from DistSender
Browse files Browse the repository at this point in the history
RPCContext is unnecessary in DistSender. The only two things that are
needed are the LatencyFunc, which is extracted from the RemoteClocks,
and the Stopper which should have been passed in directly. This change
makes DistSender a tiny bit more easily testable.

Epic: none

Release note: None
  • Loading branch information
andrewbaptist committed Dec 20, 2023
1 parent 2c4c877 commit de430df
Show file tree
Hide file tree
Showing 12 changed files with 84 additions and 95 deletions.
39 changes: 19 additions & 20 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,8 @@ type FirstRangeProvider interface {
type DistSender struct {
log.AmbientContext

st *cluster.Settings
st *cluster.Settings
stopper *stop.Stopper
// clock is used to set time for some calls. E.g. read-only ops
// which span ranges and don't require read consistency.
clock *hlc.Clock
Expand All @@ -529,15 +530,10 @@ type DistSender struct {
// This is not required if a RangeDescriptorDB is supplied.
firstRangeProvider FirstRangeProvider
transportFactory TransportFactory
rpcContext *rpc.Context
// nodeDialer allows RPC calls from the SQL layer to the KV layer.
nodeDialer *nodedialer.Dialer
rpcRetryOptions retry.Options
asyncSenderSem *quotapool.IntPool
// clusterID is the logical cluster ID used to verify access to enterprise features.
// It is copied out of the rpcContext at construction time and used in
// testing.
logicalClusterID *base.ClusterIDContainer

// batchInterceptor is set for tenants; when set, information about all
// BatchRequests and BatchResponses are passed through this interceptor, which
Expand Down Expand Up @@ -588,14 +584,14 @@ type DistSenderConfig struct {
AmbientCtx log.AmbientContext

Settings *cluster.Settings
Stopper *stop.Stopper
Clock *hlc.Clock
NodeDescs NodeDescStore
// NodeIDGetter, if set, provides non-gossip based implementation for
// obtaining the local KV node ID. The DistSender uses the node ID to
// preferentially route requests to a local replica (if one exists).
NodeIDGetter func() roachpb.NodeID
RPCRetryOptions *retry.Options
RPCContext *rpc.Context
// NodeDialer is the dialer from the SQL layer to the KV layer.
NodeDialer *nodedialer.Dialer

Expand Down Expand Up @@ -628,6 +624,8 @@ type DistSenderConfig struct {
TestingKnobs ClientTestingKnobs

HealthFunc HealthFunc

LatencyFunc LatencyFunc
}

// NewDistSender returns a batch.Sender instance which connects to the
Expand All @@ -648,13 +646,15 @@ func NewDistSender(cfg DistSenderConfig) *DistSender {
}
ds := &DistSender{
st: cfg.Settings,
stopper: cfg.Stopper,
clock: cfg.Clock,
nodeDescs: cfg.NodeDescs,
nodeIDGetter: nodeIDGetter,
metrics: makeDistSenderMetrics(),
kvInterceptor: cfg.KVInterceptor,
locality: cfg.Locality,
healthFunc: cfg.HealthFunc,
latencyFunc: cfg.LatencyFunc,
}
if ds.st == nil {
ds.st = cluster.MakeTestingClusterSettings()
Expand All @@ -679,7 +679,7 @@ func NewDistSender(cfg DistSenderConfig) *DistSender {
getRangeDescCacheSize := func() int64 {
return rangeDescriptorCacheSize.Get(&ds.st.SV)
}
ds.rangeCache = rangecache.NewRangeCache(ds.st, rdb, getRangeDescCacheSize, cfg.RPCContext.Stopper)
ds.rangeCache = rangecache.NewRangeCache(ds.st, rdb, getRangeDescCacheSize, cfg.Stopper)
if tf := cfg.TestingKnobs.TransportFactory; tf != nil {
ds.transportFactory = tf
} else {
Expand All @@ -693,21 +693,16 @@ func NewDistSender(cfg DistSenderConfig) *DistSender {
if cfg.RPCRetryOptions != nil {
ds.rpcRetryOptions = *cfg.RPCRetryOptions
}
if cfg.RPCContext == nil {
panic("no RPCContext set in DistSenderConfig")
}
ds.rpcContext = cfg.RPCContext
ds.nodeDialer = cfg.NodeDialer
if ds.rpcRetryOptions.Closer == nil {
ds.rpcRetryOptions.Closer = ds.rpcContext.Stopper.ShouldQuiesce()
ds.rpcRetryOptions.Closer = cfg.Stopper.ShouldQuiesce()
}
ds.logicalClusterID = cfg.RPCContext.LogicalClusterID
ds.asyncSenderSem = quotapool.NewIntPool("DistSender async concurrency",
uint64(senderConcurrencyLimit.Get(&ds.st.SV)))
senderConcurrencyLimit.SetOnChange(&ds.st.SV, func(ctx context.Context) {
ds.asyncSenderSem.UpdateCapacity(uint64(senderConcurrencyLimit.Get(&ds.st.SV)))
})
ds.rpcContext.Stopper.AddCloser(ds.asyncSenderSem.Closer("stopper"))
cfg.Stopper.AddCloser(ds.asyncSenderSem.Closer("stopper"))

if ds.firstRangeProvider != nil {
ctx := ds.AnnotateCtx(context.Background())
Expand All @@ -722,8 +717,12 @@ func NewDistSender(cfg DistSenderConfig) *DistSender {

if cfg.TestingKnobs.LatencyFunc != nil {
ds.latencyFunc = cfg.TestingKnobs.LatencyFunc
} else {
ds.latencyFunc = ds.rpcContext.RemoteClocks.Latency
}
// Some tests don't set the latencyFunc.
if ds.latencyFunc == nil {
ds.latencyFunc = func(roachpb.NodeID) (time.Duration, bool) {
return time.Millisecond, true
}
}

if cfg.TestingKnobs.OnRangeSpanningNonTxnalBatch != nil {
Expand Down Expand Up @@ -1233,9 +1232,9 @@ func (ds *DistSender) divideAndSendParallelCommit(
qiBatchIdx := batchIdx + 1
qiResponseCh := make(chan response, 1)

runTask := ds.rpcContext.Stopper.RunAsyncTask
runTask := ds.stopper.RunAsyncTask
if ds.disableParallelBatches {
runTask = ds.rpcContext.Stopper.RunTask
runTask = ds.stopper.RunTask
}
if err := runTask(ctx, "kv.DistSender: sending pre-commit query intents", func(ctx context.Context) {
// Map response index to the original un-swapped batch index.
Expand Down Expand Up @@ -1776,7 +1775,7 @@ func (ds *DistSender) sendPartialBatchAsync(
responseCh chan response,
positions []int,
) bool {
if err := ds.rpcContext.Stopper.RunAsyncTaskEx(
if err := ds.stopper.RunAsyncTaskEx(
ctx,
stop.TaskOpts{
TaskName: "kv.DistSender: sending partial batch",
Expand Down
6 changes: 1 addition & 5 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -756,15 +756,11 @@ func (a *activeRangeFeed) acquireCatchupScanQuota(
func newTransportForRange(
ctx context.Context, desc *roachpb.RangeDescriptor, ds *DistSender,
) (Transport, error) {
var latencyFn LatencyFunc
if ds.rpcContext != nil {
latencyFn = ds.rpcContext.RemoteClocks.Latency
}
replicas, err := NewReplicaSlice(ctx, ds.nodeDescs, desc, nil, AllExtantReplicas)
if err != nil {
return nil, err
}
replicas.OptimizeReplicaOrder(ds.st, ds.nodeIDGetter(), ds.healthFunc, latencyFn, ds.locality)
replicas.OptimizeReplicaOrder(ds.st, ds.nodeIDGetter(), ds.healthFunc, ds.latencyFunc, ds.locality)
opts := SendOptions{class: connectionClass(&ds.st.SV)}
return ds.transportFactory(opts, ds.nodeDialer, replicas)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func TestDistSenderRangeFeedRetryOnTransportErrors(t *testing.T) {
Clock: clock,
NodeDescs: g,
RPCRetryOptions: &retry.Options{MaxRetries: 10},
RPCContext: rpcContext,
Stopper: stopper,
TestingKnobs: ClientTestingKnobs{
TransportFactory: func(SendOptions, *nodedialer.Dialer, ReplicaSlice) (Transport, error) {
return transport, nil
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvclient/kvcoord/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func TestRangeLookupWithOpenTransaction(t *testing.T) {
Settings: cluster.MakeTestingClusterSettings(),
Clock: s.Clock(),
NodeDescs: gs,
RPCContext: s.RPCContext(),
Stopper: s.Stopper(),
NodeDialer: nodedialer.New(s.RPCContext(), gossip.AddressResolver(gs)),
FirstRangeProvider: gs,
})
Expand Down Expand Up @@ -1128,7 +1128,7 @@ func TestMultiRangeScanReverseScanInconsistent(t *testing.T) {
Settings: s.ClusterSettings(),
Clock: clock,
NodeDescs: gs,
RPCContext: s.RPCContext(),
Stopper: s.Stopper(),
NodeDialer: nodedialer.New(s.RPCContext(), gossip.AddressResolver(gs)),
FirstRangeProvider: gs,
})
Expand Down Expand Up @@ -1656,7 +1656,7 @@ func TestBatchPutWithConcurrentSplit(t *testing.T) {
AmbientCtx: s.AmbientCtx(),
Clock: s.Clock(),
NodeDescs: gs,
RPCContext: s.RPCContext(),
Stopper: s.Stopper(),
NodeDialer: nodedialer.New(s.RPCContext(), gossip.AddressResolver(gs)),
Settings: cluster.MakeTestingClusterSettings(),
FirstRangeProvider: gs,
Expand Down
Loading

0 comments on commit de430df

Please sign in to comment.