From fae5171c9c955b1f355452bec1e3fb12388ff0e1 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Tue, 1 Mar 2022 09:53:06 -0500 Subject: [PATCH] sql/contention/txnidcache: reuse blocks in list, account for space This change does two things to the txnidcache: 1) It accounts for the space used by the fifo eviction list. Previously we'd use more than double the intended space. We should probably also subtrace out the size of the buffers we're currently filling and the channel we use to communicate them, but I'll leave that for later. 2) It stops trying to compact the blocks. Compacting the blocks ends up being a good deal of overhead because we have to copy across every single message. Instead we can just append the block directly to the list. This does have the hazard of wasting a lot of space when the blocks are sparse. However, if the blocks are sparse, we know that the throughput is low, so it's fine. Resolves #76738 Release justification: bug fixes and low-risk updates to new functionality Release note: None --- pkg/sql/contention/txnidcache/BUILD.bazel | 3 + pkg/sql/contention/txnidcache/fifo_cache.go | 76 ++--- .../contention/txnidcache/fifo_cache_test.go | 261 ++++++------------ .../contention/txnidcache/testdata/fifo_cache | 81 ++++++ pkg/sql/contention/txnidcache/txn_id_cache.go | 9 +- 5 files changed, 196 insertions(+), 234 deletions(-) create mode 100644 pkg/sql/contention/txnidcache/testdata/fifo_cache diff --git a/pkg/sql/contention/txnidcache/BUILD.bazel b/pkg/sql/contention/txnidcache/BUILD.bazel index 62327daab8f3..048eefb55c0d 100644 --- a/pkg/sql/contention/txnidcache/BUILD.bazel +++ b/pkg/sql/contention/txnidcache/BUILD.bazel @@ -34,6 +34,7 @@ go_test( "txn_id_cache_test.go", "writer_test.go", ], + data = glob(["testdata/**"]), embed = [":txnidcache"], deps = [ "//pkg/kv", @@ -55,7 +56,9 @@ go_test( "//pkg/util/log", "//pkg/util/stop", "//pkg/util/syncutil", + "//pkg/util/uint128", "//pkg/util/uuid", + "@com_github_cockroachdb_datadriven//:datadriven", "@com_github_cockroachdb_errors//:errors", "@com_github_stretchr_testify//require", ], diff --git a/pkg/sql/contention/txnidcache/fifo_cache.go b/pkg/sql/contention/txnidcache/fifo_cache.go index 5217d608e5d1..aef77400207e 100644 --- a/pkg/sql/contention/txnidcache/fifo_cache.go +++ b/pkg/sql/contention/txnidcache/fifo_cache.go @@ -12,10 +12,10 @@ package txnidcache import ( "sync" + "unsafe" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/contention/contentionutils" - "github.com/cockroachdb/cockroach/pkg/sql/contentionpb" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" ) @@ -39,21 +39,19 @@ type fifoCache struct { } type blockListNode struct { - block + *block next *blockListNode } // blockList is a singly-linked list of blocks. The list is used to // implement FIFO eviction. type blockList struct { - head *blockListNode - tail *blockListNode - - // tailIdx is an index pointing into the next empty slot in block - // stored in the tail pointer. - tailIdx int + numNodes int + head *blockListNode + tail *blockListNode } +// newFifoCache takes a function which returns a capacity in bytes. func newFIFOCache(capacity contentionutils.CapacityLimiter) *fifoCache { c := &fifoCache{ capacity: capacity, @@ -70,22 +68,14 @@ func (c *fifoCache) add(b *block) { c.mu.Lock() defer c.mu.Unlock() - blockSize := 0 for i := range b { if !b[i].Valid() { break } c.mu.data[b[i].TxnID] = b[i].TxnFingerprintID - blockSize++ } - - c.mu.eviction.append(b[:blockSize]) - - // Zeros out the block and put it back into the blockPool. - *b = block{} - blockPool.Put(b) - + c.mu.eviction.addNode(b) c.maybeEvictLocked() } @@ -97,24 +87,25 @@ func (c *fifoCache) get(txnID uuid.UUID) (roachpb.TransactionFingerprintID, bool return fingerprintID, found } -func (c *fifoCache) size() int { +func (c *fifoCache) size() int64 { c.mu.RLock() defer c.mu.RUnlock() - return len(c.mu.data) + return c.sizeLocked() +} + +func (c *fifoCache) sizeLocked() int64 { + return int64(c.mu.eviction.numNodes)* + ((entrySize*blockSize)+int64(unsafe.Sizeof(blockListNode{}))) + + int64(len(c.mu.data))*entrySize } func (c *fifoCache) maybeEvictLocked() { - for int64(len(c.mu.data)) > c.capacity() { + for c.sizeLocked() > c.capacity() { node := c.mu.eviction.removeFront() if node == nil { return } - c.evictNodeLocked(node) - - // Zero out the node and put it back into the pool. - *node = blockListNode{} - nodePool.Put(node) } } @@ -127,40 +118,23 @@ func (c *fifoCache) evictNodeLocked(node *blockListNode) { delete(c.mu.data, node.block[i].TxnID) } -} -func (e *blockList) append(block []contentionpb.ResolvedTxnID) { - block = e.appendToTail(block) - for len(block) > 0 { - e.addNode() - block = e.appendToTail(block) - } + *node.block = block{} + blockPool.Put(node.block) + *node = blockListNode{} + nodePool.Put(node) } -func (e *blockList) addNode() { +func (e *blockList) addNode(b *block) { newNode := nodePool.Get().(*blockListNode) + newNode.block = b if e.head == nil { e.head = newNode } else { e.tail.next = newNode } e.tail = newNode - e.tailIdx = 0 -} - -func (e *blockList) appendToTail( - block []contentionpb.ResolvedTxnID, -) (remaining []contentionpb.ResolvedTxnID) { - if e.head == nil { - return block - } - toCopy := blockSize - e.tailIdx - if toCopy > len(block) { - toCopy = len(block) - } - copy(e.tail.block[e.tailIdx:], block[:toCopy]) - e.tailIdx += toCopy - return block[toCopy:] + e.numNodes++ } func (e *blockList) removeFront() *blockListNode { @@ -168,7 +142,11 @@ func (e *blockList) removeFront() *blockListNode { return nil } + e.numNodes-- removedBlock := e.head e.head = e.head.next + if e.head == nil { + e.tail = nil + } return removedBlock } diff --git a/pkg/sql/contention/txnidcache/fifo_cache_test.go b/pkg/sql/contention/txnidcache/fifo_cache_test.go index e43fff26ce03..aa8851c10fc4 100644 --- a/pkg/sql/contention/txnidcache/fifo_cache_test.go +++ b/pkg/sql/contention/txnidcache/fifo_cache_test.go @@ -12,191 +12,89 @@ package txnidcache import ( "fmt" - "math/rand" + "strconv" + "strings" "testing" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/sql/contentionpb" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/util/uint128" "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/cockroachdb/datadriven" "github.com/stretchr/testify/require" ) -func TestFIFOCache(t *testing.T) { - cache := newFIFOCache(func() int64 { return 2 * blockSize } /* capacity */) - - // Fill the first eviction block in cache to 1/4 capacity. - input1, expected1 := generateInputBlock(blockSize * 1 / 4 /* size */) - cache.add(cloneBlock(input1)) - checkExists(t, cache, expected1) - checkEvictionListShape(t, cache, []int{blockSize * 1 / 4}) - checkEvictionListContent(t, cache, []*block{input1}) - - // Fill the first eviction block in cache to 3/4 capacity. - input2, expected2 := generateInputBlock(blockSize / 2 /* size */) - cache.add(cloneBlock(input2)) - checkExists(t, cache, expected1, expected2) - checkEvictionListShape(t, cache, []int{blockSize * 3 / 4}) - - checkEvictionListContent(t, cache, []*block{input1, input2}) - - // Overflow the first eviction block, the second block should be 1/2 filled. - input3, expected3 := generateInputBlock(blockSize * 3 / 4) - cache.add(cloneBlock(input3)) - checkExists(t, cache, expected1, expected2, expected3) - checkEvictionListShape(t, cache, []int{blockSize, blockSize / 2}) - checkEvictionListContent(t, cache, []*block{input1, input2, input3}) - - // Overflow the second block, cause the first block to be evicted. Second - // block becomes the first block, and it is completely filled. The new second - // block should be 1/4 filled. Part of the input3 will be evicted. - input4, expected4 := generateInputBlock(blockSize * 3 / 4) - cache.add(cloneBlock(input4)) - checkEvictionListShape(t, cache, []int{blockSize, blockSize / 4}) - - // First 1/4 of input3 should be evicted and the remaining [2/4:3/4] should - // still remain. - remainingInput3 := &block{} - copy(remainingInput3[:], input3[blockSize*1/4:blockSize*3/4]) - checkEvictionListContent(t, cache, []*block{remainingInput3, input4}) - - // Removes the portion that hasn't been evicted. - evictedExpected3 := removeEntryFromMap(expected3, input3[blockSize*1/4:blockSize*3/4]) - - // Removes the portion that has been evicted. - remainingExpected3 := removeEntryFromMap(expected3, input3[:blockSize*1/4]) - - checkNotExist(t, cache, expected1, expected2, evictedExpected3) - checkExists(t, cache, remainingExpected3, expected4) - - // Partially fill up the second block in the eviction list. - input5, expected5 := generateInputBlock(blockSize / 2) - cache.add(cloneBlock(input5)) - checkNotExist(t, cache, expected1, expected2, evictedExpected3) - checkExists(t, cache, remainingExpected3, expected4, expected5) - checkEvictionListShape(t, cache, []int{blockSize, blockSize * 3 / 4}) - checkEvictionListContent(t, cache, []*block{remainingInput3, input4, input5}) - - // Overflow the second block again to trigger another eviction, part of the - // input4 would be evicted. - input6, expected6 := generateInputBlock(blockSize * 3 / 4) - cache.add(cloneBlock(input6)) - checkEvictionListShape(t, cache, []int{blockSize, blockSize / 2}) - - remainingInput4 := &block{} - copy(remainingInput4[:], input4[blockSize/2:blockSize*3/4]) - checkEvictionListContent(t, cache, []*block{remainingInput4, input5, input6}) - - evictedExpected4 := removeEntryFromMap(expected4, input4[blockSize/2:blockSize*3/4]) - remainingExpected4 := removeEntryFromMap(expected4, input4[:blockSize/2]) - - checkNotExist(t, cache, expected1, expected2, expected3, evictedExpected4) - checkExists(t, cache, remainingExpected4, expected5, expected6) -} - -func removeEntryFromMap( - m map[uuid.UUID]roachpb.TransactionFingerprintID, filterList []contentionpb.ResolvedTxnID, -) map[uuid.UUID]roachpb.TransactionFingerprintID { - newMap := make(map[uuid.UUID]roachpb.TransactionFingerprintID) - for k, v := range m { - newMap[k] = v - } - - for _, val := range filterList { - delete(newMap, val.TxnID) - } - - return newMap -} - -func checkEvictionListContent(t *testing.T, cache *fifoCache, expectedBlocks []*block) { - t.Helper() - - cur := cache.mu.eviction.head - evictionListBlockIdx := 0 - evictionListSize := 0 - - for i := range expectedBlocks { - require.NotNilf(t, cur, "expect a valid eviction list node, but it (size=%d)"+ - "is nil", evictionListSize) - - for blockIdx := 0; blockIdx < blockSize; blockIdx++ { - if !expectedBlocks[i][blockIdx].Valid() { - break - } - - require.Equal(t, expectedBlocks[i][blockIdx], cur.block[evictionListBlockIdx], - "expected eviction block at index [%d][%d] to be "+ - "%s, but it is %s", i, blockIdx, expectedBlocks[i][blockIdx].TxnID.String(), - cur.block[evictionListBlockIdx].TxnID.String()) - - evictionListBlockIdx++ - - isEvictionListIdxStillValid := - evictionListBlockIdx < blockSize && cur.block[evictionListBlockIdx].Valid() - - if !isEvictionListIdxStillValid { - cur = cur.next - evictionListBlockIdx = 0 - evictionListSize++ - } - } - } - - require.Nilf(t, cur, "expect eviction list to be fully iterated, but it was not") -} - -func checkEvictionListShape(t *testing.T, cache *fifoCache, expectedBlockSizes []int) { - t.Helper() - cur := cache.mu.eviction.head - - for i := range expectedBlockSizes { - require.NotNilf(t, cur, "expect an eviction list of size %d, but it has "+ - "a size of %d", len(expectedBlockSizes), i-1) - - actualBlockSize := 0 - for blockIdx := 0; blockIdx < blockSize; blockIdx++ { - if !cur.block[blockIdx].Valid() { - break +func TestFIFOCacheDataDriven(t *testing.T) { + var cache *fifoCache + inputBlocks := make(map[string]*block) + expectedMaps := make(map[string]map[uuid.UUID]roachpb.TransactionFingerprintID) + blockToNameMap := make(map[*block]string) + + datadriven.Walk(t, testutils.TestDataPath(t), func(t *testing.T, path string) { + datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { + switch d.Cmd { + case "init": + var capacity int + d.ScanArgs(t, "capacity", &capacity) + cache = newFIFOCache(func() int64 { return int64(capacity) } /* capacity */) + return fmt.Sprintf("cache_size: %d", cache.size()) + case "newInputBlock": + var name string + var percentageFilledStr string + + d.ScanArgs(t, "name", &name) + d.ScanArgs(t, "percentageFilled", &percentageFilledStr) + + percentageFilled, err := strconv.ParseFloat(percentageFilledStr, 64) + require.NoError(t, err) + + actualSize := int(blockSize * percentageFilled) + input, expected := generateInputBlock(actualSize) + inputBlocks[name] = input + expectedMaps[name] = expected + + return fmt.Sprintf("blockSize: %d", actualSize) + case "insertBlock": + var name string + d.ScanArgs(t, "name", &name) + input, ok := inputBlocks[name] + require.True(t, ok, "input %s is not found", name) + input = cloneBlock(input) + cache.add(input) + blockToNameMap[input] = name + case "show": + var result []string + + result = append(result, fmt.Sprintf("cacheSize: %d", cache.size())) + for cur := cache.mu.eviction.head; cur != nil; cur = cur.next { + blockName := blockToNameMap[cur.block] + expected := expectedMaps[blockName] + + actualBlockSize := 0 + for blockOffset := 0; blockOffset < blockSize; blockOffset++ { + if !cur.block[blockOffset].Valid() { + break + } + actualBlockSize++ + txnID := cur.block[blockOffset].TxnID + expectedTxnFingerprint, ok := expected[txnID] + require.True(t, ok, "expected to find txn fingerprint ID "+ + "for txnID %s, but it was not found", txnID) + require.Equal(t, expectedTxnFingerprint, cache.mu.data[txnID], + "expected the txnID %s to have txn fingerprint ID %d, but "+ + "got %d", txnID, expectedTxnFingerprint, cache.mu.data[txnID]) + } + + result = append(result, + fmt.Sprintf("blockName: %s, blockSize: %d", + blockName, actualBlockSize)) + } + + return strings.Join(result, "\n") } - actualBlockSize++ - } - - require.Equal(t, expectedBlockSizes[i], actualBlockSize, - "expected eviction list block at index [%d] to have a size "+ - "of %d, but instead it has a size of %d", - i, expectedBlockSizes[i], actualBlockSize) - - cur = cur.next - } -} - -func checkExists( - t *testing.T, cache *fifoCache, expectedMaps ...map[uuid.UUID]roachpb.TransactionFingerprintID, -) { - t.Helper() - for _, expected := range expectedMaps { - for expectedKey, expectedValue := range expected { - actualValue, found := cache.get(expectedKey) - require.True(t, found, "expected txnID %s to be present in "+ - "the cache, but it was not found", expectedKey) - require.Equal(t, expectedValue, actualValue, "expected to find "+ - "transaction fingerprint ID %d for txnID %s, but found %d instead", - expectedValue, expectedKey, actualValue) - } - } -} - -func checkNotExist( - t *testing.T, cache *fifoCache, expectedMaps ...map[uuid.UUID]roachpb.TransactionFingerprintID, -) { - t.Helper() - for _, expected := range expectedMaps { - for expectedKey := range expected { - _, found := cache.get(expectedKey) - require.False(t, found, "expected txnID %s to be not present in "+ - "the cache, but it was found", expectedKey) - } - } + return "" + }) + }) } func cloneBlock(b *block) *block { @@ -205,6 +103,8 @@ func cloneBlock(b *block) *block { return newBlock } +var deterministicIntSource = uint128.FromInts(1, 1) + func generateInputBlock( size int, ) (input *block, expected map[uuid.UUID]roachpb.TransactionFingerprintID) { @@ -216,10 +116,13 @@ func generateInputBlock( expected = make(map[uuid.UUID]roachpb.TransactionFingerprintID) for i := 0; i < size; i++ { - input[i].TxnID = uuid.FastMakeV4() - input[i].TxnFingerprintID = roachpb.TransactionFingerprintID(rand.Uint64()) + input[i].TxnID = uuid.FromUint128(deterministicIntSource) + input[i].TxnFingerprintID = + roachpb.TransactionFingerprintID(deterministicIntSource.Lo + deterministicIntSource.Hi) expected[input[i].TxnID] = input[i].TxnFingerprintID + + deterministicIntSource = deterministicIntSource.Add(1) } return input, expected diff --git a/pkg/sql/contention/txnidcache/testdata/fifo_cache b/pkg/sql/contention/txnidcache/testdata/fifo_cache new file mode 100644 index 000000000000..f86df9f5eee0 --- /dev/null +++ b/pkg/sql/contention/txnidcache/testdata/fifo_cache @@ -0,0 +1,81 @@ +# 24240 bytes is enough for a fifoCache with 3 fully filled eviction block. +init capacity=24240 +---- +cache_size: 0 + +newInputBlock name=block1 percentageFilled=0.25 +---- +blockSize: 42 + +insertBlock name=block1 +---- + +show +---- +cacheSize: 5056 +blockName: block1, blockSize: 42 + +newInputBlock name=block2 percentageFilled=0.50 +---- +blockSize: 84 + +insertBlock name=block2 +---- + +show +---- +cacheSize: 11120 +blockName: block1, blockSize: 42 +blockName: block2, blockSize: 84 + +newInputBlock name=block3 percentageFilled=1.0 +---- +blockSize: 168 + +insertBlock name=block3 +---- + +show +---- +cacheSize: 19200 +blockName: block1, blockSize: 42 +blockName: block2, blockSize: 84 +blockName: block3, blockSize: 168 + +# Any subsequent insertion will cause evictions + +newInputBlock name=block4 percentageFilled=1.0 +---- +blockSize: 168 + +insertBlock name=block4 +---- + +show +---- +cacheSize: 22224 +blockName: block2, blockSize: 84 +blockName: block3, blockSize: 168 +blockName: block4, blockSize: 168 + +verifyBlockExistence names=block2,block3,block4 +---- + +verifyBlockNonExistence names=block1 +---- + +# Inserting sparse blocks can cause premature eviction. + +newInputBlock name=block5 percentageFilled=0.01786 +---- +blockSize: 3 + +insertBlock name=block5 +---- + +show +---- +cacheSize: 20280 +blockName: block3, blockSize: 168 +blockName: block4, blockSize: 168 +blockName: block5, blockSize: 3 diff --git a/pkg/sql/contention/txnidcache/txn_id_cache.go b/pkg/sql/contention/txnidcache/txn_id_cache.go index 5baad3dab7e0..63c1a56727b4 100644 --- a/pkg/sql/contention/txnidcache/txn_id_cache.go +++ b/pkg/sql/contention/txnidcache/txn_id_cache.go @@ -132,10 +132,7 @@ func NewTxnIDCache(st *cluster.Settings, metrics *Metrics) *Cache { closeCh: make(chan struct{}), } - t.store = newFIFOCache(func() int64 { - return MaxSize.Get(&st.SV) / entrySize - } /* capacity */) - + t.store = newFIFOCache(func() int64 { return MaxSize.Get(&st.SV) }) t.writer = newWriter(st, t) return t } @@ -189,7 +186,7 @@ func (t *Cache) DrainWriteBuffer() { t.writer.DrainWriteBuffer() } -// Size return the current size of the Cache. +// Size return the current size of the Cache in bytes. func (t *Cache) Size() int64 { - return int64(t.store.size()) * entrySize + return t.store.size() }