-
Notifications
You must be signed in to change notification settings - Fork 3.9k
/
Copy pathskl_impl.go
164 lines (142 loc) · 5.65 KB
/
skl_impl.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
// Copyright 2017 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
package tscache
import (
"context"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
)
const (
// defaultSklPageSize is the default size of each page in the sklImpl's
// read and write intervalSkl.
defaultSklPageSize = 32 << 20 // 32 MB
// TestSklPageSize is passed to tests as the size of each page in the
// sklImpl to limit its memory footprint. Reducing this size can hurt
// performance but it decreases the risk of OOM failures when many tests
// are running concurrently.
TestSklPageSize = 128 << 10 // 128 KB
)
// sklImpl implements the Cache interface. It maintains a pair of skiplists
// containing keys or key ranges and the timestamps at which they were most
// recently read or written. If a timestamp was read or written by a
// transaction, the txn ID is stored with the timestamp to avoid advancing
// timestamps on successive requests from the same transaction.
type sklImpl struct {
rCache, wCache *intervalSkl
clock *hlc.Clock
pageSize uint32
metrics Metrics
}
var _ Cache = &sklImpl{}
// newSklImpl returns a new treeImpl with the supplied hybrid clock.
func newSklImpl(clock *hlc.Clock, pageSize uint32) *sklImpl {
if pageSize == 0 {
pageSize = defaultSklPageSize
}
tc := sklImpl{clock: clock, pageSize: pageSize, metrics: makeMetrics()}
tc.clear(clock.Now())
return &tc
}
// clear clears the cache and resets the low-water mark.
func (tc *sklImpl) clear(lowWater hlc.Timestamp) {
tc.rCache = newIntervalSkl(tc.clock, MinRetentionWindow, tc.pageSize, tc.metrics.Skl.Read)
tc.wCache = newIntervalSkl(tc.clock, MinRetentionWindow, tc.pageSize, tc.metrics.Skl.Write)
tc.rCache.floorTS = lowWater
tc.wCache.floorTS = lowWater
}
// getSkl returns either the read or write intervalSkl.
func (tc *sklImpl) getSkl(readCache bool) *intervalSkl {
if readCache {
return tc.rCache
}
return tc.wCache
}
// Add implements the Cache interface.
func (tc *sklImpl) Add(start, end roachpb.Key, ts hlc.Timestamp, txnID uuid.UUID, readCache bool) {
start, end = tc.boundKeyLengths(start, end)
skl := tc.getSkl(readCache)
val := cacheValue{ts: ts, txnID: txnID}
if len(end) == 0 {
skl.Add(nonNil(start), val)
} else {
skl.AddRange(nonNil(start), end, excludeTo, val)
}
}
// SetLowWater implements the Cache interface.
func (tc *sklImpl) SetLowWater(start, end roachpb.Key, ts hlc.Timestamp) {
tc.Add(start, end, ts, noTxnID, false /* readCache */)
tc.Add(start, end, ts, noTxnID, true /* readCache */)
}
// getLowWater implements the Cache interface.
func (tc *sklImpl) getLowWater(readCache bool) hlc.Timestamp {
return tc.getSkl(readCache).FloorTS()
}
// GetMaxRead implements the Cache interface.
func (tc *sklImpl) GetMaxRead(start, end roachpb.Key) (hlc.Timestamp, uuid.UUID) {
return tc.getMax(start, end, true /* readCache */)
}
// GetMaxWrite implements the Cache interface.
func (tc *sklImpl) GetMaxWrite(start, end roachpb.Key) (hlc.Timestamp, uuid.UUID) {
return tc.getMax(start, end, false /* readCache */)
}
func (tc *sklImpl) getMax(start, end roachpb.Key, readCache bool) (hlc.Timestamp, uuid.UUID) {
skl := tc.getSkl(readCache)
var val cacheValue
if len(end) == 0 {
val = skl.LookupTimestamp(nonNil(start))
} else {
val = skl.LookupTimestampRange(nonNil(start), end, excludeTo)
}
return val.ts, val.txnID
}
// boundKeyLengths makes sure that the key lengths provided are well below the
// size of each sklPage, otherwise we'll never be successful in adding it to
// an intervalSkl.
func (tc *sklImpl) boundKeyLengths(start, end roachpb.Key) (roachpb.Key, roachpb.Key) {
// We bound keys to 1/32 of the page size. These could be slightly larger
// and still not trigger the "key range too large" panic in intervalSkl,
// but anything larger could require multiple page rotations before it's
// able to fit in if other ranges are being added concurrently.
maxKeySize := int(tc.pageSize / 32)
// If either key is too long, truncate its length, making sure to always
// grow the [start,end) range instead of shrinking it. This will reduce the
// precision of the entry in the cache, which could allow independent
// requests to interfere, but will never permit consistency anomalies.
if l := len(start); l > maxKeySize {
start = start[:maxKeySize]
log.Warningf(context.TODO(), "start key with length %d exceeds maximum key length of %d; "+
"losing precision in timestamp cache", l, maxKeySize)
}
if l := len(end); l > maxKeySize {
end = end[:maxKeySize].PrefixEnd() // PrefixEnd to grow range
log.Warningf(context.TODO(), "end key with length %d exceeds maximum key length of %d; "+
"losing precision in timestamp cache", l, maxKeySize)
}
return start, end
}
// Metrics implements the Cache interface.
func (tc *sklImpl) Metrics() Metrics {
return tc.metrics
}
// intervalSkl doesn't handle nil keys the same way as empty keys. Cockroach's
// KV API layer doesn't make a distinction.
var emptyStartKey = []byte("")
func nonNil(b []byte) []byte {
if b == nil {
return emptyStartKey
}
return b
}