Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvclient: remove RPCContext from DistSender #116644

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 19 additions & 20 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
@@ -509,7 +509,8 @@ type FirstRangeProvider interface {
type DistSender struct {
log.AmbientContext

st *cluster.Settings
st *cluster.Settings
stopper *stop.Stopper
// clock is used to set time for some calls. E.g. read-only ops
// which span ranges and don't require read consistency.
clock *hlc.Clock
@@ -529,15 +530,10 @@ type DistSender struct {
// This is not required if a RangeDescriptorDB is supplied.
firstRangeProvider FirstRangeProvider
transportFactory TransportFactory
rpcContext *rpc.Context
// nodeDialer allows RPC calls from the SQL layer to the KV layer.
nodeDialer *nodedialer.Dialer
rpcRetryOptions retry.Options
asyncSenderSem *quotapool.IntPool
// clusterID is the logical cluster ID used to verify access to enterprise features.
// It is copied out of the rpcContext at construction time and used in
// testing.
logicalClusterID *base.ClusterIDContainer

// batchInterceptor is set for tenants; when set, information about all
// BatchRequests and BatchResponses are passed through this interceptor, which
@@ -588,14 +584,14 @@ type DistSenderConfig struct {
AmbientCtx log.AmbientContext

Settings *cluster.Settings
Stopper *stop.Stopper
Clock *hlc.Clock
NodeDescs NodeDescStore
// NodeIDGetter, if set, provides non-gossip based implementation for
// obtaining the local KV node ID. The DistSender uses the node ID to
// preferentially route requests to a local replica (if one exists).
NodeIDGetter func() roachpb.NodeID
RPCRetryOptions *retry.Options
RPCContext *rpc.Context
// NodeDialer is the dialer from the SQL layer to the KV layer.
NodeDialer *nodedialer.Dialer

@@ -628,6 +624,8 @@ type DistSenderConfig struct {
TestingKnobs ClientTestingKnobs

HealthFunc HealthFunc

LatencyFunc LatencyFunc
}

// NewDistSender returns a batch.Sender instance which connects to the
@@ -648,13 +646,15 @@ func NewDistSender(cfg DistSenderConfig) *DistSender {
}
ds := &DistSender{
st: cfg.Settings,
stopper: cfg.Stopper,
clock: cfg.Clock,
nodeDescs: cfg.NodeDescs,
nodeIDGetter: nodeIDGetter,
metrics: makeDistSenderMetrics(),
kvInterceptor: cfg.KVInterceptor,
locality: cfg.Locality,
healthFunc: cfg.HealthFunc,
latencyFunc: cfg.LatencyFunc,
}
if ds.st == nil {
ds.st = cluster.MakeTestingClusterSettings()
@@ -679,7 +679,7 @@ func NewDistSender(cfg DistSenderConfig) *DistSender {
getRangeDescCacheSize := func() int64 {
return rangeDescriptorCacheSize.Get(&ds.st.SV)
}
ds.rangeCache = rangecache.NewRangeCache(ds.st, rdb, getRangeDescCacheSize, cfg.RPCContext.Stopper)
ds.rangeCache = rangecache.NewRangeCache(ds.st, rdb, getRangeDescCacheSize, cfg.Stopper)
if tf := cfg.TestingKnobs.TransportFactory; tf != nil {
ds.transportFactory = tf
} else {
@@ -693,21 +693,16 @@ func NewDistSender(cfg DistSenderConfig) *DistSender {
if cfg.RPCRetryOptions != nil {
ds.rpcRetryOptions = *cfg.RPCRetryOptions
}
if cfg.RPCContext == nil {
panic("no RPCContext set in DistSenderConfig")
}
ds.rpcContext = cfg.RPCContext
ds.nodeDialer = cfg.NodeDialer
if ds.rpcRetryOptions.Closer == nil {
ds.rpcRetryOptions.Closer = ds.rpcContext.Stopper.ShouldQuiesce()
ds.rpcRetryOptions.Closer = cfg.Stopper.ShouldQuiesce()
}
ds.logicalClusterID = cfg.RPCContext.LogicalClusterID
ds.asyncSenderSem = quotapool.NewIntPool("DistSender async concurrency",
uint64(senderConcurrencyLimit.Get(&ds.st.SV)))
senderConcurrencyLimit.SetOnChange(&ds.st.SV, func(ctx context.Context) {
ds.asyncSenderSem.UpdateCapacity(uint64(senderConcurrencyLimit.Get(&ds.st.SV)))
})
ds.rpcContext.Stopper.AddCloser(ds.asyncSenderSem.Closer("stopper"))
cfg.Stopper.AddCloser(ds.asyncSenderSem.Closer("stopper"))

if ds.firstRangeProvider != nil {
ctx := ds.AnnotateCtx(context.Background())
@@ -722,8 +717,12 @@ func NewDistSender(cfg DistSenderConfig) *DistSender {

if cfg.TestingKnobs.LatencyFunc != nil {
ds.latencyFunc = cfg.TestingKnobs.LatencyFunc
} else {
ds.latencyFunc = ds.rpcContext.RemoteClocks.Latency
}
// Some tests don't set the latencyFunc.
if ds.latencyFunc == nil {
ds.latencyFunc = func(roachpb.NodeID) (time.Duration, bool) {
return time.Millisecond, true
}
}

if cfg.TestingKnobs.OnRangeSpanningNonTxnalBatch != nil {
@@ -1233,9 +1232,9 @@ func (ds *DistSender) divideAndSendParallelCommit(
qiBatchIdx := batchIdx + 1
qiResponseCh := make(chan response, 1)

runTask := ds.rpcContext.Stopper.RunAsyncTask
runTask := ds.stopper.RunAsyncTask
if ds.disableParallelBatches {
runTask = ds.rpcContext.Stopper.RunTask
runTask = ds.stopper.RunTask
}
if err := runTask(ctx, "kv.DistSender: sending pre-commit query intents", func(ctx context.Context) {
// Map response index to the original un-swapped batch index.
@@ -1776,7 +1775,7 @@ func (ds *DistSender) sendPartialBatchAsync(
responseCh chan response,
positions []int,
) bool {
if err := ds.rpcContext.Stopper.RunAsyncTaskEx(
if err := ds.stopper.RunAsyncTaskEx(
ctx,
stop.TaskOpts{
TaskName: "kv.DistSender: sending partial batch",
6 changes: 1 addition & 5 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
@@ -756,15 +756,11 @@ func (a *activeRangeFeed) acquireCatchupScanQuota(
func newTransportForRange(
ctx context.Context, desc *roachpb.RangeDescriptor, ds *DistSender,
) (Transport, error) {
var latencyFn LatencyFunc
if ds.rpcContext != nil {
latencyFn = ds.rpcContext.RemoteClocks.Latency
}
replicas, err := NewReplicaSlice(ctx, ds.nodeDescs, desc, nil, AllExtantReplicas)
if err != nil {
return nil, err
}
replicas.OptimizeReplicaOrder(ds.st, ds.nodeIDGetter(), ds.healthFunc, latencyFn, ds.locality)
replicas.OptimizeReplicaOrder(ds.st, ds.nodeIDGetter(), ds.healthFunc, ds.latencyFunc, ds.locality)
opts := SendOptions{class: connectionClass(&ds.st.SV)}
return ds.transportFactory(opts, ds.nodeDialer, replicas)
}
Original file line number Diff line number Diff line change
@@ -155,7 +155,7 @@ func TestDistSenderRangeFeedRetryOnTransportErrors(t *testing.T) {
Clock: clock,
NodeDescs: g,
RPCRetryOptions: &retry.Options{MaxRetries: 10},
RPCContext: rpcContext,
Stopper: stopper,
TestingKnobs: ClientTestingKnobs{
TransportFactory: func(SendOptions, *nodedialer.Dialer, ReplicaSlice) (Transport, error) {
return transport, nil
6 changes: 3 additions & 3 deletions pkg/kv/kvclient/kvcoord/dist_sender_server_test.go
Original file line number Diff line number Diff line change
@@ -92,7 +92,7 @@ func TestRangeLookupWithOpenTransaction(t *testing.T) {
Settings: cluster.MakeTestingClusterSettings(),
Clock: s.Clock(),
NodeDescs: gs,
RPCContext: s.RPCContext(),
Stopper: s.Stopper(),
NodeDialer: nodedialer.New(s.RPCContext(), gossip.AddressResolver(gs)),
FirstRangeProvider: gs,
})
@@ -1128,7 +1128,7 @@ func TestMultiRangeScanReverseScanInconsistent(t *testing.T) {
Settings: s.ClusterSettings(),
Clock: clock,
NodeDescs: gs,
RPCContext: s.RPCContext(),
Stopper: s.Stopper(),
NodeDialer: nodedialer.New(s.RPCContext(), gossip.AddressResolver(gs)),
FirstRangeProvider: gs,
})
@@ -1656,7 +1656,7 @@ func TestBatchPutWithConcurrentSplit(t *testing.T) {
AmbientCtx: s.AmbientCtx(),
Clock: s.Clock(),
NodeDescs: gs,
RPCContext: s.RPCContext(),
Stopper: s.Stopper(),
NodeDialer: nodedialer.New(s.RPCContext(), gossip.AddressResolver(gs)),
Settings: cluster.MakeTestingClusterSettings(),
FirstRangeProvider: gs,
Loading