-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
Copy pathtxn_coord_sender.go
1401 lines (1287 loc) · 51.5 KB
/
txn_coord_sender.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright 2014 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
package kv
import (
"context"
"sort"
"time"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/duration"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
)
const (
opTxnCoordSender = "txn coordinator"
opHeartbeatLoop = "heartbeat txn"
)
// maxTxnIntentsBytes is a threshold in bytes for intent spans stored
// on the coordinator during the lifetime of a transaction. Intents
// are included with a transaction on commit or abort, to be cleaned
// up asynchronously. If they exceed this threshold, they're condensed
// to avoid memory blowup both on the coordinator and (critically) on
// the EndTransaction command at the Raft group responsible for the
// transaction record.
var maxTxnIntentsBytes = settings.RegisterIntSetting(
"kv.transaction.max_intents_bytes",
"maximum number of bytes used to track write intents in transactions",
256*1000,
)
// maxTxnRefreshSpansBytes is a threshold in bytes for refresh spans stored
// on the coordinator during the lifetime of a transaction. Refresh spans
// are used for SERIALIZABLE transactions to avoid client restarts.
var maxTxnRefreshSpansBytes = settings.RegisterIntSetting(
"kv.transaction.max_refresh_spans_bytes",
"maximum number of bytes used to track refresh spans in serializable transactions",
256*1000,
)
// txnCoordState represents the state of the transaction coordinator.
// It is an intermediate state which indicates we've finished the
// transaction at the coordinator level and it's no longer legitimate
// for sending requests, even though we don't yet know for sure that
// the transaction record has been aborted / committed.
type txnCoordState int
const (
_ txnCoordState = iota
// done indicates the transaction has been completed via end
// transaction and can no longer be used.
done
// aborted indicates the transaction was aborted or abandoned (e.g.
// from timeout, heartbeat failure, context cancelation, txn abort
// or restart, etc.)
aborted
)
// A TxnCoordSender is an implementation of client.Sender which wraps
// a lower-level Sender (either a storage.Stores or a DistSender) to
// which it sends commands. It acts as a man-in-the-middle,
// coordinating transaction state for clients. Unlike other senders,
// the TxnCoordSender is stateful and holds information about an
// ongoing transaction. Among other things, it records the intent
// spans of keys mutated by the transaction for later
// resolution.
//
// After a transaction has begun writing, the TxnCoordSender may start
// sending periodic heartbeat messages to that transaction's txn
// record, to keep it live. Note that heartbeating is done only from
// the root transaction coordinator, in the event that multiple
// coordinators are active (i.e. in a distributed SQL flow).
type TxnCoordSender struct {
mu struct {
syncutil.Mutex
// meta contains all coordinator state which may be passed between
// distributed TxnCoordSenders via MetaRelease() and MetaAugment().
meta roachpb.TxnCoordMeta
// intentsSizeBytes is the size in bytes of the intent spans in the
// meta, maintained to efficiently check the threshold.
intentsSizeBytes int64
// refreshSpansBytes is the total size in bytes of the spans
// encountered during this transaction that need to be refreshed to
// avoid serializable restart.
refreshSpansBytes int64
// lastUpdateNanos is the latest wall time in nanos the client sent
// transaction operations to this coordinator. Accessed and updated
// atomically.
lastUpdateNanos int64
// Analogous to lastUpdateNanos, this is the wall time at which the
// transaction was instantiated.
firstUpdateNanos int64
// txnEnd is closed when the transaction is aborted or committed,
// terminating the associated heartbeat instance.
txnEnd chan struct{}
// state indicates the state of the transaction coordinator, which
// may briefly diverge from the state of the transaction record if
// the coordinator is aborted after a failed heartbeat, but before
// we've gotten a response with the updated transaction state.
state txnCoordState
// onFinishFn is a closure invoked when state changes to done or aborted.
onFinishFn func(error)
}
// A pointer member to the creating factory provides access to
// immutable factory settings.
*TxnCoordSenderFactory
// typ specifies whether this transaction is the top level,
// or one of potentially many distributed transactions.
typ client.TxnType
}
var _ client.TxnSender = &TxnCoordSender{}
// TxnMetrics holds all metrics relating to KV transactions.
type TxnMetrics struct {
Aborts *metric.CounterWithRates
Commits *metric.CounterWithRates
Commits1PC *metric.CounterWithRates // Commits which finished in a single phase
AutoRetries *metric.CounterWithRates // Auto retries which avoid client-side restarts
Abandons *metric.CounterWithRates
Durations *metric.Histogram
// Restarts is the number of times we had to restart the transaction.
Restarts *metric.Histogram
// Counts of restart types.
RestartsWriteTooOld *metric.Counter
RestartsDeleteRange *metric.Counter
RestartsSerializable *metric.Counter
RestartsPossibleReplay *metric.Counter
}
var (
metaAbortsRates = metric.Metadata{
Name: "txn.aborts",
Help: "Number of aborted KV transactions"}
metaCommitsRates = metric.Metadata{
Name: "txn.commits",
Help: "Number of committed KV transactions (including 1PC)"}
metaCommits1PCRates = metric.Metadata{
Name: "txn.commits1PC",
Help: "Number of committed one-phase KV transactions"}
metaAutoRetriesRates = metric.Metadata{
Name: "txn.autoretries",
Help: "Number of automatic retries to avoid serializable restarts"}
metaAbandonsRates = metric.Metadata{
Name: "txn.abandons",
Help: "Number of abandoned KV transactions"}
metaDurationsHistograms = metric.Metadata{
Name: "txn.durations",
Help: "KV transaction durations in nanoseconds"}
metaRestartsHistogram = metric.Metadata{
Name: "txn.restarts",
Help: "Number of restarted KV transactions"}
metaRestartsWriteTooOld = metric.Metadata{
Name: "txn.restarts.writetooold",
Help: "Number of restarts due to a concurrent writer committing first"}
metaRestartsDeleteRange = metric.Metadata{
Name: "txn.restarts.deleterange",
Help: "Number of restarts due to a forwarded commit timestamp and a DeleteRange command"}
metaRestartsSerializable = metric.Metadata{
Name: "txn.restarts.serializable",
Help: "Number of restarts due to a forwarded commit timestamp and isolation=SERIALIZABLE"}
metaRestartsPossibleReplay = metric.Metadata{
Name: "txn.restarts.possiblereplay",
Help: "Number of restarts due to possible replays of command batches at the storage layer"}
)
// MakeTxnMetrics returns a TxnMetrics struct that contains metrics whose
// windowed portions retain data for approximately histogramWindow.
func MakeTxnMetrics(histogramWindow time.Duration) TxnMetrics {
return TxnMetrics{
Aborts: metric.NewCounterWithRates(metaAbortsRates),
Commits: metric.NewCounterWithRates(metaCommitsRates),
Commits1PC: metric.NewCounterWithRates(metaCommits1PCRates),
AutoRetries: metric.NewCounterWithRates(metaAutoRetriesRates),
Abandons: metric.NewCounterWithRates(metaAbandonsRates),
Durations: metric.NewLatency(metaDurationsHistograms, histogramWindow),
Restarts: metric.NewHistogram(metaRestartsHistogram, histogramWindow, 100, 3),
RestartsWriteTooOld: metric.NewCounter(metaRestartsWriteTooOld),
RestartsDeleteRange: metric.NewCounter(metaRestartsDeleteRange),
RestartsSerializable: metric.NewCounter(metaRestartsSerializable),
RestartsPossibleReplay: metric.NewCounter(metaRestartsPossibleReplay),
}
}
// TxnCoordSenderFactory implements client.TxnSenderFactory.
type TxnCoordSenderFactory struct {
log.AmbientContext
st *cluster.Settings
wrapped client.Sender
clock *hlc.Clock
heartbeatInterval time.Duration
clientTimeout time.Duration
linearizable bool // enables linearizable behavior
stopper *stop.Stopper
metrics TxnMetrics
}
var _ client.TxnSenderFactory = &TxnCoordSenderFactory{}
const defaultClientTimeout = 10 * time.Second
// NewTxnCoordSenderFactory creates a new TxnCoordSenderFactory. The
// factory creates new instances of TxnCoordSenders.
//
// TODO(spencer): move these settings into a configuration object and
// supply that to each sender.
func NewTxnCoordSenderFactory(
ambient log.AmbientContext,
st *cluster.Settings,
wrapped client.Sender,
clock *hlc.Clock,
linearizable bool,
stopper *stop.Stopper,
txnMetrics TxnMetrics,
) *TxnCoordSenderFactory {
return &TxnCoordSenderFactory{
AmbientContext: ambient,
st: st,
wrapped: wrapped,
clock: clock,
heartbeatInterval: base.DefaultHeartbeatInterval,
clientTimeout: defaultClientTimeout,
linearizable: linearizable,
stopper: stopper,
metrics: txnMetrics,
}
}
// New is part of the TxnCoordSenderFactory interface.
func (tcf *TxnCoordSenderFactory) New(typ client.TxnType) client.TxnSender {
tcs := &TxnCoordSender{
typ: typ,
TxnCoordSenderFactory: tcf,
}
tcs.mu.meta.RefreshValid = true
return tcs
}
// WrappedSender is part of the TxnCoordSenderFactory interface.
func (tcf *TxnCoordSenderFactory) WrappedSender() client.Sender {
return tcf.wrapped
}
// Metrics returns the factory's metrics struct.
func (tcf *TxnCoordSenderFactory) Metrics() TxnMetrics {
return tcf.metrics
}
// GetMeta is part of the client.TxnSender interface.
func (tc *TxnCoordSender) GetMeta() roachpb.TxnCoordMeta {
tc.mu.Lock()
defer tc.mu.Unlock()
// Copy mutable state so access is safe for the caller.
meta := tc.mu.meta
meta.Txn = tc.mu.meta.Txn.Clone()
meta.Intents = append([]roachpb.Span(nil), tc.mu.meta.Intents...)
if tc.mu.meta.RefreshValid {
meta.RefreshReads = append([]roachpb.Span(nil), tc.mu.meta.RefreshReads...)
meta.RefreshWrites = append([]roachpb.Span(nil), tc.mu.meta.RefreshWrites...)
}
return meta
}
// AugmentMeta is part of the client.TxnSender interface.
func (tc *TxnCoordSender) AugmentMeta(meta roachpb.TxnCoordMeta) {
tc.mu.Lock()
defer tc.mu.Unlock()
// Sanity check: don't combine if the meta is for a different txn ID.
if tc.mu.meta.Txn.ID != (uuid.UUID{}) && tc.mu.meta.Txn.ID != meta.Txn.ID {
return
}
tc.mu.meta.Txn.Update(&meta.Txn)
// Do not modify existing span slices when copying.
tc.mu.meta.Intents, _ = roachpb.MergeSpans(
append(append([]roachpb.Span(nil), tc.mu.meta.Intents...), meta.Intents...),
)
if !meta.RefreshValid {
tc.mu.meta.RefreshValid = false
tc.mu.meta.RefreshReads = nil
tc.mu.meta.RefreshWrites = nil
} else if tc.mu.meta.RefreshValid {
tc.mu.meta.RefreshReads, _ = roachpb.MergeSpans(
append(append([]roachpb.Span(nil), tc.mu.meta.RefreshReads...), meta.RefreshReads...),
)
tc.mu.meta.RefreshWrites, _ = roachpb.MergeSpans(
append(append([]roachpb.Span(nil), tc.mu.meta.RefreshWrites...), meta.RefreshWrites...),
)
}
tc.mu.meta.CommandCount += meta.CommandCount
// Recompute the size of the intents.
tc.mu.intentsSizeBytes = 0
for _, i := range tc.mu.meta.Intents {
tc.mu.intentsSizeBytes += int64(len(i.Key) + len(i.EndKey))
}
// Recompute the size of the refreshes.
tc.mu.refreshSpansBytes = 0
for _, u := range tc.mu.meta.RefreshReads {
tc.mu.refreshSpansBytes += int64(len(u.Key) + len(u.EndKey))
}
for _, u := range tc.mu.meta.RefreshWrites {
tc.mu.refreshSpansBytes += int64(len(u.Key) + len(u.EndKey))
}
}
// OnFinish is part of the client.TxnSender interface.
func (tc *TxnCoordSender) OnFinish(onFinishFn func(error)) {
tc.mu.Lock()
defer tc.mu.Unlock()
tc.mu.onFinishFn = onFinishFn
}
// Send implements the batch.Sender interface. If the request is part of a
// transaction, the TxnCoordSender adds the transaction to a map of active
// transactions and begins heartbeating it. Every subsequent request for the
// same transaction updates the lastUpdate timestamp to prevent live
// transactions from being considered abandoned and garbage collected.
// Read/write mutating requests have their key or key range added to the
// transaction's interval tree of key ranges for eventual cleanup via resolved
// write intents; they're tagged to an outgoing EndTransaction request, with
// the receiving replica in charge of resolving them.
func (tc *TxnCoordSender) Send(
ctx context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
ctx = tc.AnnotateCtx(ctx)
// Start new or pick up active trace. From here on, there's always an active
// Trace, though its overhead is small unless it's sampled.
sp := opentracing.SpanFromContext(ctx)
if sp == nil {
sp = tc.AmbientContext.Tracer.StartSpan(opTxnCoordSender)
defer sp.Finish()
ctx = opentracing.ContextWithSpan(ctx, sp)
}
startNS := tc.clock.PhysicalNow()
if ba.Txn != nil {
ctx = log.WithLogTag(ctx, "txn", uuid.ShortStringer(ba.Txn.ID))
if log.V(2) {
ctx = log.WithLogTag(ctx, "ts", ba.Txn.Timestamp)
}
// If this request is part of a transaction...
if err := tc.validateTxnForBatch(ctx, &ba); err != nil {
return nil, roachpb.NewError(err)
}
txnID := ba.Txn.ID
// Associate the txnID with the trace.
txnIDStr := txnID.String()
sp.SetBaggageItem("txnID", txnIDStr)
var et *roachpb.EndTransactionRequest
var hasET bool
{
var rArgs roachpb.Request
rArgs, hasET = ba.GetArg(roachpb.EndTransaction)
if hasET {
et = rArgs.(*roachpb.EndTransactionRequest)
if len(et.Key) != 0 {
return nil, roachpb.NewErrorf("EndTransaction must not have a Key set")
}
et.Key = ba.Txn.Key
if len(et.IntentSpans) > 0 {
// TODO(tschottdorf): it may be useful to allow this later.
// That would be part of a possible plan to allow txns which
// write on multiple coordinators.
return nil, roachpb.NewErrorf("client must not pass intents to EndTransaction")
}
}
}
if pErr := func() *roachpb.Error {
tc.mu.Lock()
defer tc.mu.Unlock()
if tc.mu.meta.Txn.ID == (uuid.UUID{}) {
// Ensure that the txn is bound.
tc.mu.meta.Txn = ba.Txn.Clone()
}
if ba.Txn.Writing {
if pErr := tc.maybeRejectClientLocked(ctx, ba.Txn.ID); pErr != nil {
return pErr
}
}
tc.mu.meta.CommandCount += int32(len(ba.Requests))
if !hasET {
return nil
}
// Everything below is carried out only when trying to finish a txn.
if tc.typ == client.LeafTxn {
return roachpb.NewErrorf("cannot commit on a leaf transaction coordinator")
}
// Populate et.IntentSpans, taking into account both any existing
// and new writes, and taking care to perform proper deduplication.
et.IntentSpans = append([]roachpb.Span(nil), tc.mu.meta.Intents...)
intentsSizeBytes := tc.mu.intentsSizeBytes
// Defensively set distinctSpans to false if we had any previous
// writes in this transaction. This effectively limits the distinct
// spans optimization to 1pc transactions.
distinctSpans := len(tc.mu.meta.Intents) == 0
// We can't pass in a batch response here to better limit the key
// spans as we don't know what is going to be affected. This will
// affect queries such as `DELETE FROM my.table LIMIT 10` when
// executed as a 1PC transaction. e.g.: a (BeginTransaction,
// DeleteRange, EndTransaction) batch.
ba.IntentSpanIterate(nil, func(span roachpb.Span) {
et.IntentSpans = append(et.IntentSpans, span)
intentsSizeBytes += int64(len(span.Key) + len(span.EndKey))
})
var err error
if et.IntentSpans, intentsSizeBytes, err = tc.maybeCondenseIntentSpans(
ctx, et.IntentSpans, intentsSizeBytes,
); err != nil {
return roachpb.NewError(err)
}
// TODO(peter): Populate DistinctSpans on all batches, not just batches
// which contain an EndTransactionRequest.
var distinct bool
et.IntentSpans, distinct = roachpb.MergeSpans(et.IntentSpans)
ba.Header.DistinctSpans = distinct && distinctSpans
if len(et.IntentSpans) == 0 {
// If there aren't any intents, then there's factually no
// transaction to end. Read-only txns have all of their state
// in the client.
return roachpb.NewErrorf("cannot commit a read-only transaction")
}
tc.mu.meta.Intents = et.IntentSpans
tc.mu.intentsSizeBytes = intentsSizeBytes
if tc.mu.meta.Txn.IsSerializable() && tc.mu.meta.RefreshValid &&
len(tc.mu.meta.RefreshReads) == 0 && len(tc.mu.meta.RefreshWrites) == 0 {
et.NoRefreshSpans = true
}
return nil
}(); pErr != nil {
return nil, pErr
}
if hasET && log.V(3) {
for _, intent := range et.IntentSpans {
log.Infof(ctx, "intent: [%s,%s)", intent.Key, intent.EndKey)
}
}
}
// Send the command through wrapped sender, handling retry
// opportunities in case of error.
var br *roachpb.BatchResponse
{
var pErr *roachpb.Error
if br, pErr = tc.wrapped.Send(ctx, ba); pErr != nil {
br, pErr = tc.maybeRetrySend(ctx, &ba, br, pErr)
}
if pErr = tc.updateState(ctx, startNS, ba, br, pErr); pErr != nil {
log.VEventf(ctx, 2, "error: %s", pErr)
return nil, pErr
}
}
if br.Txn == nil {
return br, nil
}
if _, ok := ba.GetArg(roachpb.EndTransaction); !ok {
return br, nil
}
// If the linearizable flag is set, we want to make sure that all the
// clocks in the system are past the commit timestamp of the transaction.
// This is guaranteed if either - the commit timestamp is MaxOffset behind
// startNS - MaxOffset ns were spent in this function when returning to the
// client. Below we choose the option that involves less waiting, which is
// likely the first one unless a transaction commits with an odd timestamp.
//
// Can't use linearizable mode with clockless reads since in that case we
// don't know how long to sleep - could be forever!
if tsNS := br.Txn.Timestamp.WallTime; startNS > tsNS {
startNS = tsNS
}
maxOffset := tc.clock.MaxOffset()
sleepNS := maxOffset -
time.Duration(tc.clock.PhysicalNow()-startNS)
if maxOffset != timeutil.ClocklessMaxOffset && tc.linearizable && sleepNS > 0 {
defer func() {
if log.V(1) {
log.Infof(ctx, "%v: waiting %s on EndTransaction for linearizability", br.Txn.Short(), duration.Truncate(sleepNS, time.Millisecond))
}
time.Sleep(sleepNS)
}()
}
if br.Txn.Status != roachpb.PENDING {
tc.mu.Lock()
tc.mu.meta.Txn = br.Txn.Clone()
tc.cleanupTxnLocked(ctx, done)
tc.mu.Unlock()
}
return br, nil
}
// maybeRetrySend handles two retry cases at the txn coord sender level.
//
// 1) If the batch requires a transaction, it's wrapped in a new
// transaction and resent.
// 2) If the error is a retry condition which might be retried directly
// if the spans collected during the transaction can be refreshed,
// proving that the transaction can be committed at a higher timestamp.
func (tc *TxnCoordSender) maybeRetrySend(
ctx context.Context, ba *roachpb.BatchRequest, br *roachpb.BatchResponse, pErr *roachpb.Error,
) (*roachpb.BatchResponse, *roachpb.Error) {
if _, ok := pErr.GetDetail().(*roachpb.OpRequiresTxnError); ok {
return tc.resendWithTxn(ctx, *ba)
}
// With mixed success, we can't attempt a retry without potentially
// succeeding at the same conditional put or increment request
// twice; return the wrapped error instead. Because the dist sender
// splits up batches to send to multiple ranges in parallel, and
// then combines the results, partial success makes it very
// difficult to determine what can be retried.
if aPSErr, ok := pErr.GetDetail().(*roachpb.MixedSuccessError); ok {
log.VEventf(ctx, 2, "got partial success; cannot retry %s (pErr=%s)", ba, aPSErr.Wrapped)
return nil, aPSErr.Wrapped
}
// Check for an error which can be retried after updating spans.
//
// Note that we can only restart on root transactions because those are the
// only places where we have all of the refresh spans. If this is a leaf, as
// in a distributed sql flow, we need to propagate the error to the root for
// an epoch restart.
canRetry, retryTxn := roachpb.CanTransactionRetryAtRefreshedTimestamp(ctx, pErr)
if !canRetry || tc.typ == client.LeafTxn ||
!tc.st.Version.IsMinSupported(cluster.VersionTxnSpanRefresh) {
return nil, pErr
}
// If a prefix of the batch was executed, collect refresh spans for
// that executed portion, and retry the remainder. The canonical
// case is a batch split between everything up to but not including
// the EndTransaction. Requests up to the EndTransaction succeed,
// but the EndTransaction fails with a retryable error. We want to
// retry only the EndTransaction.
ba.UpdateTxn(retryTxn)
retryBa := *ba
if br != nil {
doneBa := *ba
doneBa.Requests = ba.Requests[:len(br.Responses)]
log.VEventf(ctx, 2, "collecting refresh spans after partial batch execution of %s", doneBa)
tc.mu.Lock()
if !tc.appendRefreshSpansLocked(ctx, doneBa, br) {
tc.mu.Unlock()
return nil, pErr
}
tc.mu.meta.Txn.RefreshedTimestamp.Forward(retryTxn.RefreshedTimestamp)
tc.mu.Unlock()
retryBa.Requests = ba.Requests[len(br.Responses):]
}
log.VEventf(ctx, 2, "retrying %s at refreshed timestamp %s because of %s",
retryBa, retryTxn.RefreshedTimestamp, pErr)
// Try updating the txn spans so we can retry.
if ok := tc.tryUpdatingTxnSpans(ctx, retryTxn); !ok {
return nil, pErr
}
// We've refreshed all of the read spans successfully and set
// newBa.Txn.RefreshedTimestamp to the current timestamp. Submit the
// batch again.
retryBr, retryErr := tc.wrapped.Send(ctx, retryBa)
if retryErr != nil {
log.VEventf(ctx, 2, "retry failed with %s", retryErr)
return nil, retryErr
}
log.VEventf(ctx, 2, "retry successful @%s", retryBa.Txn.Timestamp)
// On success, combine responses if applicable and set error to nil.
if br != nil {
br.Responses = append(br.Responses, retryBr.Responses...)
retryBr.CollectedSpans = append(br.CollectedSpans, retryBr.CollectedSpans...)
br.BatchResponse_Header = retryBr.BatchResponse_Header
} else {
br = retryBr
}
tc.metrics.AutoRetries.Inc(1)
return br, nil
}
// appendRefreshSpansLocked appends refresh spans from the supplied batch
// request, qualified by the batch response where appropriate. Returns
// whether the batch transaction's refreshed timestamp is greater or equal
// to the max refreshed timestamp used so far with this sender.
//
// The batch refreshed timestamp and the max refreshed timestamp for
// the sender can get out of step because the txn coord sender can be
// used concurrently (i.e. when using the "RETURNING NOTHING"
// syntax). What we don't want is to append refreshes which are
// already too old compared to the max refreshed timestamp that's already
// in use with this sender. In that case the caller should return an
// error for client-side retry.
func (tc *TxnCoordSender) appendRefreshSpansLocked(
ctx context.Context, ba roachpb.BatchRequest, br *roachpb.BatchResponse,
) bool {
origTS := ba.Txn.OrigTimestamp
origTS.Forward(ba.Txn.RefreshedTimestamp)
if origTS.Less(tc.mu.meta.Txn.RefreshedTimestamp) {
log.VEventf(ctx, 2, "txn orig timestamp %s < sender refreshed timestamp %s",
origTS, tc.mu.meta.Txn.RefreshedTimestamp)
return false
}
ba.RefreshSpanIterate(br, func(span roachpb.Span, write bool) {
if log.V(3) {
log.Infof(ctx, "refresh: %s write=%t", span, write)
}
if write {
tc.mu.meta.RefreshWrites = append(tc.mu.meta.RefreshWrites, span)
} else {
tc.mu.meta.RefreshReads = append(tc.mu.meta.RefreshReads, span)
}
tc.mu.refreshSpansBytes += int64(len(span.Key) + len(span.EndKey))
})
return true
}
// tryUpdatingTxnSpans sends Refresh and RefreshRange commands to all
// spans read during the transaction to ensure that no writes were
// written more recently than the original transaction timestamp. All
// implicated timestamp caches are updated with the final transaction
// timestamp. On success, returns true and an updated BatchRequest
// containing a transaction whose original timestamp and timestamp
// have been set to the same value.
func (tc *TxnCoordSender) tryUpdatingTxnSpans(
ctx context.Context, refreshTxn *roachpb.Transaction,
) bool {
tc.mu.Lock()
refreshReads := tc.mu.meta.RefreshReads
refreshWrites := tc.mu.meta.RefreshWrites
refreshValid := tc.mu.meta.RefreshValid
tc.mu.Unlock()
if !refreshValid {
log.VEvent(ctx, 2, "can't refresh txn spans; not valid")
return false
} else if len(refreshReads) == 0 && len(refreshWrites) == 0 {
log.VEvent(ctx, 2, "there are no txn spans to refresh")
return true
}
// Refresh all spans (merge first).
refreshSpanBa := roachpb.BatchRequest{}
refreshSpanBa.Txn = refreshTxn
addRefreshes := func(refreshes []roachpb.Span, write bool) {
for _, u := range refreshes {
var req roachpb.Request
if len(u.EndKey) == 0 {
req = &roachpb.RefreshRequest{
Span: u,
Write: write,
}
} else {
req = &roachpb.RefreshRangeRequest{
Span: u,
Write: write,
}
}
refreshSpanBa.Add(req)
log.VEventf(ctx, 2, "updating span %s @%s - @%s to avoid serializable restart",
req.Header(), refreshTxn.OrigTimestamp, refreshTxn.Timestamp)
}
}
addRefreshes(refreshReads, false)
addRefreshes(refreshWrites, true)
if _, batchErr := tc.wrapped.Send(ctx, refreshSpanBa); batchErr != nil {
log.VEventf(ctx, 2, "failed to refresh txn spans (%s); propagating original retry error", batchErr)
return false
}
return true
}
func (tc *TxnCoordSender) appendAndCondenseIntentsLocked(
ctx context.Context, ba roachpb.BatchRequest, br *roachpb.BatchResponse,
) {
ba.IntentSpanIterate(br, func(span roachpb.Span) {
tc.mu.meta.Intents = append(tc.mu.meta.Intents, span)
tc.mu.intentsSizeBytes += int64(len(span.Key) + len(span.EndKey))
})
if condensedIntents, condensedIntentsSize, err :=
tc.maybeCondenseIntentSpans(ctx, tc.mu.meta.Intents, tc.mu.intentsSizeBytes); err != nil {
log.VEventf(ctx, 2, "failed to condense intent spans (%s); skipping", err)
} else {
tc.mu.meta.Intents, tc.mu.intentsSizeBytes = condensedIntents, condensedIntentsSize
}
}
type spanBucket struct {
rangeID roachpb.RangeID
size int64
spans []roachpb.Span
}
// maybeCondenseIntentSpans avoids sending massive EndTransaction
// requests which can consume excessive memory at evaluation time and
// in the txn coordinator sender itself. Spans are condensed based on
// current range boundaries. Returns the condensed set of spans and
// the new total spans size. Note that errors can be returned if the
// range iterator fails.
func (tc *TxnCoordSender) maybeCondenseIntentSpans(
ctx context.Context, spans []roachpb.Span, spansSize int64,
) ([]roachpb.Span, int64, error) {
if spansSize < maxTxnIntentsBytes.Get(&tc.st.SV) {
return spans, spansSize, nil
}
// Only condense if the wrapped sender is a distributed sender.
ds, ok := tc.wrapped.(*DistSender)
if !ok {
return spans, spansSize, nil
}
// Sort the spans by start key.
sort.Slice(spans, func(i, j int) bool { return spans[i].Key.Compare(spans[j].Key) < 0 })
// Divide them by range boundaries and condense. Iterate over spans
// using a range iterator and add each to a bucket keyed by range
// ID. Local keys are kept in a new slice and not added to buckets.
buckets := []*spanBucket{}
localSpans := []roachpb.Span{}
ri := NewRangeIterator(ds)
for _, s := range spans {
if keys.IsLocal(s.Key) {
localSpans = append(localSpans, s)
continue
}
ri.Seek(ctx, roachpb.RKey(s.Key), Ascending)
if !ri.Valid() {
return nil, 0, ri.Error().GoError()
}
rangeID := ri.Desc().RangeID
if l := len(buckets); l > 0 && buckets[l-1].rangeID == rangeID {
buckets[l-1].spans = append(buckets[l-1].spans, s)
} else {
buckets = append(buckets, &spanBucket{rangeID: rangeID, spans: []roachpb.Span{s}})
}
buckets[len(buckets)-1].size += int64(len(s.Key) + len(s.EndKey))
}
// Sort the buckets by size and collapse from largest to smallest
// until total size of uncondensed spans no longer exceeds threshold.
sort.Slice(buckets, func(i, j int) bool { return buckets[i].size > buckets[j].size })
spans = localSpans // reset to hold just the local spans; will add newly condensed and remainder
for _, bucket := range buckets {
// Condense until we get to half the threshold.
if spansSize <= maxTxnIntentsBytes.Get(&tc.st.SV)/2 {
// Collect remaining spans from each bucket into uncondensed slice.
spans = append(spans, bucket.spans...)
continue
}
spansSize -= bucket.size
// TODO(spencer): consider further optimizations here to create
// more than one span out of a bucket to avoid overly broad span
// combinations.
cs := bucket.spans[0]
for _, s := range bucket.spans[1:] {
cs = cs.Combine(s)
if !cs.Valid() {
return nil, 0, errors.Errorf("combining span %s yielded invalid result", s)
}
}
spansSize += int64(len(cs.Key) + len(cs.EndKey))
spans = append(spans, cs)
}
return spans, spansSize, nil
}
// maybeRejectClientLocked checks whether the (transactional) request is in a
// state that prevents it from continuing, such as the coordinator having
// considered the client abandoned, or a heartbeat having reported an error.
func (tc *TxnCoordSender) maybeRejectClientLocked(
ctx context.Context, txnID uuid.UUID,
) *roachpb.Error {
// Check whether the transaction is still tracked and has a chance of
// completing. It's possible that the coordinator learns about the
// transaction having terminated from a heartbeat, and GC queue correctness
// (along with common sense) mandates that we don't let the client
// continue.
switch {
case tc.mu.state == aborted:
fallthrough
case tc.mu.meta.Txn.Status == roachpb.ABORTED:
abortedErr := roachpb.NewErrorWithTxn(roachpb.NewTransactionAbortedError(), &tc.mu.meta.Txn)
// TODO(andrei): figure out a UserPriority to use here.
newTxn := roachpb.PrepareTransactionForRetry(
ctx, abortedErr,
// priority is not used for aborted errors
roachpb.NormalUserPriority,
tc.clock)
return roachpb.NewError(roachpb.NewHandledRetryableTxnError(
abortedErr.Message, txnID, newTxn))
case tc.mu.meta.Txn.Status == roachpb.COMMITTED:
return roachpb.NewErrorWithTxn(roachpb.NewTransactionStatusError(
"transaction is already committed"), &tc.mu.meta.Txn)
default:
return nil
}
}
// validateTxn validates properties of a txn specified on a request.
// The transaction is expected to be initialized by the time it reaches
// the TxnCoordSender. Furthermore, no transactional writes are allowed
// unless preceded by a begin transaction request within the same batch.
// The exception is if the transaction is already in state txn.Writing=true.
func (tc *TxnCoordSender) validateTxnForBatch(ctx context.Context, ba *roachpb.BatchRequest) error {
if len(ba.Requests) == 0 {
return errors.Errorf("empty batch with txn")
}
ba.Txn.AssertInitialized(ctx)
// Check for a begin transaction to set txn key based on the key of
// the first transactional write. Also enforce that no transactional
// writes occur before a begin transaction.
var haveBeginTxn bool
for _, req := range ba.Requests {
args := req.GetInner()
if _, ok := args.(*roachpb.BeginTransactionRequest); ok {
if haveBeginTxn || ba.Txn.Writing {
return errors.Errorf("begin transaction requested twice in the same txn: %s", ba.Txn)
}
if ba.Txn.Key == nil {
return errors.Errorf("transaction with BeginTxnRequest missing anchor key: %v", ba)
}
haveBeginTxn = true
}
}
return nil
}
// cleanupTxnLocked is called when a transaction ends. The heartbeat
// goroutine is signaled to clean up the transaction gracefully.
func (tc *TxnCoordSender) cleanupTxnLocked(ctx context.Context, state txnCoordState) {
tc.mu.state = state
if tc.mu.onFinishFn != nil {
// rejectErr is guaranteed to be non-nil because state is done or
// aborted on cleanup.
rejectErr := tc.maybeRejectClientLocked(ctx, tc.mu.meta.Txn.ID).GetDetail()
if rejectErr == nil {
log.Fatalf(ctx, "expected non-nil rejectErr on txn coord state %v", state)
}
tc.mu.onFinishFn(rejectErr)
}
tc.mu.meta.Intents = nil
tc.mu.intentsSizeBytes = 0
// The heartbeat might've already removed the record. Or we may have already
// closed txnEnd but we are racing with the heartbeat cleanup.
if tc.mu.txnEnd == nil {
return
}
// Trigger heartbeat shutdown.
log.VEvent(ctx, 2, "coordinator stops")
close(tc.mu.txnEnd)
tc.mu.txnEnd = nil
}
// finalTxnStatsLocked collects a transaction's final statistics. Returns
// the duration, restarts, and finalized txn status.
func (tc *TxnCoordSender) finalTxnStatsLocked() (duration, restarts int64, status roachpb.TransactionStatus) {
duration = tc.clock.PhysicalNow() - tc.mu.firstUpdateNanos
restarts = int64(tc.mu.meta.Txn.Epoch)
status = tc.mu.meta.Txn.Status
return duration, restarts, status
}
// heartbeatLoop periodically sends a HeartbeatTxn RPC to an extant transaction,
// stopping in the event the transaction is aborted or committed after
// attempting to resolve the intents. When the heartbeat stops, the transaction
// stats are updated based on its final disposition.
//
// TODO(dan): The Context we use for this is currently the one from the first
// request in a Txn, but the semantics of this aren't good. Each context has its
// own associated lifetime and we're ignoring all but the first. It happens now
// that we pass the same one in every request, but it's brittle to rely on this
// forever.
// TODO(wiz): Update (*DBServer).Batch to not use context.TODO().
func (tc *TxnCoordSender) heartbeatLoop(ctx context.Context) {
var tickChan <-chan time.Time
{
ticker := time.NewTicker(tc.heartbeatInterval)
tickChan = ticker.C
defer ticker.Stop()
}
// TODO(tschottdorf): this should join to the trace of the request
// which starts this goroutine.
sp := tc.AmbientContext.Tracer.StartSpan(opHeartbeatLoop)
defer sp.Finish()
ctx = opentracing.ContextWithSpan(ctx, sp)
defer func() {
tc.mu.Lock()
if tc.mu.txnEnd != nil {
close(tc.mu.txnEnd)
tc.mu.txnEnd = nil
}
duration, restarts, status := tc.finalTxnStatsLocked()
tc.mu.Unlock()
tc.updateStats(duration, restarts, status, false)
}()
var closer <-chan struct{}
{
tc.mu.Lock()
closer = tc.mu.txnEnd
tc.mu.Unlock()
if closer == nil {
return
}
}
// Loop with ticker for periodic heartbeats.
for {
select {
case <-tickChan:
if !tc.heartbeat(ctx) {
return
}
case <-closer:
// Transaction finished normally.
return
case <-ctx.Done():
// Note that if ctx is not cancelable, then ctx.Done() returns a nil
// channel, which blocks forever. In this case, the heartbeat loop is
// responsible for timing out transactions. If ctx.Done() is not nil, then
// then heartbeat loop ignores the timeout check and this case is
// responsible for client timeouts.
log.VEventf(ctx, 2, "transaction heartbeat stopped: %s", ctx.Err())
tc.tryAsyncAbort(ctx)
return
case <-tc.stopper.ShouldQuiesce():
return
}
}
}
// tryAsyncAbort (synchronously) grabs a copy of the txn proto and the
// intents (which it then clears from meta), and asynchronously tries
// to abort the transaction.
func (tc *TxnCoordSender) tryAsyncAbort(ctx context.Context) {
tc.mu.Lock()
defer tc.mu.Unlock()
// Clone the intents and the txn to avoid data races.
intentSpans, _ := roachpb.MergeSpans(append([]roachpb.Span(nil), tc.mu.meta.Intents...))
tc.cleanupTxnLocked(ctx, aborted)
txn := tc.mu.meta.Txn.Clone()
// Since we don't hold the lock continuously, it's possible that two aborts
// raced here. That's fine (and probably better than the alternative, which