diff --git a/pkg/ccl/testccl/sqlccl/show_transfer_state_test.go b/pkg/ccl/testccl/sqlccl/show_transfer_state_test.go index f8ac743552d8..768f8fc83529 100644 --- a/pkg/ccl/testccl/sqlccl/show_transfer_state_test.go +++ b/pkg/ccl/testccl/sqlccl/show_transfer_state_test.go @@ -43,6 +43,12 @@ func TestShowTransferState(t *testing.T) { require.NoError(t, err) _, err = tenantDB.Exec("CREATE TYPE typ AS ENUM ('foo', 'bar')") require.NoError(t, err) + _, err = tenantDB.Exec("CREATE TABLE tab (a INT4, b typ)") + require.NoError(t, err) + _, err = tenantDB.Exec("INSERT INTO tab VALUES (1, 'foo')") + require.NoError(t, err) + _, err = tenantDB.Exec("GRANT SELECT ON tab TO testuser") + require.NoError(t, err) testUserConn := tenant.SQLConnForUser(t, username.TestUser, "") @@ -90,12 +96,18 @@ func TestShowTransferState(t *testing.T) { defer func() { _ = conn.Close(ctx) }() // Add a prepared statement to make sure SHOW TRANSFER STATE handles it. - _, err = conn.Prepare(ctx, "prepared_stmt", "SELECT $1::INT4, 'foo'::typ WHERE 1 = 1") + _, err = conn.Prepare(ctx, "prepared_stmt_const", "SELECT $1::INT4, 'foo'::typ WHERE 1 = 1") + require.NoError(t, err) + _, err = conn.Prepare(ctx, "prepared_stmt_aost", "SELECT a, b FROM tab AS OF SYSTEM TIME '-1us'") require.NoError(t, err) var intResult int var enumResult string - err = conn.QueryRow(ctx, "prepared_stmt", 1).Scan(&intResult, &enumResult) + err = conn.QueryRow(ctx, "prepared_stmt_const", 1).Scan(&intResult, &enumResult) + require.NoError(t, err) + require.Equal(t, 1, intResult) + require.Equal(t, "foo", enumResult) + err = conn.QueryRow(ctx, "prepared_stmt_aost").Scan(&intResult, &enumResult) require.NoError(t, err) require.Equal(t, 1, intResult) require.Equal(t, "foo", enumResult) @@ -171,14 +183,26 @@ func TestShowTransferState(t *testing.T) { // session. result := conn.PgConn().ExecPrepared( ctx, - "prepared_stmt", + "prepared_stmt_const", [][]byte{{0, 0, 0, 2}}, // binary representation of 2 []int16{1}, // paramFormats - 1 means binary - []int16{1}, // resultFormats - 1 means binary + []int16{1, 1}, // resultFormats - 1 means binary ).Read() + require.NoError(t, result.Err) require.Equal(t, [][][]byte{{ {0, 0, 0, 2}, {0x66, 0x6f, 0x6f}, // binary representation of 2, 'foo' }}, result.Rows) + result = conn.PgConn().ExecPrepared( + ctx, + "prepared_stmt_aost", + [][]byte{}, // paramValues + []int16{}, // paramFormats + []int16{1, 1}, // resultFormats - 1 means binary + ).Read() + require.NoError(t, result.Err) + require.Equal(t, [][][]byte{{ + {0, 0, 0, 1}, {0x66, 0x6f, 0x6f}, // binary representation of 1, 'foo' + }}, result.Rows) }) // Errors should be displayed as a SQL value. diff --git a/pkg/cmd/roachtest/tests/cdc_bench.go b/pkg/cmd/roachtest/tests/cdc_bench.go index 87d50d5ed380..4590374139f0 100644 --- a/pkg/cmd/roachtest/tests/cdc_bench.go +++ b/pkg/cmd/roachtest/tests/cdc_bench.go @@ -548,7 +548,8 @@ func waitForChangefeed( ) (changefeedInfo, error) { ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() - for attempt := 0; ; attempt++ { + const maxLoadJobAttempts = 5 + for loadJobAttempt := 0; ; loadJobAttempt++ { select { case <-ticker.C: case <-ctx.Done(): @@ -557,9 +558,9 @@ func waitForChangefeed( info, err := getChangefeedInfo(conn, jobID) if err != nil { - logger.Errorf("error getting changefeed info: %v (attempt %d)", err, attempt+1) - if attempt > 5 { - return changefeedInfo{}, errors.Wrap(err, "failed 5 attempts to get changefeed info") + logger.Errorf("error getting changefeed info: %v (attempt %d)", err, loadJobAttempt+1) + if loadJobAttempt > 5 { + return changefeedInfo{}, errors.Wrapf(err, "failed %d attempts to get changefeed info", maxLoadJobAttempts) } continue } else if info.errMsg != "" { @@ -570,6 +571,7 @@ func waitForChangefeed( } else if ok { return *info, nil } + loadJobAttempt = 0 } } diff --git a/pkg/kv/kvserver/concurrency/lock_table.go b/pkg/kv/kvserver/concurrency/lock_table.go index f20ab052549d..c38a8fdd0e90 100644 --- a/pkg/kv/kvserver/concurrency/lock_table.go +++ b/pkg/kv/kvserver/concurrency/lock_table.go @@ -667,7 +667,7 @@ func (g *lockTableGuardImpl) CheckOptimisticNoConflicts( ltRange := &lockState{key: startKey, endKey: span.EndKey} for iter.FirstOverlap(ltRange); iter.Valid(); iter.NextOverlap(ltRange) { l := iter.Cur() - if !l.isNonConflictingLock(g, g.curStrength()) { + if !l.isNonConflictingLock(g) { return false } } @@ -2359,7 +2359,7 @@ func (l *lockState) claimBeforeProceeding(g *lockTableGuardImpl) { panic("lock table bug: did not find enqueued request") } -func (l *lockState) isNonConflictingLock(g *lockTableGuardImpl, str lock.Strength) bool { +func (l *lockState) isNonConflictingLock(g *lockTableGuardImpl) bool { l.mu.Lock() defer l.mu.Unlock() @@ -2368,13 +2368,12 @@ func (l *lockState) isNonConflictingLock(g *lockTableGuardImpl, str lock.Strengt return true } // Lock is not empty. - lockHolderTxn, lockHolderTS := l.getLockHolder() - if lockHolderTxn == nil { - // Transactions that have claimed the lock, but have not acquired it yet, - // are considered non-conflicting. - // - // Optimistic evaluation may call into this function with or without holding - // latches. It's worth considering both these cases separately: + if !l.isHeld() { + // If the lock is neither empty nor held it must be the case that another + // transaction has claimed the lock. Locks that have been claimed, but have + // not been acquired yet, are considered non-conflicting. Optimistic + // evaluation may call into this function with or without holding latches. + // It's worth considering both these cases separately: // // 1. If Optimistic evaluation is holding latches, then there cannot be a // conflicting request that has claimed (but not acquired) the lock that is @@ -2397,19 +2396,29 @@ func (l *lockState) isNonConflictingLock(g *lockTableGuardImpl, str lock.Strengt // claimed the lock will know what happened and what to do about it. return true } + lockHolderTxn, _ := l.getLockHolder() if g.isSameTxn(lockHolderTxn) { - // Already locked by this txn. + // NB: Unlike the pessimistic (normal) evaluation code path, we do not need + // to check the lock's strength if it is already held by this transaction -- + // it's non-conflicting. There's two cases to consider: + // + // 1. If the lock is held with the same/higher lock strength on this key + // then this optimistic evaluation attempt already has all the protection it + // needs. + // + // 2. If the lock is held with a weaker lock strength other transactions may + // be able to acquire a lock on this key that conflicts with this optimistic + // evaluation attempt. This is okay, as we'll detect such cases -- however, + // the weaker lock in itself is not conflicting with the optimistic + // evaluation attempt. return true } + // NB: We do not look at the txnStatusCache in this optimistic evaluation // path. A conflict with a finalized txn will be noticed when retrying // pessimistically. - if str == lock.None && g.ts.Less(lockHolderTS) { - return true - } - // Conflicts. - return false + return !lock.Conflicts(l.getLockMode(), g.curLockMode(), &g.lt.settings.SV) // non-conflicting } // Acquires this lock. Any requests that are waiting in the lock's wait queues diff --git a/pkg/kv/kvserver/concurrency/testdata/lock_table/optimistic b/pkg/kv/kvserver/concurrency/testdata/lock_table/optimistic index 2dddf460b702..07666b2f9f15 100644 --- a/pkg/kv/kvserver/concurrency/testdata/lock_table/optimistic +++ b/pkg/kv/kvserver/concurrency/testdata/lock_table/optimistic @@ -132,3 +132,125 @@ num=2 holder: txn: 00000000-0000-0000-0000-000000000001 epoch: 0, iso: Serializable, ts: 10.000000000,1, info: unrepl [(str: Exclusive seq: 0)] lock: "g" holder: txn: 00000000-0000-0000-0000-000000000001 epoch: 0, iso: Serializable, ts: 10.000000000,1, info: unrepl [(str: Exclusive seq: 0)] + + +# ------------------------------------------------------------------------------ +# Test that optimistic evaluation succeeds if the lock is held by our own +# transaction, regardless of lock strengths. +# ------------------------------------------------------------------------------ + +clear +---- +num=0 + +new-request r=req5 txn=txn1 ts=10,1 spans=shared@a +---- + +scan r=req5 +---- +start-waiting: false + +should-wait r=req5 +---- +false + +acquire r=req5 k=a durability=u strength=shared +---- +num=1 + lock: "a" + holder: txn: 00000000-0000-0000-0000-000000000001 epoch: 0, iso: Serializable, ts: 10.000000000,1, info: unrepl [(str: Shared seq: 0)] + +# Ensure a optimistic evaluation attempt from the same transaction that covers +# key "a" succeeds -- both with lower and higher lock strengths than the +# strength of the lock already held (shared). + +new-request r=req6 txn=txn1 ts=10,1 spans=exclusive@a,c +---- + +scan-opt r=req6 +---- +start-waiting: false + +should-wait r=req6 +---- +false + +check-opt-no-conflicts r=req6 spans=exclusive@a,c +---- +no-conflicts: true + +dequeue r=req6 +---- +num=1 + lock: "a" + holder: txn: 00000000-0000-0000-0000-000000000001 epoch: 0, iso: Serializable, ts: 10.000000000,1, info: unrepl [(str: Shared seq: 0)] + +new-request r=req7 txn=txn1 ts=10,1 spans=none@a,c +---- + +scan-opt r=req7 +---- +start-waiting: false + +should-wait r=req7 +---- +false + +check-opt-no-conflicts r=req7 spans=none@a,c +---- +no-conflicts: true + +dequeue r=req7 +---- +num=1 + lock: "a" + holder: txn: 00000000-0000-0000-0000-000000000001 epoch: 0, iso: Serializable, ts: 10.000000000,1, info: unrepl [(str: Shared seq: 0)] + +# ------------------------------------------------------------------------------ +# Test that optimistic evaluation works with SHARED locking strength -- if a +# shared lock is held, another transaction should be able to perform optimistic +# evaluation with shared locking strength and not conflict; optimistic evaluation +# should conflict if run with exclusive lock strength. +# ------------------------------------------------------------------------------ + +new-request r=req8 txn=txn2 ts=10,1 spans=none@a,c +---- + +scan-opt r=req8 +---- +start-waiting: false + +should-wait r=req8 +---- +false + +check-opt-no-conflicts r=req8 spans=shared@a,c +---- +no-conflicts: true + +dequeue r=req8 +---- +num=1 + lock: "a" + holder: txn: 00000000-0000-0000-0000-000000000001 epoch: 0, iso: Serializable, ts: 10.000000000,1, info: unrepl [(str: Shared seq: 0)] + +new-request r=req9 txn=txn2 ts=10,1 spans=exclusive@a,c +---- + +scan-opt r=req9 +---- +start-waiting: false + +should-wait r=req9 +---- +false + +check-opt-no-conflicts r=req9 spans=exclusive@a,c +---- +no-conflicts: false + +dequeue r=req9 +---- +num=1 + lock: "a" + holder: txn: 00000000-0000-0000-0000-000000000001 epoch: 0, iso: Serializable, ts: 10.000000000,1, info: unrepl [(str: Shared seq: 0)] diff --git a/pkg/sql/conn_executor_prepare.go b/pkg/sql/conn_executor_prepare.go index a9e862a5244a..472b9edc503a 100644 --- a/pkg/sql/conn_executor_prepare.go +++ b/pkg/sql/conn_executor_prepare.go @@ -288,8 +288,12 @@ func (ex *connExecutor) prepare( } // Use the existing transaction. - if err := prepare(ctx, ex.state.mu.txn); err != nil && origin != PreparedStatementOriginSessionMigration { - return nil, err + if err := prepare(ctx, ex.state.mu.txn); err != nil { + if origin != PreparedStatementOriginSessionMigration { + return nil, err + } else { + log.Warningf(ctx, "could not prepare statement during session migration: %v", err) + } } // Account for the memory used by this prepared statement. @@ -320,8 +324,15 @@ func (ex *connExecutor) populatePrepared( return 0, err } p.extendedEvalCtx.PrepareOnly = true - if err := ex.handleAOST(ctx, p.stmt.AST); err != nil { - return 0, err + // If the statement is being prepared by a session migration, then we should + // not evaluate the AS OF SYSTEM TIME timestamp. During session migration, + // there is no way for the statement being prepared to be executed in this + // transaction, so there's no need to fix the timestamp, unlike how we must + // for pgwire- or SQL-level prepared statements. + if origin != PreparedStatementOriginSessionMigration { + if err := ex.handleAOST(ctx, p.stmt.AST); err != nil { + return 0, err + } } // PREPARE has a limited subset of statements it can be run with. Postgres diff --git a/pkg/sql/testdata/session_migration/prepared_statements b/pkg/sql/testdata/session_migration/prepared_statements index e764edd7643e..80cdee3eb23f 100644 --- a/pkg/sql/testdata/session_migration/prepared_statements +++ b/pkg/sql/testdata/session_migration/prepared_statements @@ -33,6 +33,11 @@ wire_prepare s4 INSERT INTO t2 VALUES($1, $2) ---- +# Regression test for transferring statements with AOST. +wire_prepare s5 +SELECT a, b FROM t2 AS OF SYSTEM TIME '-2us' +---- + wire_prepare s_empty ; ---- @@ -102,6 +107,15 @@ SELECT * FROM t2 ---- 1 cat +query +SELECT pg_sleep(0.1) +---- +true + +wire_query s5 +---- +1 cat + reset ----