-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
dist_sender.go
2667 lines (2477 loc) · 104 KB
/
dist_sender.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright 2014 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package kvcoord
import (
"context"
"fmt"
"runtime"
"runtime/pprof"
"strings"
"sync"
"sync/atomic"
"time"
"unsafe"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/multitenant"
"github.com/cockroachdb/cockroach/pkg/multitenant/tenantcostmodel"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/startup"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
)
var (
metaDistSenderBatchCount = metric.Metadata{
Name: "distsender.batches",
Help: "Number of batches processed",
Measurement: "Batches",
Unit: metric.Unit_COUNT,
}
metaDistSenderPartialBatchCount = metric.Metadata{
Name: "distsender.batches.partial",
Help: "Number of partial batches processed after being divided on range boundaries",
Measurement: "Partial Batches",
Unit: metric.Unit_COUNT,
}
metaDistSenderAsyncSentCount = metric.Metadata{
Name: "distsender.batches.async.sent",
Help: "Number of partial batches sent asynchronously",
Measurement: "Partial Batches",
Unit: metric.Unit_COUNT,
}
metaDistSenderAsyncThrottledCount = metric.Metadata{
Name: "distsender.batches.async.throttled",
Help: "Number of partial batches not sent asynchronously due to throttling",
Measurement: "Partial Batches",
Unit: metric.Unit_COUNT,
}
metaTransportSentCount = metric.Metadata{
Name: "distsender.rpc.sent",
Help: "Number of replica-addressed RPCs sent",
Measurement: "RPCs",
Unit: metric.Unit_COUNT,
}
metaTransportLocalSentCount = metric.Metadata{
Name: "distsender.rpc.sent.local",
Help: "Number of replica-addressed RPCs sent through the local-server optimization",
Measurement: "RPCs",
Unit: metric.Unit_COUNT,
}
metaTransportSenderNextReplicaErrCount = metric.Metadata{
Name: "distsender.rpc.sent.nextreplicaerror",
Help: "Number of replica-addressed RPCs sent due to per-replica errors",
Measurement: "RPCs",
Unit: metric.Unit_COUNT,
}
metaDistSenderNotLeaseHolderErrCount = metric.Metadata{
Name: "distsender.errors.notleaseholder",
Help: "Number of NotLeaseHolderErrors encountered from replica-addressed RPCs",
Measurement: "Errors",
Unit: metric.Unit_COUNT,
}
metaDistSenderInLeaseTransferBackoffsCount = metric.Metadata{
Name: "distsender.errors.inleasetransferbackoffs",
Help: "Number of times backed off due to NotLeaseHolderErrors during lease transfer",
Measurement: "Errors",
Unit: metric.Unit_COUNT,
}
metaDistSenderRangeLookups = metric.Metadata{
Name: "distsender.rangelookups",
Help: "Number of range lookups",
Measurement: "Range Lookups",
Unit: metric.Unit_COUNT,
}
metaDistSenderSlowRPCs = metric.Metadata{
Name: "requests.slow.distsender",
Help: `Number of replica-bound RPCs currently stuck or retrying for a long time.
Note that this is not a good signal for KV health. The remote side of the
RPCs tracked here may experience contention, so an end user can easily
cause values for this metric to be emitted by leaving a transaction open
for a long time and contending with it using a second transaction.`,
Measurement: "Requests",
Unit: metric.Unit_COUNT,
}
metaDistSenderMethodCountTmpl = metric.Metadata{
Name: "distsender.rpc.%s.sent",
Help: `Number of %s requests processed.
This counts the requests in batches handed to DistSender, not the RPCs
sent to individual Ranges as a result.`,
Measurement: "RPCs",
Unit: metric.Unit_COUNT,
}
metaDistSenderErrCountTmpl = metric.Metadata{
Name: "distsender.rpc.err.%s",
Help: `Number of %s errors received replica-bound RPCs
This counts how often error of the specified type was received back from replicas
as part of executing possibly range-spanning requests. Failures to reach the target
replica will be accounted for as 'roachpb.CommunicationErrType' and unclassified
errors as 'roachpb.InternalErrType'.
`,
Measurement: "Errors",
Unit: metric.Unit_COUNT,
}
metaDistSenderRangefeedTotalRanges = metric.Metadata{
Name: "distsender.rangefeed.total_ranges",
Help: `Number of ranges executing rangefeed
This counts the number of ranges with an active rangefeed.
`,
Measurement: "Ranges",
Unit: metric.Unit_COUNT,
}
metaDistSenderRangefeedCatchupRanges = metric.Metadata{
Name: "distsender.rangefeed.catchup_ranges",
Help: `Number of ranges in catchup mode
This counts the number of ranges with an active rangefeed that are performing catchup scan.
`,
Measurement: "Ranges",
Unit: metric.Unit_COUNT,
}
metaDistSenderRangefeedErrorCatchupRanges = metric.Metadata{
Name: "distsender.rangefeed.error_catchup_ranges",
Help: `Number of ranges in catchup mode which experienced an error`,
Measurement: "Ranges",
Unit: metric.Unit_COUNT,
}
metaDistSenderRangefeedRestartRanges = metric.Metadata{
Name: "distsender.rangefeed.restart_ranges",
Help: `Number of ranges that were restarted due to transient errors`,
Measurement: "Ranges",
Unit: metric.Unit_COUNT,
}
metaDistSenderRangefeedRestartStuck = metric.Metadata{
Name: "distsender.rangefeed.restart_stuck",
Help: `Number of times a rangefeed was restarted due to not receiving ` +
`timely updates (kv.rangefeed.range_stuck_threshold cluster setting)`,
Measurement: "Count",
Unit: metric.Unit_COUNT,
}
)
// CanSendToFollower is used by the DistSender to determine if it needs to look
// up the current lease holder for a request. It is used by the
// followerreadsccl code to inject logic to check if follower reads are enabled.
// By default, without CCL code, this function returns false.
var CanSendToFollower = func(
_ uuid.UUID,
_ *cluster.Settings,
_ *hlc.Clock,
_ roachpb.RangeClosedTimestampPolicy,
_ *kvpb.BatchRequest,
) bool {
return false
}
const (
// The default limit for asynchronous senders.
defaultSenderConcurrency = 1024
// RangeLookupPrefetchCount is the maximum number of range descriptors to prefetch
// during range lookups.
RangeLookupPrefetchCount = 8
// The maximum number of times a replica is retried when it repeatedly returns
// stale lease info.
sameReplicaRetryLimit = 10
)
var rangeDescriptorCacheSize = settings.RegisterIntSetting(
settings.TenantWritable,
"kv.range_descriptor_cache.size",
"maximum number of entries in the range descriptor cache",
1e6,
)
// senderConcurrencyLimit controls the maximum number of asynchronous send
// requests.
var senderConcurrencyLimit = settings.RegisterIntSetting(
settings.TenantWritable,
"kv.dist_sender.concurrency_limit",
"maximum number of asynchronous send requests",
max(defaultSenderConcurrency, int64(64*runtime.GOMAXPROCS(0))),
settings.NonNegativeInt,
)
// FollowerReadsUnhealthy controls whether we will send follower reads to nodes
// that are not considered healthy. By default, we will sort these nodes behind
// healthy nodes.
var FollowerReadsUnhealthy = settings.RegisterBoolSetting(
settings.TenantWritable,
"kv.dist_sender.follower_reads_unhealthy.enabled",
"send follower reads to unhealthy nodes",
true,
)
func max(a, b int64) int64 {
if a > b {
return a
}
return b
}
// DistSenderMetrics is the set of metrics for a given distributed sender.
type DistSenderMetrics struct {
BatchCount *metric.Counter
PartialBatchCount *metric.Counter
AsyncSentCount *metric.Counter
AsyncThrottledCount *metric.Counter
SentCount *metric.Counter
LocalSentCount *metric.Counter
NextReplicaErrCount *metric.Counter
NotLeaseHolderErrCount *metric.Counter
InLeaseTransferBackoffs *metric.Counter
RangeLookups *metric.Counter
SlowRPCs *metric.Gauge
MethodCounts [kvpb.NumMethods]*metric.Counter
ErrCounts [kvpb.NumErrors]*metric.Counter
DistSenderRangeFeedMetrics
}
// DistSenderRangeFeedMetrics is a set of rangefeed specific metrics.
type DistSenderRangeFeedMetrics struct {
RangefeedRanges *metric.Gauge
RangefeedCatchupRanges *metric.Gauge
RangefeedErrorCatchup *metric.Counter
RangefeedRestartRanges *metric.Counter
RangefeedRestartStuck *metric.Counter
}
func makeDistSenderMetrics() DistSenderMetrics {
m := DistSenderMetrics{
BatchCount: metric.NewCounter(metaDistSenderBatchCount),
PartialBatchCount: metric.NewCounter(metaDistSenderPartialBatchCount),
AsyncSentCount: metric.NewCounter(metaDistSenderAsyncSentCount),
AsyncThrottledCount: metric.NewCounter(metaDistSenderAsyncThrottledCount),
SentCount: metric.NewCounter(metaTransportSentCount),
LocalSentCount: metric.NewCounter(metaTransportLocalSentCount),
NextReplicaErrCount: metric.NewCounter(metaTransportSenderNextReplicaErrCount),
NotLeaseHolderErrCount: metric.NewCounter(metaDistSenderNotLeaseHolderErrCount),
InLeaseTransferBackoffs: metric.NewCounter(metaDistSenderInLeaseTransferBackoffsCount),
RangeLookups: metric.NewCounter(metaDistSenderRangeLookups),
SlowRPCs: metric.NewGauge(metaDistSenderSlowRPCs),
DistSenderRangeFeedMetrics: makeDistSenderRangeFeedMetrics(),
}
for i := range m.MethodCounts {
method := kvpb.Method(i).String()
meta := metaDistSenderMethodCountTmpl
meta.Name = fmt.Sprintf(meta.Name, strings.ToLower(method))
meta.Help = fmt.Sprintf(meta.Help, method)
m.MethodCounts[i] = metric.NewCounter(meta)
}
for i := range m.ErrCounts {
errType := kvpb.ErrorDetailType(i).String()
meta := metaDistSenderErrCountTmpl
meta.Name = fmt.Sprintf(meta.Name, strings.ToLower(errType))
meta.Help = fmt.Sprintf(meta.Help, errType)
m.ErrCounts[i] = metric.NewCounter(meta)
}
return m
}
func makeDistSenderRangeFeedMetrics() DistSenderRangeFeedMetrics {
return DistSenderRangeFeedMetrics{
RangefeedRanges: metric.NewGauge(metaDistSenderRangefeedTotalRanges),
RangefeedCatchupRanges: metric.NewGauge(metaDistSenderRangefeedCatchupRanges),
RangefeedErrorCatchup: metric.NewCounter(metaDistSenderRangefeedErrorCatchupRanges),
RangefeedRestartRanges: metric.NewCounter(metaDistSenderRangefeedRestartRanges),
RangefeedRestartStuck: metric.NewCounter(metaDistSenderRangefeedRestartStuck),
}
}
// MetricStruct implements metrics.Struct
func (m DistSenderRangeFeedMetrics) MetricStruct() {}
// FirstRangeProvider is capable of providing DistSender with the descriptor of
// the first range in the cluster and notifying the DistSender when the first
// range in the cluster has changed.
type FirstRangeProvider interface {
// GetFirstRangeDescriptor returns the RangeDescriptor for the first range
// in the cluster.
GetFirstRangeDescriptor() (*roachpb.RangeDescriptor, error)
// OnFirstRangeChanged calls the provided callback when the RangeDescriptor
// for the first range has changed.
OnFirstRangeChanged(func(*roachpb.RangeDescriptor))
}
// A DistSender provides methods to access Cockroach's monolithic,
// distributed key value store. Each method invocation triggers a
// lookup or lookups to find replica metadata for implicated key
// ranges. RPCs are sent to one or more of the replicas to satisfy
// the method invocation.
type DistSender struct {
log.AmbientContext
st *cluster.Settings
// nodeDescriptor, if set, holds the descriptor of the node the
// DistSender lives on. It should be accessed via getNodeDescriptor(),
// which tries to obtain the value from the Gossip network if the
// descriptor is unknown.
nodeDescriptor unsafe.Pointer
// clock is used to set time for some calls. E.g. read-only ops
// which span ranges and don't require read consistency.
clock *hlc.Clock
// nodeDescs provides information on the KV nodes that DistSender may
// consider routing requests to.
nodeDescs NodeDescStore
// metrics stored DistSender-related metrics.
metrics DistSenderMetrics
// rangeCache caches replica metadata for key ranges.
rangeCache *rangecache.RangeCache
// firstRangeProvider provides the range descriptor for range one.
// This is not required if a RangeDescriptorDB is supplied.
firstRangeProvider FirstRangeProvider
transportFactory TransportFactory
rpcContext *rpc.Context
// nodeDialer allows RPC calls from the SQL layer to the KV layer.
nodeDialer *nodedialer.Dialer
rpcRetryOptions retry.Options
asyncSenderSem *quotapool.IntPool
// clusterID is the logical cluster ID used to verify access to enterprise features.
// It is copied out of the rpcContext at construction time and used in
// testing.
logicalClusterID *base.ClusterIDContainer
// batchInterceptor is set for tenants; when set, information about all
// BatchRequests and BatchResponses are passed through this interceptor, which
// can potentially throttle requests.
kvInterceptor multitenant.TenantSideKVInterceptor
// disableFirstRangeUpdates disables updates of the first range via
// gossip. Used by tests which want finer control of the contents of the
// range cache.
disableFirstRangeUpdates int32
// disableParallelBatches instructs DistSender to never parallelize
// the transmission of partial batch requests across ranges.
disableParallelBatches bool
// LatencyFunc is used to estimate the latency to other nodes.
latencyFunc LatencyFunc
// HealthFunc returns true if the node is alive and not draining.
healthFunc atomic.Pointer[HealthFunc]
onRangeSpanningNonTxnalBatch func(ba *kvpb.BatchRequest) *kvpb.Error
// locality is the description of the topography of the server on which the
// DistSender is running. It is used to estimate the latency to other nodes
// in the absence of a latency function.
locality roachpb.Locality
// If set, the DistSender will try the replicas in the order they appear in
// the descriptor, instead of trying to reorder them by latency. The knob
// only applies to requests sent with the LEASEHOLDER routing policy.
dontReorderReplicas bool
// dontConsiderConnHealth, if set, makes the GRPCTransport not take into
// consideration the connection health when deciding the ordering for
// replicas. When not set, replicas on nodes with unhealthy connections are
// deprioritized.
dontConsiderConnHealth bool
// Currently executing range feeds.
activeRangeFeeds sync.Map // // map[*rangeFeedRegistry]nil
}
var _ kv.Sender = &DistSender{}
// DistSenderConfig holds configuration and auxiliary objects that can be passed
// to NewDistSender.
type DistSenderConfig struct {
AmbientCtx log.AmbientContext
Settings *cluster.Settings
Clock *hlc.Clock
NodeDescs NodeDescStore
// nodeDescriptor, if provided, is used to describe which node the
// DistSender lives on, for instance when deciding where to send RPCs.
// Usually it is filled in from the Gossip network on demand.
nodeDescriptor *roachpb.NodeDescriptor
RPCRetryOptions *retry.Options
RPCContext *rpc.Context
// NodeDialer is the dialer from the SQL layer to the KV layer.
NodeDialer *nodedialer.Dialer
// One of the following two must be provided, but not both.
//
// If only FirstRangeProvider is supplied, DistSender will use itself as a
// RangeDescriptorDB and scan the meta ranges directly to satisfy range
// lookups, using the FirstRangeProvider to bootstrap the location of the
// meta1 range. Additionally, it will proactively update its range
// descriptor cache with any meta1 updates from the provider.
//
// If only RangeDescriptorDB is provided, all range lookups will be
// delegated to it.
//
// If both are provided (not required, but allowed for tests) range lookups
// will be delegated to the RangeDescriptorDB but FirstRangeProvider will
// still be used to listen for updates to the first range's descriptor.
FirstRangeProvider FirstRangeProvider
RangeDescriptorDB rangecache.RangeDescriptorDB
// Locality is the description of the topography of the server on which the
// DistSender is running.
Locality roachpb.Locality
// KVInterceptor is set for tenants; when set, information about all
// BatchRequests and BatchResponses are passed through this interceptor, which
// can potentially throttle requests.
KVInterceptor multitenant.TenantSideKVInterceptor
TestingKnobs ClientTestingKnobs
}
// NewDistSender returns a batch.Sender instance which connects to the
// Cockroach cluster via the supplied gossip instance. Supplying a
// DistSenderContext or the fields within is optional. For omitted values, sane
// defaults will be used.
func NewDistSender(cfg DistSenderConfig) *DistSender {
ds := &DistSender{
st: cfg.Settings,
clock: cfg.Clock,
nodeDescs: cfg.NodeDescs,
metrics: makeDistSenderMetrics(),
kvInterceptor: cfg.KVInterceptor,
locality: cfg.Locality,
}
if ds.st == nil {
ds.st = cluster.MakeTestingClusterSettings()
}
ds.AmbientContext = cfg.AmbientCtx
if ds.AmbientContext.Tracer == nil {
panic("no tracer set in AmbientCtx")
}
if cfg.nodeDescriptor != nil {
atomic.StorePointer(&ds.nodeDescriptor, unsafe.Pointer(cfg.nodeDescriptor))
}
var rdb rangecache.RangeDescriptorDB
if cfg.FirstRangeProvider != nil {
ds.firstRangeProvider = cfg.FirstRangeProvider
rdb = ds
}
if cfg.RangeDescriptorDB != nil {
rdb = cfg.RangeDescriptorDB
}
if rdb == nil {
panic("DistSenderConfig must contain either FirstRangeProvider or RangeDescriptorDB")
}
getRangeDescCacheSize := func() int64 {
return rangeDescriptorCacheSize.Get(&ds.st.SV)
}
ds.rangeCache = rangecache.NewRangeCache(ds.st, rdb, getRangeDescCacheSize, cfg.RPCContext.Stopper)
if tf := cfg.TestingKnobs.TransportFactory; tf != nil {
ds.transportFactory = tf
} else {
ds.transportFactory = GRPCTransportFactory
}
ds.dontReorderReplicas = cfg.TestingKnobs.DontReorderReplicas
ds.dontConsiderConnHealth = cfg.TestingKnobs.DontConsiderConnHealth
ds.rpcRetryOptions = base.DefaultRetryOptions()
// TODO(arul): The rpcRetryOptions passed in here from server/tenant don't
// set a max retries limit. Should they?
if cfg.RPCRetryOptions != nil {
ds.rpcRetryOptions = *cfg.RPCRetryOptions
}
if cfg.RPCContext == nil {
panic("no RPCContext set in DistSenderConfig")
}
ds.rpcContext = cfg.RPCContext
ds.nodeDialer = cfg.NodeDialer
if ds.rpcRetryOptions.Closer == nil {
ds.rpcRetryOptions.Closer = ds.rpcContext.Stopper.ShouldQuiesce()
}
ds.logicalClusterID = cfg.RPCContext.LogicalClusterID
ds.asyncSenderSem = quotapool.NewIntPool("DistSender async concurrency",
uint64(senderConcurrencyLimit.Get(&ds.st.SV)))
senderConcurrencyLimit.SetOnChange(&ds.st.SV, func(ctx context.Context) {
ds.asyncSenderSem.UpdateCapacity(uint64(senderConcurrencyLimit.Get(&ds.st.SV)))
})
ds.rpcContext.Stopper.AddCloser(ds.asyncSenderSem.Closer("stopper"))
if ds.firstRangeProvider != nil {
ctx := ds.AnnotateCtx(context.Background())
ds.firstRangeProvider.OnFirstRangeChanged(func(desc *roachpb.RangeDescriptor) {
if atomic.LoadInt32(&ds.disableFirstRangeUpdates) == 1 {
return
}
log.VEventf(ctx, 1, "gossiped first range descriptor: %+v", desc.Replicas())
ds.rangeCache.EvictByKey(ctx, roachpb.RKeyMin)
})
}
if cfg.TestingKnobs.LatencyFunc != nil {
ds.latencyFunc = cfg.TestingKnobs.LatencyFunc
} else {
ds.latencyFunc = ds.rpcContext.RemoteClocks.Latency
}
if cfg.TestingKnobs.OnRangeSpanningNonTxnalBatch != nil {
ds.onRangeSpanningNonTxnalBatch = cfg.TestingKnobs.OnRangeSpanningNonTxnalBatch
}
// Placeholder function until we inject the real health function in using
// SetHealthFunc.
// TODO(baptist): Restructure the code to allow injecting the correct
// HealthFunc at construction time.
healthFunc := HealthFunc(func(id roachpb.NodeID) bool {
return true
})
ds.healthFunc.Store(&healthFunc)
return ds
}
// SetHealthFunc is called after construction due to the circular dependency
// between DistSender and NodeLiveness.
func (ds *DistSender) SetHealthFunc(healthFn HealthFunc) {
ds.healthFunc.Store(&healthFn)
}
// LatencyFunc returns the LatencyFunc of the DistSender.
func (ds *DistSender) LatencyFunc() LatencyFunc {
return ds.latencyFunc
}
// HealthFunc returns the HealthFunc of the DistSender.
func (ds *DistSender) HealthFunc() HealthFunc {
return *ds.healthFunc.Load()
}
// DisableFirstRangeUpdates disables updates of the first range via
// gossip. Used by tests which want finer control of the contents of the range
// cache.
func (ds *DistSender) DisableFirstRangeUpdates() {
atomic.StoreInt32(&ds.disableFirstRangeUpdates, 1)
}
// DisableParallelBatches instructs DistSender to never parallelize the
// transmission of partial batch requests across ranges.
func (ds *DistSender) DisableParallelBatches() {
ds.disableParallelBatches = true
}
// Metrics returns a struct which contains metrics related to the distributed
// sender's activity.
func (ds *DistSender) Metrics() DistSenderMetrics {
return ds.metrics
}
// RangeDescriptorCache gives access to the DistSender's range cache.
func (ds *DistSender) RangeDescriptorCache() *rangecache.RangeCache {
return ds.rangeCache
}
// RangeLookup implements the RangeDescriptorDB interface.
//
// It uses LookupRange to perform a lookup scan for the provided key, using
// DistSender itself as the client.Sender. This means that the scan will recurse
// into DistSender, which will in turn use the RangeDescriptorCache again to
// lookup the RangeDescriptor necessary to perform the scan.
//
// The client has some control over the consistency of the lookup. The
// acceptable values for the consistency argument are INCONSISTENT
// or READ_UNCOMMITTED. We use INCONSISTENT for an optimistic lookup
// pass. If we don't find a new enough descriptor, we do a leaseholder
// read at READ_UNCOMMITTED in order to read intents as well as committed
// values. The reason for this is that it's not clear whether the intent
// or the previous value points to the correct location of the Range. It gets
// even more complicated when there are split-related intents or a txn record
// co-located with a replica involved in the split. Since we cannot know the
// correct answer, we look up both the pre- and post- transaction values.
//
// Note that consistency levels CONSISTENT or INCONSISTENT will result in an
// assertion failed error. See the commentary on kv.RangeLookup for more
// details.
func (ds *DistSender) RangeLookup(
ctx context.Context, key roachpb.RKey, rc rangecache.RangeLookupConsistency, useReverseScan bool,
) ([]roachpb.RangeDescriptor, []roachpb.RangeDescriptor, error) {
ds.metrics.RangeLookups.Inc(1)
switch rc {
case kvpb.INCONSISTENT, kvpb.READ_UNCOMMITTED:
default:
return nil, nil, errors.AssertionFailedf("invalid consistency level %v", rc)
}
// By using DistSender as the sender, we guarantee that even if the desired
// RangeDescriptor is not on the first range we send the lookup too, we'll
// still find it when we scan to the next range. This addresses the issue
// described in #18032 and #16266, allowing us to support meta2 splits.
return kv.RangeLookup(ctx, ds, key.AsRawKey(), rc, RangeLookupPrefetchCount, useReverseScan)
}
// FirstRange implements the RangeDescriptorDB interface.
//
// It returns the RangeDescriptor for the first range in the cluster using the
// FirstRangeProvider, which is typically implemented using the gossip protocol
// instead of the datastore.
func (ds *DistSender) FirstRange() (*roachpb.RangeDescriptor, error) {
if ds.firstRangeProvider == nil {
panic("with `nil` firstRangeProvider, DistSender must not use itself as RangeDescriptorDB")
}
return ds.firstRangeProvider.GetFirstRangeDescriptor()
}
// getNodeID attempts to return the local node ID. It returns 0 if the DistSender
// does not have access to the Gossip network.
func (ds *DistSender) getNodeID() roachpb.NodeID {
// Today, secondary tenants don't run in process with KV instances, so they
// don't have access to the Gossip network. The DistSender uses the node ID to
// preferentially route requests to a local replica (if one exists). Not
// knowing the node ID, and thus not being able to take advantage of this
// optimization is okay, given tenants not running in-process with KV
// instances have no such optimization to take advantage of to begin with.
g, ok := ds.nodeDescs.(*gossip.Gossip)
if !ok {
return 0
}
return g.NodeID.Get()
}
// CountRanges returns the number of ranges that encompass the given key span.
func (ds *DistSender) CountRanges(ctx context.Context, rs roachpb.RSpan) (int64, error) {
var count int64
ri := MakeRangeIterator(ds)
for ri.Seek(ctx, rs.Key, Ascending); ri.Valid(); ri.Next(ctx) {
count++
if !ri.NeedAnother(rs) {
break
}
}
return count, ri.Error()
}
// getRoutingInfo looks up the range information (descriptor, lease) to use for a
// query of the key descKey with the given options. The lookup takes into
// consideration the last range descriptor that the caller had used for this key
// span, if any, and if the last range descriptor has been evicted because it
// was found to be stale, which is all managed through the EvictionToken. The
// function should be provided with an EvictionToken if one was acquired from
// this function on a previous call. If not, an empty EvictionToken can be
// provided.
//
// If useReverseScan is set and descKey is the boundary between the two ranges,
// the left range will be returned (even though descKey is actually contained on
// the right range). This is useful for ReverseScans, which call this method
// with their exclusive EndKey.
//
// The returned EvictionToken reflects the close integration between the
// DistSender and the RangeDescriptorCache; the DistSender concerns itself not
// only with consuming cached information (the descriptor and lease info come
// from the cache), but also with updating the cache.
func (ds *DistSender) getRoutingInfo(
ctx context.Context,
descKey roachpb.RKey,
evictToken rangecache.EvictionToken,
useReverseScan bool,
) (rangecache.EvictionToken, error) {
returnToken, err := ds.rangeCache.LookupWithEvictionToken(
ctx, descKey, evictToken, useReverseScan,
)
if err != nil {
return rangecache.EvictionToken{}, err
}
// Sanity check: the descriptor we're about to return must include the key
// we're interested in.
{
containsFn := (*roachpb.RangeDescriptor).ContainsKey
if useReverseScan {
containsFn = (*roachpb.RangeDescriptor).ContainsKeyInverted
}
if !containsFn(returnToken.Desc(), descKey) {
log.Fatalf(ctx, "programming error: range resolution returning non-matching descriptor: "+
"desc: %s, key: %s, reverse: %t", returnToken.Desc(), descKey, redact.Safe(useReverseScan))
}
}
return returnToken, nil
}
// initAndVerifyBatch initializes timestamp-related information and
// verifies batch constraints before splitting.
func (ds *DistSender) initAndVerifyBatch(ctx context.Context, ba *kvpb.BatchRequest) *kvpb.Error {
// Attach the local node ID to each request.
if ba.GatewayNodeID == 0 {
ba.GatewayNodeID = ds.getNodeID()
}
// Attach a clock reading from the local node to help stabilize HLCs across
// the cluster. This is NOT required for correctness.
ba.Now = ds.clock.NowAsClockTimestamp()
// In the event that timestamp isn't set and read consistency isn't
// required, set the timestamp using the local clock.
if ba.ReadConsistency != kvpb.CONSISTENT && ba.Timestamp.IsEmpty() {
ba.Timestamp = ba.Now.ToTimestamp()
}
if len(ba.Requests) < 1 {
return kvpb.NewErrorf("empty batch")
}
if ba.MaxSpanRequestKeys != 0 || ba.TargetBytes != 0 {
// Verify that the batch contains only specific range requests or the
// EndTxnRequest. Verify that a batch with a ReverseScan only contains
// ReverseScan range requests.
var foundForward, foundReverse bool
for _, req := range ba.Requests {
inner := req.GetInner()
switch inner.(type) {
case *kvpb.ScanRequest, *kvpb.ResolveIntentRangeRequest,
*kvpb.DeleteRangeRequest, *kvpb.RevertRangeRequest,
*kvpb.ExportRequest, *kvpb.QueryLocksRequest, *kvpb.IsSpanEmptyRequest:
// Accepted forward range requests.
foundForward = true
case *kvpb.ReverseScanRequest:
// Accepted reverse range requests.
foundReverse = true
case *kvpb.QueryIntentRequest, *kvpb.EndTxnRequest,
*kvpb.GetRequest, *kvpb.ResolveIntentRequest, *kvpb.DeleteRequest:
// Accepted point requests that can be in batches with limit.
default:
return kvpb.NewErrorf("batch with limit contains %s request", inner.Method())
}
}
if foundForward && foundReverse {
return kvpb.NewErrorf("batch with limit contains both forward and reverse scans")
}
}
switch ba.WaitPolicy {
case lock.WaitPolicy_Block, lock.WaitPolicy_Error:
// Default. All request types supported.
case lock.WaitPolicy_SkipLocked:
for _, req := range ba.Requests {
inner := req.GetInner()
if !kvpb.CanSkipLocked(inner) {
switch inner.(type) {
case *kvpb.QueryIntentRequest, *kvpb.EndTxnRequest:
// Not directly supported, but can be part of the same batch.
default:
return kvpb.NewErrorf("batch with SkipLocked wait policy contains %s request", inner.Method())
}
}
}
default:
return kvpb.NewErrorf("unknown wait policy %s", ba.WaitPolicy)
}
// If the context has any pprof labels, attach them to the BatchRequest.
// These labels will be applied to the root context processing the request
// server-side, if the node processing the request is collecting a CPU
// profile with labels.
pprof.ForLabels(ctx, func(key, value string) bool {
ba.ProfileLabels = append(ba.ProfileLabels, key, value)
return true
})
return nil
}
// errNo1PCTxn indicates that a batch cannot be sent as a 1 phase
// commit because it spans multiple ranges and must be split into at
// least two parts, with the final part containing the EndTxn
// request.
var errNo1PCTxn = kvpb.NewErrorf("cannot send 1PC txn to multiple ranges")
// splitBatchAndCheckForRefreshSpans splits the batch according to the
// canSplitET parameter and checks whether the batch can forward its
// read timestamp. If the batch has its CanForwardReadTimestamp flag
// set but is being split across multiple sub-batches then the flag in
// the batch header is unset.
func splitBatchAndCheckForRefreshSpans(
ba *kvpb.BatchRequest, canSplitET bool,
) [][]kvpb.RequestUnion {
parts := ba.Split(canSplitET)
// If the batch is split and the header has its CanForwardReadTimestamp flag
// set then we much check whether any request would need to be refreshed in
// the event that the one of the partial batches was to forward its read
// timestamp during a server-side refresh. If any such request exists then
// we unset the CanForwardReadTimestamp flag.
if len(parts) > 1 {
unsetCanForwardReadTimestampFlag(ba)
}
return parts
}
// unsetCanForwardReadTimestampFlag ensures that if a batch is going to
// be split across ranges and any of its requests would need to refresh
// on read timestamp bumps, it does not have its CanForwardReadTimestamp
// flag set. It would be incorrect to allow part of a batch to perform a
// server-side refresh if another part of the batch that was sent to a
// different range would also need to refresh. Such behavior could cause
// a transaction to observe an inconsistent snapshot and violate
// serializability.
func unsetCanForwardReadTimestampFlag(ba *kvpb.BatchRequest) {
if !ba.CanForwardReadTimestamp {
// Already unset.
return
}
for _, req := range ba.Requests {
if kvpb.NeedsRefresh(req.GetInner()) {
// Unset the flag.
ba.CanForwardReadTimestamp = false
return
}
}
}
// Send implements the batch.Sender interface. It subdivides the Batch
// into batches admissible for sending (preventing certain illegal
// mixtures of requests), executes each individual part (which may
// span multiple ranges), and recombines the response.
//
// When the request spans ranges, it is split by range and a partial
// subset of the batch request is sent to affected ranges in parallel.
func (ds *DistSender) Send(
ctx context.Context, ba *kvpb.BatchRequest,
) (*kvpb.BatchResponse, *kvpb.Error) {
startup.AssertStartupRetry(ctx)
ds.incrementBatchCounters(ba)
if pErr := ds.initAndVerifyBatch(ctx, ba); pErr != nil {
return nil, pErr
}
ctx = ds.AnnotateCtx(ctx)
ctx, sp := tracing.EnsureChildSpan(ctx, ds.AmbientContext.Tracer, "dist sender send")
defer sp.Finish()
splitET := false
var require1PC bool
lastReq := ba.Requests[len(ba.Requests)-1].GetInner()
if et, ok := lastReq.(*kvpb.EndTxnRequest); ok && et.Require1PC {
require1PC = true
}
// To ensure that we lay down intents to prevent starvation, always
// split the end transaction request into its own batch on retries.
// Txns requiring 1PC are an exception and should never be split.
if ba.Txn != nil && ba.Txn.Epoch > 0 && !require1PC {
splitET = true
}
parts := splitBatchAndCheckForRefreshSpans(ba, splitET)
if len(parts) > 1 && (ba.MaxSpanRequestKeys != 0 || ba.TargetBytes != 0) {
// We already verified above that the batch contains only scan requests of the same type.
// Such a batch should never need splitting.
log.Fatalf(ctx, "batch with MaxSpanRequestKeys=%d, TargetBytes=%d needs splitting",
redact.Safe(ba.MaxSpanRequestKeys), redact.Safe(ba.TargetBytes))
}
var singleRplChunk [1]*kvpb.BatchResponse
rplChunks := singleRplChunk[:0:1]
onePart := len(parts) == 1
errIdxOffset := 0
for len(parts) > 0 {
if !onePart {
ba = ba.ShallowCopy()
ba.Requests = parts[0]
}
// The minimal key range encompassing all requests contained within.
// Local addressing has already been resolved.
// TODO(tschottdorf): consider rudimentary validation of the batch here
// (for example, non-range requests with EndKey, or empty key ranges).
rs, err := keys.Range(ba.Requests)
if err != nil {
return nil, kvpb.NewError(err)
}
isReverse := ba.IsReverse()
// Determine whether this part of the BatchRequest contains a committing
// EndTxn request.
var withCommit, withParallelCommit bool
if etArg, ok := ba.GetArg(kvpb.EndTxn); ok {
et := etArg.(*kvpb.EndTxnRequest)
withCommit = et.Commit
withParallelCommit = et.IsParallelCommit()
}
var rpl *kvpb.BatchResponse
var pErr *kvpb.Error
if withParallelCommit {
rpl, pErr = ds.divideAndSendParallelCommit(ctx, ba, rs, isReverse, 0 /* batchIdx */)
} else {
rpl, pErr = ds.divideAndSendBatchToRanges(ctx, ba, rs, isReverse, withCommit, 0 /* batchIdx */)
}
if pErr == errNo1PCTxn {
// If we tried to send a single round-trip EndTxn but it looks like
// it's going to hit multiple ranges, split it here and try again.
if len(parts) != 1 {
panic("EndTxn not in last chunk of batch")
} else if require1PC {
log.Fatalf(ctx, "required 1PC transaction cannot be split: %s", ba)
}
parts = splitBatchAndCheckForRefreshSpans(ba, true /* split ET */)
onePart = false
// Restart transaction of the last chunk as multiple parts with
// EndTxn in the last part.
continue
}
if pErr != nil {
if pErr.Index != nil && pErr.Index.Index != -1 {
pErr.Index.Index += int32(errIdxOffset)
}
return nil, pErr
}
errIdxOffset += len(ba.Requests)
// Propagate transaction from last reply to next request. The final
// update is taken and put into the response's main header.
rplChunks = append(rplChunks, rpl)
parts = parts[1:]
if len(parts) > 0 {
ba.UpdateTxn(rpl.Txn)
}
}
var reply *kvpb.BatchResponse
if len(rplChunks) > 0 {
reply = rplChunks[0]
for _, rpl := range rplChunks[1:] {
reply.Responses = append(reply.Responses, rpl.Responses...)
reply.CollectedSpans = append(reply.CollectedSpans, rpl.CollectedSpans...)
}
lastHeader := rplChunks[len(rplChunks)-1].BatchResponse_Header
lastHeader.CollectedSpans = reply.CollectedSpans
reply.BatchResponse_Header = lastHeader
}
return reply, nil
}
// incrementBatchCounters increments the appropriate counters to track the
// batch and its composite request methods.
func (ds *DistSender) incrementBatchCounters(ba *kvpb.BatchRequest) {
ds.metrics.BatchCount.Inc(1)
for _, ru := range ba.Requests {
m := ru.GetInner().Method()
ds.metrics.MethodCounts[m].Inc(1)
}
}
type response struct {
reply *kvpb.BatchResponse
positions []int
pErr *kvpb.Error
}
// divideAndSendParallelCommit divides a parallel-committing batch into
// sub-batches that can be evaluated in parallel but should not be evaluated