diff --git a/pkg/sql/contention/txnidcache/BUILD.bazel b/pkg/sql/contention/txnidcache/BUILD.bazel index 62327daab8f3..9fcf328176dc 100644 --- a/pkg/sql/contention/txnidcache/BUILD.bazel +++ b/pkg/sql/contention/txnidcache/BUILD.bazel @@ -34,6 +34,7 @@ go_test( "txn_id_cache_test.go", "writer_test.go", ], + data = glob(["testdata/**"]), embed = [":txnidcache"], deps = [ "//pkg/kv", @@ -56,6 +57,7 @@ go_test( "//pkg/util/stop", "//pkg/util/syncutil", "//pkg/util/uuid", + "@com_github_cockroachdb_datadriven//:datadriven", "@com_github_cockroachdb_errors//:errors", "@com_github_stretchr_testify//require", ], diff --git a/pkg/sql/contention/txnidcache/fifo_cache.go b/pkg/sql/contention/txnidcache/fifo_cache.go index 5217d608e5d1..aef77400207e 100644 --- a/pkg/sql/contention/txnidcache/fifo_cache.go +++ b/pkg/sql/contention/txnidcache/fifo_cache.go @@ -12,10 +12,10 @@ package txnidcache import ( "sync" + "unsafe" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/contention/contentionutils" - "github.com/cockroachdb/cockroach/pkg/sql/contentionpb" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" ) @@ -39,21 +39,19 @@ type fifoCache struct { } type blockListNode struct { - block + *block next *blockListNode } // blockList is a singly-linked list of blocks. The list is used to // implement FIFO eviction. type blockList struct { - head *blockListNode - tail *blockListNode - - // tailIdx is an index pointing into the next empty slot in block - // stored in the tail pointer. - tailIdx int + numNodes int + head *blockListNode + tail *blockListNode } +// newFifoCache takes a function which returns a capacity in bytes. func newFIFOCache(capacity contentionutils.CapacityLimiter) *fifoCache { c := &fifoCache{ capacity: capacity, @@ -70,22 +68,14 @@ func (c *fifoCache) add(b *block) { c.mu.Lock() defer c.mu.Unlock() - blockSize := 0 for i := range b { if !b[i].Valid() { break } c.mu.data[b[i].TxnID] = b[i].TxnFingerprintID - blockSize++ } - - c.mu.eviction.append(b[:blockSize]) - - // Zeros out the block and put it back into the blockPool. - *b = block{} - blockPool.Put(b) - + c.mu.eviction.addNode(b) c.maybeEvictLocked() } @@ -97,24 +87,25 @@ func (c *fifoCache) get(txnID uuid.UUID) (roachpb.TransactionFingerprintID, bool return fingerprintID, found } -func (c *fifoCache) size() int { +func (c *fifoCache) size() int64 { c.mu.RLock() defer c.mu.RUnlock() - return len(c.mu.data) + return c.sizeLocked() +} + +func (c *fifoCache) sizeLocked() int64 { + return int64(c.mu.eviction.numNodes)* + ((entrySize*blockSize)+int64(unsafe.Sizeof(blockListNode{}))) + + int64(len(c.mu.data))*entrySize } func (c *fifoCache) maybeEvictLocked() { - for int64(len(c.mu.data)) > c.capacity() { + for c.sizeLocked() > c.capacity() { node := c.mu.eviction.removeFront() if node == nil { return } - c.evictNodeLocked(node) - - // Zero out the node and put it back into the pool. - *node = blockListNode{} - nodePool.Put(node) } } @@ -127,40 +118,23 @@ func (c *fifoCache) evictNodeLocked(node *blockListNode) { delete(c.mu.data, node.block[i].TxnID) } -} -func (e *blockList) append(block []contentionpb.ResolvedTxnID) { - block = e.appendToTail(block) - for len(block) > 0 { - e.addNode() - block = e.appendToTail(block) - } + *node.block = block{} + blockPool.Put(node.block) + *node = blockListNode{} + nodePool.Put(node) } -func (e *blockList) addNode() { +func (e *blockList) addNode(b *block) { newNode := nodePool.Get().(*blockListNode) + newNode.block = b if e.head == nil { e.head = newNode } else { e.tail.next = newNode } e.tail = newNode - e.tailIdx = 0 -} - -func (e *blockList) appendToTail( - block []contentionpb.ResolvedTxnID, -) (remaining []contentionpb.ResolvedTxnID) { - if e.head == nil { - return block - } - toCopy := blockSize - e.tailIdx - if toCopy > len(block) { - toCopy = len(block) - } - copy(e.tail.block[e.tailIdx:], block[:toCopy]) - e.tailIdx += toCopy - return block[toCopy:] + e.numNodes++ } func (e *blockList) removeFront() *blockListNode { @@ -168,7 +142,11 @@ func (e *blockList) removeFront() *blockListNode { return nil } + e.numNodes-- removedBlock := e.head e.head = e.head.next + if e.head == nil { + e.tail = nil + } return removedBlock } diff --git a/pkg/sql/contention/txnidcache/fifo_cache_test.go b/pkg/sql/contention/txnidcache/fifo_cache_test.go index e43fff26ce03..dc630eab2df7 100644 --- a/pkg/sql/contention/txnidcache/fifo_cache_test.go +++ b/pkg/sql/contention/txnidcache/fifo_cache_test.go @@ -13,190 +13,115 @@ package txnidcache import ( "fmt" "math/rand" + "strconv" + "strings" "testing" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/sql/contentionpb" + "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/cockroachdb/datadriven" "github.com/stretchr/testify/require" ) -func TestFIFOCache(t *testing.T) { - cache := newFIFOCache(func() int64 { return 2 * blockSize } /* capacity */) - - // Fill the first eviction block in cache to 1/4 capacity. - input1, expected1 := generateInputBlock(blockSize * 1 / 4 /* size */) - cache.add(cloneBlock(input1)) - checkExists(t, cache, expected1) - checkEvictionListShape(t, cache, []int{blockSize * 1 / 4}) - checkEvictionListContent(t, cache, []*block{input1}) - - // Fill the first eviction block in cache to 3/4 capacity. - input2, expected2 := generateInputBlock(blockSize / 2 /* size */) - cache.add(cloneBlock(input2)) - checkExists(t, cache, expected1, expected2) - checkEvictionListShape(t, cache, []int{blockSize * 3 / 4}) - - checkEvictionListContent(t, cache, []*block{input1, input2}) - - // Overflow the first eviction block, the second block should be 1/2 filled. - input3, expected3 := generateInputBlock(blockSize * 3 / 4) - cache.add(cloneBlock(input3)) - checkExists(t, cache, expected1, expected2, expected3) - checkEvictionListShape(t, cache, []int{blockSize, blockSize / 2}) - checkEvictionListContent(t, cache, []*block{input1, input2, input3}) - - // Overflow the second block, cause the first block to be evicted. Second - // block becomes the first block, and it is completely filled. The new second - // block should be 1/4 filled. Part of the input3 will be evicted. - input4, expected4 := generateInputBlock(blockSize * 3 / 4) - cache.add(cloneBlock(input4)) - checkEvictionListShape(t, cache, []int{blockSize, blockSize / 4}) - - // First 1/4 of input3 should be evicted and the remaining [2/4:3/4] should - // still remain. - remainingInput3 := &block{} - copy(remainingInput3[:], input3[blockSize*1/4:blockSize*3/4]) - checkEvictionListContent(t, cache, []*block{remainingInput3, input4}) - - // Removes the portion that hasn't been evicted. - evictedExpected3 := removeEntryFromMap(expected3, input3[blockSize*1/4:blockSize*3/4]) - - // Removes the portion that has been evicted. - remainingExpected3 := removeEntryFromMap(expected3, input3[:blockSize*1/4]) - - checkNotExist(t, cache, expected1, expected2, evictedExpected3) - checkExists(t, cache, remainingExpected3, expected4) - - // Partially fill up the second block in the eviction list. - input5, expected5 := generateInputBlock(blockSize / 2) - cache.add(cloneBlock(input5)) - checkNotExist(t, cache, expected1, expected2, evictedExpected3) - checkExists(t, cache, remainingExpected3, expected4, expected5) - checkEvictionListShape(t, cache, []int{blockSize, blockSize * 3 / 4}) - checkEvictionListContent(t, cache, []*block{remainingInput3, input4, input5}) - - // Overflow the second block again to trigger another eviction, part of the - // input4 would be evicted. - input6, expected6 := generateInputBlock(blockSize * 3 / 4) - cache.add(cloneBlock(input6)) - checkEvictionListShape(t, cache, []int{blockSize, blockSize / 2}) - - remainingInput4 := &block{} - copy(remainingInput4[:], input4[blockSize/2:blockSize*3/4]) - checkEvictionListContent(t, cache, []*block{remainingInput4, input5, input6}) - - evictedExpected4 := removeEntryFromMap(expected4, input4[blockSize/2:blockSize*3/4]) - remainingExpected4 := removeEntryFromMap(expected4, input4[:blockSize/2]) - - checkNotExist(t, cache, expected1, expected2, expected3, evictedExpected4) - checkExists(t, cache, remainingExpected4, expected5, expected6) -} - -func removeEntryFromMap( - m map[uuid.UUID]roachpb.TransactionFingerprintID, filterList []contentionpb.ResolvedTxnID, -) map[uuid.UUID]roachpb.TransactionFingerprintID { - newMap := make(map[uuid.UUID]roachpb.TransactionFingerprintID) - for k, v := range m { - newMap[k] = v - } - - for _, val := range filterList { - delete(newMap, val.TxnID) - } - - return newMap -} - -func checkEvictionListContent(t *testing.T, cache *fifoCache, expectedBlocks []*block) { - t.Helper() - - cur := cache.mu.eviction.head - evictionListBlockIdx := 0 - evictionListSize := 0 - - for i := range expectedBlocks { - require.NotNilf(t, cur, "expect a valid eviction list node, but it (size=%d)"+ - "is nil", evictionListSize) - - for blockIdx := 0; blockIdx < blockSize; blockIdx++ { - if !expectedBlocks[i][blockIdx].Valid() { - break - } - - require.Equal(t, expectedBlocks[i][blockIdx], cur.block[evictionListBlockIdx], - "expected eviction block at index [%d][%d] to be "+ - "%s, but it is %s", i, blockIdx, expectedBlocks[i][blockIdx].TxnID.String(), - cur.block[evictionListBlockIdx].TxnID.String()) - - evictionListBlockIdx++ - - isEvictionListIdxStillValid := - evictionListBlockIdx < blockSize && cur.block[evictionListBlockIdx].Valid() - - if !isEvictionListIdxStillValid { - cur = cur.next - evictionListBlockIdx = 0 - evictionListSize++ +func TestFIFOCacheDataDriven(t *testing.T) { + var cache *fifoCache + inputBlocks := make(map[string]*block) + expectedMaps := make(map[string]map[uuid.UUID]roachpb.TransactionFingerprintID) + blockToNameMap := make(map[*block]string) + + datadriven.Walk(t, testutils.TestDataPath(t), func(t *testing.T, path string) { + datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { + switch d.Cmd { + case "init": + var capacity int + d.ScanArgs(t, "capacity", &capacity) + cache = newFIFOCache(func() int64 { return int64(capacity) } /* capacity */) + return fmt.Sprintf("cache_size: %d", cache.size()) + case "newInputBlock": + var name string + var percentageFilledStr string + + d.ScanArgs(t, "name", &name) + d.ScanArgs(t, "percentageFilled", &percentageFilledStr) + + percentageFilled, err := strconv.ParseFloat(percentageFilledStr, 64) + require.NoError(t, err) + + actualSize := int(blockSize * percentageFilled) + input, expected := generateInputBlock(actualSize) + inputBlocks[name] = input + expectedMaps[name] = expected + + return fmt.Sprintf("blockSize: %d", actualSize) + case "insertBlock": + var name string + d.ScanArgs(t, "name", &name) + input, ok := inputBlocks[name] + require.True(t, ok, "input %s is not found", name) + input = cloneBlock(input) + cache.add(input) + blockToNameMap[input] = name + case "show": + var result []string + + result = append(result, fmt.Sprintf("cacheSize: %d", cache.size())) + for cur := cache.mu.eviction.head; cur != nil; cur = cur.next { + blockName := blockToNameMap[cur.block] + + actualBlockSize := 0 + for blockOffset := 0; blockOffset < blockSize; blockOffset++ { + if !cur.block[blockOffset].Valid() { + break + } + actualBlockSize++ + } + + result = append(result, + fmt.Sprintf("blockName: %s, blockSize: %d", + blockName, actualBlockSize)) + } + + return strings.Join(result, "\n") + case "checkCacheContent": + var presentInputBlockNamesStr string + var evictedInputBlockNamesStr string + + if d.HasArg("presentBlockNames") { + d.ScanArgs(t, "presentBlockNames", &presentInputBlockNamesStr) + } + if d.HasArg("evictedBlockNames") { + d.ScanArgs(t, "evictedBlockNames", &evictedInputBlockNamesStr) + } + + presentInputBlockNames := strings.Split(presentInputBlockNamesStr, ",") + for _, name := range presentInputBlockNames { + expected := expectedMaps[name] + for txnID, expectedTxnFingerprintID := range expected { + actualTxnFingerprintID, ok := cache.mu.data[txnID] + require.True(t, ok, "expected to find txn fingerprint ID "+ + "for txnID %s, but it was not found", txnID) + require.Equal(t, expectedTxnFingerprintID, actualTxnFingerprintID, + "expected the txnID %s to have txn fingerprint ID %d, but "+ + "got %d", txnID, expectedTxnFingerprintID, actualTxnFingerprintID) + } + } + + evictedInputBlockNames := strings.Split(evictedInputBlockNamesStr, ",") + for _, name := range evictedInputBlockNames { + expected := expectedMaps[name] + for txnID := range expected { + _, ok := cache.mu.data[txnID] + require.False(t, ok, "expected to not find txn fingerprint ID "+ + "for txnID %s, but it was found", txnID) + } + } + + return "ok" } - } - } - - require.Nilf(t, cur, "expect eviction list to be fully iterated, but it was not") -} - -func checkEvictionListShape(t *testing.T, cache *fifoCache, expectedBlockSizes []int) { - t.Helper() - cur := cache.mu.eviction.head - - for i := range expectedBlockSizes { - require.NotNilf(t, cur, "expect an eviction list of size %d, but it has "+ - "a size of %d", len(expectedBlockSizes), i-1) - - actualBlockSize := 0 - for blockIdx := 0; blockIdx < blockSize; blockIdx++ { - if !cur.block[blockIdx].Valid() { - break - } - actualBlockSize++ - } - - require.Equal(t, expectedBlockSizes[i], actualBlockSize, - "expected eviction list block at index [%d] to have a size "+ - "of %d, but instead it has a size of %d", - i, expectedBlockSizes[i], actualBlockSize) - - cur = cur.next - } -} - -func checkExists( - t *testing.T, cache *fifoCache, expectedMaps ...map[uuid.UUID]roachpb.TransactionFingerprintID, -) { - t.Helper() - for _, expected := range expectedMaps { - for expectedKey, expectedValue := range expected { - actualValue, found := cache.get(expectedKey) - require.True(t, found, "expected txnID %s to be present in "+ - "the cache, but it was not found", expectedKey) - require.Equal(t, expectedValue, actualValue, "expected to find "+ - "transaction fingerprint ID %d for txnID %s, but found %d instead", - expectedValue, expectedKey, actualValue) - } - } -} - -func checkNotExist( - t *testing.T, cache *fifoCache, expectedMaps ...map[uuid.UUID]roachpb.TransactionFingerprintID, -) { - t.Helper() - for _, expected := range expectedMaps { - for expectedKey := range expected { - _, found := cache.get(expectedKey) - require.False(t, found, "expected txnID %s to be not present in "+ - "the cache, but it was found", expectedKey) - } - } + return "" + }) + }) } func cloneBlock(b *block) *block { @@ -217,7 +142,8 @@ func generateInputBlock( for i := 0; i < size; i++ { input[i].TxnID = uuid.FastMakeV4() - input[i].TxnFingerprintID = roachpb.TransactionFingerprintID(rand.Uint64()) + input[i].TxnFingerprintID = + roachpb.TransactionFingerprintID(rand.Uint64()) expected[input[i].TxnID] = input[i].TxnFingerprintID } diff --git a/pkg/sql/contention/txnidcache/testdata/fifo_cache b/pkg/sql/contention/txnidcache/testdata/fifo_cache new file mode 100644 index 000000000000..27c151192afe --- /dev/null +++ b/pkg/sql/contention/txnidcache/testdata/fifo_cache @@ -0,0 +1,96 @@ +# 24240 bytes is enough for a fifoCache with 3 fully filled eviction block. +init capacity=24240 +---- +cache_size: 0 + +newInputBlock name=input1 percentageFilled=0.25 +---- +blockSize: 42 + +insertBlock name=input1 +---- + +checkCacheContent presentBlockNames=input1 +---- +ok + +show +---- +cacheSize: 5056 +blockName: input1, blockSize: 42 + +newInputBlock name=input2 percentageFilled=0.50 +---- +blockSize: 84 + +insertBlock name=input2 +---- + +checkCacheContent presentBlockNames=input1,input2 +---- +ok + +show +---- +cacheSize: 11120 +blockName: input1, blockSize: 42 +blockName: input2, blockSize: 84 + +newInputBlock name=input3 percentageFilled=1.0 +---- +blockSize: 168 + +insertBlock name=input3 +---- + +checkCacheContent presentBlockNames=input1,input2,input3 +---- +ok + +show +---- +cacheSize: 19200 +blockName: input1, blockSize: 42 +blockName: input2, blockSize: 84 +blockName: input3, blockSize: 168 + +# Any subsequent insertion will cause evictions + +newInputBlock name=input4 percentageFilled=1.0 +---- +blockSize: 168 + +insertBlock name=input4 +---- + +checkCacheContent presentBlockNames=input2,input3,input4 evictedBlockNames=input1 +---- +ok + +show +---- +cacheSize: 22224 +blockName: input2, blockSize: 84 +blockName: input3, blockSize: 168 +blockName: input4, blockSize: 168 + + +# Inserting sparse blocks can cause premature eviction. + +newInputBlock name=input5 percentageFilled=0.01786 +---- +blockSize: 3 + +insertBlock name=input5 +---- + +checkCacheContent presentBlockNames=input3,input4,input5 evictedBlockNames=input1,input2 +---- +ok + +show +---- +cacheSize: 20280 +blockName: input3, blockSize: 168 +blockName: input4, blockSize: 168 +blockName: input5, blockSize: 3 diff --git a/pkg/sql/contention/txnidcache/txn_id_cache.go b/pkg/sql/contention/txnidcache/txn_id_cache.go index 5baad3dab7e0..63c1a56727b4 100644 --- a/pkg/sql/contention/txnidcache/txn_id_cache.go +++ b/pkg/sql/contention/txnidcache/txn_id_cache.go @@ -132,10 +132,7 @@ func NewTxnIDCache(st *cluster.Settings, metrics *Metrics) *Cache { closeCh: make(chan struct{}), } - t.store = newFIFOCache(func() int64 { - return MaxSize.Get(&st.SV) / entrySize - } /* capacity */) - + t.store = newFIFOCache(func() int64 { return MaxSize.Get(&st.SV) }) t.writer = newWriter(st, t) return t } @@ -189,7 +186,7 @@ func (t *Cache) DrainWriteBuffer() { t.writer.DrainWriteBuffer() } -// Size return the current size of the Cache. +// Size return the current size of the Cache in bytes. func (t *Cache) Size() int64 { - return int64(t.store.size()) * entrySize + return t.store.size() } diff --git a/pkg/sql/delegate/show_completions.go b/pkg/sql/delegate/show_completions.go index 268b1b345c32..a73575b6b51b 100644 --- a/pkg/sql/delegate/show_completions.go +++ b/pkg/sql/delegate/show_completions.go @@ -14,27 +14,21 @@ import ( "bytes" "fmt" "sort" - "strconv" "strings" "unicode" "github.com/cockroachdb/cockroach/pkg/sql/lexbase" "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" - "github.com/cockroachdb/errors" ) func (d *delegator) delegateShowCompletions(n *tree.ShowCompletions) (tree.Statement, error) { - offsetVal, ok := n.Offset.AsConstantInt() - if !ok { - return nil, errors.Newf("invalid offset %v", n.Offset) - } - offset, err := strconv.Atoi(offsetVal.String()) + offset, err := n.Offset.AsInt64() if err != nil { return nil, err } - completions, err := RunShowCompletions(n.Statement.RawString(), offset) + completions, err := RunShowCompletions(n.Statement.RawString(), int(offset)) if err != nil { return nil, err } diff --git a/pkg/sql/schema_changer_test.go b/pkg/sql/schema_changer_test.go index 51f65dc018bf..a3eb276aaab6 100644 --- a/pkg/sql/schema_changer_test.go +++ b/pkg/sql/schema_changer_test.go @@ -7023,11 +7023,10 @@ func TestDropColumnAfterMutations(t *testing.T) { s, sqlDB, _ := serverutils.StartServer(t, params) defer s.Stopper().Stop(ctx) - conn1 := sqlutils.MakeSQLRunner(sqlDB) - conn2 := sqlutils.MakeSQLRunner(sqlDB) + tdb := sqlutils.MakeSQLRunner(sqlDB) var schemaChangeWaitGroup sync.WaitGroup - conn1.Exec(t, ` + tdb.Exec(t, ` CREATE TABLE t (i INT8 PRIMARY KEY, j INT8); INSERT INTO t VALUES (1, 1); `) @@ -7043,7 +7042,7 @@ INSERT INTO t VALUES (1, 1); schemaChangeWaitGroup.Add(1) go func() { defer schemaChangeWaitGroup.Done() - _, err := conn2.DB.ExecContext(ctx, + _, err := tdb.DB.ExecContext(ctx, ` BEGIN; ALTER TABLE t ALTER COLUMN j SET NOT NULL; @@ -7058,7 +7057,7 @@ COMMIT; schemaChangeWaitGroup.Add(1) go func() { defer schemaChangeWaitGroup.Done() - _, err := conn2.DB.ExecContext(ctx, + _, err := tdb.DB.ExecContext(ctx, ` SET sql_safe_updates = false; BEGIN; @@ -7095,14 +7094,14 @@ COMMIT; delayJobChannels = []chan struct{}{make(chan struct{}), make(chan struct{})} jobControlMu.Unlock() - conn2.Exec(t, ` + tdb.Exec(t, ` DROP TABLE t; CREATE TABLE t (i INT8 PRIMARY KEY, j INT8); INSERT INTO t VALUES (1, 1); `) go func() { // This transaction will not complete. Therefore, we don't check the returned error. - _, _ = conn2.DB.ExecContext(context.Background(), + _, _ = tdb.DB.ExecContext(context.Background(), ` SET application_name='failed-concurrent-drop-mutations'; BEGIN; @@ -7117,7 +7116,7 @@ COMMIT; go func() { // This transaction will not complete. Therefore, we don't check the returned error. - _, _ = conn1.DB.ExecContext(context.Background(), + _, _ = tdb.DB.ExecContext(context.Background(), ` SET application_name='failed-concurrent-drop-mutations'; SET sql_safe_updates = false; @@ -7148,14 +7147,14 @@ COMMIT; // the test. // Second job should be in reverting state and retrying. - conn1.CheckQueryResultsRetry(t, + tdb.CheckQueryResultsRetry(t, fmt.Sprintf("SELECT num_runs > 3 FROM system.jobs WHERE id = %d AND status = '%s'", jobIDs[1], jobs.StatusReverting), [][]string{{"true"}}, ) // First job should be in running state. - conn1.CheckQueryResults(t, fmt.Sprintf("SELECT status from system.jobs WHERE id = %d", jobIDs[0]), [][]string{{string(jobs.StatusRunning)}}) + tdb.CheckQueryResults(t, fmt.Sprintf("SELECT status from system.jobs WHERE id = %d", jobIDs[0]), [][]string{{string(jobs.StatusRunning)}}) // Both jobs should be stuck in COMMIT, waiting for jobs to complete. - conn1.CheckQueryResults(t, + tdb.CheckQueryResults(t, "SELECT count(*) FROM [SHOW SESSIONS] WHERE last_active_query LIKE '%COMMIT%' AND session_id != (SELECT * FROM [SHOW session_id]) "+ "AND application_name='failed-concurrent-drop-mutations'", [][]string{{"2"}}, @@ -7168,8 +7167,7 @@ COMMIT; // Test 3: with concurrent drop and mutations where an insert will // cause the backfill operation to fail. t.Run("concurrent-drop-mutations-insert-fail", func(t *testing.T) { - conn1.Exec(t, `SET use_declarative_schema_changer = 'off'`) - conn2.Exec(t, `SET use_declarative_schema_changer = 'off'`) + tdb.Exec(t, `SET use_declarative_schema_changer = 'off'`) jobControlMu.Lock() delayJobList = []string{"ALTER TABLE defaultdb.public.t ALTER COLUMN j SET NOT NULL", @@ -7177,7 +7175,7 @@ COMMIT; delayJobChannels = []chan struct{}{make(chan struct{}), make(chan struct{})} jobControlMu.Unlock() - conn2.Exec(t, ` + tdb.Exec(t, ` DROP TABLE t; CREATE TABLE t (i INT8 PRIMARY KEY, j INT8); INSERT INTO t VALUES (1, 1); @@ -7188,7 +7186,7 @@ COMMIT; // will fail during backfill or the dependent one with the drop will fail. // This transaction will not complete. Therefore, we don't check the returned error. - _, _ = conn2.DB.ExecContext(context.Background(), + _, _ = tdb.DB.ExecContext(context.Background(), ` SET application_name='concurrent-drop-mutations-insert-fail'; BEGIN; @@ -7201,7 +7199,7 @@ COMMIT; go func() { // This transaction will not complete. Therefore, we don't check the returned error. - _, _ = conn1.DB.ExecContext(context.Background(), + _, _ = tdb.DB.ExecContext(context.Background(), ` SET application_name='concurrent-drop-mutations-insert-fail'; SET sql_safe_updates = false; @@ -7234,7 +7232,7 @@ COMMIT; // job to backfill to validate this test's correctness assumptions. firstJobID := jobIDs[0] secondJobID := jobIDs[1] - res := conn1.QueryStr(t, "SELECT status from system.jobs where id in ($1, $2)", firstJobID, secondJobID) + res := tdb.QueryStr(t, "SELECT status from system.jobs where id in ($1, $2)", firstJobID, secondJobID) require.Len(t, res, 2) firstJobStatus := jobs.Status(res[0][0]) secondJobStatus := jobs.Status(res[1][0]) @@ -7248,12 +7246,12 @@ COMMIT; return errors.New("one job should be running while the other job should be reverting") }) // Ensure that the reverting job is retrying. - conn1.CheckQueryResultsRetry(t, + tdb.CheckQueryResultsRetry(t, fmt.Sprintf("SELECT num_runs > 3 FROM system.jobs WHERE id = %d", revertingJobID), [][]string{{"true"}}, ) // Both jobs should be stuck in COMMIT, waiting for jobs to complete. - conn1.CheckQueryResults(t, + tdb.CheckQueryResults(t, "SELECT count(*) FROM [SHOW SESSIONS] WHERE last_active_query LIKE '%COMMIT%' AND session_id != (SELECT * FROM [SHOW session_id]) "+ "AND application_name='concurrent-drop-mutations-insert-fail'", [][]string{{"2"}}, diff --git a/pkg/ui/workspaces/db-console/src/views/jobs/duration.tsx b/pkg/ui/workspaces/db-console/src/views/jobs/duration.tsx index 9b9fe67a9464..fa27bfc96c8e 100644 --- a/pkg/ui/workspaces/db-console/src/views/jobs/duration.tsx +++ b/pkg/ui/workspaces/db-console/src/views/jobs/duration.tsx @@ -49,7 +49,6 @@ export class Duration extends React.PureComponent<{ {"Duration: " + formatDuration(moment.duration(finishedAt.diff(startedAt)))} - ) ); }