-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
cmd_scan_interleaved_intents.go
147 lines (128 loc) · 4.49 KB
/
cmd_scan_interleaved_intents.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package batcheval
import (
"context"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/marusama/semaphore"
)
// scanInterleavedIntentsPerStore is the number of concurrent
// ScanInterleavedIntents requests that will be run on a store. Used as part
// of pre-evaluation throttling.
const scanInterleavedIntentsPerStore = 1
var perStoreSems storeSemRegistry
func init() {
perStoreSems.init()
RegisterReadOnlyCommand(roachpb.ScanInterleavedIntents, declareKeysScanInterleavedIntents, ScanInterleavedIntents)
}
func declareKeysScanInterleavedIntents(
rs ImmutableRangeState, _ roachpb.Header, _ roachpb.Request, latchSpans, _ *spanset.SpanSet,
) {
latchSpans.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeDescriptorKey(rs.GetStartKey())})
}
type storeSemRegistry struct {
syncutil.Mutex
sems map[roachpb.StoreID]semaphore.Semaphore
}
func (s *storeSemRegistry) init() {
s.sems = make(map[roachpb.StoreID]semaphore.Semaphore)
}
func (s *storeSemRegistry) getOrCreate(store roachpb.StoreID) semaphore.Semaphore {
s.Lock()
defer s.Unlock()
sem, ok := s.sems[store]
if !ok {
sem = semaphore.New(scanInterleavedIntentsPerStore)
s.sems[store] = sem
}
return sem
}
func ThrottleScanInterleavedIntents(ctx context.Context, store roachpb.StoreID) {
sem := perStoreSems.getOrCreate(store)
_ = sem.Acquire(ctx, 1)
}
func ReleaseScanInterleavedIntentsThrottle(store roachpb.StoreID) {
sem := perStoreSems.getOrCreate(store)
sem.Release(1)
}
// ScanInterleavedIntents returns intents encountered in the provided span.
// These intents are then resolved in the separated intents migration, the
// usual caller for this request.
func ScanInterleavedIntents(
ctx context.Context, reader storage.Reader, cArgs CommandArgs, response roachpb.Response,
) (result.Result, error) {
req := cArgs.Args.(*roachpb.ScanInterleavedIntentsRequest)
resp := response.(*roachpb.ScanInterleavedIntentsResponse)
// Put a limit on memory usage by scanning for at least maxIntentCount
// intents or maxIntentBytes in intent values, whichever is reached first,
// then returning those.
const maxIntentCount = 1000
const maxIntentBytes = 1 << 20 // 1MB
iter := reader.NewEngineIterator(storage.IterOptions{
LowerBound: req.Key,
UpperBound: req.EndKey,
})
defer iter.Close()
valid, err := iter.SeekEngineKeyGE(storage.EngineKey{Key: req.Key})
intentCount := 0
intentBytes := 0
for ; valid && err == nil; valid, err = iter.NextEngineKey() {
key, err := iter.EngineKey()
if err != nil {
return result.Result{}, err
}
if !key.IsMVCCKey() {
// This should never happen, as the only non-MVCC keys are lock table
// keys and those are in the local keyspace. Return an error.
return result.Result{}, errors.New("encountered non-MVCC key during lock table migration")
}
mvccKey, err := key.ToMVCCKey()
if err != nil {
return result.Result{}, err
}
if !mvccKey.Timestamp.IsEmpty() {
// Versioned value - not an intent.
//
// TODO(bilal): Explore seeking here in case there are keys with lots of
// versioned values.
continue
}
val := iter.Value()
meta := enginepb.MVCCMetadata{}
if err := protoutil.Unmarshal(val, &meta); err != nil {
return result.Result{}, err
}
if meta.IsInline() {
// Inlined value - not an intent.
continue
}
if intentCount >= maxIntentCount || intentBytes >= maxIntentBytes {
// Batch limit reached - cut short this batch here. This kv
// will be added to txnIntents on the next iteration of the outer loop.
resp.ResumeSpan = &roachpb.Span{
Key: mvccKey.Key,
EndKey: req.EndKey,
}
break
}
resp.Intents = append(resp.Intents, roachpb.MakeIntent(meta.Txn, mvccKey.Key))
intentCount++
intentBytes += len(val)
}
return result.Result{}, nil
}