-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
txn_id_cache_shard.go
102 lines (88 loc) · 2.65 KB
/
txn_id_cache_shard.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package txnidcache
import (
"sync/atomic"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/cache"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
)
const cacheShardWriteBufferSize = 24
// cacheShard is a shard of the txnidcache.Cache. The idea behind sharding
// the cache is to reduce the mutex contention.
type cacheShard struct {
mu struct {
syncutil.RWMutex
store *cache.UnorderedCache
}
atomic struct {
allocatedMem int64
}
writeBuffer []ResolvedTxnID
txnIDCache *Cache
}
var _ storage = &cacheShard{}
type ResolvedTxnID struct {
TxnID uuid.UUID
TxnFingerprintID roachpb.TransactionFingerprintID
}
func newTxnIDCacheShard(t *Cache) *cacheShard {
shard := &cacheShard{
writeBuffer: make([]ResolvedTxnID, 0, cacheShardWriteBufferSize),
txnIDCache: t,
}
shard.mu.store = cache.NewUnorderedCache(cache.Config{
Policy: cache.CacheFIFO,
ShouldEvict: func(_ int, _, _ interface{}) bool {
limit := MaxSize.Get(&t.st.SV)
limitPerShard := limit / shardCount
return atomic.LoadInt64(&shard.atomic.allocatedMem) > limitPerShard
},
OnEvictedEntry: func(_ *cache.Entry) {
atomic.AddInt64(&shard.atomic.allocatedMem, -entrySize)
t.metrics.TxnIDCacheSize.Dec(entrySize)
t.metrics.EvictedTxnIDCacheCount.Inc(1)
},
})
return shard
}
func (s *cacheShard) flushLocked() {
memUsed := entrySize * int64(len(s.writeBuffer))
atomic.AddInt64(&s.atomic.allocatedMem, memUsed)
s.txnIDCache.metrics.TxnIDCacheSize.Inc(memUsed)
for _, v := range s.writeBuffer {
s.mu.store.Add(v.TxnID, v.TxnFingerprintID)
}
s.writeBuffer = s.writeBuffer[:0]
}
// Record implements the writer interface.
func (s *cacheShard) Record(msg ResolvedTxnID) {
s.writeBuffer = append(s.writeBuffer, msg)
if len(s.writeBuffer) == cap(s.writeBuffer) {
s.Flush()
}
}
// Flush implements the writer interface.
func (s *cacheShard) Flush() {
s.mu.Lock()
s.flushLocked()
s.mu.Unlock()
}
// Lookup implements the reader interface.
func (s *cacheShard) Lookup(txnID uuid.UUID) (result roachpb.TransactionFingerprintID, found bool) {
s.mu.RLock()
value, found := s.mu.store.Get(txnID)
s.mu.RUnlock()
if !found {
return result, found
}
return value.(roachpb.TransactionFingerprintID), found
}