-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
lease.go
1270 lines (1180 loc) · 43.2 KB
/
lease.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright 2015 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
// Package lease provides functionality to create and manage sql schema leases.
package lease
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
kvstorage "github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/logcrash"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil/singleflight"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"github.com/cockroachdb/redact"
)
var errRenewLease = errors.New("renew lease on id")
var errReadOlderVersion = errors.New("read older descriptor version from store")
// LeaseDuration controls the duration of sql descriptor leases.
var LeaseDuration = settings.RegisterDurationSetting(
"sql.catalog.descriptor_lease_duration",
"mean duration of sql descriptor leases, this actual duration is jitterred",
base.DefaultDescriptorLeaseDuration)
func between0and1inclusive(f float64) error {
if f < 0 || f > 1 {
return errors.Errorf("value %f must be between 0 and 1", f)
}
return nil
}
// LeaseJitterFraction controls the percent jitter around sql lease durations
var LeaseJitterFraction = settings.RegisterFloatSetting(
"sql.catalog.descriptor_lease_jitter_fraction",
"mean duration of sql descriptor leases, this actual duration is jitterred",
base.DefaultDescriptorLeaseJitterFraction,
between0and1inclusive)
// WaitForNoVersion returns once there are no unexpired leases left
// for any version of the descriptor.
func (m *Manager) WaitForNoVersion(
ctx context.Context, id descpb.ID, retryOpts retry.Options,
) error {
for lastCount, r := 0, retry.Start(retryOpts); r.Next(); {
// Check to see if there are any leases that still exist on the previous
// version of the descriptor.
now := m.storage.clock.Now()
stmt := fmt.Sprintf(`SELECT count(1) FROM system.public.lease AS OF SYSTEM TIME '%s' WHERE ("descID" = %d AND expiration > $1)`,
now.AsOfSystemTime(),
id)
values, err := m.storage.internalExecutor.QueryRowEx(
ctx, "count-leases", nil, /* txn */
sessiondata.InternalExecutorOverride{User: security.RootUserName()},
stmt, now.GoTime(),
)
if err != nil {
return err
}
if values == nil {
return errors.New("failed to count leases")
}
count := int(tree.MustBeDInt(values[0]))
if count == 0 {
break
}
if count != lastCount {
lastCount = count
log.Infof(ctx, "waiting for %d leases to expire: desc=%d", count, id)
}
}
return nil
}
// WaitForOneVersion returns once there are no unexpired leases on the
// previous version of the descriptor. It returns the descriptor with the
// current version.
// After returning there can only be versions of the descriptor >= to the
// returned version. Lease acquisition (see acquire()) maintains the
// invariant that no new leases for desc.Version-1 will be granted once
// desc.Version exists.
func (m *Manager) WaitForOneVersion(
ctx context.Context, id descpb.ID, retryOpts retry.Options,
) (desc catalog.Descriptor, _ error) {
for lastCount, r := 0, retry.Start(retryOpts); r.Next(); {
if err := m.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
desc, err = catalogkv.MustGetDescriptorByID(ctx, txn, m.Codec(), id)
return err
}); err != nil {
return nil, err
}
// Check to see if there are any leases that still exist on the previous
// version of the descriptor.
now := m.storage.clock.Now()
descs := []IDVersion{NewIDVersionPrev(desc.GetName(), desc.GetID(), desc.GetVersion())}
count, err := CountLeases(ctx, m.storage.internalExecutor, descs, now)
if err != nil {
return nil, err
}
if count == 0 {
break
}
if count != lastCount {
lastCount = count
log.Infof(ctx, "waiting for %d leases to expire: desc=%v", count, descs)
}
}
return desc, nil
}
// IDVersion represents a descriptor ID, version pair that are
// meant to map to a single immutable descriptor.
type IDVersion struct {
// Name is only provided for pretty printing.
Name string
ID descpb.ID
Version descpb.DescriptorVersion
}
// NewIDVersionPrev returns an initialized IDVersion with the
// previous version of the descriptor.
func NewIDVersionPrev(name string, id descpb.ID, currVersion descpb.DescriptorVersion) IDVersion {
return IDVersion{Name: name, ID: id, Version: currVersion - 1}
}
// ensureVersion ensures that the latest version >= minVersion. It will
// check if the latest known version meets the criterion, or attempt to
// acquire a lease at the latest version with the hope that it meets
// the criterion.
func ensureVersion(
ctx context.Context, id descpb.ID, minVersion descpb.DescriptorVersion, m *Manager,
) error {
if s := m.findNewest(id); s != nil && minVersion <= s.GetVersion() {
return nil
}
if err := m.AcquireFreshestFromStore(ctx, id); err != nil {
return err
}
if s := m.findNewest(id); s != nil && s.GetVersion() < minVersion {
return errors.Errorf("version %d for descriptor %s does not exist yet", minVersion, s.GetName())
}
return nil
}
type historicalDescriptor struct {
desc catalog.Descriptor
expiration hlc.Timestamp // ModificationTime of the next descriptor
}
// Retrieves historical descriptors of given id within the lower and upper bound
// timestamp from the MVCC key range. Any descriptor versions that were modified
// after the lower bound timestamp and before the upper bound timestamp will be
// retrieved through an export request.
//
// Note that this does not necessarily retrieve a descriptor version that was
// alive at the lower bound timestamp.
func getDescriptorsFromStoreForInterval(
ctx context.Context,
db *kv.DB,
codec keys.SQLCodec,
id descpb.ID,
lowerBound, upperBound hlc.Timestamp,
) ([]historicalDescriptor, error) {
// Create an export request (1 kv call) for all descriptors for given
// descriptor ID written during the interval [timestamp, endTimestamp].
batchRequestHeader := roachpb.Header{Timestamp: upperBound}
descriptorKey := catalogkeys.MakeDescMetadataKey(codec, id)
requestHeader := roachpb.RequestHeader{
Key: descriptorKey,
EndKey: descriptorKey.PrefixEnd(),
}
req := &roachpb.ExportRequest{
RequestHeader: requestHeader,
StartTime: lowerBound,
MVCCFilter: roachpb.MVCCFilter_All,
ReturnSST: true,
}
// Export request returns descriptors in decreasing modification time.
res, pErr := kv.SendWrappedWith(ctx, db.NonTransactionalSender(), batchRequestHeader, req)
if pErr != nil {
return nil, errors.Wrapf(pErr.GoError(), "error in retrieving descs between %s, %s",
lowerBound, upperBound)
}
// Unmarshal key span retrieved from export request to construct historical descs.
var descriptorsRead []historicalDescriptor
subsequentModificationTime := upperBound
for _, file := range res.(*roachpb.ExportResponse).Files {
if err := func() error {
it, err := kvstorage.NewMemSSTIterator(file.SST, false /* verify */)
if err != nil {
return err
}
defer it.Close()
// Convert each MVCC key value pair corresponding to the specified
// descriptor ID.
for it.SeekGE(kvstorage.NilKey); ; it.Next() {
if ok, err := it.Valid(); err != nil {
return err
} else if !ok {
return nil
}
// Decode key and value of descriptor.
k := it.UnsafeKey()
descContent := it.UnsafeValue()
if descContent == nil {
return errors.Wrapf(errors.New("unsafe value error"), "error "+
"extracting raw bytes of descriptor with key %s modified between "+
"%s, %s", k.String(), k.Timestamp, subsequentModificationTime)
}
// Construct a plain descriptor.
value := roachpb.Value{RawBytes: descContent}
var desc descpb.Descriptor
if err := value.GetProto(&desc); err != nil {
return err
}
descBuilder := catalogkv.NewBuilderWithMVCCTimestamp(&desc, k.Timestamp)
// Construct a historical descriptor with expiration.
histDesc := historicalDescriptor{
desc: descBuilder.BuildImmutable(),
expiration: subsequentModificationTime,
}
descriptorsRead = append(descriptorsRead, histDesc)
// update the expiration time for next iteration.
subsequentModificationTime = k.Timestamp
}
}(); err != nil {
return nil, err
}
}
return descriptorsRead, nil
}
// Read an older descriptor version for the particular timestamp from the store
// with at most 2 KV calls. We unfortunately need to read more than one
// descriptor version just so that we can set the expiration time on the
// descriptor properly.
//
// TODO(vivek): Future work:
// 2. Translate multiple simultaneous calls to this method into a single call
// as is done for acquireNodeLease().
// 3. Figure out a sane policy on when these descriptors should be purged.
// They are currently purged in PurgeOldVersions.
func (m *Manager) readOlderVersionForTimestamp(
ctx context.Context, id descpb.ID, timestamp hlc.Timestamp,
) ([]historicalDescriptor, error) {
// Retrieve the endTimestamp for our query, which will be the modification
// time of the first descriptor in the manager's active set.
t := m.findDescriptorState(id, false /*create*/)
endTimestamp := func() hlc.Timestamp {
t.mu.Lock()
defer t.mu.Unlock()
if len(t.mu.active.data) == 0 {
return hlc.Timestamp{}
}
return t.mu.active.data[0].GetModificationTime()
}()
// Make an export request for descriptors between the start and end
// timestamps, returning descriptors in decreasing modification time order.
//
// In the following scenario v4 is our oldest active lease
// [v1@t1 ][v2@t3 ][v3@t5 ][v4@t7
// [start end]
// getDescriptorsFromStoreForInterval(..., start, end) will get back:
// [v3, v2] (reverse order)
descs, err := getDescriptorsFromStoreForInterval(ctx, m.DB(), m.Codec(), id, timestamp, endTimestamp)
if err != nil {
return nil, err
}
// In the case where the descriptor we're looking for is modified before the
// earliest retrieved timestamp, we get the descriptor before the earliest
// descriptor retrieved from getDescriptorsFromStoreForInterval by making
// another KV call.
earliestModificationTime := descs[len(descs)-1].desc.GetModificationTime()
if timestamp.Less(earliestModificationTime) {
desc, err := m.storage.getForExpiration(ctx, earliestModificationTime, id)
if err != nil {
return nil, err
}
descs = append(descs, historicalDescriptor{
desc: desc,
expiration: earliestModificationTime,
})
}
return descs, nil
}
// Insert descriptor versions. The versions provided are not in
// any particular order.
func (m *Manager) insertDescriptorVersions(id descpb.ID, versions []historicalDescriptor) {
t := m.findDescriptorState(id, false /* create */)
t.mu.Lock()
defer t.mu.Unlock()
for i := range versions {
// Since we gave up the lock while reading the versions from
// the store we have to ensure that no one else inserted the
// same version.
existingVersion := t.mu.active.findVersion(versions[i].desc.GetVersion())
if existingVersion == nil {
t.mu.active.insert(
newDescriptorVersionState(t, versions[i].desc, versions[i].expiration, false))
}
}
}
// AcquireFreshestFromStore acquires a new lease from the store and
// inserts it into the active set. It guarantees that the lease returned is
// the one acquired after the call is made. Use this if the lease we want to
// get needs to see some descriptor updates that we know happened recently.
func (m *Manager) AcquireFreshestFromStore(ctx context.Context, id descpb.ID) error {
// Create descriptorState if needed.
_ = m.findDescriptorState(id, true /* create */)
// We need to acquire a lease on a "fresh" descriptor, meaning that joining
// a potential in-progress lease acquisition is generally not good enough.
// If we are to join an in-progress acquisition, it needs to be an acquisition
// initiated after this point.
// So, we handle two cases:
// 1. The first DoChan() call tells us that we didn't join an in-progress
// acquisition. Great, the lease that's being acquired is good.
// 2. The first DoChan() call tells us that we did join an in-progress acq.
// We have to wait this acquisition out; it's not good for us. But any
// future acquisition is good, so the next time around the loop it doesn't
// matter if we initiate a request or join an in-progress one.
// In both cases, we need to check if the lease we want is still valid because
// lease acquisition is done without holding the descriptorState lock, so anything
// can happen in between lease acquisition and us getting control again.
attemptsMade := 0
for {
// Acquire a fresh lease.
didAcquire, err := acquireNodeLease(ctx, m, id)
if m.testingKnobs.LeaseStoreTestingKnobs.LeaseAcquireResultBlockEvent != nil {
m.testingKnobs.LeaseStoreTestingKnobs.LeaseAcquireResultBlockEvent(AcquireFreshestBlock, id)
}
if err != nil {
return err
}
if didAcquire {
// Case 1: we didn't join an in-progress call and the lease is still
// valid.
break
} else if attemptsMade > 1 {
// Case 2: more than one acquisition has happened and the lease is still
// valid.
break
}
attemptsMade++
}
return nil
}
// If the lease cannot be obtained because the descriptor is in the process of
// being dropped or offline, the error will be of type inactiveTableError.
// The boolean returned is true if this call was actually responsible for the
// lease acquisition.
func acquireNodeLease(ctx context.Context, m *Manager, id descpb.ID) (bool, error) {
var toRelease *storedLease
resultChan, didAcquire := m.storage.group.DoChan(fmt.Sprintf("acquire%d", id), func() (interface{}, error) {
// Note that we use a new `context` here to avoid a situation where a cancellation
// of the first context cancels other callers to the `acquireNodeLease()` method,
// because of its use of `singleflight.Group`. See issue #41780 for how this has
// happened.
newCtx, cancel := m.stopper.WithCancelOnQuiesce(logtags.WithTags(context.Background(), logtags.FromContext(ctx)))
defer cancel()
if m.isDraining() {
return nil, errors.New("cannot acquire lease when draining")
}
newest := m.findNewest(id)
var minExpiration hlc.Timestamp
if newest != nil {
minExpiration = newest.getExpiration()
}
desc, expiration, err := m.storage.acquire(newCtx, minExpiration, id)
if err != nil {
return nil, err
}
t := m.findDescriptorState(id, false /* create */)
t.mu.Lock()
t.mu.takenOffline = false
defer t.mu.Unlock()
var newDescVersionState *descriptorVersionState
newDescVersionState, toRelease, err = t.upsertLeaseLocked(newCtx, desc, expiration)
if err != nil {
return nil, err
}
if newDescVersionState != nil {
m.names.insert(newDescVersionState)
}
if toRelease != nil {
releaseLease(toRelease, m)
}
return true, nil
})
select {
case <-ctx.Done():
return false, ctx.Err()
case result := <-resultChan:
if result.Err != nil {
return false, result.Err
}
}
return didAcquire, nil
}
// releaseLease from store.
func releaseLease(lease *storedLease, m *Manager) {
ctx := context.TODO()
if m.isDraining() {
// Release synchronously to guarantee release before exiting.
m.storage.release(ctx, m.stopper, lease)
return
}
// Release to the store asynchronously, without the descriptorState lock.
if err := m.stopper.RunAsyncTask(
ctx, "sql.descriptorState: releasing descriptor lease",
func(ctx context.Context) {
m.storage.release(ctx, m.stopper, lease)
}); err != nil {
log.Warningf(ctx, "error: %s, not releasing lease: %q", err, lease)
}
}
// purgeOldVersions removes old unused descriptor versions older than
// minVersion and releases any associated leases.
// If takenOffline is set, minVersion is ignored; no lease is acquired and all
// existing unused versions are removed. The descriptor is further marked dropped,
// which will cause existing in-use leases to be eagerly released once
// they're not in use any more.
// If t has no active leases, nothing is done.
func purgeOldVersions(
ctx context.Context,
db *kv.DB,
id descpb.ID,
dropped bool,
minVersion descpb.DescriptorVersion,
m *Manager,
) error {
t := m.findDescriptorState(id, false /*create*/)
if t == nil {
return nil
}
t.mu.Lock()
if t.mu.maxVersionSeen < minVersion {
t.mu.maxVersionSeen = minVersion
}
empty := len(t.mu.active.data) == 0 && t.mu.acquisitionsInProgress == 0
t.mu.Unlock()
if empty && !dropped {
// We don't currently have a version on this descriptor, so no need to refresh
// anything.
return nil
}
removeInactives := func(dropped bool) {
t.mu.Lock()
t.mu.takenOffline = dropped
leases := t.removeInactiveVersions()
t.mu.Unlock()
for _, l := range leases {
releaseLease(l, m)
}
}
if dropped {
removeInactives(true /* dropped */)
return nil
}
if err := ensureVersion(ctx, id, minVersion, m); err != nil {
return err
}
// Acquire a refcount on the descriptor on the latest version to maintain an
// active lease, so that it doesn't get released when removeInactives()
// is called below. Release this lease after calling removeInactives().
desc, _, err := t.findForTimestamp(ctx, m.storage.clock.Now())
if isInactive := catalog.HasInactiveDescriptorError(err); err == nil || isInactive {
removeInactives(isInactive)
if desc != nil {
t.release(ctx, desc)
return nil
}
return nil
}
return err
}
// AcquireBlockType is the type of blocking result event when
// calling LeaseAcquireResultBlockEvent.
type AcquireBlockType int
const (
// AcquireBlock denotes the LeaseAcquireResultBlockEvent is
// coming from descriptorState.acquire().
AcquireBlock AcquireBlockType = iota
// AcquireFreshestBlock denotes the LeaseAcquireResultBlockEvent is
// from descriptorState.acquireFreshestFromStore().
AcquireFreshestBlock
)
// Manager manages acquiring and releasing per-descriptor leases. It also
// handles resolving descriptor names to descriptor IDs. The leases are managed
// internally with a descriptor and expiration time exported by the
// API. The descriptor acquired needs to be released. A transaction
// can use a descriptor as long as its timestamp is within the
// validity window for the descriptor:
// descriptor.ModificationTime <= txn.ReadTimestamp < expirationTime
//
// Exported only for testing.
//
// The locking order is:
// Manager.mu > descriptorState.mu > nameCache.mu > descriptorVersionState.mu
type Manager struct {
rangeFeedFactory *rangefeed.Factory
storage storage
mu struct {
syncutil.Mutex
descriptors map[descpb.ID]*descriptorState
// updatesResolvedTimestamp keeps track of a timestamp before which all
// descriptor updates have already been seen.
updatesResolvedTimestamp hlc.Timestamp
}
draining atomic.Value
// names is a cache for name -> id mappings. A mapping for the cache
// should only be used if we currently have an active lease on the respective
// id; otherwise, the mapping may well be stale.
// Not protected by mu.
names nameCache
testingKnobs ManagerTestingKnobs
ambientCtx log.AmbientContext
stopper *stop.Stopper
sem *quotapool.IntPool
}
const leaseConcurrencyLimit = 5
// NewLeaseManager creates a new Manager.
//
// internalExecutor can be nil to help bootstrapping, but then it needs to be set via
// SetInternalExecutor before the Manager is used.
//
// stopper is used to run async tasks. Can be nil in tests.
func NewLeaseManager(
ambientCtx log.AmbientContext,
nodeIDContainer *base.SQLIDContainer,
db *kv.DB,
clock *hlc.Clock,
internalExecutor sqlutil.InternalExecutor,
settings *cluster.Settings,
codec keys.SQLCodec,
testingKnobs ManagerTestingKnobs,
stopper *stop.Stopper,
rangeFeedFactory *rangefeed.Factory,
) *Manager {
lm := &Manager{
storage: storage{
nodeIDContainer: nodeIDContainer,
db: db,
clock: clock,
internalExecutor: internalExecutor,
settings: settings,
codec: codec,
group: &singleflight.Group{},
testingKnobs: testingKnobs.LeaseStoreTestingKnobs,
outstandingLeases: metric.NewGauge(metric.Metadata{
Name: "sql.leases.active",
Help: "The number of outstanding SQL schema leases.",
Measurement: "Outstanding leases",
Unit: metric.Unit_COUNT,
}),
},
rangeFeedFactory: rangeFeedFactory,
testingKnobs: testingKnobs,
names: makeNameCache(),
ambientCtx: ambientCtx,
stopper: stopper,
sem: quotapool.NewIntPool("lease manager", leaseConcurrencyLimit),
}
lm.stopper.AddCloser(lm.sem.Closer("stopper"))
lm.mu.descriptors = make(map[descpb.ID]*descriptorState)
lm.mu.updatesResolvedTimestamp = db.Clock().Now()
lm.draining.Store(false)
return lm
}
// NameMatchesDescriptor returns true if the provided name and IDs match this
// descriptor.
func NameMatchesDescriptor(
desc catalog.Descriptor, parentID descpb.ID, parentSchemaID descpb.ID, name string,
) bool {
return desc.GetParentID() == parentID &&
desc.GetParentSchemaID() == parentSchemaID &&
desc.GetName() == name
}
// findNewest returns the newest descriptor version state for the ID.
func (m *Manager) findNewest(id descpb.ID) *descriptorVersionState {
t := m.findDescriptorState(id, false /* create */)
t.mu.Lock()
defer t.mu.Unlock()
return t.mu.active.findNewest()
}
// AcquireByName returns a version for the specified descriptor valid for
// the timestamp. It returns the descriptor and a expiration time.
// A transaction using this descriptor must ensure that its
// commit-timestamp < expiration-time. Care must be taken to not modify
// the returned descriptor. Renewal of a lease may begin in the
// background. Renewal is done in order to prevent blocking on future
// acquisitions.
//
// Known limitation: AcquireByName() calls Acquire() and therefore suffers
// from the same limitation as Acquire (See Acquire). AcquireByName() is
// unable to function correctly on a timestamp less than the timestamp
// of a transaction with a DROP/TRUNCATE on the descriptor. The limitation in
// the face of a DROP follows directly from the limitation on Acquire().
// A TRUNCATE is implemented by changing the name -> id mapping
// and by dropping the descriptor with the old id. While AcquireByName
// can use the timestamp and get the correct name->id mapping at a
// timestamp, it uses Acquire() to get a descriptor with the corresponding
// id and fails because the id has been dropped by the TRUNCATE.
func (m *Manager) AcquireByName(
ctx context.Context,
timestamp hlc.Timestamp,
parentID descpb.ID,
parentSchemaID descpb.ID,
name string,
) (LeasedDescriptor, error) {
// When offline descriptor leases were not allowed to be cached,
// attempt to acquire a lease on them would generate a descriptor
// offline error. Recent changes allow offline descriptor leases
// to be cached, but callers still need the offline error generated.
// This logic will release the lease (the lease manager will still
// cache it), and generate the offline descriptor error.
validateDescriptorForReturn := func(desc LeasedDescriptor) (LeasedDescriptor, error) {
if desc.Underlying().Offline() {
if err := catalog.FilterDescriptorState(
desc.Underlying(), tree.CommonLookupFlags{},
); err != nil {
desc.Release(ctx)
return nil, err
}
}
return desc, nil
}
// Check if we have cached an ID for this name.
descVersion := m.names.get(ctx, parentID, parentSchemaID, name, timestamp)
if descVersion != nil {
if descVersion.GetModificationTime().LessEq(timestamp) {
expiration := descVersion.getExpiration()
// If this lease is nearly expired, ensure a renewal is queued.
durationUntilExpiry := time.Duration(expiration.WallTime - timestamp.WallTime)
if durationUntilExpiry < m.storage.leaseRenewalTimeout() {
if t := m.findDescriptorState(descVersion.GetID(), false /* create */); t != nil {
if err := t.maybeQueueLeaseRenewal(
ctx, m, descVersion.GetID(), name); err != nil {
return nil, err
}
}
}
return validateDescriptorForReturn(descVersion)
}
// m.names.get() incremented the refcount, we decrement it to get a new
// version.
descVersion.Release(ctx)
// Return a valid descriptor for the timestamp.
leasedDesc, err := m.Acquire(ctx, timestamp, descVersion.GetID())
if err != nil {
return nil, err
}
return validateDescriptorForReturn(leasedDesc)
}
// We failed to find something in the cache, or what we found is not
// guaranteed to be valid by the time we use it because we don't have a
// lease with at least a bit of lifetime left in it. So, we do it the hard
// way: look in the database to resolve the name, then acquire a new lease.
var err error
id, err := m.resolveName(ctx, timestamp, parentID, parentSchemaID, name)
if err != nil {
return nil, err
}
desc, err := m.Acquire(ctx, timestamp, id)
if err != nil {
return nil, err
}
if !NameMatchesDescriptor(desc.Underlying(), parentID, parentSchemaID, name) {
// We resolved name `name`, but the lease has a different name in it.
// That can mean two things. Assume the descriptor is being renamed from A to B.
// a) `name` is A. The transaction doing the RENAME committed (so the
// descriptor has been updated to B), but its schema changer has not
// finished yet. B is the new name of the descriptor, queries should use that. If
// we already had a lease with name A, we would've allowed to use it (but we
// don't, otherwise the cache lookup above would've given it to us). Since
// we don't, let's not allow A to be used, given that the lease now has name
// B in it. It'd be sketchy to allow A to be used with an inconsistent name
// in the descriptor.
//
// b) `name` is B. Like in a), the transaction doing the RENAME
// committed (so the descriptor has been updated to B), but its schema
// change has not finished yet. We still had a valid lease with name A in
// it. What to do, what to do? We could allow name B to be used, but who
// knows what consequences that would have, since its not consistent with
// the descriptor. We could say "descriptor B not found", but that means that, until
// the next gossip update, this node would not service queries for this
// descriptor under the name B. That's no bueno, as B should be available to be
// used immediately after the RENAME transaction has committed.
// The problem is that we have a lease that we know is stale (the descriptor
// in the DB doesn't necessarily have a new version yet, but it definitely
// has a new name). So, lets force getting a fresh descriptor.
// This case (modulo the "committed" part) also applies when the txn doing a
// RENAME had a lease on the old name, and then tries to use the new name
// after the RENAME statement.
//
// How do we disambiguate between the a) and b)? We get a fresh lease on
// the descriptor, as required by b), and then we'll know if we're trying to
// resolve the current or the old name.
//
// TODO(vivek): check if the entire above comment is indeed true. Review the
// use of NameMatchesDescriptor() throughout this function.
desc.Release(ctx)
if err := m.AcquireFreshestFromStore(ctx, id); err != nil {
return nil, err
}
desc, err = m.Acquire(ctx, timestamp, id)
if err != nil {
return nil, err
}
if !NameMatchesDescriptor(desc.Underlying(), parentID, parentSchemaID, name) {
// If the name we had doesn't match the newest descriptor in the DB, then
// we're trying to use an old name.
desc.Release(ctx)
return nil, catalog.ErrDescriptorNotFound
}
}
return validateDescriptorForReturn(desc)
}
// resolveName resolves a descriptor name to a descriptor ID at a particular
// timestamp by looking in the database. If the mapping is not found,
// catalog.ErrDescriptorNotFound is returned.
func (m *Manager) resolveName(
ctx context.Context,
timestamp hlc.Timestamp,
parentID descpb.ID,
parentSchemaID descpb.ID,
name string,
) (descpb.ID, error) {
id := descpb.InvalidID
if err := m.storage.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
// Run the name lookup as high-priority, thereby pushing any intents out of
// its way. We don't want schema changes to prevent name resolution/lease
// acquisitions; we'd rather force them to refresh. Also this prevents
// deadlocks in cases where the name resolution is triggered by the
// transaction doing the schema change itself.
if err := txn.SetUserPriority(roachpb.MaxUserPriority); err != nil {
return err
}
if err := txn.SetFixedTimestamp(ctx, timestamp); err != nil {
return err
}
var found bool
var err error
found, id, err = catalogkv.LookupObjectID(ctx, txn, m.storage.codec, parentID, parentSchemaID, name)
if err != nil {
return err
}
if !found {
return nil
}
return nil
}); err != nil {
return id, err
}
if id == descpb.InvalidID {
return id, catalog.ErrDescriptorNotFound
}
return id, nil
}
// LeasedDescriptor tracks and manages leasing related
// information for a descriptor.
type LeasedDescriptor interface {
catalog.NameEntry
// Underlying returns the underlying descriptor which has been leased.
// The implementation of the methods on this object delegate to
// that object.
Underlying() catalog.Descriptor
// Expiration returns the current expiration. Subsequent calls may return a
// later timestamp but will never return an earlier one.
Expiration() hlc.Timestamp
// Release releases the reference to this leased descriptor. The descriptor
// should not be used after the lease has been released.
Release(ctx context.Context)
}
// Acquire acquires a read lease for the specified descriptor ID valid for
// the timestamp. It returns the descriptor and an expiration time.
// A transaction using this descriptor must ensure that its
// commit-timestamp < expiration-time. Care must be taken to not modify
// the returned descriptor.
//
// Known limitation: Acquire() can return an error after the descriptor with
// the ID has been dropped. This is true even when using a timestamp
// less than the timestamp of the DROP command. This is because Acquire
// can only return an older version of a descriptor if the latest version
// can be leased; as it stands a dropped descriptor cannot be leased.
func (m *Manager) Acquire(
ctx context.Context, timestamp hlc.Timestamp, id descpb.ID,
) (LeasedDescriptor, error) {
for {
t := m.findDescriptorState(id, true /*create*/)
desc, latest, err := t.findForTimestamp(ctx, timestamp)
if err == nil {
// If the latest lease is nearly expired, ensure a renewal is queued.
if latest {
durationUntilExpiry := time.Duration(desc.getExpiration().WallTime - timestamp.WallTime)
if durationUntilExpiry < m.storage.leaseRenewalTimeout() {
if err := t.maybeQueueLeaseRenewal(ctx, m, id, desc.GetName()); err != nil {
return nil, err
}
}
}
return desc, nil
}
switch {
case errors.Is(err, errRenewLease):
if err := func() error {
t.markAcquisitionStart(ctx)
defer t.markAcquisitionDone(ctx)
// Renew lease and retry. This will block until the lease is acquired.
_, errLease := acquireNodeLease(ctx, m, id)
return errLease
}(); err != nil {
return nil, err
}
if m.testingKnobs.LeaseStoreTestingKnobs.LeaseAcquireResultBlockEvent != nil {
m.testingKnobs.LeaseStoreTestingKnobs.LeaseAcquireResultBlockEvent(AcquireBlock, id)
}
case errors.Is(err, errReadOlderVersion):
// Read old versions from the store. This can block while reading.
versions, errRead := m.readOlderVersionForTimestamp(ctx, id, timestamp)
if errRead != nil {
return nil, errRead
}
m.insertDescriptorVersions(id, versions)
default:
return nil, err
}
}
}
// removeOnceDereferenced returns true if the Manager thinks
// a descriptorVersionState can be removed after its refcount goes to 0.
func (m *Manager) removeOnceDereferenced() bool {
return m.storage.testingKnobs.RemoveOnceDereferenced ||
// Release from the store if the Manager is draining.
m.isDraining()
}
func (m *Manager) isDraining() bool {
return m.draining.Load().(bool)
}
// SetDraining (when called with 'true') removes all inactive leases. Any leases
// that are active will be removed once the lease's reference count drops to 0.
//
// The reporter callback, if non-nil, is called on a best effort basis
// to report work that needed to be done and which may or may not have
// been done by the time this call returns. See the explanation in
// pkg/server/drain.go for details.
func (m *Manager) SetDraining(drain bool, reporter func(int, redact.SafeString)) {
m.draining.Store(drain)
if !drain {
return
}
m.mu.Lock()
defer m.mu.Unlock()
for _, t := range m.mu.descriptors {
t.mu.Lock()
leases := t.removeInactiveVersions()
t.mu.Unlock()
for _, l := range leases {
releaseLease(l, m)
}
if reporter != nil {
// Report progress through the Drain RPC.
reporter(len(leases), "descriptor leases")
}
}
}
// If create is set, cache and stopper need to be set as well.
func (m *Manager) findDescriptorState(id descpb.ID, create bool) *descriptorState {
m.mu.Lock()
defer m.mu.Unlock()
t := m.mu.descriptors[id]
if t == nil && create {
t = &descriptorState{m: m, id: id, stopper: m.stopper}
m.mu.descriptors[id] = t
}
return t
}
// RefreshLeases starts a goroutine that refreshes the lease manager
// leases for descriptors received in the latest system configuration via gossip or
// rangefeeds. This function must be passed a non-nil gossip if
// RangefeedLeases is not active.
func (m *Manager) RefreshLeases(ctx context.Context, s *stop.Stopper, db *kv.DB) {
descUpdateCh := make(chan *descpb.Descriptor)
m.watchForUpdates(ctx, descUpdateCh)
_ = s.RunAsyncTask(ctx, "refresh-leases", func(ctx context.Context) {
for {
select {
case desc := <-descUpdateCh:
// NB: We allow nil descriptors to be sent to synchronize the updating of
// descriptors.
if desc == nil {
continue
}
if evFunc := m.testingKnobs.TestingDescriptorUpdateEvent; evFunc != nil {
if err := evFunc(desc); err != nil {
log.Infof(ctx, "skipping update of %v due to knob: %v",
desc, err)
continue
}
}
id, version, name, state, _, err := descpb.GetDescriptorMetadata(desc)
if err != nil {
log.Fatalf(ctx, "invalid descriptor %v: %v", desc, err)
}
dropped := state == descpb.DescriptorState_DROP
// Try to refresh the lease to one >= this version.
log.VEventf(ctx, 2, "purging old version of descriptor %d@%d (dropped %v)",
id, version, dropped)
if err := purgeOldVersions(ctx, db, id, dropped, version, m); err != nil {
log.Warningf(ctx, "error purging leases for descriptor %d(%s): %s",
id, name, err)
}