-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
lock_table.go
3823 lines (3535 loc) · 142 KB
/
lock_table.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package concurrency
import (
"container/list"
"fmt"
"sort"
"sync"
"sync/atomic"
"time"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
)
// Default upper bound on the number of locks in a lockTable.
const defaultLockTableSize = 10000
// The kind of waiting that the request is subject to.
type waitKind int
const (
_ waitKind = iota
// waitFor indicates that the request is waiting on another transaction to
// release its locks or complete its own request. waitingStates with this
// waitKind will provide information on who the request is waiting on. The
// request will likely want to eventually push the conflicting transaction.
waitFor
// waitForDistinguished is a sub-case of waitFor. It implies everything that
// waitFor does and additionally indicates that the request is currently the
// "distinguished waiter". A distinguished waiter is responsible for taking
// extra actions, e.g. immediately pushing the transaction it is waiting
// for. If there are multiple requests in the waitFor state waiting on the
// same transaction, at least one will be a distinguished waiter.
waitForDistinguished
// waitElsewhere is used when the lockTable is under memory pressure and is
// clearing its internal queue state. Like the waitFor* states, it informs
// the request who it is waiting for so that deadlock detection works.
// However, sequencing information inside the lockTable is mostly discarded.
waitElsewhere
// waitSelf indicates that a different request from the same transaction has
// claimed the lock already. This request should sit tight and wait for a new
// notification without pushing anyone.
//
// By definition, the lock cannot be held at this point -- if it were, another
// request from the same transaction would not be in the lock's wait-queues,
// obviating the need for this state.
//
// TODO(arul): this waitSelf state + claimantTxn stuff won't extend well to
// multiple lock holders. See TODO in informActiveWaiters.
waitSelf
// waitQueueMaxLengthExceeded indicates that the request attempted to enter a
// lock wait-queue as a writer and found that the queue's length was already
// equal to or exceeding the request's configured maximum. As a result, the
// request was rejected.
waitQueueMaxLengthExceeded
// doneWaiting indicates that the request is done waiting on this pass
// through the lockTable and should make another call to ScanAndEnqueue.
doneWaiting
)
// The current waiting state of the request.
//
// See the detailed comment about "Waiting logic" on lockTableGuardImpl.
type waitingState struct {
kind waitKind
// Fields below are populated for waitFor* and waitElsewhere kinds.
// Represents who the request is waiting for. The conflicting
// transaction may be a lock holder of a conflicting lock or a
// conflicting request being sequenced through the same lockTable.
txn *enginepb.TxnMeta // always non-nil in waitFor{,Distinguished,Self} and waitElsewhere
key roachpb.Key // the key of the conflict
held bool // is the conflict a held lock?
queuedWriters int // how many writers are waiting?
queuedReaders int // how many readers are waiting?
// Represents the lock strength of the action that the request was trying to
// perform when it hit the conflict. E.g. was it trying to perform a (possibly
// locking) read or write an Intent?
guardStrength lock.Strength
}
// String implements the fmt.Stringer interface.
func (s waitingState) String() string {
return redact.StringWithoutMarkers(s)
}
// SafeFormat implements the redact.SafeFormatter interface.
func (s waitingState) SafeFormat(w redact.SafePrinter, _ rune) {
switch s.kind {
case waitFor, waitForDistinguished:
distinguished := redact.SafeString("")
if s.kind == waitForDistinguished {
distinguished = " (distinguished)"
}
target := redact.SafeString("holding lock")
if !s.held {
target = "running request"
}
w.Printf("wait for%s txn %s %s @ key %s (queuedWriters: %d, queuedReaders: %d)",
distinguished, s.txn.Short(), target, s.key, s.queuedWriters, s.queuedReaders)
case waitSelf:
w.Printf("wait self @ key %s", s.key)
case waitElsewhere:
if !s.held {
w.SafeString("wait elsewhere by proceeding to evaluation")
}
w.Printf("wait elsewhere for txn %s @ key %s", s.txn.Short(), s.key)
case waitQueueMaxLengthExceeded:
w.Printf("wait-queue maximum length exceeded @ key %s with length %d",
s.key, s.queuedWriters)
case doneWaiting:
w.SafeString("done waiting")
default:
panic("unhandled waitingState.kind")
}
}
// Implementation
// TODO(sbhola):
// - metrics about lockTable state to export to observability debug pages:
// number of locks, number of waiting requests, wait time?, ...
// The btree for a particular key.
type treeMu struct {
mu syncutil.RWMutex // Protects everything in this struct.
// For assigning sequence numbers to the lockState objects as required by
// the util/interval/generic type contract.
lockIDSeqNum uint64
// Container for lockState structs. Locks that are not held or reserved and
// have no waiting requests are garbage collected. Additionally, locks that
// are only held with Replicated durability and have no waiting requests may
// also be garbage collected since their state can be recovered from
// persistent storage.
btree
// For constraining memory consumption. We need better memory accounting
// than this.
// TODO(nvanbenschoten): use an atomic.Int64.
numLocks int64
// For dampening the frequency with which we enforce lockTableImpl.maxLocks.
lockAddMaxLocksCheckInterval uint64
}
// lockTableImpl is an implementation of lockTable.
//
// Concurrency: in addition to holding latches, we require for a particular
// request ScanAndEnqueue() and CurState() must be called by the same
// thread.
//
// Mutex ordering: lockTableImpl.enabledMu
//
// > treeMu.mu
// > lockState.mu
// > lockTableGuardImpl.mu
type lockTableImpl struct {
// The ID of the range to which this replica's lock table belongs.
// Used to populate results when querying the lock table.
rID roachpb.RangeID
// Is the lockTable enabled? When enabled, the lockTable tracks locks and
// allows requests to queue in wait-queues on these locks. When disabled,
// no locks or wait-queues are maintained.
//
// enabledMu is held in read-mode when determining whether the lockTable
// is enabled and when acting on that information (e.g. adding new locks).
// It is held in write-mode when enabling or disabling the lockTable.
//
// enabledSeq holds the lease sequence for which the lockTable is enabled
// under. Discovered locks from prior lease sequences are ignored, as they
// may no longer be accurate.
enabled bool
enabledMu syncutil.RWMutex
enabledSeq roachpb.LeaseSequence
// A sequence number is assigned to each request seen by the lockTable. This
// is to preserve fairness despite the design choice of allowing
// out-of-order evaluation of requests with overlapping spans where the
// latter request does not encounter contention. This out-of-order
// evaluation happens because requests do not reserve spans that are
// uncontended while they wait for on contended locks after releasing their
// latches. Consider the following examples:
//
// Example 1:
// - req1 wants to write to A, B
// - req2 wants to write to B
// - lock at A is held by some other txn.
// - Even though req2 arrives later, req1 will wait only in the queue for A
// and allow req2 to proceed to evaluation.
//
// Example 2:
// - Same as example 1 but lock at A is held by txn3 and lock at B is held
// by txn4.
// - Lock at A is released so req1 claims the lock at A and starts waiting at
// B.
// - It is unfair for req1 to wait behind req2 at B. The sequence number
// assigned to req1 and req2 will restore the fairness by making req1
// wait before req2.
//
// Example 3: Deadlock in lock table if it did not use sequence numbers.
// - Lock at B is acquired by txn0.
// - req1 (from txn1) arrives at lockTable and wants to write to A and B.
// It queues at B.
// - req2 (from txn2) arrives at lockTable and only wants to write A.
// It proceeds to evaluation and acquires the lock at A for txn2 and then
// the request is done. The lock is still held.
// - req3 (from txn3) wants to write to A and B. It queues at A.
// - txn2 releases A. req3 is in the front of the queue at A so it claims the
// lock and starts waiting at B behind req1.
// - txn0 releases B. req1 gets to claim the lock at B and does another scan
// and adds itself to the queue at A, behind req3 which holds the claim for
// A.
// Now in the queues for A and B req1 is behind req3 and vice versa and
// this deadlock has been created entirely due to the lock table's behavior.
// TODO(nvanbenschoten): use an atomic.Uint64.
seqNum uint64
// locks contains the btree object (wrapped in the treeMu structure) that
// contains the actual lockState objects. These lockState objects represent
// the individual locks in the lock table. Locks on both Global and Local keys
// are stored in the same btree.
locks treeMu
// maxLocks is a soft maximum on number of locks. When it is exceeded, and
// subject to the dampening in lockAddMaxLocksCheckInterval, locks will be
// cleared.
maxLocks int64
// When maxLocks is exceeded, will attempt to clear down to minLocks,
// instead of clearing everything.
minLocks int64
// txnStatusCache is a small LRU cache that tracks the status of
// transactions that have been successfully pushed.
//
// NOTE: it probably makes sense to maintain a single txnStatusCache
// across all Ranges on a Store instead of an individual cache per
// Range. For now, we don't do this because we don't share any state
// between separate concurrency.Manager instances.
txnStatusCache txnStatusCache
// clock is used to track the lock hold and lock wait start times.
clock *hlc.Clock
// settings provides a handle to cluster settings.
settings *cluster.Settings
}
var _ lockTable = &lockTableImpl{}
func newLockTable(
maxLocks int64, rangeID roachpb.RangeID, clock *hlc.Clock, settings *cluster.Settings,
) *lockTableImpl {
lt := &lockTableImpl{
rID: rangeID,
clock: clock,
settings: settings,
}
lt.setMaxLocks(maxLocks)
return lt
}
func (t *lockTableImpl) setMaxLocks(maxLocks int64) {
// Check at 5% intervals of the max count.
lockAddMaxLocksCheckInterval := maxLocks / int64(20)
if lockAddMaxLocksCheckInterval == 0 {
lockAddMaxLocksCheckInterval = 1
}
t.maxLocks = maxLocks
t.minLocks = maxLocks / 2
t.locks.lockAddMaxLocksCheckInterval = uint64(lockAddMaxLocksCheckInterval)
}
// lockTableGuardImpl is an implementation of lockTableGuard.
//
// The struct is a guard that is returned to the request the first time it calls
// lockTable.ScanAndEnqueue() and used in later calls to ScanAndEnqueue() and
// done(). After a call to ScanAndEnqueue() (which is made while holding
// latches), the caller must first call lockTableGuard.StartWaiting() and if it
// returns true release the latches and continue interacting with the
// lockTableGuard. If StartWaiting() returns false, the request can proceed to
// evaluation.
//
// Waiting logic: The interface hides the queues that the request is waiting on,
// and the request's position in the queue. One of the reasons for this hiding
// is that queues are not FIFO since a request that did not wait on a queue for
// key k in a preceding call to ScanAndEnqueue() (because k was not locked and
// there was no queue) may need to wait on the queue in a later call to
// ScanAndEnqueue(). So sequencing of requests arriving at the lockTable is
// partially decided by a sequence number assigned to a request when it first
// called ScanAndEnqueue() and queues are ordered by this sequence number.
// However the sequencing is not fully described by the sequence numbers -- a
// request R1 encountering contention over some keys in its span does not
// prevent a request R2 that has a higher sequence number and overlapping span
// to proceed if R2 does not encounter contention. This concurrency (that is not
// completely fair) is deemed desirable.
//
// The interface exposes an abstracted version of the waiting logic in a way
// that the request that starts waiting is considered waiting for at most one
// other request or transaction. This is exposed as a series of state
// transitions where the transitions are notified via newState() and the current
// state can be read using CurState().
//
// - The waitFor* states provide information on who the request is waiting for.
// The waitForDistinguished state is a sub-case -- a distinguished waiter is
// responsible for taking extra actions e.g. immediately pushing the transaction
// it is waiting for. The implementation ensures that if there are multiple
// requests in waitFor state waiting on the same transaction at least one will
// be a distinguished waiter.
//
// TODO(sbhola): investigate removing the waitForDistinguished state which
// will simplify the code here. All waitFor requests would wait (currently
// 50ms) before pushing the transaction (for deadlock detection) they are
// waiting on, say T. Typically T will be done before 50ms which is considered
// ok: the one exception we will need to make is if T has the min priority or
// the waiting transaction has max priority -- in both cases it will push
// immediately. The bad case is if T is ABORTED: the push will succeed after,
// and if T left N intents, each push would wait for 50ms, incurring a latency
// of 50*N ms. A cache of recently encountered ABORTED transactions on each
// Store should mitigate this latency increase. Whenever a transaction sees a
// waitFor state, it will consult this cache and if T is found, push
// immediately (if there isn't already a push in-flight) -- even if T is not
// initially in the cache, the first push will place it in the cache, so the
// maximum latency increase is 50ms.
//
// - The waitElsewhere state is a rare state that is used when the lockTable is
// under memory pressure and is clearing its internal queue state. Like the
// waitFor* states, it informs the request who it is waiting for so that
// deadlock detection works. However, sequencing information inside the
// lockTable is mostly discarded.
//
// - The waitSelf state is a rare state when a different request from the same
// transaction has claimed the lock. See the comment about the concept of
// claiming a lock on claimantTxn().
//
// - The waitQueueMaxLengthExceeded state is used to indicate that the request
// was rejected because it attempted to enter a lock wait-queue as a writer
// and found that the queue's length was already equal to or exceeding the
// request's configured maximum.
//
// - The doneWaiting state is used to indicate that the request should make
// another call to ScanAndEnqueue() (that next call is more likely to return a
// lockTableGuard that returns false from StartWaiting()).
type lockTableGuardImpl struct {
seqNum uint64
lt *lockTableImpl
// Information about this request.
txn *roachpb.Transaction
ts hlc.Timestamp
spans *lockspanset.LockSpanSet
waitPolicy lock.WaitPolicy
maxWaitQueueLength int
// Snapshot of the tree for which this request has some spans. Note that
// the lockStates in this snapshot may have been removed from
// lockTableImpl. Additionally, it is possible that there is a new lockState
// for the same key. This can result in various harmless anomalies:
// - the request may hold a claim on a lockState that is no longer
// in the tree. When it next does a scan, it will either find a new
// lockState where it will compete or none. Both lockStates can be in
// the mu.locks map, which is harmless.
// - the request may wait behind a transaction that has claimed a lock but is
// yet to acquire it. This could cause a delay in pushing the lock holder.
// This is not a correctness issue (the whole system is not deadlocked) and we
// expect will not be a real performance issue.
//
// TODO(sbhola): experimentally evaluate the lazy queueing of the current
// implementation, in comparison with eager queueing. If eager queueing
// is comparable in system throughput, one can eliminate the above anomalies.
//
// TODO(nvanbenschoten): should we be Reset-ing these btree snapshot when we
// Dequeue a lockTableGuardImpl? In releaseLockTableGuardImpl?
//
tableSnapshot btree
// notRemovableLock points to the lock for which this guard has incremented
// lockState.notRemovable. It will be set to nil when this guard has decremented
// lockState.notRemovable. Note that:
// - notRemovableLock may no longer be the btree in lockTableImpl since it may
// have been removed due to the lock being released. This is harmless since
// the change in lock state for that lock's key (even if it has meanwhile been
// reacquired by a different request) means forward progress for this request,
// which guarantees liveness for this request.
// - Multiple guards can have marked the same lock as notRemovable, which is
// why lockState.notRemovable behaves like a reference count.
notRemovableLock *lockState
// A request whose startWait is set to true in ScanAndEnqueue is actively
// waiting at a particular key. This is the first key encountered when
// iterating through spans that it needs to wait at. A future event (lock
// release etc.) may cause the request to no longer need to wait at this
// key. It then needs to continue iterating through spans to find the next
// key to wait at (we don't want to wastefully start at the beginning since
// this request probably has a claim at the contended keys there): str, index,
// and key collectively track the current position to allow it to continue
// iterating.
// The key for the lockState.
key roachpb.Key
// The key for the lockState is contained in the Span specified by
// spans[str][index].
str lock.Strength // Iterates from strongest to weakest lock strength
index int
mu struct {
syncutil.Mutex
startWait bool
// curLockWaitStart represents the timestamp when the request started waiting
// on the current lock. Multiple consecutive waitingStates might refer to
// the same lock, in which case the curLockWaitStart is not updated in between
// them.
curLockWaitStart time.Time
state waitingState
signal chan struct{}
// locks for which this request is in the list of queued{Readers,Writers}.
// For writers, this includes both active and inactive waiters. For readers,
// there's no such thing as inactive readers, so by definition the request
// must be an active waiter.
//
// TODO(sbhola): investigate whether the logic to maintain this locks map
// can be simplified so it doesn't need to be adjusted by various lockState
// methods. It adds additional bookkeeping burden that means it is more
// prone to inconsistencies. There are two main uses: (a) removing from
// various lockStates when requestDone() is called, (b) tryActiveWait() uses
// it as an optimization to know that this request is not known to the
// lockState. (b) can be handled by other means -- the first scan the
// request won't be in the lockState and the second scan it likely will. (a)
// doesn't necessarily require this map to be consistent -- the request
// could track the places where it is has enqueued as places where it could
// be present and then do the search.
locks map[*lockState]struct{}
// mustComputeWaitingState is set in context of the state change channel
// being signaled. It denotes whether the signaler has already computed the
// guard's next waiting state or not.
//
// If set to true, a call to CurState() must compute the state from scratch,
// by resuming its scan. In such cases, the signaler has deferred the
// computation work on to the callers, which is proportional to the number
// of waiters.
//
// If set to false, the signaler has already computed this request's next
// waiting state. As such, a call to CurState() can simply return the state
// without doing any extra work.
mustComputeWaitingState bool
}
// Locks to resolve before scanning again. Doesn't need to be protected by
// mu since should only be read after the caller has already synced with mu
// in realizing that it is doneWaiting.
//
// toResolve should only include replicated locks; for unreplicated locks,
// toResolveUnreplicated is used instead.
toResolve []roachpb.LockUpdate
// toResolveUnreplicated is a list of locks (only held with durability
// unreplicated) that are known to belong to finalized transactions. Such
// locks may be cleared from the lock table (and some requests queueing in the
// lock's wait queue may be able to proceed). If set, the request should
// perform these actions on behalf of the lock table, either before proceeding
// to evaluation, or before waiting on a conflicting lock.
//
// TODO(arul): We need to push the responsibility of doing so on to a request
// because TransactionIsFinalized does not take proactive action. If we
// addressed the TODO in TransactionIsFinalized, and taught it to take action
// on locks belonging to finalized transactions, we wouldn't need to bother
// scanning requests.
toResolveUnreplicated []roachpb.LockUpdate
}
var _ lockTableGuard = &lockTableGuardImpl{}
// Used to avoid allocations.
var lockTableGuardImplPool = sync.Pool{
New: func() interface{} {
g := new(lockTableGuardImpl)
g.mu.signal = make(chan struct{}, 1)
g.mu.locks = make(map[*lockState]struct{})
return g
},
}
// newLockTableGuardImpl returns a new lockTableGuardImpl. The struct will
// contain pre-allocated mu.signal and mu.locks fields, so it shouldn't be
// overwritten blindly.
func newLockTableGuardImpl() *lockTableGuardImpl {
return lockTableGuardImplPool.Get().(*lockTableGuardImpl)
}
// releaseLockTableGuardImpl releases the guard back into the object pool.
func releaseLockTableGuardImpl(g *lockTableGuardImpl) {
// Preserve the signal channel and locks map fields in the pooled
// object. Drain the signal channel and assert that the map is empty.
// The map should have been cleared by lockState.requestDone.
signal, locks := g.mu.signal, g.mu.locks
select {
case <-signal:
default:
}
if len(locks) != 0 {
panic("lockTableGuardImpl.mu.locks not empty after Dequeue")
}
*g = lockTableGuardImpl{}
g.mu.signal = signal
g.mu.locks = locks
lockTableGuardImplPool.Put(g)
}
func (g *lockTableGuardImpl) ShouldWait() bool {
g.mu.Lock()
defer g.mu.Unlock()
// The request needs to drop latches and wait if:
// 1. The lock table indicated as such (e.g. the request ran into a
// conflicting lock).
// 2. OR the request successfully performed its scan but discovered replicated
// locks that need to be resolved before it can evaluate.
return g.mu.startWait || len(g.toResolve) > 0
}
func (g *lockTableGuardImpl) ResolveBeforeScanning() []roachpb.LockUpdate {
return g.toResolve
}
func (g *lockTableGuardImpl) NewStateChan() chan struct{} {
g.mu.Lock()
defer g.mu.Unlock()
return g.mu.signal
}
func (g *lockTableGuardImpl) CurState() waitingState {
g.mu.Lock()
defer g.mu.Unlock()
if !g.mu.mustComputeWaitingState {
return g.mu.state
}
// Not actively waiting anywhere so no one else can set
// mustComputeWaitingState to true while this method executes.
g.mu.mustComputeWaitingState = false
g.mu.Unlock()
g.resumeScan(false /* notify */)
g.mu.Lock() // Unlock deferred
return g.mu.state
}
// updateStateToDoneWaitingLocked updates the request's waiting state to
// indicate that it is done waiting.
// REQUIRES: g.mu to be locked.
func (g *lockTableGuardImpl) updateStateToDoneWaitingLocked() {
g.mu.state = waitingState{kind: doneWaiting}
}
// startWaitingWithWaitingState modifies state on the request's guard to let it
// start waiting.
func (g *lockTableGuardImpl) startWaitingWithWaitingState(ws waitingState, notify bool) {
g.key = ws.key
g.mu.Lock()
defer g.mu.Unlock()
g.mu.startWait = true
g.mu.curLockWaitStart = g.lt.clock.PhysicalTime()
g.maybeUpdateWaitingStateLocked(ws, notify)
}
// maybeUpdateWaitingStateLocked updates the request's waiting state if the
// supplied state is meaningfully different[1]. The request's state change
// channel is signaled if the waiting state is updated and the caller has
// dictated such. Eliding updates, and more importantly notifications to the
// state change channel, avoids needlessly nudging a waiting request.
//
// [1] The state is not updated if the lock table waiter does not need to take
// action as a result of the update. In practice, this means updates to
// observability related fields are elided. See updateWaitingStateLocked if this
// behavior is undesirable.
//
// REQUIRES: g.mu to be locked.
func (g *lockTableGuardImpl) maybeUpdateWaitingStateLocked(newState waitingState, notify bool) {
if g.canElideWaitingStateUpdate(newState) {
return // the update isn't meaningful; early return
}
g.updateWaitingStateLocked(newState)
if notify {
g.notify()
}
}
// updateWaitingStateLocked updates the request's waiting state to indicate
// to the one supplied. The supplied waiting state must imply the request is
// still waiting. Typically, this function is called for the first time when
// the request discovers a conflict while scanning the lock table.
//
// REQUIRES: g.mu to be locked.
func (g *lockTableGuardImpl) updateWaitingStateLocked(newState waitingState) {
if newState.kind == doneWaiting {
panic(errors.AssertionFailedf("unexpected waiting state kind: %d", newState.kind))
}
newState.guardStrength = g.curStrength() // copy over the strength which caused the conflict
g.mu.state = newState
}
// canElideWaitingStateUpdate returns true if updating the guard's waiting state
// to the supplied waitingState would not cause the waiter to take a different
// action, such as proceeding with its scan or pushing a different transaction.
// Notably, observability related updates are considered fair game for elision.
//
// REQUIRES: g.mu to be locked.
func (g *lockTableGuardImpl) canElideWaitingStateUpdate(newState waitingState) bool {
// Note that we don't need to check newState.guardStrength as it's
// automatically assigned when updating the state.
return g.mu.state.kind == newState.kind && g.mu.state.txn == newState.txn &&
g.mu.state.key.Equal(newState.key) && g.mu.state.held == newState.held
}
func (g *lockTableGuardImpl) CheckOptimisticNoConflicts(
lockSpanSet *lockspanset.LockSpanSet,
) (ok bool) {
if g.waitPolicy == lock.WaitPolicy_SkipLocked {
// If the request is using a SkipLocked wait policy, lock conflicts are
// handled during evaluation.
return true
}
// Temporarily replace the LockSpanSet in the guard.
originalSpanSet := g.spans
g.spans = lockSpanSet
g.str = lock.MaxStrength
g.index = -1
defer func() {
g.spans = originalSpanSet
}()
span := stepToNextSpan(g)
for span != nil {
startKey := span.Key
iter := g.tableSnapshot.MakeIter()
ltRange := &lockState{key: startKey, endKey: span.EndKey}
for iter.FirstOverlap(ltRange); iter.Valid(); iter.NextOverlap(ltRange) {
l := iter.Cur()
if !l.isNonConflictingLock(g) {
return false
}
}
span = stepToNextSpan(g)
}
return true
}
func (g *lockTableGuardImpl) IsKeyLockedByConflictingTxn(
key roachpb.Key, strength lock.Strength,
) (bool, *enginepb.TxnMeta) {
iter := g.tableSnapshot.MakeIter()
iter.SeekGE(&lockState{key: key})
if !iter.Valid() || !iter.Cur().key.Equal(key) {
// No lock on key.
return false, nil
}
l := iter.Cur()
l.mu.Lock()
defer l.mu.Unlock()
if l.isEmptyLock() {
// The lock is empty but has not yet been deleted.
return false, nil
}
if l.alreadyHoldsLockAndIsAllowedToProceed(g, strength) {
// If another request from this transaction has already locked this key with
// sufficient locking strength then there's no conflict; we can proceed.
return false, nil
}
if l.isHeld() {
lockHolderTxn, _ := l.getLockHolder()
if !g.isSameTxn(lockHolderTxn) &&
lock.Conflicts(l.getLockMode(), makeLockMode(strength, g.txn, g.ts), &g.lt.settings.SV) {
return true, l.holder.txn // they key is locked by some other transaction; return the holder
}
}
// There's no conflict with the lock holder itself. However, there may be
// other locking requests that came before us that we may conflict with. This
// ensures fairness by preventing a stream of locking[1] SKIP LOCKED requests
// from starving out regular locking requests.
if strength == lock.None { // [1] we only need to do this checking for locking requests
// TODO(arul): Is there a hazard for a stream of non-locking requests to
// starve out a writer, by perpetually bumping the timestamp cache from
// underneath it?
return false, nil
}
for e := l.queuedWriters.Front(); e != nil; e = e.Next() {
qqg := e.Value.(*queuedGuard)
if qqg.guard.seqNum > g.seqNum {
// We only need to check for conflicts with requests that came before us
// (read: have lower sequence numbers than us).
break
}
if qqg.guard.txn.ID == g.txn.ID {
panic(errors.AssertionFailedf(
"SKIP LOCKED request should not find another waiting request from the same transaction",
))
}
if lock.Conflicts(qqg.mode, makeLockMode(strength, g.txn, g.ts), &g.lt.settings.SV) {
return true, nil // the conflict isn't with a lock holder, nil is returned
}
}
return false, nil // no conflict
}
func (g *lockTableGuardImpl) notify() {
select {
case g.mu.signal <- struct{}{}:
default:
}
}
// doneActivelyWaitingAtLock is called when a request, that was previously
// actively waiting at a lock, is no longer doing so. It may have transitioned
// to become an inactive waiter or removed from the lock's wait queues entirely
// -- either way, it should find the next lock (if any) to wait at.
//
// REQUIRES: g.mu to be locked.
func (g *lockTableGuardImpl) doneActivelyWaitingAtLock() {
g.mu.mustComputeWaitingState = true
g.notify()
}
func (g *lockTableGuardImpl) txnMeta() *enginepb.TxnMeta {
if g.txn == nil {
return nil
}
return &g.txn.TxnMeta
}
func (g *lockTableGuardImpl) hasUncertaintyInterval() bool {
return g.txn != nil && g.txn.ReadTimestamp.Less(g.txn.GlobalUncertaintyLimit)
}
func (g *lockTableGuardImpl) isSameTxn(txn *enginepb.TxnMeta) bool {
return g.txn != nil && g.txn.ID == txn.ID
}
// curStrength returns the lock strength of the current lock span being scanned
// by the request. Lock spans declared by a request are iterated from strongest
// to weakest, and the return value of this method is mutable as the request's
// scan progresses from lock to lock.
func (g *lockTableGuardImpl) curStrength() lock.Strength {
return g.str
}
// getCurLockMode returns the lock mode of the current lock being scanned by the
// request. The value returned by this method are mutable as the request's scan
// of the lock table progresses from lock to lock.
func (g *lockTableGuardImpl) curLockMode() lock.Mode {
return makeLockMode(g.curStrength(), g.txn, g.ts)
}
// makeLockMode constructs and returns a lock mode.
func makeLockMode(str lock.Strength, txn *roachpb.Transaction, ts hlc.Timestamp) lock.Mode {
switch str {
case lock.None:
iso := isolation.Serializable
if txn != nil {
iso = txn.IsoLevel
}
return lock.MakeModeNone(ts, iso)
case lock.Shared:
assert(txn != nil, "only transactional requests can acquire shared locks")
return lock.MakeModeShared()
case lock.Exclusive:
assert(txn != nil, "only transactional requests can acquire exclusive locks")
return lock.MakeModeExclusive(ts, txn.IsoLevel)
case lock.Intent:
return lock.MakeModeIntent(ts)
default:
panic(fmt.Sprintf("unhandled request strength: %s", str))
}
}
// takeToResolveUnreplicated returns the list of unreplicated locks accumulated
// by the guard for resolution. Ownership, and responsibility to resolve these
// locks, is passed to the caller.
func (g *lockTableGuardImpl) takeToResolveUnreplicated() []roachpb.LockUpdate {
toResolveUnreplicated := g.toResolveUnreplicated
g.toResolveUnreplicated = nil
return toResolveUnreplicated
}
// resumeScan resumes the request's (receiver's) scan of the lock table. The
// scan continues until either all overlapping locks in the lock table have been
// considered and no conflict is found, or until the request encounters a lock
// that it conflicts with. Either way, the receiver's state is mutated such that
// a call to ShouldWait will reflect the termination condition. The same applies
// to the receiver's waitingState; however, if the waitingState does change,
// the state change channel will only be signaled if notify is supplied as true.
//
// Note that the details about scan mechanics are captured on the receiver --
// information such as what lock spans to scan, where to begin the scan from
// etc.
//
// ACQUIRES: g.mu.
func (g *lockTableGuardImpl) resumeScan(notify bool) {
spans := g.spans.GetSpans(g.curStrength())
var span *roachpb.Span
resumingInSameSpan := false
if g.index == -1 || len(spans[g.index].EndKey) == 0 {
span = stepToNextSpan(g)
} else {
span = &spans[g.index]
resumingInSameSpan = true
}
defer func() {
// Eagerly update any unreplicated locks that are known to belong to
// finalized transactions. We do so regardless of whether this request can
// proceed to evaluation or needs to wait at some conflicting lock.
//
// Note that replicated locks are handled differently, using the g.toResolve
// slice. Additionally, they're only resolved when a request is done
// waiting and can proceed to evaluation.
if toResolveUnreplicated := g.takeToResolveUnreplicated(); len(toResolveUnreplicated) > 0 {
for i := range toResolveUnreplicated {
g.lt.updateLockInternal(&toResolveUnreplicated[i])
}
}
}()
for span != nil {
startKey := span.Key
if resumingInSameSpan {
startKey = g.key
}
iter := g.tableSnapshot.MakeIter()
// From here on, the use of resumingInSameSpan is just a performance
// optimization to deal with the interface limitation of btree that
// prevents us from specifying an exclusive start key. We need to check
// that the lock is not the same as our exclusive start key and only need
// to do that check once -- for the first lock.
ltRange := &lockState{key: startKey, endKey: span.EndKey}
for iter.FirstOverlap(ltRange); iter.Valid(); iter.NextOverlap(ltRange) {
l := iter.Cur()
if resumingInSameSpan {
resumingInSameSpan = false
if l.key.Equal(startKey) {
// This lock is where it stopped waiting.
continue
}
// Else, past the lock where it stopped waiting. We may not
// encounter that lock since it may have been garbage collected.
}
conflicts := l.scanAndMaybeEnqueue(g, notify)
if conflicts {
return
}
}
resumingInSameSpan = false
span = stepToNextSpan(g)
}
if len(g.toResolve) > 0 {
j := 0
// Some of the locks in g.toResolve may already have been claimed by
// another concurrent request and removed, or intent resolution could have
// happened while this request was waiting (after releasing latches). So
// we iterate over all the elements of toResolve and only keep the ones
// where removing the lock via the call to updateLockInternal is not a
// noop.
//
// For pushed transactions that are not finalized, we disable this
// deduplication and allow all resolution attempts to adjust the lock's
// timestamp to go through. This is because updating the lock ahead of
// resolution risks rediscovery loops where the lock is continually
// rediscovered at a lower timestamp than that in the lock table.
for i := range g.toResolve {
var doResolve bool
if g.toResolve[i].Status.IsFinalized() {
doResolve = g.lt.updateLockInternal(&g.toResolve[i])
} else {
doResolve = true
}
if doResolve {
g.toResolve[j] = g.toResolve[i]
j++
}
}
g.toResolve = g.toResolve[:j]
}
g.mu.Lock()
defer g.mu.Unlock()
g.updateStateToDoneWaitingLocked()
if notify {
g.notify()
}
}
// Waiting writers in a lockState are wrapped in a queuedGuard. A waiting
// writer is typically waiting in an active state, i.e., the
// lockTableGuardImpl.key refers to this lockState. However, there are
// multiple reasons that can cause a writer to be an inactive waiter:
// - The first transactional writer is able to claim a lock when it is
// released. Doing so entails the writer being marked inactive.
// - It is able to claim a lock that was previously claimed by a request with
// a higher sequence number. In such cases, the writer adds itself to the
// head of the queue as an inactive waiter and proceeds with its scan.
// - A discovered lock causes the discoverer to become an inactive waiter
// (until it scans again).
// - A lock held by a finalized txn causes the first waiter to be an inactive
// waiter.
//
// The first two cases above (claiming an unheld lock) only occur for
// transactional requests, but the other cases can happen for both transactional
// and non-transactional requests.
type queuedGuard struct {
guard *lockTableGuardImpl
mode lock.Mode // protected by lockState.mu
active bool // protected by lockState.mu
}
// Information about a lock holder for unreplicated locks.
type unreplicatedLockHolderInfo struct {
// strengths tracks whether the lock is held with a particular strength; if it
// is, the lowest sequence number (that hasn't been rolled back) that it was
// acquired with is stored. If the lock isn't held with a particular strength,
// a sentinel value (-1) is stored.
//
// NB: Intents cannot be held/acquired in unreplicated fashion; thus the
// highest lock strength for unreplicated locks is Exclusive.
strengths [len(unreplicatedHolderStrengths)]enginepb.TxnSeq
// The timestamp at which the unreplicated lock is held. Must not regress.
ts hlc.Timestamp
}
// Fixed length slice for all supported lock strengths for unreplicated locks.
// May be used to iterate supported lock strengths in strength order (strongest
// to weakest).
var unreplicatedHolderStrengths = [...]lock.Strength{lock.Exclusive, lock.Shared}
// unreplicatedLockHolderStrengthToIndexMap returns a mapping between
// (strength, index) pairs that can be used to index into the
// unreplicatedLockHolderInfo.strengths array.
//
// Trying to use a lock strength that isn't supported with unreplicated locks to
// index into the unreplicatedLockHolderInfo.strengths array will cause a
// runtime error.
var unreplicatedLockHolderStrengthToIndexMap = func() [lock.MaxStrength + 1]int {
var m [lock.MaxStrength + 1]int
// Initialize all to -1.
for str := range m {
m[str] = -1
}
// Set the indices of the valid strengths.
for i, str := range unreplicatedHolderStrengths {
m[str] = i
}
return m
}()
// init initializes an unreplicatedLockHolderInfo struct.
func (ulh *unreplicatedLockHolderInfo) init() {
ulh.resetStrengths()
}
// clear removes previously tracked unreplicated lock holder information.
func (ulh *unreplicatedLockHolderInfo) clear() {
ulh.resetStrengths()
ulh.ts = hlc.Timestamp{}
}
// epochBumped is called when a transaction is known to have its epoch bumped.