From 00fd06af294737029614bd05d9e14bf038a6150a Mon Sep 17 00:00:00 2001 From: Azhng Date: Tue, 8 Feb 2022 17:47:04 +0000 Subject: [PATCH 1/4] sql: introduce BenchmarkWriter benchmark This commit introduces the new BenchmarkWriter benchmark to measure the write performance for TxnID Cache. Informs #76013 Release note: None --- pkg/sql/contention/txnidcache/BUILD.bazel | 5 + pkg/sql/contention/txnidcache/writer_test.go | 160 +++++++++++++++++++ 2 files changed, 165 insertions(+) create mode 100644 pkg/sql/contention/txnidcache/writer_test.go diff --git a/pkg/sql/contention/txnidcache/BUILD.bazel b/pkg/sql/contention/txnidcache/BUILD.bazel index 30df9f311370..22572a648ee6 100644 --- a/pkg/sql/contention/txnidcache/BUILD.bazel +++ b/pkg/sql/contention/txnidcache/BUILD.bazel @@ -30,22 +30,27 @@ go_test( srcs = [ "main_test.go", "txn_id_cache_test.go", + "writer_test.go", ], + embed = [":txnidcache"], deps = [ "//pkg/kv", "//pkg/roachpb", "//pkg/security", "//pkg/security/securitytest", "//pkg/server", + "//pkg/settings/cluster", "//pkg/sql", "//pkg/sql/sessiondata", "//pkg/sql/tests", "//pkg/testutils", "//pkg/testutils/serverutils", + "//pkg/testutils/skip", "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", "//pkg/util/leaktest", "//pkg/util/log", + "//pkg/util/stop", "//pkg/util/syncutil", "//pkg/util/uuid", "@com_github_cockroachdb_errors//:errors", diff --git a/pkg/sql/contention/txnidcache/writer_test.go b/pkg/sql/contention/txnidcache/writer_test.go new file mode 100644 index 000000000000..1b125dfeb484 --- /dev/null +++ b/pkg/sql/contention/txnidcache/writer_test.go @@ -0,0 +1,160 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package txnidcache + +import ( + "context" + "encoding/binary" + "fmt" + "math" + "sync" + "testing" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/uuid" +) + +type blackHoleSink struct { + pool *sync.Pool + + // Simulate a real sink. + ch chan *messageBlock +} + +var _ messageSink = &blackHoleSink{} + +func newBlackHoleSink(pool *sync.Pool, chanSize int) *blackHoleSink { + return &blackHoleSink{ + pool: pool, + ch: make(chan *messageBlock, chanSize), + } +} + +func (b *blackHoleSink) start() { + go func() { + for block := range b.ch { + *block = messageBlock{} + b.pool.Put(block) + } + }() +} + +func (b *blackHoleSink) stop() { + close(b.ch) +} + +// push implements messageSink interface. +func (b *blackHoleSink) push(block *messageBlock) { + b.ch <- block +} + +// generateUUID uses a provided integer to populate uuid.UUID. This is +// to avoid UUID generation slowing down the benchmark. +func generateUUID(i uint64) uuid.UUID { + id := uuid.UUID{} + binary.LittleEndian.PutUint64(id[:8], i) + binary.BigEndian.PutUint64(id[8:], i) + return id +} + +func BenchmarkWriter(b *testing.B) { + skip.UnderShort(b) + defer log.Scope(b).Close(b) + + ctx := context.Background() + + run := func(b *testing.B, sink messageSink, blockPool *sync.Pool, numOfConcurrentWriter int) { + starter := make(chan struct{}) + + w := newWriter(sink, blockPool) + + b.ResetTimer() + b.SetBytes(messageBlockSize * entrySize) + var wg sync.WaitGroup + for writerIdx := 0; writerIdx < numOfConcurrentWriter; writerIdx++ { + wg.Add(1) + + go func(writerIdx int) { + defer wg.Done() + + <-starter + + numOfOps := b.N / numOfConcurrentWriter + randomValueBase := numOfOps * writerIdx + for i := 0; i < numOfOps; i++ { + randomValue := randomValueBase + i + w.Record(ResolvedTxnID{ + TxnID: generateUUID(uint64(randomValue)), + TxnFingerprintID: roachpb.TransactionFingerprintID(math.MaxInt - randomValue), + }) + } + }(writerIdx) + } + + close(starter) + wg.Wait() + } + + type testSinkType struct { + name string + new func() (_ messageSink, _ *sync.Pool, cleanup func()) + } + + sinkTypes := []testSinkType{ + { + name: "blackHole", + new: func() (_ messageSink, _ *sync.Pool, cleanup func()) { + blockPool := &sync.Pool{ + New: func() interface{} { + return &messageBlock{} + }, + } + + blackHole := newBlackHoleSink(blockPool, channelSize) + blackHole.start() + + return blackHole, blockPool, blackHole.stop + }, + }, + { + name: "real", + new: func() (_ messageSink, _ *sync.Pool, cleanup func()) { + st := cluster.MakeTestingClusterSettings() + metrics := NewMetrics() + realSink := NewTxnIDCache(st, &metrics) + + stopper := stop.NewStopper() + realSink.Start(ctx, stopper) + + return realSink, realSink.messageBlockPool, func() { + stopper.Stop(ctx) + } + }, + }, + } + + for _, sinkType := range sinkTypes { + b.Run(fmt.Sprintf("sinkType=%s", sinkType.name), func(b *testing.B) { + for _, numOfConcurrentWriter := range []int{1, 24, 48, 64, 92, 128} { + b.Run(fmt.Sprintf("concurrentWriter=%d", numOfConcurrentWriter), func(b *testing.B) { + sink, blockPool, cleanup := sinkType.new() + defer cleanup() + + run(b, sink, blockPool, numOfConcurrentWriter) + }) + } + }) + } +} From 8c35183c8ff2c31ebf6785e81eaf29eebb77e3da Mon Sep 17 00:00:00 2001 From: Azhng Date: Thu, 10 Feb 2022 00:16:02 +0000 Subject: [PATCH 2/4] sql: introduce new FIFO cache for txnID cache Previously, txnID cache relied on cache.UnorderedCache for storage and FIFO eviction behavior. However, due to unintended allocation behavior within cache.UnorderedCache, this yielded about 20% throughput drop in kv95 benchmark. This commit introduces a new FIFO cache, built to remove all allocation during write operation. This commit removes all performance hit caused by the previous use of cache.UnorderedCache. Resolves #76013 Release note: None --- pkg/sql/contention/txnidcache/BUILD.bazel | 3 +- .../txnidcache/concurrent_write_buffer.go | 34 +-- pkg/sql/contention/txnidcache/fifo_cache.go | 173 +++++++++++++ .../contention/txnidcache/fifo_cache_test.go | 231 ++++++++++++++++++ pkg/sql/contention/txnidcache/txn_id_cache.go | 89 +++---- pkg/sql/contention/txnidcache/writer.go | 12 +- pkg/sql/contention/txnidcache/writer_test.go | 30 +-- 7 files changed, 473 insertions(+), 99 deletions(-) create mode 100644 pkg/sql/contention/txnidcache/fifo_cache.go create mode 100644 pkg/sql/contention/txnidcache/fifo_cache_test.go diff --git a/pkg/sql/contention/txnidcache/BUILD.bazel b/pkg/sql/contention/txnidcache/BUILD.bazel index 22572a648ee6..3551b9e641b0 100644 --- a/pkg/sql/contention/txnidcache/BUILD.bazel +++ b/pkg/sql/contention/txnidcache/BUILD.bazel @@ -5,6 +5,7 @@ go_library( srcs = [ "cluster_settings.go", "concurrent_write_buffer.go", + "fifo_cache.go", "metrics.go", "txn_id_cache.go", "writer.go", @@ -16,7 +17,6 @@ go_library( "//pkg/settings", "//pkg/settings/cluster", "//pkg/sql/contention/contentionutils", - "//pkg/util/cache", "//pkg/util/encoding", "//pkg/util/metric", "//pkg/util/stop", @@ -28,6 +28,7 @@ go_library( go_test( name = "txnidcache_test", srcs = [ + "fifo_cache_test.go", "main_test.go", "txn_id_cache_test.go", "writer_test.go", diff --git a/pkg/sql/contention/txnidcache/concurrent_write_buffer.go b/pkg/sql/contention/txnidcache/concurrent_write_buffer.go index 5455c958e424..9157bc1623d0 100644 --- a/pkg/sql/contention/txnidcache/concurrent_write_buffer.go +++ b/pkg/sql/contention/txnidcache/concurrent_write_buffer.go @@ -16,9 +16,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/contention/contentionutils" ) -const messageBlockSize = 1024 +// blockSize is chosen as 168 since each ResolvedTxnID is 24 byte. +// 168 * 24 = 4032 bytes < 4KiB page size. +const blockSize = 168 -type messageBlock [messageBlockSize]ResolvedTxnID +type block [blockSize]ResolvedTxnID // concurrentWriteBuffer is a data structure that optimizes for concurrent // writes and also implements the Writer interface. @@ -26,37 +28,37 @@ type concurrentWriteBuffer struct { guard struct { *contentionutils.ConcurrentBufferGuard - // msgBlock is the temporary buffer that concurrentWriteBuffer uses to batch + // block is the temporary buffer that concurrentWriteBuffer uses to batch // write requests before sending them into the channel. - msgBlock *messageBlock + block *block } - msgBlockPool *sync.Pool + blockPool *sync.Pool // sink is the flush target that ConcurrentWriteBuffer flushes to once - // msgBlock is full. - sink messageSink + // block is full. + sink blockSink } var _ Writer = &concurrentWriteBuffer{} // newConcurrentWriteBuffer returns a new instance of concurrentWriteBuffer. -func newConcurrentWriteBuffer(sink messageSink, msgBlockPool *sync.Pool) *concurrentWriteBuffer { +func newConcurrentWriteBuffer(sink blockSink, blockPool *sync.Pool) *concurrentWriteBuffer { writeBuffer := &concurrentWriteBuffer{ - sink: sink, - msgBlockPool: msgBlockPool, + sink: sink, + blockPool: blockPool, } - writeBuffer.guard.msgBlock = msgBlockPool.Get().(*messageBlock) + writeBuffer.guard.block = blockPool.Get().(*block) writeBuffer.guard.ConcurrentBufferGuard = contentionutils.NewConcurrentBufferGuard( func() int64 { - return messageBlockSize + return blockSize }, /* limiter */ func(_ int64) { - writeBuffer.sink.push(writeBuffer.guard.msgBlock) + writeBuffer.sink.push(writeBuffer.guard.block) - // Resets the msgBlock. - writeBuffer.guard.msgBlock = writeBuffer.msgBlockPool.Get().(*messageBlock) + // Resets the block. + writeBuffer.guard.block = writeBuffer.blockPool.Get().(*block) } /* onBufferFull */) return writeBuffer @@ -66,7 +68,7 @@ func newConcurrentWriteBuffer(sink messageSink, msgBlockPool *sync.Pool) *concur // fingerprint ID. Record is safe to be used concurrently. func (c *concurrentWriteBuffer) Record(resolvedTxnID ResolvedTxnID) { c.guard.AtomicWrite(func(writerIdx int64) { - c.guard.msgBlock[writerIdx] = resolvedTxnID + c.guard.block[writerIdx] = resolvedTxnID }) } diff --git a/pkg/sql/contention/txnidcache/fifo_cache.go b/pkg/sql/contention/txnidcache/fifo_cache.go new file mode 100644 index 000000000000..88224c74d0c4 --- /dev/null +++ b/pkg/sql/contention/txnidcache/fifo_cache.go @@ -0,0 +1,173 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package txnidcache + +import ( + "sync" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/contention/contentionutils" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/uuid" +) + +var nodePool = &sync.Pool{ + New: func() interface{} { + return &blockListNode{} + }, +} + +type fifoCache struct { + blockPool *sync.Pool + capacity contentionutils.CapacityLimiter + + mu struct { + syncutil.RWMutex + + data map[uuid.UUID]roachpb.TransactionFingerprintID + + eviction blockList + } +} + +type blockListNode struct { + block + next *blockListNode +} + +// blockList is a singly-linked list of blocks. The list is used to +// implement FIFO eviction. +type blockList struct { + head *blockListNode + tail *blockListNode + + // tailIdx is an index pointing into the next empty slot in block + // stored in the tail pointer. + tailIdx int +} + +func newFIFOCache(pool *sync.Pool, capacity contentionutils.CapacityLimiter) *fifoCache { + c := &fifoCache{ + blockPool: pool, + capacity: capacity, + } + + c.mu.data = make(map[uuid.UUID]roachpb.TransactionFingerprintID) + c.mu.eviction = blockList{} + return c +} + +// add insert a block into the fifoCache. +// N.B. After Add() returns, the input block should not be reused. +func (c *fifoCache) add(b *block) { + c.mu.Lock() + defer c.mu.Unlock() + + blockSize := 0 + for i := range b { + if !b[i].valid() { + break + } + + c.mu.data[b[i].TxnID] = b[i].TxnFingerprintID + blockSize++ + } + + c.mu.eviction.append(b[:blockSize]) + + // Zeros out the block and put it back into the blockPool. + *b = block{} + c.blockPool.Put(b) + + c.maybeEvictLocked() +} + +func (c *fifoCache) get(txnID uuid.UUID) (roachpb.TransactionFingerprintID, bool) { + c.mu.RLock() + defer c.mu.RUnlock() + + fingerprintID, found := c.mu.data[txnID] + return fingerprintID, found +} + +func (c *fifoCache) size() int { + c.mu.RLock() + defer c.mu.RUnlock() + return len(c.mu.data) +} + +func (c *fifoCache) maybeEvictLocked() { + for int64(len(c.mu.data)) > c.capacity() { + node := c.mu.eviction.removeFront() + if node == nil { + return + } + + c.evictNodeLocked(node) + + // Zero out the node and put it back into the pool. + *node = blockListNode{} + nodePool.Put(node) + } +} + +// evictNodeLocked deletes all entries in the block from the internal map. +func (c *fifoCache) evictNodeLocked(node *blockListNode) { + for i := 0; i < blockSize; i++ { + if !node.block[i].valid() { + break + } + + delete(c.mu.data, node.block[i].TxnID) + } +} + +func (e *blockList) append(block []ResolvedTxnID) { + block = e.appendToTail(block) + for len(block) > 0 { + e.addNode() + block = e.appendToTail(block) + } +} + +func (e *blockList) addNode() { + newNode := nodePool.Get().(*blockListNode) + if e.head == nil { + e.head = newNode + } else { + e.tail.next = newNode + } + e.tail = newNode + e.tailIdx = 0 +} + +func (e *blockList) appendToTail(block []ResolvedTxnID) (remaining []ResolvedTxnID) { + if e.head == nil { + return block + } + toCopy := blockSize - e.tailIdx + if toCopy > len(block) { + toCopy = len(block) + } + copy(e.tail.block[e.tailIdx:], block[:toCopy]) + e.tailIdx += toCopy + return block[toCopy:] +} + +func (e *blockList) removeFront() *blockListNode { + if e.head == nil { + return nil + } + + removedBlock := e.head + e.head = e.head.next + return removedBlock +} diff --git a/pkg/sql/contention/txnidcache/fifo_cache_test.go b/pkg/sql/contention/txnidcache/fifo_cache_test.go new file mode 100644 index 000000000000..14ab17d39ccd --- /dev/null +++ b/pkg/sql/contention/txnidcache/fifo_cache_test.go @@ -0,0 +1,231 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package txnidcache + +import ( + "fmt" + "math/rand" + "sync" + "testing" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/stretchr/testify/require" +) + +func TestFIFOCache(t *testing.T) { + pool := &sync.Pool{ + New: func() interface{} { + return &block{} + }, + } + cache := newFIFOCache(pool, func() int64 { return 2 * blockSize } /* capacity */) + + // Fill the first eviction block in cache to 1/4 capacity. + input1, expected1 := generateInputBlock(blockSize * 1 / 4 /* size */) + cache.add(cloneBlock(input1)) + checkExists(t, cache, expected1) + checkEvictionListShape(t, cache, []int{blockSize * 1 / 4}) + checkEvictionListContent(t, cache, []*block{input1}) + + // Fill the first eviction block in cache to 3/4 capacity. + input2, expected2 := generateInputBlock(blockSize / 2 /* size */) + cache.add(cloneBlock(input2)) + checkExists(t, cache, expected1, expected2) + checkEvictionListShape(t, cache, []int{blockSize * 3 / 4}) + + checkEvictionListContent(t, cache, []*block{input1, input2}) + + // Overflow the first eviction block, the second block should be 1/2 filled. + input3, expected3 := generateInputBlock(blockSize * 3 / 4) + cache.add(cloneBlock(input3)) + checkExists(t, cache, expected1, expected2, expected3) + checkEvictionListShape(t, cache, []int{blockSize, blockSize / 2}) + checkEvictionListContent(t, cache, []*block{input1, input2, input3}) + + // Overflow the second block, cause the first block to be evicted. Second + // block becomes the first block, and it is completely filled. The new second + // block should be 1/4 filled. Part of the input3 will be evicted. + input4, expected4 := generateInputBlock(blockSize * 3 / 4) + cache.add(cloneBlock(input4)) + checkEvictionListShape(t, cache, []int{blockSize, blockSize / 4}) + + // First 1/4 of input3 should be evicted and the remaining [2/4:3/4] should + // still remain. + remainingInput3 := &block{} + copy(remainingInput3[:], input3[blockSize*1/4:blockSize*3/4]) + checkEvictionListContent(t, cache, []*block{remainingInput3, input4}) + + // Removes the portion that hasn't been evicted. + evictedExpected3 := removeEntryFromMap(expected3, input3[blockSize*1/4:blockSize*3/4]) + + // Removes the portion that has been evicted. + remainingExpected3 := removeEntryFromMap(expected3, input3[:blockSize*1/4]) + + checkNotExist(t, cache, expected1, expected2, evictedExpected3) + checkExists(t, cache, remainingExpected3, expected4) + + // Partially fill up the second block in the eviction list. + input5, expected5 := generateInputBlock(blockSize / 2) + cache.add(cloneBlock(input5)) + checkNotExist(t, cache, expected1, expected2, evictedExpected3) + checkExists(t, cache, remainingExpected3, expected4, expected5) + checkEvictionListShape(t, cache, []int{blockSize, blockSize * 3 / 4}) + checkEvictionListContent(t, cache, []*block{remainingInput3, input4, input5}) + + // Overflow the second block again to trigger another eviction, part of the + // input4 would be evicted. + input6, expected6 := generateInputBlock(blockSize * 3 / 4) + cache.add(cloneBlock(input6)) + checkEvictionListShape(t, cache, []int{blockSize, blockSize / 2}) + + remainingInput4 := &block{} + copy(remainingInput4[:], input4[blockSize/2:blockSize*3/4]) + checkEvictionListContent(t, cache, []*block{remainingInput4, input5, input6}) + + evictedExpected4 := removeEntryFromMap(expected4, input4[blockSize/2:blockSize*3/4]) + remainingExpected4 := removeEntryFromMap(expected4, input4[:blockSize/2]) + + checkNotExist(t, cache, expected1, expected2, expected3, evictedExpected4) + checkExists(t, cache, remainingExpected4, expected5, expected6) +} + +func removeEntryFromMap( + m map[uuid.UUID]roachpb.TransactionFingerprintID, filterList []ResolvedTxnID, +) map[uuid.UUID]roachpb.TransactionFingerprintID { + newMap := make(map[uuid.UUID]roachpb.TransactionFingerprintID) + for k, v := range m { + newMap[k] = v + } + + for _, val := range filterList { + delete(newMap, val.TxnID) + } + + return newMap +} + +func checkEvictionListContent(t *testing.T, cache *fifoCache, expectedBlocks []*block) { + t.Helper() + + cur := cache.mu.eviction.head + evictionListBlockIdx := 0 + evictionListSize := 0 + + for i := range expectedBlocks { + require.NotNilf(t, cur, "expect a valid eviction list node, but it (size=%d)"+ + "is nil", evictionListSize) + + for blockIdx := 0; blockIdx < blockSize; blockIdx++ { + if !expectedBlocks[i][blockIdx].valid() { + break + } + + require.Equal(t, expectedBlocks[i][blockIdx], cur.block[evictionListBlockIdx], + "expected eviction block at index [%d][%d] to be "+ + "%s, but it is %s", i, blockIdx, expectedBlocks[i][blockIdx].TxnID.String(), + cur.block[evictionListBlockIdx].TxnID.String()) + + evictionListBlockIdx++ + + isEvictionListIdxStillValid := + evictionListBlockIdx < blockSize && cur.block[evictionListBlockIdx].valid() + + if !isEvictionListIdxStillValid { + cur = cur.next + evictionListBlockIdx = 0 + evictionListSize++ + } + } + } + + require.Nilf(t, cur, "expect eviction list to be fully iterated, but it was not") +} + +func checkEvictionListShape(t *testing.T, cache *fifoCache, expectedBlockSizes []int) { + t.Helper() + cur := cache.mu.eviction.head + + for i := range expectedBlockSizes { + require.NotNilf(t, cur, "expect an eviction list of size %d, but it has "+ + "a size of %d", len(expectedBlockSizes), i-1) + + actualBlockSize := 0 + for blockIdx := 0; blockIdx < blockSize; blockIdx++ { + if !cur.block[blockIdx].valid() { + break + } + actualBlockSize++ + } + + require.Equal(t, expectedBlockSizes[i], actualBlockSize, + "expected eviction list block at index [%d] to have a size "+ + "of %d, but instead it has a size of %d", + i, expectedBlockSizes[i], actualBlockSize) + + cur = cur.next + } +} + +func checkExists( + t *testing.T, cache *fifoCache, expectedMaps ...map[uuid.UUID]roachpb.TransactionFingerprintID, +) { + t.Helper() + for _, expected := range expectedMaps { + for expectedKey, expectedValue := range expected { + actualValue, found := cache.get(expectedKey) + require.True(t, found, "expected txnID %s to be present in "+ + "the cache, but it was not found", expectedKey) + require.Equal(t, expectedValue, actualValue, "expected to find "+ + "transaction fingerprint ID %d for txnID %s, but found %d instead", + expectedValue, expectedKey, actualValue) + } + } +} + +func checkNotExist( + t *testing.T, cache *fifoCache, expectedMaps ...map[uuid.UUID]roachpb.TransactionFingerprintID, +) { + t.Helper() + for _, expected := range expectedMaps { + for expectedKey := range expected { + _, found := cache.get(expectedKey) + require.False(t, found, "expected txnID %s to be not present in "+ + "the cache, but it was found", expectedKey) + } + } +} + +func cloneBlock(b *block) *block { + newBlock := &block{} + copy(newBlock[:], b[:]) + return newBlock +} + +func generateInputBlock( + size int, +) (input *block, expected map[uuid.UUID]roachpb.TransactionFingerprintID) { + if size > blockSize { + panic(fmt.Sprintf("input block size cannot be greater than %d", blockSize)) + } + + input = &block{} + expected = make(map[uuid.UUID]roachpb.TransactionFingerprintID) + + for i := 0; i < size; i++ { + input[i].TxnID = uuid.FastMakeV4() + input[i].TxnFingerprintID = roachpb.TransactionFingerprintID(rand.Uint64()) + + expected[input[i].TxnID] = input[i].TxnFingerprintID + } + + return input, expected +} diff --git a/pkg/sql/contention/txnidcache/txn_id_cache.go b/pkg/sql/contention/txnidcache/txn_id_cache.go index 9384ed704430..ab20e4a67c46 100644 --- a/pkg/sql/contention/txnidcache/txn_id_cache.go +++ b/pkg/sql/contention/txnidcache/txn_id_cache.go @@ -16,9 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" - "github.com/cockroachdb/cockroach/pkg/util/cache" "github.com/cockroachdb/cockroach/pkg/util/stop" - "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" ) @@ -42,9 +40,9 @@ type Writer interface { Flush() } -type messageSink interface { - // push allows a messageBlock to be pushed into the pusher. - push(*messageBlock) +type blockSink interface { + // push allows a block to be pushed into the pusher. + push(*block) } const channelSize = 128 @@ -93,27 +91,22 @@ const channelSize = 128 // ^ // | // Cache polls the channel using a goroutine and push the -// | messageBlock into its storage. +// | block into its storage. // | // +----------------------------------+ // | Cache: | // | The cache contains a | // | FIFO buffer backed by | -// | cache.UnorderedCache | +// | fifoCache. | // +----------------------------------+ type Cache struct { st *cluster.Settings - msgChan chan *messageBlock + blockCh chan *block closeCh chan struct{} - mu struct { - syncutil.RWMutex - - store *cache.UnorderedCache - } - - messageBlockPool *sync.Pool + store *fifoCache + blockPool *sync.Pool writer Writer @@ -137,9 +130,9 @@ func (r *ResolvedTxnID) valid() bool { } var ( - _ Reader = &Cache{} - _ Writer = &Cache{} - _ messageSink = &Cache{} + _ Reader = &Cache{} + _ Writer = &Cache{} + _ blockSink = &Cache{} ) // NewTxnIDCache creates a new instance of Cache. @@ -147,51 +140,30 @@ func NewTxnIDCache(st *cluster.Settings, metrics *Metrics) *Cache { t := &Cache{ st: st, metrics: metrics, - msgChan: make(chan *messageBlock, channelSize), + blockCh: make(chan *block, channelSize), closeCh: make(chan struct{}), - } - - t.messageBlockPool = &sync.Pool{ - New: func() interface{} { - return &messageBlock{} + blockPool: &sync.Pool{ + New: func() interface{} { + return &block{} + }, }, } - t.mu.store = cache.NewUnorderedCache(cache.Config{ - Policy: cache.CacheFIFO, - ShouldEvict: func(size int, _, _ interface{}) bool { - return int64(size)*entrySize > MaxSize.Get(&st.SV) - }, - }) + t.store = newFIFOCache(t.blockPool, func() int64 { + return MaxSize.Get(&st.SV) / entrySize + } /* capacity */) - t.writer = newWriter(t, t.messageBlockPool) + t.writer = newWriter(t, t.blockPool) return t } // Start implements the Provider interface. func (t *Cache) Start(ctx context.Context, stopper *stop.Stopper) { - addBlockToStore := func(msgBlock *messageBlock) { - t.mu.Lock() - defer t.mu.Unlock() - for blockIdx := range msgBlock { - if !msgBlock[blockIdx].valid() { - break - } - t.mu.store.Add(msgBlock[blockIdx].TxnID, msgBlock[blockIdx].TxnFingerprintID) - } - } - - consumeBlock := func(b *messageBlock) { - addBlockToStore(b) - *b = messageBlock{} - t.messageBlockPool.Put(b) - } - err := stopper.RunAsyncTask(ctx, "txn-id-cache-ingest", func(ctx context.Context) { for { select { - case msgBlock := <-t.msgChan: - consumeBlock(msgBlock) + case b := <-t.blockCh: + t.store.add(b) case <-stopper.ShouldQuiesce(): close(t.closeCh) return @@ -207,16 +179,13 @@ func (t *Cache) Start(ctx context.Context, stopper *stop.Stopper) { func (t *Cache) Lookup(txnID uuid.UUID) (result roachpb.TransactionFingerprintID, found bool) { t.metrics.CacheReadCounter.Inc(1) - t.mu.RLock() - defer t.mu.RUnlock() - - txnFingerprintID, found := t.mu.store.Get(txnID) + txnFingerprintID, found := t.store.get(txnID) if !found { t.metrics.CacheMissCounter.Inc(1) return roachpb.InvalidTransactionFingerprintID, found } - return txnFingerprintID.(roachpb.TransactionFingerprintID), found + return txnFingerprintID, found } // Record implements the Writer interface. @@ -224,10 +193,10 @@ func (t *Cache) Record(resolvedTxnID ResolvedTxnID) { t.writer.Record(resolvedTxnID) } -// push implements the messageSink interface. -func (t *Cache) push(msg *messageBlock) { +// push implements the blockSink interface. +func (t *Cache) push(b *block) { select { - case t.msgChan <- msg: + case t.blockCh <- b: case <-t.closeCh: } } @@ -239,7 +208,5 @@ func (t *Cache) Flush() { // Size return the current size of the Cache. func (t *Cache) Size() int64 { - t.mu.RLock() - defer t.mu.RUnlock() - return int64(t.mu.store.Len()) * entrySize + return int64(t.store.size()) * entrySize } diff --git a/pkg/sql/contention/txnidcache/writer.go b/pkg/sql/contention/txnidcache/writer.go index f86bc6bd3916..b24f45fb9adc 100644 --- a/pkg/sql/contention/txnidcache/writer.go +++ b/pkg/sql/contention/txnidcache/writer.go @@ -24,20 +24,20 @@ const shardCount = 16 type writer struct { shards [shardCount]*concurrentWriteBuffer - sink messageSink - msgBlockPool *sync.Pool + sink blockSink + blockPool *sync.Pool } var _ Writer = &writer{} -func newWriter(sink messageSink, msgBlockPool *sync.Pool) *writer { +func newWriter(sink blockSink, blockPool *sync.Pool) *writer { w := &writer{ - sink: sink, - msgBlockPool: msgBlockPool, + sink: sink, + blockPool: blockPool, } for shardIdx := 0; shardIdx < shardCount; shardIdx++ { - w.shards[shardIdx] = newConcurrentWriteBuffer(sink, msgBlockPool) + w.shards[shardIdx] = newConcurrentWriteBuffer(sink, blockPool) } return w diff --git a/pkg/sql/contention/txnidcache/writer_test.go b/pkg/sql/contention/txnidcache/writer_test.go index 1b125dfeb484..8555007ef5cf 100644 --- a/pkg/sql/contention/txnidcache/writer_test.go +++ b/pkg/sql/contention/txnidcache/writer_test.go @@ -30,23 +30,23 @@ type blackHoleSink struct { pool *sync.Pool // Simulate a real sink. - ch chan *messageBlock + ch chan *block } -var _ messageSink = &blackHoleSink{} +var _ blockSink = &blackHoleSink{} func newBlackHoleSink(pool *sync.Pool, chanSize int) *blackHoleSink { return &blackHoleSink{ pool: pool, - ch: make(chan *messageBlock, chanSize), + ch: make(chan *block, chanSize), } } func (b *blackHoleSink) start() { go func() { - for block := range b.ch { - *block = messageBlock{} - b.pool.Put(block) + for incomingBlock := range b.ch { + *incomingBlock = block{} + b.pool.Put(incomingBlock) } }() } @@ -55,8 +55,8 @@ func (b *blackHoleSink) stop() { close(b.ch) } -// push implements messageSink interface. -func (b *blackHoleSink) push(block *messageBlock) { +// push implements blockSink interface. +func (b *blackHoleSink) push(block *block) { b.ch <- block } @@ -75,13 +75,13 @@ func BenchmarkWriter(b *testing.B) { ctx := context.Background() - run := func(b *testing.B, sink messageSink, blockPool *sync.Pool, numOfConcurrentWriter int) { + run := func(b *testing.B, sink blockSink, blockPool *sync.Pool, numOfConcurrentWriter int) { starter := make(chan struct{}) w := newWriter(sink, blockPool) b.ResetTimer() - b.SetBytes(messageBlockSize * entrySize) + b.SetBytes(blockSize * entrySize) var wg sync.WaitGroup for writerIdx := 0; writerIdx < numOfConcurrentWriter; writerIdx++ { wg.Add(1) @@ -109,16 +109,16 @@ func BenchmarkWriter(b *testing.B) { type testSinkType struct { name string - new func() (_ messageSink, _ *sync.Pool, cleanup func()) + new func() (_ blockSink, _ *sync.Pool, cleanup func()) } sinkTypes := []testSinkType{ { name: "blackHole", - new: func() (_ messageSink, _ *sync.Pool, cleanup func()) { + new: func() (_ blockSink, _ *sync.Pool, cleanup func()) { blockPool := &sync.Pool{ New: func() interface{} { - return &messageBlock{} + return &block{} }, } @@ -130,7 +130,7 @@ func BenchmarkWriter(b *testing.B) { }, { name: "real", - new: func() (_ messageSink, _ *sync.Pool, cleanup func()) { + new: func() (_ blockSink, _ *sync.Pool, cleanup func()) { st := cluster.MakeTestingClusterSettings() metrics := NewMetrics() realSink := NewTxnIDCache(st, &metrics) @@ -138,7 +138,7 @@ func BenchmarkWriter(b *testing.B) { stopper := stop.NewStopper() realSink.Start(ctx, stopper) - return realSink, realSink.messageBlockPool, func() { + return realSink, realSink.blockPool, func() { stopper.Stop(ctx) } }, From 583f56f984ac0a04dbdc87fbd3154e34042f2325 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Tue, 15 Feb 2022 11:40:12 -0500 Subject: [PATCH 3/4] sql/catalog: propagate HasPostDeserialization changes through the catalog This is an imperfect change that feels a bit ad-hoc for my liking, but does improve the situation. The problem is that in recent changes, we no longer just retreive mutable descriptors and then sometimes clone Immutable descriptors. Instead we create the more general Immutable descriptors and clone out mutables. The problem is that when we did this, we threw away information regarding whether the descriptor itself had been modified by post-deserialization. In general, this is mostly an exercise in plumbing. The key feature is that when we retreive a Mutable descriptor and it was changed, we need to know that. This change tests that. There is some ad-hoc code to propagate isUncommitedVersion in various places which I don't feel great about, but it also felt wrong not to. A follow-up should come through to really nail down the properties here. The existence of NewBuilder and the fact that it's called in various places is another mechanism by which this information could be erased, but that's less of a concern for me. This change makes it possible to simplify migrations around descriptor upgrades. Release note: None --- pkg/sql/catalog/BUILD.bazel | 1 + pkg/sql/catalog/catalog.go | 3 - pkg/sql/catalog/dbdesc/database_desc.go | 28 +++--- .../catalog/dbdesc/database_desc_builder.go | 46 +++++++--- pkg/sql/catalog/dbdesc/database_test.go | 3 +- pkg/sql/catalog/descriptor.go | 4 + pkg/sql/catalog/descs/BUILD.bazel | 2 +- pkg/sql/catalog/descs/collection_test.go | 77 ++++++++++++++++ pkg/sql/catalog/descs/hydrate.go | 10 +-- pkg/sql/catalog/descs/kv_descriptors.go | 4 +- pkg/sql/catalog/descs/txn.go | 1 + .../catalog/internal/catkv/catalog_query.go | 4 + pkg/sql/catalog/lease/count.go | 1 + .../catalog/post_derserialization_changes.go | 89 +++++++++++++++++++ pkg/sql/catalog/schemadesc/schema_desc.go | 33 ++++--- .../catalog/schemadesc/schema_desc_builder.go | 43 ++++++--- .../schemadesc/synthetic_schema_desc.go | 3 + pkg/sql/catalog/tabledesc/helpers_test.go | 4 +- pkg/sql/catalog/tabledesc/structured.go | 48 +--------- pkg/sql/catalog/tabledesc/structured_test.go | 8 +- pkg/sql/catalog/tabledesc/table_desc.go | 32 +++---- .../catalog/tabledesc/table_desc_builder.go | 81 +++++++++-------- .../typedesc/table_implicit_record_type.go | 5 ++ pkg/sql/catalog/typedesc/type_desc.go | 33 ++++--- pkg/sql/catalog/typedesc/type_desc_builder.go | 46 +++++++--- 25 files changed, 419 insertions(+), 190 deletions(-) create mode 100644 pkg/sql/catalog/post_derserialization_changes.go diff --git a/pkg/sql/catalog/BUILD.bazel b/pkg/sql/catalog/BUILD.bazel index 9fe419c59cbe..5f9dbac9b77c 100644 --- a/pkg/sql/catalog/BUILD.bazel +++ b/pkg/sql/catalog/BUILD.bazel @@ -8,6 +8,7 @@ go_library( "descriptor.go", "descriptor_id_set.go", "errors.go", + "post_derserialization_changes.go", "schema.go", "table_col_map.go", "table_col_set.go", diff --git a/pkg/sql/catalog/catalog.go b/pkg/sql/catalog/catalog.go index 60dd47699015..2aedee7b0671 100644 --- a/pkg/sql/catalog/catalog.go +++ b/pkg/sql/catalog/catalog.go @@ -54,9 +54,6 @@ type MutableDescriptor interface { SetDropped() // SetOffline sets the descriptor's state to offline, with the provided reason. SetOffline(reason string) - // HasPostDeserializationChanges returns if the MutableDescriptor was changed after running - // RunPostDeserializationChanges. - HasPostDeserializationChanges() bool // SetDeclarativeSchemaChangerState sets the state of the declarative // schema change currently operating on this descriptor. diff --git a/pkg/sql/catalog/dbdesc/database_desc.go b/pkg/sql/catalog/dbdesc/database_desc.go index d75703b11dff..67b369abe0e3 100644 --- a/pkg/sql/catalog/dbdesc/database_desc.go +++ b/pkg/sql/catalog/dbdesc/database_desc.go @@ -42,6 +42,10 @@ type immutable struct { // isUncommittedVersion is set to true if this descriptor was created from // a copy of a Mutable with an uncommitted version. isUncommittedVersion bool + + // changed represents whether or not the descriptor was changed + // after RunPostDeserializationChanges. + changes catalog.PostDeserializationChanges } // Mutable wraps a database descriptor and provides methods @@ -49,10 +53,6 @@ type immutable struct { type Mutable struct { immutable ClusterVersion *immutable - - // changed represents whether or not the descriptor was changed - // after RunPostDeserializationChanges. - changed bool } // SafeMessage makes immutable a SafeMessager. @@ -147,7 +147,15 @@ func (desc *immutable) ByteSize() int64 { // NewBuilder implements the catalog.Descriptor interface. func (desc *immutable) NewBuilder() catalog.DescriptorBuilder { - return NewBuilder(desc.DatabaseDesc()) + return newBuilder(desc.DatabaseDesc(), desc.isUncommittedVersion, desc.changes) +} + +// NewBuilder implements the catalog.Descriptor interface. +// +// It overrides the wrapper's implementation to deal with the fact that +// mutable has overridden the definition of IsUncommittedVersion. +func (desc *Mutable) NewBuilder() catalog.DescriptorBuilder { + return newBuilder(desc.DatabaseDesc(), desc.IsUncommittedVersion(), desc.changes) } // IsMultiRegion implements the DatabaseDescriptor interface. @@ -381,9 +389,7 @@ func (desc *Mutable) OriginalVersion() descpb.DescriptorVersion { // ImmutableCopy implements the MutableDescriptor interface. func (desc *Mutable) ImmutableCopy() catalog.Descriptor { - imm := NewBuilder(desc.DatabaseDesc()).BuildImmutableDatabase() - imm.(*immutable).isUncommittedVersion = desc.IsUncommittedVersion() - return imm + return desc.NewBuilder().BuildImmutable() } // IsNew implements the MutableDescriptor interface. @@ -458,10 +464,10 @@ func (desc *Mutable) SetPlacement(placement descpb.DataPlacement) { desc.RegionConfig.Placement = placement } -// HasPostDeserializationChanges returns if the MutableDescriptor was changed after running +// GetPostDeserializationChanges returns if the MutableDescriptor was changed after running // RunPostDeserializationChanges. -func (desc *Mutable) HasPostDeserializationChanges() bool { - return desc.changed +func (desc *immutable) GetPostDeserializationChanges() catalog.PostDeserializationChanges { + return desc.changes } // GetDefaultPrivilegeDescriptor returns a DefaultPrivilegeDescriptor. diff --git a/pkg/sql/catalog/dbdesc/database_desc_builder.go b/pkg/sql/catalog/dbdesc/database_desc_builder.go index 45b3811614a0..76901bdcd433 100644 --- a/pkg/sql/catalog/dbdesc/database_desc_builder.go +++ b/pkg/sql/catalog/dbdesc/database_desc_builder.go @@ -36,7 +36,8 @@ type databaseDescriptorBuilder struct { original *descpb.DatabaseDescriptor maybeModified *descpb.DatabaseDescriptor - changed bool + isUncommittedVersion bool + changes catalog.PostDeserializationChanges } var _ DatabaseDescriptorBuilder = &databaseDescriptorBuilder{} @@ -44,8 +45,19 @@ var _ DatabaseDescriptorBuilder = &databaseDescriptorBuilder{} // NewBuilder creates a new catalog.DescriptorBuilder object for building // database descriptors. func NewBuilder(desc *descpb.DatabaseDescriptor) DatabaseDescriptorBuilder { + return newBuilder(desc, false, /* isUncommittedVersion */ + catalog.PostDeserializationChanges{}) +} + +func newBuilder( + desc *descpb.DatabaseDescriptor, + isUncommittedVersion bool, + changes catalog.PostDeserializationChanges, +) DatabaseDescriptorBuilder { return &databaseDescriptorBuilder{ - original: protoutil.Clone(desc).(*descpb.DatabaseDescriptor), + original: protoutil.Clone(desc).(*descpb.DatabaseDescriptor), + isUncommittedVersion: isUncommittedVersion, + changes: changes, } } @@ -82,10 +94,14 @@ func (ddb *databaseDescriptorBuilder) RunPostDeserializationChanges() { descpb.InvalidID, privilege.Database, ddb.maybeModified.GetName()) - removedSelfEntryInSchemas := maybeRemoveDroppedSelfEntryFromSchemas(ddb.maybeModified) addedGrantOptions := catprivilege.MaybeUpdateGrantOptions(ddb.maybeModified.Privileges) - ddb.changed = privsChanged || removedSelfEntryInSchemas || addedGrantOptions || - removedIncompatibleDatabasePrivs || createdDefaultPrivileges + + if privsChanged || addedGrantOptions || removedIncompatibleDatabasePrivs || createdDefaultPrivileges { + ddb.changes.Add(catalog.UpgradedPrivileges) + } + if maybeRemoveDroppedSelfEntryFromSchemas(ddb.maybeModified) { + ddb.changes.Add(catalog.RemovedSelfEntryInSchemas) + } } // RunRestoreChanges implements the catalog.DescriptorBuilder interface. @@ -146,7 +162,11 @@ func (ddb *databaseDescriptorBuilder) BuildImmutableDatabase() catalog.DatabaseD if desc == nil { desc = ddb.original } - return &immutable{DatabaseDescriptor: *desc} + return &immutable{ + DatabaseDescriptor: *desc, + isUncommittedVersion: ddb.isUncommittedVersion, + changes: ddb.changes, + } } // BuildExistingMutable implements the catalog.DescriptorBuilder interface. @@ -161,9 +181,12 @@ func (ddb *databaseDescriptorBuilder) BuildExistingMutableDatabase() *Mutable { ddb.maybeModified = protoutil.Clone(ddb.original).(*descpb.DatabaseDescriptor) } return &Mutable{ - immutable: immutable{DatabaseDescriptor: *ddb.maybeModified}, + immutable: immutable{ + DatabaseDescriptor: *ddb.maybeModified, + changes: ddb.changes, + isUncommittedVersion: ddb.isUncommittedVersion, + }, ClusterVersion: &immutable{DatabaseDescriptor: *ddb.original}, - changed: ddb.changed, } } @@ -180,8 +203,11 @@ func (ddb *databaseDescriptorBuilder) BuildCreatedMutableDatabase() *Mutable { desc = ddb.original } return &Mutable{ - immutable: immutable{DatabaseDescriptor: *desc}, - changed: ddb.changed, + immutable: immutable{ + DatabaseDescriptor: *desc, + changes: ddb.changes, + isUncommittedVersion: ddb.isUncommittedVersion, + }, } } diff --git a/pkg/sql/catalog/dbdesc/database_test.go b/pkg/sql/catalog/dbdesc/database_test.go index 3b87b3d3c165..c88927a8af7e 100644 --- a/pkg/sql/catalog/dbdesc/database_test.go +++ b/pkg/sql/catalog/dbdesc/database_test.go @@ -322,7 +322,8 @@ func TestFixDroppedSchemaName(t *testing.T) { b := NewBuilder(&dbDesc) b.RunPostDeserializationChanges() desc := b.BuildCreatedMutableDatabase() - require.Truef(t, desc.HasPostDeserializationChanges(), "expected changes in descriptor, found none") + require.Truef(t, desc.GetPostDeserializationChanges().HasChanges(), + "expected changes in descriptor, found none") _, ok := desc.Schemas[dbName] require.Falsef(t, ok, "erroneous entry exists") } diff --git a/pkg/sql/catalog/descriptor.go b/pkg/sql/catalog/descriptor.go index b8b35722dc6b..ffda6e3afeeb 100644 --- a/pkg/sql/catalog/descriptor.go +++ b/pkg/sql/catalog/descriptor.go @@ -211,6 +211,10 @@ type Descriptor interface { // the value on the descriptor. Changes will need to be written back to // the descriptor using SetDeclarativeSchemaChangeState. GetDeclarativeSchemaChangerState() *scpb.DescriptorState + + // GetPostDeserializationChanges returns the set of ways the Descriptor + // was changed after running RunPostDeserializationChanges. + GetPostDeserializationChanges() PostDeserializationChanges } // DatabaseDescriptor encapsulates the concept of a database. diff --git a/pkg/sql/catalog/descs/BUILD.bazel b/pkg/sql/catalog/descs/BUILD.bazel index 56827805591d..f88753201c08 100644 --- a/pkg/sql/catalog/descs/BUILD.bazel +++ b/pkg/sql/catalog/descs/BUILD.bazel @@ -62,7 +62,6 @@ go_library( "//pkg/util/hlc", "//pkg/util/iterutil", "//pkg/util/log", - "//pkg/util/protoutil", "//pkg/util/retry", "//pkg/util/syncutil", "@com_github_cockroachdb_errors//:errors", @@ -105,6 +104,7 @@ go_test( "//pkg/util/log", "//pkg/util/randutil", "@com_github_lib_pq//oid", + "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/sql/catalog/descs/collection_test.go b/pkg/sql/catalog/descs/collection_test.go index 28e9dd943efd..880c6c486d4f 100644 --- a/pkg/sql/catalog/descs/collection_test.go +++ b/pkg/sql/catalog/descs/collection_test.go @@ -38,6 +38,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/lib/pq/oid" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -520,3 +521,79 @@ CREATE TABLE test.schema.t(x INT); _, err = db.Query("GRANT USAGE ON SCHEMA test.schema TO testuser;") require.NoError(t, err) } + +// TestCollectionPreservesPostDeserializationChanges ensures that when +// descriptors are retrieved from the collection and in need of post- +// deserialization changes, that the fact that those changes happened +// is preserved in both the mutable and immutable forms of the descriptor. +func TestCollectionPreservesPostDeserializationChanges(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(ctx) + + tdb := sqlutils.MakeSQLRunner(sqlDB) + tdb.Exec(t, "CREATE DATABASE db") + tdb.Exec(t, "CREATE SCHEMA db.sc") + tdb.Exec(t, "CREATE TYPE db.sc.typ AS ENUM ('a')") + tdb.Exec(t, "CREATE TABLE db.sc.tab (c db.sc.typ)") + var dbID, scID, typID, tabID descpb.ID + const q = "SELECT id FROM system.namespace WHERE name = $1" + tdb.QueryRow(t, q, "db").Scan(&dbID) + tdb.QueryRow(t, q, "sc").Scan(&scID) + tdb.QueryRow(t, q, "typ").Scan(&typID) + tdb.QueryRow(t, q, "tab").Scan(&tabID) + + // Make some bespoke modifications to each of the descriptors to make sure + // they'd need post-deserialization changes. + execCfg := s.ExecutorConfig().(sql.ExecutorConfig) + require.NoError(t, sql.DescsTxn(ctx, &execCfg, func( + ctx context.Context, txn *kv.Txn, col *descs.Collection, + ) error { + descs, err := col.GetMutableDescriptorsByID(ctx, txn, dbID, scID, typID, tabID) + if err != nil { + return err + } + // Remove the grant option, this will ensure we get some + // post-deserialization changes. These grant options should always exist + // for admin and root. + b := txn.NewBatch() + for _, d := range descs { + p := d.GetPrivileges() + for i := range p.Users { + p.Users[i].WithGrantOption = 0 + } + if err := col.WriteDescToBatch(ctx, false, d, b); err != nil { + return err + } + } + return txn.Run(ctx, b) + })) + require.NoError(t, sql.DescsTxn(ctx, &execCfg, func( + ctx context.Context, txn *kv.Txn, col *descs.Collection, + ) error { + immuts, err := col.GetImmutableDescriptorsByID(ctx, txn, tree.CommonLookupFlags{ + Required: true, + }, dbID, scID, typID, tabID) + if err != nil { + return err + } + for _, d := range immuts { + assert.True(t, d.GetPostDeserializationChanges(). + Contains(catalog.UpgradedPrivileges)) + } + + muts, err := col.GetMutableDescriptorsByID(ctx, txn, dbID, scID, typID, tabID) + if err != nil { + return err + } + for _, d := range muts { + assert.True(t, d.GetPostDeserializationChanges(). + Contains(catalog.UpgradedPrivileges)) + } + + return nil + })) +} diff --git a/pkg/sql/catalog/descs/hydrate.go b/pkg/sql/catalog/descs/hydrate.go index 0bc9832f3b90..e0fa565a022a 100644 --- a/pkg/sql/catalog/descs/hydrate.go +++ b/pkg/sql/catalog/descs/hydrate.go @@ -22,7 +22,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlerrors" - "github.com/cockroachdb/cockroach/pkg/util/protoutil" ) // hydrateTypesInTableDesc installs user defined type metadata in all types.T @@ -114,14 +113,11 @@ func (tc *Collection) hydrateTypesInTableDesc( } // Make a copy of the underlying descriptor before hydration. - descBase := protoutil.Clone(t.TableDesc()).(*descpb.TableDescriptor) - if err := typedesc.HydrateTypesInTableDescriptor(ctx, descBase, getType); err != nil { + mut := t.NewBuilder().BuildExistingMutable().(*tabledesc.Mutable) + if err := typedesc.HydrateTypesInTableDescriptor(ctx, mut.TableDesc(), getType); err != nil { return nil, err } - if t.IsUncommittedVersion() { - return tabledesc.NewBuilderForUncommittedVersion(descBase).BuildImmutableTable(), nil - } - return tabledesc.NewBuilder(descBase).BuildImmutableTable(), nil + return mut.ImmutableCopy().(catalog.TableDescriptor), nil default: return desc, nil } diff --git a/pkg/sql/catalog/descs/kv_descriptors.go b/pkg/sql/catalog/descs/kv_descriptors.go index 1e6a5905864e..4fb8d7f40185 100644 --- a/pkg/sql/catalog/descs/kv_descriptors.go +++ b/pkg/sql/catalog/descs/kv_descriptors.go @@ -242,7 +242,9 @@ func (kd *kvDescriptors) getByIDs( return nil, err } for j, desc := range kvDescs { - ret[indexes[j]] = desc.NewBuilder().BuildExistingMutable() + b := desc.NewBuilder() + b.RunPostDeserializationChanges() + ret[indexes[j]] = b.BuildExistingMutable() } return ret, nil } diff --git a/pkg/sql/catalog/descs/txn.go b/pkg/sql/catalog/descs/txn.go index 08661b70ac1f..e04049c19d16 100644 --- a/pkg/sql/catalog/descs/txn.go +++ b/pkg/sql/catalog/descs/txn.go @@ -181,6 +181,7 @@ func CheckTwoVersionInvariant( // the current provisional commit timestamp for this transaction then if this // transaction ends up committing then there won't have been any created // in the meantime. + count, err := lease.CountLeases(ctx, ie, descs, txn.ProvisionalCommitTimestamp()) if err != nil { return false, err diff --git a/pkg/sql/catalog/internal/catkv/catalog_query.go b/pkg/sql/catalog/internal/catkv/catalog_query.go index 23b3757c74e7..9092159344f4 100644 --- a/pkg/sql/catalog/internal/catkv/catalog_query.go +++ b/pkg/sql/catalog/internal/catkv/catalog_query.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlerrors" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" ) @@ -42,6 +43,9 @@ func lookupDescriptorsUnvalidated( key := catalogkeys.MakeDescMetadataKey(codec, id) b.Get(key) } + if log.ExpensiveLogEnabled(ctx, 2) { + log.Infof(ctx, "looking up unvalidated descriptors by id: %v", ids) + } }) if err != nil { return nil, err diff --git a/pkg/sql/catalog/lease/count.go b/pkg/sql/catalog/lease/count.go index a18c54efb533..c424a16071e2 100644 --- a/pkg/sql/catalog/lease/count.go +++ b/pkg/sql/catalog/lease/count.go @@ -39,6 +39,7 @@ func CountLeases( stmt := fmt.Sprintf(`SELECT count(1) FROM system.public.lease AS OF SYSTEM TIME '%s' WHERE `, at.AsOfSystemTime()) + strings.Join(whereClauses, " OR ") + values, err := executor.QueryRowEx( ctx, "count-leases", nil, /* txn */ sessiondata.InternalExecutorOverride{User: security.RootUserName()}, diff --git a/pkg/sql/catalog/post_derserialization_changes.go b/pkg/sql/catalog/post_derserialization_changes.go new file mode 100644 index 000000000000..b1e7fa74dc86 --- /dev/null +++ b/pkg/sql/catalog/post_derserialization_changes.go @@ -0,0 +1,89 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package catalog + +import "github.com/cockroachdb/cockroach/pkg/util" + +// PostDeserializationChangeType is used to indicate the type of +// PostDeserializationChange which occurred for a descriptor. +type PostDeserializationChangeType int + +// PostDeserializationChanges are a set of booleans to indicate which types of +// upgrades or fixes occurred when filling in the descriptor after +// deserialization. +type PostDeserializationChanges struct{ s util.FastIntSet } + +// HasChanges returns true if the set of changes is non-empty. +func (c PostDeserializationChanges) HasChanges() bool { + return !c.s.Empty() +} + +// Add adds a change to the set of changes. +func (c *PostDeserializationChanges) Add(change PostDeserializationChangeType) { + c.s.Add(int(change)) +} + +// ForEach calls f for every change in the set of changes. +func (c PostDeserializationChanges) ForEach(f func(change PostDeserializationChangeType)) { + c.s.ForEach(func(i int) { f(PostDeserializationChangeType(i)) }) +} + +// Contains returns true if the set of changes contains this change. +func (c PostDeserializationChanges) Contains(change PostDeserializationChangeType) bool { + return c.s.Contains(int(change)) +} + +const ( + // UpgradedFormatVersion indicates that the FormatVersion was upgraded. + UpgradedFormatVersion PostDeserializationChangeType = iota + + // FixedIndexEncodingType indicates that the encoding type of a public index + // was fixed. + FixedIndexEncodingType + + // UpgradedIndexFormatVersion indicates that the format version of at least + // one index descriptor was upgraded. + UpgradedIndexFormatVersion + + // UpgradedForeignKeyRepresentation indicates that the foreign key + // representation was upgraded. + UpgradedForeignKeyRepresentation + + // UpgradedNamespaceName indicates that the table was system.namespace + // and it had its name upgraded from "namespace2". + // + // TODO(ajwerner): Remove this and the associated migration in 22.1 as + // this will never be true due to the corresponding long-running migration. + UpgradedNamespaceName + + // UpgradedPrivileges indicates that the PrivilegeDescriptor version was upgraded. + UpgradedPrivileges + + // RemovedDefaultExprFromComputedColumn indicates that the table had at least + // one computed column which also had a DEFAULT expression, which therefore + // had to be removed. See issue #72881 for details. + RemovedDefaultExprFromComputedColumn + + // RemovedDuplicateIDsInRefs indicates that the table + // has redundant IDs in its DependsOn, DependsOnTypes and DependedOnBy + // references. + RemovedDuplicateIDsInRefs + + // AddedConstraintIDs indicates that table descriptors had constraint ID + // added. + AddedConstraintIDs + + // RemovedSelfEntryInSchemas corresponds to a change which occurred in + // database descriptors to recover from an earlier bug whereby when + // dropping a schema, we'd mark the database itself as though it was the + // schema which was dropped. + RemovedSelfEntryInSchemas +) diff --git a/pkg/sql/catalog/schemadesc/schema_desc.go b/pkg/sql/catalog/schemadesc/schema_desc.go index e89a9a705bba..8c70cf2bf035 100644 --- a/pkg/sql/catalog/schemadesc/schema_desc.go +++ b/pkg/sql/catalog/schemadesc/schema_desc.go @@ -40,6 +40,10 @@ type immutable struct { // isUncommittedVersion is set to true if this descriptor was created from // a copy of a Mutable with an uncommitted version. isUncommittedVersion bool + + // changed represents how the descriptor was changed after + // RunPostDeserializationChanges. + changes catalog.PostDeserializationChanges } func (desc *immutable) SchemaKind() catalog.ResolvedSchemaKind { @@ -76,10 +80,6 @@ type Mutable struct { immutable ClusterVersion *immutable - - // changed represents whether or not the descriptor was changed - // after RunPostDeserializationChanges. - changed bool } var _ redact.SafeMessager = (*immutable)(nil) @@ -157,9 +157,17 @@ func (desc *immutable) ByteSize() int64 { return int64(desc.Size()) } +// NewBuilder implements the catalog.Descriptor interface. +// +// It overrides the wrapper's implementation to deal with the fact that +// mutable has overridden the definition of IsUncommittedVersion. +func (desc *Mutable) NewBuilder() catalog.DescriptorBuilder { + return newBuilder(desc.SchemaDesc(), desc.IsUncommittedVersion(), desc.changes) +} + // NewBuilder implements the catalog.Descriptor interface. func (desc *immutable) NewBuilder() catalog.DescriptorBuilder { - return NewBuilder(desc.SchemaDesc()) + return newBuilder(desc.SchemaDesc(), desc.IsUncommittedVersion(), desc.changes) } // ValidateSelf implements the catalog.Descriptor interface. @@ -247,6 +255,11 @@ func (desc *immutable) GetDefaultPrivilegeDescriptor() catalog.DefaultPrivilegeD return catprivilege.MakeDefaultPrivileges(defaultPrivilegeDescriptor) } +// GetPostDeserializationChanges implements the Descriptor interface. +func (desc *immutable) GetPostDeserializationChanges() catalog.PostDeserializationChanges { + return desc.changes +} + // MaybeIncrementVersion implements the MutableDescriptor interface. func (desc *Mutable) MaybeIncrementVersion() { // Already incremented, no-op. @@ -283,9 +296,7 @@ func (desc *Mutable) OriginalVersion() descpb.DescriptorVersion { // ImmutableCopy implements the MutableDescriptor interface. func (desc *Mutable) ImmutableCopy() catalog.Descriptor { - imm := NewBuilder(desc.SchemaDesc()).BuildImmutable() - imm.(*immutable).isUncommittedVersion = desc.IsUncommittedVersion() - return imm + return desc.NewBuilder().BuildImmutable() } // IsNew implements the MutableDescriptor interface. @@ -321,12 +332,6 @@ func (desc *Mutable) IsUncommittedVersion() bool { return desc.IsNew() || desc.GetVersion() != desc.ClusterVersion.GetVersion() } -// HasPostDeserializationChanges returns if the MutableDescriptor was changed after running -// RunPostDeserializationChanges. -func (desc *Mutable) HasPostDeserializationChanges() bool { - return desc.changed -} - // GetMutableDefaultPrivilegeDescriptor returns a catprivilege.Mutable. func (desc *Mutable) GetMutableDefaultPrivilegeDescriptor() *catprivilege.Mutable { defaultPrivilegeDescriptor := desc.GetDefaultPrivileges() diff --git a/pkg/sql/catalog/schemadesc/schema_desc_builder.go b/pkg/sql/catalog/schemadesc/schema_desc_builder.go index 2e44185d2184..ea423aa4758a 100644 --- a/pkg/sql/catalog/schemadesc/schema_desc_builder.go +++ b/pkg/sql/catalog/schemadesc/schema_desc_builder.go @@ -28,9 +28,10 @@ type SchemaDescriptorBuilder interface { } type schemaDescriptorBuilder struct { - original *descpb.SchemaDescriptor - maybeModified *descpb.SchemaDescriptor - changed bool + original *descpb.SchemaDescriptor + maybeModified *descpb.SchemaDescriptor + isUncommittedVersion bool + changes catalog.PostDeserializationChanges } var _ SchemaDescriptorBuilder = &schemaDescriptorBuilder{} @@ -38,8 +39,19 @@ var _ SchemaDescriptorBuilder = &schemaDescriptorBuilder{} // NewBuilder creates a new catalog.DescriptorBuilder object for building // schema descriptors. func NewBuilder(desc *descpb.SchemaDescriptor) SchemaDescriptorBuilder { + return newBuilder(desc, false, /* isUncommittedVersion */ + catalog.PostDeserializationChanges{}) +} + +func newBuilder( + desc *descpb.SchemaDescriptor, + isUncommittedVersion bool, + changes catalog.PostDeserializationChanges, +) SchemaDescriptorBuilder { return &schemaDescriptorBuilder{ - original: protoutil.Clone(desc).(*descpb.SchemaDescriptor), + original: protoutil.Clone(desc).(*descpb.SchemaDescriptor), + isUncommittedVersion: isUncommittedVersion, + changes: changes, } } @@ -60,7 +72,9 @@ func (sdb *schemaDescriptorBuilder) RunPostDeserializationChanges() { sdb.maybeModified.GetName(), ) addedGrantOptions := catprivilege.MaybeUpdateGrantOptions(sdb.maybeModified.Privileges) - sdb.changed = privsChanged || addedGrantOptions + if privsChanged || addedGrantOptions { + sdb.changes.Add(catalog.UpgradedPrivileges) + } } // RunRestoreChanges implements the catalog.DescriptorBuilder interface. @@ -81,7 +95,11 @@ func (sdb *schemaDescriptorBuilder) BuildImmutableSchema() catalog.SchemaDescrip if desc == nil { desc = sdb.original } - return &immutable{SchemaDescriptor: *desc} + return &immutable{ + SchemaDescriptor: *desc, + changes: sdb.changes, + isUncommittedVersion: sdb.isUncommittedVersion, + } } // BuildExistingMutable implements the catalog.DescriptorBuilder interface. @@ -96,9 +114,12 @@ func (sdb *schemaDescriptorBuilder) BuildExistingMutableSchema() *Mutable { sdb.maybeModified = protoutil.Clone(sdb.original).(*descpb.SchemaDescriptor) } return &Mutable{ - immutable: immutable{SchemaDescriptor: *sdb.maybeModified}, + immutable: immutable{ + SchemaDescriptor: *sdb.maybeModified, + changes: sdb.changes, + isUncommittedVersion: sdb.isUncommittedVersion, + }, ClusterVersion: &immutable{SchemaDescriptor: *sdb.original}, - changed: sdb.changed, } } @@ -111,7 +132,9 @@ func (sdb *schemaDescriptorBuilder) BuildCreatedMutable() catalog.MutableDescrip // which is in the process of being created. func (sdb *schemaDescriptorBuilder) BuildCreatedMutableSchema() *Mutable { return &Mutable{ - immutable: immutable{SchemaDescriptor: *sdb.original}, - changed: sdb.changed, + immutable: immutable{ + SchemaDescriptor: *sdb.original, + changes: sdb.changes, + }, } } diff --git a/pkg/sql/catalog/schemadesc/synthetic_schema_desc.go b/pkg/sql/catalog/schemadesc/synthetic_schema_desc.go index 248388b7067c..b1762be3674d 100644 --- a/pkg/sql/catalog/schemadesc/synthetic_schema_desc.go +++ b/pkg/sql/catalog/schemadesc/synthetic_schema_desc.go @@ -112,6 +112,9 @@ func (p synthetic) SchemaDesc() *descpb.SchemaDescriptor { func (p synthetic) GetDeclarativeSchemaChangerState() *scpb.DescriptorState { return nil } +func (p synthetic) GetPostDeserializationChanges() catalog.PostDeserializationChanges { + return catalog.PostDeserializationChanges{} +} // GetDefaultPrivilegeDescriptor returns a DefaultPrivilegeDescriptor. func (p synthetic) GetDefaultPrivilegeDescriptor() catalog.DefaultPrivilegeDescriptor { diff --git a/pkg/sql/catalog/tabledesc/helpers_test.go b/pkg/sql/catalog/tabledesc/helpers_test.go index 2606cf9b023d..f635e8f76d0a 100644 --- a/pkg/sql/catalog/tabledesc/helpers_test.go +++ b/pkg/sql/catalog/tabledesc/helpers_test.go @@ -59,10 +59,10 @@ func ValidateConstraints(immI catalog.TableDescriptor) error { func GetPostDeserializationChanges( immI catalog.TableDescriptor, -) (PostDeserializationTableDescriptorChanges, error) { +) (catalog.PostDeserializationChanges, error) { imm, ok := immI.(*immutable) if !ok { - return PostDeserializationTableDescriptorChanges{}, errors.Errorf("expected immutable descriptor") + return catalog.PostDeserializationChanges{}, errors.Errorf("expected immutable descriptor") } return imm.GetPostDeserializationChanges(), nil } diff --git a/pkg/sql/catalog/tabledesc/structured.go b/pkg/sql/catalog/tabledesc/structured.go index 4458f3bc4eaf..e0c029ec9eb9 100644 --- a/pkg/sql/catalog/tabledesc/structured.go +++ b/pkg/sql/catalog/tabledesc/structured.go @@ -66,50 +66,6 @@ var ErrMissingColumns = errors.New("table must contain at least 1 column") // ErrMissingPrimaryKey indicates a table with no primary key. var ErrMissingPrimaryKey = errors.New("table must contain a primary key") -// PostDeserializationTableDescriptorChanges are a set of booleans to indicate -// which types of upgrades or fixes occurred when filling in the descriptor -// after deserialization. -type PostDeserializationTableDescriptorChanges struct { - // UpgradedFormatVersion indicates that the FormatVersion was upgraded. - UpgradedFormatVersion bool - - // FixedIndexEncodingType indicates that the encoding type of a public index - // was fixed. - FixedIndexEncodingType bool - - // UpgradedIndexFormatVersion indicates that the format version of at least - // one index descriptor was upgraded. - UpgradedIndexFormatVersion bool - - // UpgradedForeignKeyRepresentation indicates that the foreign key - // representation was upgraded. - UpgradedForeignKeyRepresentation bool - - // UpgradedNamespaceName indicates that the table was system.namespace - // and it had its name upgraded from "namespace2". - // - // TODO(ajwerner): Remove this and the associated migration in 22.1 as - // this will never be true due to the corresponding long-running migration. - UpgradedNamespaceName bool - - // UpgradedPrivileges indicates that the PrivilegeDescriptor version was upgraded. - UpgradedPrivileges bool - - // RemovedDefaultExprFromComputedColumn indicates that the table had at least - // one computed column which also had a DEFAULT expression, which therefore - // had to be removed. See issue #72881 for details. - RemovedDefaultExprFromComputedColumn bool - - // RemovedDuplicateIDsInRefs indicates that the table - // has redundant IDs in its DependsOn, DependsOnTypes and DependedOnBy - // references. - RemovedDuplicateIDsInRefs bool - - // AddedConstraintIDs indicates that table descriptors had constraint ID - // added. - AddedConstraintIDs bool -} - // DescriptorType returns the type of this descriptor. func (desc *wrapper) DescriptorType() catalog.DescriptorType { return catalog.Table @@ -2115,7 +2071,7 @@ func (desc *wrapper) MakeFirstMutationPublic( includeConstraints catalog.MutationPublicationFilter, ) (catalog.TableDescriptor, error) { // Clone the ImmutableTable descriptor because we want to create an ImmutableCopy one. - table := NewBuilder(desc.TableDesc()).BuildExistingMutableTable() + table := desc.NewBuilder().(TableDescriptorBuilder).BuildExistingMutableTable() mutationID := desc.Mutations[0].MutationID i := 0 for _, mutation := range desc.Mutations { @@ -2142,7 +2098,7 @@ func (desc *wrapper) MakeFirstMutationPublic( // MakePublic implements the TableDescriptor interface. func (desc *wrapper) MakePublic() catalog.TableDescriptor { // Clone the ImmutableTable descriptor because we want to create an ImmutableCopy one. - table := NewBuilder(desc.TableDesc()).BuildExistingMutableTable() + table := desc.NewBuilder().(TableDescriptorBuilder).BuildExistingMutableTable() table.State = descpb.DescriptorState_PUBLIC table.Version++ return table diff --git a/pkg/sql/catalog/tabledesc/structured_test.go b/pkg/sql/catalog/tabledesc/structured_test.go index bb18000fa213..5e4cd5454386 100644 --- a/pkg/sql/catalog/tabledesc/structured_test.go +++ b/pkg/sql/catalog/tabledesc/structured_test.go @@ -311,7 +311,7 @@ func TestMaybeUpgradeFormatVersion(t *testing.T) { desc := b.BuildImmutableTable() changes, err := GetPostDeserializationChanges(desc) require.NoError(t, err) - upgraded := changes.UpgradedFormatVersion + upgraded := changes.Contains(catalog.UpgradedFormatVersion) if upgraded != test.expUpgrade { t.Fatalf("%d: expected upgraded=%t, but got upgraded=%t", i, test.expUpgrade, upgraded) } @@ -583,7 +583,7 @@ func TestMaybeUpgradeIndexFormatVersion(t *testing.T) { require.NoError(t, err) if test.upgraded == nil { - require.Equal(t, PostDeserializationTableDescriptorChanges{}, changes) + require.Zero(t, changes) return } @@ -599,7 +599,7 @@ func TestMaybeUpgradeIndexFormatVersion(t *testing.T) { desc2 := b2.BuildImmutableTable() changes2, err := GetPostDeserializationChanges(desc2) require.NoError(t, err) - require.Equal(t, PostDeserializationTableDescriptorChanges{}, changes2) + require.Zero(t, changes2) require.Equal(t, desc.TableDesc(), desc2.TableDesc()) }) } @@ -880,7 +880,7 @@ func TestRemoveDefaultExprFromComputedColumn(t *testing.T) { require.NoError(t, validate.Self(clusterversion.TestingClusterVersion, fixed)) changes, err := GetPostDeserializationChanges(fixed) require.NoError(t, err) - require.True(t, changes.RemovedDefaultExprFromComputedColumn) + require.True(t, changes.Contains(catalog.RemovedDefaultExprFromComputedColumn)) require.False(t, fixed.PublicColumns()[1].HasDefault()) } } diff --git a/pkg/sql/catalog/tabledesc/table_desc.go b/pkg/sql/catalog/tabledesc/table_desc.go index 80d09b571e1c..70b080aa668e 100644 --- a/pkg/sql/catalog/tabledesc/table_desc.go +++ b/pkg/sql/catalog/tabledesc/table_desc.go @@ -45,7 +45,7 @@ type wrapper struct { indexCache *indexCache columnCache *columnCache - postDeserializationChanges PostDeserializationTableDescriptorChanges + changes catalog.PostDeserializationChanges } // IsUncommittedVersion implements the catalog.Descriptor interface. @@ -55,19 +55,8 @@ func (*wrapper) IsUncommittedVersion() bool { // GetPostDeserializationChanges returns the set of changes which occurred to // this descriptor post deserialization. -func (desc *wrapper) GetPostDeserializationChanges() PostDeserializationTableDescriptorChanges { - return desc.postDeserializationChanges -} - -// HasPostDeserializationChanges returns if the MutableDescriptor was changed after running -// RunPostDeserializationChanges. -func (desc *wrapper) HasPostDeserializationChanges() bool { - return desc.postDeserializationChanges.UpgradedForeignKeyRepresentation || - desc.postDeserializationChanges.UpgradedFormatVersion || - desc.postDeserializationChanges.UpgradedIndexFormatVersion || - desc.postDeserializationChanges.UpgradedNamespaceName || - desc.postDeserializationChanges.UpgradedPrivileges || - desc.postDeserializationChanges.AddedConstraintIDs +func (desc *wrapper) GetPostDeserializationChanges() catalog.PostDeserializationChanges { + return desc.changes } // ActiveChecks implements the TableDescriptor interface. @@ -115,7 +104,7 @@ func (desc *wrapper) ByteSize() int64 { // NewBuilder implements the catalog.Descriptor interface. func (desc *wrapper) NewBuilder() catalog.DescriptorBuilder { - return NewBuilder(desc.TableDesc()) + return newBuilder(desc.TableDesc(), desc.IsUncommittedVersion(), desc.changes) } // GetPrimaryIndexID implements the TableDescriptor interface. @@ -130,10 +119,15 @@ func (desc *wrapper) IsTemporary() bool { // ImmutableCopy implements the MutableDescriptor interface. func (desc *Mutable) ImmutableCopy() catalog.Descriptor { - if desc.IsUncommittedVersion() { - return NewBuilderForUncommittedVersion(desc.TableDesc()).BuildImmutable() - } - return NewBuilder(desc.TableDesc()).BuildImmutable() + return desc.NewBuilder().BuildImmutable() +} + +// NewBuilder implements the catalog.Descriptor interface. +// +// It overrides the wrapper's implementation to deal with the fact that +// mutable has overridden the definition of IsUncommittedVersion. +func (desc *Mutable) NewBuilder() catalog.DescriptorBuilder { + return newBuilder(desc.TableDesc(), desc.IsUncommittedVersion(), desc.changes) } // IsUncommittedVersion implements the Descriptor interface. diff --git a/pkg/sql/catalog/tabledesc/table_desc_builder.go b/pkg/sql/catalog/tabledesc/table_desc_builder.go index 4278261fcd25..7ecf2a2c44ff 100644 --- a/pkg/sql/catalog/tabledesc/table_desc_builder.go +++ b/pkg/sql/catalog/tabledesc/table_desc_builder.go @@ -33,7 +33,7 @@ type TableDescriptorBuilder interface { type tableDescriptorBuilder struct { original *descpb.TableDescriptor maybeModified *descpb.TableDescriptor - changes PostDeserializationTableDescriptorChanges + changes catalog.PostDeserializationChanges skipFKsWithNoMatchingTable bool isUncommittedVersion bool } @@ -43,17 +43,8 @@ var _ TableDescriptorBuilder = &tableDescriptorBuilder{} // NewBuilder creates a new catalog.DescriptorBuilder object for building // table descriptors. func NewBuilder(desc *descpb.TableDescriptor) TableDescriptorBuilder { - return newBuilder(desc) -} - -// NewBuilderForUncommittedVersion is like NewBuilder but ensures that the -// uncommitted version flag is set in the built descriptor. -// This should be used when constructing a new copy of an immutable from an -// existing descriptor which may have a new version. -func NewBuilderForUncommittedVersion(desc *descpb.TableDescriptor) TableDescriptorBuilder { - b := newBuilder(desc) - b.isUncommittedVersion = true - return b + return newBuilder(desc, false, /* isUncommittedVersion */ + catalog.PostDeserializationChanges{}) } // NewBuilderForFKUpgrade should be used when attempting to upgrade the @@ -64,7 +55,8 @@ func NewBuilderForUncommittedVersion(desc *descpb.TableDescriptor) TableDescript func NewBuilderForFKUpgrade( desc *descpb.TableDescriptor, skipFKsWithNoMatchingTable bool, ) TableDescriptorBuilder { - b := newBuilder(desc) + b := newBuilder(desc, false, /* isUncommittedVersion */ + catalog.PostDeserializationChanges{}) b.skipFKsWithNoMatchingTable = skipFKsWithNoMatchingTable return b } @@ -83,9 +75,15 @@ func NewUnsafeImmutable(desc *descpb.TableDescriptor) catalog.TableDescriptor { return b.BuildImmutableTable() } -func newBuilder(desc *descpb.TableDescriptor) *tableDescriptorBuilder { +func newBuilder( + desc *descpb.TableDescriptor, + isUncommittedVersion bool, + changes catalog.PostDeserializationChanges, +) *tableDescriptorBuilder { return &tableDescriptorBuilder{ - original: protoutil.Clone(desc).(*descpb.TableDescriptor), + original: protoutil.Clone(desc).(*descpb.TableDescriptor), + isUncommittedVersion: isUncommittedVersion, + changes: changes, } } @@ -97,19 +95,27 @@ func (tdb *tableDescriptorBuilder) DescriptorType() catalog.DescriptorType { // RunPostDeserializationChanges implements the catalog.DescriptorBuilder // interface. func (tdb *tableDescriptorBuilder) RunPostDeserializationChanges() { + prevChanges := tdb.changes tdb.maybeModified = protoutil.Clone(tdb.original).(*descpb.TableDescriptor) tdb.changes = maybeFillInDescriptor(tdb.maybeModified) + prevChanges.ForEach(func(change catalog.PostDeserializationChangeType) { + tdb.changes.Add(change) + }) + } // RunRestoreChanges implements the catalog.DescriptorBuilder interface. func (tdb *tableDescriptorBuilder) RunRestoreChanges( descLookupFn func(id descpb.ID) catalog.Descriptor, ) (err error) { - tdb.changes.UpgradedForeignKeyRepresentation, err = maybeUpgradeForeignKeyRepresentation( + upgradedFK, err := maybeUpgradeForeignKeyRepresentation( descLookupFn, tdb.skipFKsWithNoMatchingTable, tdb.maybeModified, ) + if upgradedFK { + tdb.changes.Add(catalog.UpgradedForeignKeyRepresentation) + } return err } @@ -125,7 +131,7 @@ func (tdb *tableDescriptorBuilder) BuildImmutableTable() catalog.TableDescriptor desc = tdb.original } imm := makeImmutable(desc) - imm.postDeserializationChanges = tdb.changes + imm.changes = tdb.changes imm.isUncommittedVersion = tdb.isUncommittedVersion return imm } @@ -143,8 +149,8 @@ func (tdb *tableDescriptorBuilder) BuildExistingMutableTable() *Mutable { } return &Mutable{ wrapper: wrapper{ - TableDescriptor: *tdb.maybeModified, - postDeserializationChanges: tdb.changes, + TableDescriptor: *tdb.maybeModified, + changes: tdb.changes, }, ClusterVersion: *tdb.original, } @@ -164,8 +170,8 @@ func (tdb *tableDescriptorBuilder) BuildCreatedMutableTable() *Mutable { } return &Mutable{ wrapper: wrapper{ - TableDescriptor: *desc, - postDeserializationChanges: tdb.changes, + TableDescriptor: *desc, + changes: tdb.changes, }, } } @@ -190,24 +196,29 @@ func makeImmutable(tbl *descpb.TableDescriptor) *immutable { // (for example: additional default privileges). func maybeFillInDescriptor( desc *descpb.TableDescriptor, -) (changes PostDeserializationTableDescriptorChanges) { - changes.UpgradedFormatVersion = maybeUpgradeFormatVersion(desc) - - changes.FixedIndexEncodingType = maybeFixPrimaryIndexEncoding(&desc.PrimaryIndex) - changes.UpgradedIndexFormatVersion = maybeUpgradePrimaryIndexFormatVersion(desc) +) (changes catalog.PostDeserializationChanges) { + set := func(change catalog.PostDeserializationChangeType, cond bool) { + if cond { + changes.Add(change) + } + } + set(catalog.UpgradedFormatVersion, maybeUpgradeFormatVersion(desc)) + set(catalog.FixedIndexEncodingType, maybeFixPrimaryIndexEncoding(&desc.PrimaryIndex)) + set(catalog.UpgradedIndexFormatVersion, maybeUpgradePrimaryIndexFormatVersion(desc)) for i := range desc.Indexes { idx := &desc.Indexes[i] - isUpgraded := maybeUpgradeSecondaryIndexFormatVersion(idx) - changes.UpgradedIndexFormatVersion = changes.UpgradedIndexFormatVersion || isUpgraded + set(catalog.UpgradedIndexFormatVersion, + maybeUpgradeSecondaryIndexFormatVersion(idx)) } for i := range desc.Mutations { if idx := desc.Mutations[i].GetIndex(); idx != nil { - isUpgraded := maybeUpgradeSecondaryIndexFormatVersion(idx) - changes.UpgradedIndexFormatVersion = changes.UpgradedIndexFormatVersion || isUpgraded + set(catalog.UpgradedIndexFormatVersion, + maybeUpgradeSecondaryIndexFormatVersion(idx)) } } - changes.UpgradedNamespaceName = maybeUpgradeNamespaceName(desc) - changes.RemovedDefaultExprFromComputedColumn = maybeRemoveDefaultExprFromComputedColumns(desc) + set(catalog.UpgradedNamespaceName, maybeUpgradeNamespaceName(desc)) + set(catalog.RemovedDefaultExprFromComputedColumn, + maybeRemoveDefaultExprFromComputedColumns(desc)) parentSchemaID := desc.GetUnexposedParentSchemaID() // TODO(richardjcai): Remove this case in 22.2. @@ -222,9 +233,9 @@ func maybeFillInDescriptor( desc.GetName(), ) addedGrantOptions := catprivilege.MaybeUpdateGrantOptions(desc.Privileges) - changes.UpgradedPrivileges = fixedPrivileges || addedGrantOptions - changes.RemovedDuplicateIDsInRefs = maybeRemoveDuplicateIDsInRefs(desc) - changes.AddedConstraintIDs = maybeAddConstraintIDs(desc) + set(catalog.UpgradedPrivileges, fixedPrivileges || addedGrantOptions) + set(catalog.RemovedDuplicateIDsInRefs, maybeRemoveDuplicateIDsInRefs(desc)) + set(catalog.AddedConstraintIDs, maybeAddConstraintIDs(desc)) return changes } diff --git a/pkg/sql/catalog/typedesc/table_implicit_record_type.go b/pkg/sql/catalog/typedesc/table_implicit_record_type.go index af7aadc73edf..bf441fbd0001 100644 --- a/pkg/sql/catalog/typedesc/table_implicit_record_type.go +++ b/pkg/sql/catalog/typedesc/table_implicit_record_type.go @@ -328,6 +328,11 @@ func (v TableImplicitRecordType) NumReferencingDescriptors() int { return 0 } // GetReferencingDescriptorID implements the TypeDescriptorInterface. func (v TableImplicitRecordType) GetReferencingDescriptorID(_ int) descpb.ID { return 0 } +// GetPostDeserializationChanges implements the Descriptor interface. +func (v TableImplicitRecordType) GetPostDeserializationChanges() catalog.PostDeserializationChanges { + return catalog.PostDeserializationChanges{} +} + func (v TableImplicitRecordType) panicNotSupported(message string) { panic(errors.AssertionFailedf("implicit table record type for table %q: not supported: %s", v.GetName(), message)) } diff --git a/pkg/sql/catalog/typedesc/type_desc.go b/pkg/sql/catalog/typedesc/type_desc.go index 6c8d0459f044..4fde51e86e81 100644 --- a/pkg/sql/catalog/typedesc/type_desc.go +++ b/pkg/sql/catalog/typedesc/type_desc.go @@ -72,10 +72,6 @@ type Mutable struct { // ClusterVersion represents the version of the type descriptor read // from the store. ClusterVersion *immutable - - // changed represents whether or not the descriptor was changed - // after RunPostDeserializationChanges. - changed bool } // IsUncommittedVersion implements the Descriptor interface. @@ -96,6 +92,10 @@ type immutable struct { // isUncommittedVersion is set to true if this descriptor was created from // a copy of a Mutable with an uncommitted version. isUncommittedVersion bool + + // changes represents how descriptor was changes after + // RunPostDeserializationChanges. + changes catalog.PostDeserializationChanges } // UpdateCachedFieldsOnModifiedMutable refreshes the immutable field by @@ -184,9 +184,17 @@ func (desc *immutable) ByteSize() int64 { return int64(desc.Size()) } +// NewBuilder implements the catalog.Descriptor interface. +// +// It overrides the wrapper's implementation to deal with the fact that +// mutable has overridden the definition of IsUncommittedVersion. +func (desc *Mutable) NewBuilder() catalog.DescriptorBuilder { + return newBuilder(desc.TypeDesc(), desc.IsUncommittedVersion(), desc.changes) +} + // NewBuilder implements the catalog.Descriptor interface. func (desc *immutable) NewBuilder() catalog.DescriptorBuilder { - return NewBuilder(desc.TypeDesc()) + return newBuilder(desc.TypeDesc(), desc.IsUncommittedVersion(), desc.changes) } // PrimaryRegionName implements the TypeDescriptor interface. @@ -316,9 +324,7 @@ func (desc *Mutable) OriginalVersion() descpb.DescriptorVersion { // ImmutableCopy implements the MutableDescriptor interface. func (desc *Mutable) ImmutableCopy() catalog.Descriptor { - imm := NewBuilder(desc.TypeDesc()).BuildImmutableType() - imm.(*immutable).isUncommittedVersion = desc.IsUncommittedVersion() - return imm + return desc.NewBuilder().(TypeDescriptorBuilder).BuildImmutableType() } // IsNew implements the MutableDescriptor interface. @@ -925,6 +931,11 @@ func (desc *immutable) HasPendingSchemaChanges() bool { } } +// GetPostDeserializationChanges implements the Descriptor interface. +func (desc *immutable) GetPostDeserializationChanges() catalog.PostDeserializationChanges { + return desc.changes +} + // GetIDClosure implements the TypeDescriptor interface. func (desc *immutable) GetIDClosure() (map[descpb.ID]struct{}, error) { ret := make(map[descpb.ID]struct{}) @@ -993,12 +1004,6 @@ func GetTypeDescriptorClosure(typ *types.T) (map[descpb.ID]struct{}, error) { return ret, nil } -// HasPostDeserializationChanges returns if the MutableDescriptor was changed after running -// RunPostDeserializationChanges. -func (desc *Mutable) HasPostDeserializationChanges() bool { - return desc.changed -} - // SetDeclarativeSchemaChangerState is part of the catalog.MutableDescriptor // interface. func (desc *Mutable) SetDeclarativeSchemaChangerState(state *scpb.DescriptorState) { diff --git a/pkg/sql/catalog/typedesc/type_desc_builder.go b/pkg/sql/catalog/typedesc/type_desc_builder.go index 769af5244d8d..327b5afd0e40 100644 --- a/pkg/sql/catalog/typedesc/type_desc_builder.go +++ b/pkg/sql/catalog/typedesc/type_desc_builder.go @@ -31,7 +31,8 @@ type typeDescriptorBuilder struct { original *descpb.TypeDescriptor maybeModified *descpb.TypeDescriptor - changed bool + isUncommittedVersion bool + changes catalog.PostDeserializationChanges } var _ TypeDescriptorBuilder = &typeDescriptorBuilder{} @@ -39,9 +40,21 @@ var _ TypeDescriptorBuilder = &typeDescriptorBuilder{} // NewBuilder creates a new catalog.DescriptorBuilder object for building // type descriptors. func NewBuilder(desc *descpb.TypeDescriptor) TypeDescriptorBuilder { - return &typeDescriptorBuilder{ - original: protoutil.Clone(desc).(*descpb.TypeDescriptor), + return newBuilder(desc, false, /* isUncommitedVersion */ + catalog.PostDeserializationChanges{}) +} + +func newBuilder( + desc *descpb.TypeDescriptor, + isUncommittedVersion bool, + changes catalog.PostDeserializationChanges, +) TypeDescriptorBuilder { + b := &typeDescriptorBuilder{ + original: protoutil.Clone(desc).(*descpb.TypeDescriptor), + isUncommittedVersion: isUncommittedVersion, + changes: changes, } + return b } // DescriptorType implements the catalog.DescriptorBuilder interface. @@ -61,7 +74,9 @@ func (tdb *typeDescriptorBuilder) RunPostDeserializationChanges() { tdb.maybeModified.GetName(), ) addedGrantOptions := catprivilege.MaybeUpdateGrantOptions(tdb.maybeModified.Privileges) - tdb.changed = fixedPrivileges || addedGrantOptions + if fixedPrivileges || addedGrantOptions { + tdb.changes.Add(catalog.UpgradedPrivileges) + } } // RunRestoreChanges implements the catalog.DescriptorBuilder interface. @@ -80,7 +95,7 @@ func (tdb *typeDescriptorBuilder) BuildImmutableType() catalog.TypeDescriptor { if desc == nil { desc = tdb.original } - imm := makeImmutable(desc) + imm := makeImmutable(desc, tdb.isUncommittedVersion, tdb.changes) return &imm } @@ -95,11 +110,11 @@ func (tdb *typeDescriptorBuilder) BuildExistingMutableType() *Mutable { if tdb.maybeModified == nil { tdb.maybeModified = protoutil.Clone(tdb.original).(*descpb.TypeDescriptor) } - clusterVersion := makeImmutable(tdb.original) + clusterVersion := makeImmutable(tdb.original, false, /* isUncommitedVersion */ + catalog.PostDeserializationChanges{}) return &Mutable{ - immutable: makeImmutable(tdb.maybeModified), + immutable: makeImmutable(tdb.maybeModified, false /* isUncommitedVersion */, tdb.changes), ClusterVersion: &clusterVersion, - changed: tdb.changed, } } @@ -112,13 +127,20 @@ func (tdb *typeDescriptorBuilder) BuildCreatedMutable() catalog.MutableDescripto // which is in the process of being created. func (tdb *typeDescriptorBuilder) BuildCreatedMutableType() *Mutable { return &Mutable{ - immutable: makeImmutable(tdb.original), - changed: tdb.changed, + immutable: makeImmutable(tdb.original, tdb.isUncommittedVersion, tdb.changes), } } -func makeImmutable(desc *descpb.TypeDescriptor) immutable { - immutDesc := immutable{TypeDescriptor: *desc} +func makeImmutable( + desc *descpb.TypeDescriptor, + isUncommittedVersion bool, + changes catalog.PostDeserializationChanges, +) immutable { + immutDesc := immutable{ + TypeDescriptor: *desc, + isUncommittedVersion: isUncommittedVersion, + changes: changes, + } // Initialize metadata specific to the TypeDescriptor kind. switch immutDesc.Kind { From 698b97c6a01e22a4e5bd1689b10b385593f49783 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Tue, 15 Feb 2022 11:46:57 -0500 Subject: [PATCH 4/4] migrations: simplify descriptor migrations We have two migrations which have the goal of iterating through the descriptors and re-writing them if there are any changes due to the post-deserialization logic. They were needlessly complex and low-level. Probably part of the reason was that we were swallowing this information. This commit reworks them to use shared library code. Probably one of them could be eliminated altogether. Release note: None --- pkg/migration/migrations/BUILD.bazel | 3 - pkg/migration/migrations/descriptor_utils.go | 73 +++++++++ .../migrations/grant_option_migration.go | 134 +---------------- .../remove_invalid_database_privileges.go | 142 +----------------- 4 files changed, 75 insertions(+), 277 deletions(-) diff --git a/pkg/migration/migrations/BUILD.bazel b/pkg/migration/migrations/BUILD.bazel index 1dd734a21fbc..4df93c0ab58f 100644 --- a/pkg/migration/migrations/BUILD.bazel +++ b/pkg/migration/migrations/BUILD.bazel @@ -37,7 +37,6 @@ go_library( "//pkg/sql/catalog", "//pkg/sql/catalog/catalogkeys", "//pkg/sql/catalog/dbdesc", - "//pkg/sql/catalog/descbuilder", "//pkg/sql/catalog/descidgen", "//pkg/sql/catalog/descpb", "//pkg/sql/catalog/descs", @@ -48,8 +47,6 @@ go_library( "//pkg/sql/privilege", "//pkg/sql/sem/tree", "//pkg/sql/sessiondata", - "//pkg/sql/sqlutil", - "//pkg/util/hlc", "//pkg/util/log", "//pkg/util/protoutil", "//pkg/util/retry", diff --git a/pkg/migration/migrations/descriptor_utils.go b/pkg/migration/migrations/descriptor_utils.go index e5d8e53fdb19..8a20aaa0816b 100644 --- a/pkg/migration/migrations/descriptor_utils.go +++ b/pkg/migration/migrations/descriptor_utils.go @@ -15,11 +15,14 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/migration" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descidgen" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" ) // CreateSystemTable is a function to inject a new system table. If the table @@ -63,3 +66,73 @@ func createSystemTable( return txn.Run(ctx, b) }) } + +// runPostDeserializationChangesOnAllDescriptors will paginate through the +// descriptor table and upgrade all descriptors in need of upgrading. +func runPostDeserializationChangesOnAllDescriptors( + ctx context.Context, d migration.TenantDeps, +) error { + // maybeUpgradeDescriptors writes the descriptors with the given IDs + // and writes new versions for all descriptors which required post + // deserialization changes. + maybeUpgradeDescriptors := func( + ctx context.Context, d migration.TenantDeps, toUpgrade []descpb.ID, + ) error { + return d.CollectionFactory.Txn(ctx, d.InternalExecutor, d.DB, func( + ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, + ) error { + descs, err := descriptors.GetMutableDescriptorsByID(ctx, txn, toUpgrade...) + if err != nil { + return err + } + batch := txn.NewBatch() + for _, desc := range descs { + if !desc.GetPostDeserializationChanges().HasChanges() { + continue + } + if err := descriptors.WriteDescToBatch( + ctx, false, desc, batch, + ); err != nil { + return err + } + } + return txn.Run(ctx, batch) + }) + } + + query := `SELECT id, length(descriptor) FROM system.descriptor ORDER BY id DESC` + rows, err := d.InternalExecutor.QueryIterator( + ctx, "retrieve-descriptors-for-upgrade", nil /* txn */, query, + ) + if err != nil { + return err + } + defer func() { _ = rows.Close() }() + var toUpgrade []descpb.ID + var curBatchBytes int + const maxBatchSize = 1 << 19 // 512 KiB + ok, err := rows.Next(ctx) + for ; ok && err == nil; ok, err = rows.Next(ctx) { + datums := rows.Cur() + id := tree.MustBeDInt(datums[0]) + size := tree.MustBeDInt(datums[1]) + if curBatchBytes+int(size) > maxBatchSize && curBatchBytes > 0 { + if err := maybeUpgradeDescriptors(ctx, d, toUpgrade); err != nil { + return err + } + toUpgrade = toUpgrade[:0] + } + curBatchBytes += int(size) + toUpgrade = append(toUpgrade, descpb.ID(id)) + } + if err != nil { + return err + } + if err := rows.Close(); err != nil { + return err + } + if len(toUpgrade) == 0 { + return nil + } + return maybeUpgradeDescriptors(ctx, d, toUpgrade) +} diff --git a/pkg/migration/migrations/grant_option_migration.go b/pkg/migration/migrations/grant_option_migration.go index c4b146119ac1..084108d00366 100644 --- a/pkg/migration/migrations/grant_option_migration.go +++ b/pkg/migration/migrations/grant_option_migration.go @@ -15,17 +15,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs" - "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/migration" - "github.com/cockroachdb/cockroach/pkg/sql/catalog" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/descbuilder" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" - "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" - "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" - "github.com/cockroachdb/cockroach/pkg/util/hlc" - "github.com/cockroachdb/cockroach/pkg/util/protoutil" - "github.com/cockroachdb/errors" ) // grantOptionMigration iterates through every descriptor and sets a user's grant option bits @@ -33,127 +23,5 @@ import ( func grantOptionMigration( ctx context.Context, _ clusterversion.ClusterVersion, d migration.TenantDeps, _ *jobs.Job, ) error { - query := `SELECT id, descriptor, crdb_internal_mvcc_timestamp FROM system.descriptor ORDER BY ID ASC` - rows, err := d.InternalExecutor.QueryIterator( - ctx, "retrieve-grant-options", nil /* txn */, query, - ) - if err != nil { - return err - } - - addGrantOptionFunc := func(ids []descpb.ID, descs []descpb.Descriptor, timestamps []hlc.Timestamp) error { - var modifiedDescs []catalog.MutableDescriptor - for i, id := range ids { - b := descbuilder.NewBuilderWithMVCCTimestamp(&descs[i], timestamps[i]) - if b == nil { - return errors.Newf("unable to find descriptor for id %d", id) - } - - b.RunPostDeserializationChanges() - mutableDesc := b.BuildExistingMutable() - - modifiedDescs = append(modifiedDescs, mutableDesc) - } - if err := writeModifiedDescriptors(ctx, d, modifiedDescs); err != nil { - return err - } - return nil - } - - return addGrantOptionMigration(ctx, rows, addGrantOptionFunc, 1<<19 /* 512 KiB batch size */) -} - -// addGrantOptionFunction is used in addGrantOptionMigration to maybe add grant options -// of descriptors specified by the id. -type addGrantOptionFunction func(ids []descpb.ID, descs []descpb.Descriptor, timestamps []hlc.Timestamp) error - -// addGrantOptionMigration is an abstraction for adding grant options. -// The rows provided should be the result of a select ID, descriptor, crdb_internal_mvcc_timestamp -// from system.descriptor table. -// The datums returned from the query are parsed to grab the descpb.Descriptor -// and addGrantOptionFunction is called on the desc. -// If minBatchSizeInBytes is specified, fixDescriptors will only be called once the -// size of the descriptors in the id array surpasses minBatchSizeInBytes. -func addGrantOptionMigration( - ctx context.Context, - rows sqlutil.InternalRows, - grantOptionFunc addGrantOptionFunction, - minBatchSizeInBytes int, -) error { - defer func() { _ = rows.Close() }() - ok, err := rows.Next(ctx) - if err != nil { - return err - } - currSize := 0 // in bytes. - var ids []descpb.ID - var descs []descpb.Descriptor - var timestamps []hlc.Timestamp - for ; ok; ok, err = rows.Next(ctx) { - if err != nil { - return err - } - datums := rows.Cur() - id, desc, ts, err := unmarshalDescFromDescriptorRow(datums) - if err != nil { - return err - } - ids = append(ids, id) - descs = append(descs, desc) - timestamps = append(timestamps, ts) - currSize += desc.Size() - if currSize > minBatchSizeInBytes || minBatchSizeInBytes == 0 { - err = grantOptionFunc(ids, descs, timestamps) - if err != nil { - return err - } - // Reset size and id array after the batch is fixed. - currSize = 0 - ids = nil - descs = nil - timestamps = nil - } - } - // Fix remaining descriptors. - return grantOptionFunc(ids, descs, timestamps) -} - -// unmarshalDescFromDescriptorRow takes in an InternalRow from a query that gets: -// ID, descriptor, crdb_internal_mvcc_timestamp from the system.descriptor table. -// ie: SELECT id, descriptor, crdb_internal_mvcc_timestamp FROM system.descriptor ORDER BY ID ASC -// and parses the id, descriptor and mvcc_timestamp fields. -func unmarshalDescFromDescriptorRow( - datums tree.Datums, -) (descpb.ID, descpb.Descriptor, hlc.Timestamp, error) { - id := descpb.ID(*datums[0].(*tree.DInt)) - ts, err := tree.DecimalToHLC(&datums[2].(*tree.DDecimal).Decimal) - if err != nil { - return id, descpb.Descriptor{}, ts, errors.Wrapf(err, - "failed to convert MVCC timestamp decimal to HLC for id %d", id) - } - var desc descpb.Descriptor - if err := protoutil.Unmarshal(([]byte)(*datums[1].(*tree.DBytes)), &desc); err != nil { - return id, descpb.Descriptor{}, ts, errors.Wrapf(err, - "failed to unmarshal descriptor with ID %d", id) - } - return id, desc, ts, nil -} - -// writeModifiedDescriptors writes the descriptors that we have given grant option privileges -// to back to batch -func writeModifiedDescriptors( - ctx context.Context, d migration.TenantDeps, modifiedDescs []catalog.MutableDescriptor, -) error { - return d.CollectionFactory.Txn(ctx, d.InternalExecutor, d.DB, func( - ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, - ) error { - batch := txn.NewBatch() - for _, desc := range modifiedDescs { - err := descriptors.WriteDescToBatch(ctx, false, desc, batch) - if err != nil { - return err - } - } - return txn.Run(ctx, batch) - }) + return runPostDeserializationChangesOnAllDescriptors(ctx, d) } diff --git a/pkg/migration/migrations/remove_invalid_database_privileges.go b/pkg/migration/migrations/remove_invalid_database_privileges.go index d251a761c14d..4a287655f952 100644 --- a/pkg/migration/migrations/remove_invalid_database_privileges.go +++ b/pkg/migration/migrations/remove_invalid_database_privileges.go @@ -15,22 +15,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs" - "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/migration" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/descbuilder" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" - "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" - "github.com/cockroachdb/cockroach/pkg/util/hlc" - "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/errors" ) -type descIDAndVersion struct { - id descpb.ID - version descpb.DescriptorVersion -} - // runRemoveInvalidDatabasePrivileges calls RunPostDeserializationChanges on // every database descriptor. It also calls RunPostDeserializationChanges on // all table descriptors to add constraint IDs. @@ -39,132 +26,5 @@ type descIDAndVersion struct { func runRemoveInvalidDatabasePrivileges( ctx context.Context, _ clusterversion.ClusterVersion, d migration.TenantDeps, _ *jobs.Job, ) error { - fixDescriptorFunc := func(ids []descpb.ID, descs []descpb.Descriptor, timestamps []hlc.Timestamp) error { - var descIDAndVersions []descIDAndVersion - for i, id := range ids { - b := descbuilder.NewBuilderWithMVCCTimestamp(&descs[i], timestamps[i]) - if b == nil { - return errors.Newf("unable to find descriptor for id %d", id) - } - - b.RunPostDeserializationChanges() - mutableDesc := b.BuildExistingMutable() - - if mutableDesc.HasPostDeserializationChanges() { - // Only need to fix the descriptor if there was a change. - descIDAndVersions = append(descIDAndVersions, descIDAndVersion{ - id: mutableDesc.GetID(), - version: mutableDesc.GetVersion(), - }) - } - } - if err := fixDescriptors(ctx, d, descIDAndVersions); err != nil { - return err - } - return nil - } - - query := `SELECT id, descriptor, crdb_internal_mvcc_timestamp FROM system.descriptor ORDER BY ID ASC` - rows, err := d.InternalExecutor.QueryIterator( - ctx, "fix-privileges", nil /* txn */, query, - ) - if err != nil { - return err - } - - return descriptorUpgradeMigration(ctx, rows, fixDescriptorFunc, 1<<19 /* 512 KiB batch size */) -} - -// fixDescriptors grabs a descriptor using its ID and fixes the descriptor -// by running RunPostDeserializationChanges. -// The descriptor will only be fixed if the version written to disk is the same -// as the version provided in the array. -func fixDescriptors( - ctx context.Context, d migration.TenantDeps, descriptorIDAndVersions []descIDAndVersion, -) error { - return d.CollectionFactory.Txn(ctx, d.InternalExecutor, d.DB, func( - ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, - ) error { - batch := txn.NewBatch() - var fixedIDs []descpb.ID - for _, idAndVersion := range descriptorIDAndVersions { - // GetMutableDescriptorByID calls RunPostDeserializationChanges which - // fixes the descriptor. - desc, err := descriptors.GetMutableDescriptorByID(ctx, txn, idAndVersion.id) - if err != nil { - return err - } - if desc.GetVersion() > idAndVersion.version { - // Already rewritten. - return nil - } - err = descriptors.WriteDescToBatch(ctx, false /* kvTrace */, desc, batch) - if err != nil { - return err - } - fixedIDs = append(fixedIDs, desc.GetID()) - } - log.Infof(ctx, "upgrading descriptor with ids %v", fixedIDs) - return txn.Run(ctx, batch) - }) -} - -// fixDescriptorsFunction is used in descriptorUpgradeMigration to fix a set -// of descriptors specified by the id. -type fixDescriptorsFunction func(ids []descpb.ID, descs []descpb.Descriptor, timestamps []hlc.Timestamp) error - -// descriptorUpgradeMigration is an abstraction for a descriptor upgrade migration. -// The rows provided should be the result of a select ID, descriptor, crdb_internal_mvcc_timestamp -// from system.descriptor table. -// The datums returned from the query are parsed to grab the descpb.Descriptor -// and fixDescriptorsFunction is called on the desc. -// If minBatchSizeInBytes is specified, fixDescriptors will only be called once the -// size of the descriptors in the id array surpasses minBatchSizeInBytes. -func descriptorUpgradeMigration( - ctx context.Context, - rows sqlutil.InternalRows, - fixDescFunc fixDescriptorsFunction, - minBatchSizeInBytes int, -) error { - defer func() { _ = rows.Close() }() - ok, err := rows.Next(ctx) - if err != nil { - return err - } - currSize := 0 // in bytes. - var ids []descpb.ID - var descs []descpb.Descriptor - var timestamps []hlc.Timestamp - for ; ok; ok, err = rows.Next(ctx) { - if err != nil { - return err - } - datums := rows.Cur() - id, desc, ts, err := unmarshalDescFromDescriptorRow(datums) - if err != nil { - return err - } - // If the descriptor is not a database or table descriptor, we can skip it. - tableDesc, databaseDesc, _, _ := descpb.FromDescriptorWithMVCCTimestamp(&desc, ts) - if databaseDesc == nil && tableDesc == nil { - continue - } - ids = append(ids, id) - descs = append(descs, desc) - timestamps = append(timestamps, ts) - currSize += desc.Size() - if currSize > minBatchSizeInBytes || minBatchSizeInBytes == 0 { - err = fixDescFunc(ids, descs, timestamps) - if err != nil { - return err - } - // Reset size and id array after the batch is fixed. - currSize = 0 - ids = nil - descs = nil - timestamps = nil - } - } - // Fix remaining descriptors. - return fixDescFunc(ids, descs, timestamps) + return runPostDeserializationChangesOnAllDescriptors(ctx, d) }