Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
108179: concurrency: use lock modes to detect conflicts during optimistic eval r=nvanbenschoten a=arulajmani

This patch switches over conflict resolution performed by optimistic evaluation to use lock modes instead of ad-hoc logic. As a result of this, optimistic evaluation is able to handle shared locks. We add a test to show this.

Closes #108142

Release note: None

108503: sql: do not evaluate AOST timestamp in session migrations r=rafiss a=rafiss

fixes https://github.com/cockroachlabs/support/issues/2510
refs #108305
Release note (bug fix): Fixed a bug where a session migration performed by SHOW TRANSFER STATE would not handle prepared statements that used the AS OF SYSTEM TIME clause. Users who encountered this bug would see errors such as `expected 1 or 0 for number of format codes, got N`. This bug was present since v22.2.0.

108523: roachtest: Reset job load attempt when loading cdc job r=miretskiy a=miretskiy

Fixes #108433

Release note: None

Co-authored-by: Arul Ajmani <[email protected]>
Co-authored-by: Rafi Shamim <[email protected]>
Co-authored-by: Yevgeniy Miretskiy <[email protected]>
  • Loading branch information
4 people committed Aug 10, 2023
4 parents 1f8fa96 + 90ba1ef + 5befaa7 + fcc95cf commit c0fbeb3
Show file tree
Hide file tree
Showing 6 changed files with 209 additions and 27 deletions.
32 changes: 28 additions & 4 deletions pkg/ccl/testccl/sqlccl/show_transfer_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ func TestShowTransferState(t *testing.T) {
require.NoError(t, err)
_, err = tenantDB.Exec("CREATE TYPE typ AS ENUM ('foo', 'bar')")
require.NoError(t, err)
_, err = tenantDB.Exec("CREATE TABLE tab (a INT4, b typ)")
require.NoError(t, err)
_, err = tenantDB.Exec("INSERT INTO tab VALUES (1, 'foo')")
require.NoError(t, err)
_, err = tenantDB.Exec("GRANT SELECT ON tab TO testuser")
require.NoError(t, err)

testUserConn := tenant.SQLConnForUser(t, username.TestUser, "")

Expand Down Expand Up @@ -90,12 +96,18 @@ func TestShowTransferState(t *testing.T) {
defer func() { _ = conn.Close(ctx) }()

// Add a prepared statement to make sure SHOW TRANSFER STATE handles it.
_, err = conn.Prepare(ctx, "prepared_stmt", "SELECT $1::INT4, 'foo'::typ WHERE 1 = 1")
_, err = conn.Prepare(ctx, "prepared_stmt_const", "SELECT $1::INT4, 'foo'::typ WHERE 1 = 1")
require.NoError(t, err)
_, err = conn.Prepare(ctx, "prepared_stmt_aost", "SELECT a, b FROM tab AS OF SYSTEM TIME '-1us'")
require.NoError(t, err)

var intResult int
var enumResult string
err = conn.QueryRow(ctx, "prepared_stmt", 1).Scan(&intResult, &enumResult)
err = conn.QueryRow(ctx, "prepared_stmt_const", 1).Scan(&intResult, &enumResult)
require.NoError(t, err)
require.Equal(t, 1, intResult)
require.Equal(t, "foo", enumResult)
err = conn.QueryRow(ctx, "prepared_stmt_aost").Scan(&intResult, &enumResult)
require.NoError(t, err)
require.Equal(t, 1, intResult)
require.Equal(t, "foo", enumResult)
Expand Down Expand Up @@ -171,14 +183,26 @@ func TestShowTransferState(t *testing.T) {
// session.
result := conn.PgConn().ExecPrepared(
ctx,
"prepared_stmt",
"prepared_stmt_const",
[][]byte{{0, 0, 0, 2}}, // binary representation of 2
[]int16{1}, // paramFormats - 1 means binary
[]int16{1}, // resultFormats - 1 means binary
[]int16{1, 1}, // resultFormats - 1 means binary
).Read()
require.NoError(t, result.Err)
require.Equal(t, [][][]byte{{
{0, 0, 0, 2}, {0x66, 0x6f, 0x6f}, // binary representation of 2, 'foo'
}}, result.Rows)
result = conn.PgConn().ExecPrepared(
ctx,
"prepared_stmt_aost",
[][]byte{}, // paramValues
[]int16{}, // paramFormats
[]int16{1, 1}, // resultFormats - 1 means binary
).Read()
require.NoError(t, result.Err)
require.Equal(t, [][][]byte{{
{0, 0, 0, 1}, {0x66, 0x6f, 0x6f}, // binary representation of 1, 'foo'
}}, result.Rows)
})

// Errors should be displayed as a SQL value.
Expand Down
10 changes: 6 additions & 4 deletions pkg/cmd/roachtest/tests/cdc_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,8 @@ func waitForChangefeed(
) (changefeedInfo, error) {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for attempt := 0; ; attempt++ {
const maxLoadJobAttempts = 5
for loadJobAttempt := 0; ; loadJobAttempt++ {
select {
case <-ticker.C:
case <-ctx.Done():
Expand All @@ -557,9 +558,9 @@ func waitForChangefeed(

info, err := getChangefeedInfo(conn, jobID)
if err != nil {
logger.Errorf("error getting changefeed info: %v (attempt %d)", err, attempt+1)
if attempt > 5 {
return changefeedInfo{}, errors.Wrap(err, "failed 5 attempts to get changefeed info")
logger.Errorf("error getting changefeed info: %v (attempt %d)", err, loadJobAttempt+1)
if loadJobAttempt > 5 {
return changefeedInfo{}, errors.Wrapf(err, "failed %d attempts to get changefeed info", maxLoadJobAttempts)
}
continue
} else if info.errMsg != "" {
Expand All @@ -570,6 +571,7 @@ func waitForChangefeed(
} else if ok {
return *info, nil
}
loadJobAttempt = 0
}
}

Expand Down
39 changes: 24 additions & 15 deletions pkg/kv/kvserver/concurrency/lock_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,7 @@ func (g *lockTableGuardImpl) CheckOptimisticNoConflicts(
ltRange := &lockState{key: startKey, endKey: span.EndKey}
for iter.FirstOverlap(ltRange); iter.Valid(); iter.NextOverlap(ltRange) {
l := iter.Cur()
if !l.isNonConflictingLock(g, g.curStrength()) {
if !l.isNonConflictingLock(g) {
return false
}
}
Expand Down Expand Up @@ -2359,7 +2359,7 @@ func (l *lockState) claimBeforeProceeding(g *lockTableGuardImpl) {
panic("lock table bug: did not find enqueued request")
}

func (l *lockState) isNonConflictingLock(g *lockTableGuardImpl, str lock.Strength) bool {
func (l *lockState) isNonConflictingLock(g *lockTableGuardImpl) bool {
l.mu.Lock()
defer l.mu.Unlock()

Expand All @@ -2368,13 +2368,12 @@ func (l *lockState) isNonConflictingLock(g *lockTableGuardImpl, str lock.Strengt
return true
}
// Lock is not empty.
lockHolderTxn, lockHolderTS := l.getLockHolder()
if lockHolderTxn == nil {
// Transactions that have claimed the lock, but have not acquired it yet,
// are considered non-conflicting.
//
// Optimistic evaluation may call into this function with or without holding
// latches. It's worth considering both these cases separately:
if !l.isHeld() {
// If the lock is neither empty nor held it must be the case that another
// transaction has claimed the lock. Locks that have been claimed, but have
// not been acquired yet, are considered non-conflicting. Optimistic
// evaluation may call into this function with or without holding latches.
// It's worth considering both these cases separately:
//
// 1. If Optimistic evaluation is holding latches, then there cannot be a
// conflicting request that has claimed (but not acquired) the lock that is
Expand All @@ -2397,19 +2396,29 @@ func (l *lockState) isNonConflictingLock(g *lockTableGuardImpl, str lock.Strengt
// claimed the lock will know what happened and what to do about it.
return true
}
lockHolderTxn, _ := l.getLockHolder()
if g.isSameTxn(lockHolderTxn) {
// Already locked by this txn.
// NB: Unlike the pessimistic (normal) evaluation code path, we do not need
// to check the lock's strength if it is already held by this transaction --
// it's non-conflicting. There's two cases to consider:
//
// 1. If the lock is held with the same/higher lock strength on this key
// then this optimistic evaluation attempt already has all the protection it
// needs.
//
// 2. If the lock is held with a weaker lock strength other transactions may
// be able to acquire a lock on this key that conflicts with this optimistic
// evaluation attempt. This is okay, as we'll detect such cases -- however,
// the weaker lock in itself is not conflicting with the optimistic
// evaluation attempt.
return true
}

// NB: We do not look at the txnStatusCache in this optimistic evaluation
// path. A conflict with a finalized txn will be noticed when retrying
// pessimistically.

if str == lock.None && g.ts.Less(lockHolderTS) {
return true
}
// Conflicts.
return false
return !lock.Conflicts(l.getLockMode(), g.curLockMode(), &g.lt.settings.SV) // non-conflicting
}

// Acquires this lock. Any requests that are waiting in the lock's wait queues
Expand Down
122 changes: 122 additions & 0 deletions pkg/kv/kvserver/concurrency/testdata/lock_table/optimistic
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,125 @@ num=2
holder: txn: 00000000-0000-0000-0000-000000000001 epoch: 0, iso: Serializable, ts: 10.000000000,1, info: unrepl [(str: Exclusive seq: 0)]
lock: "g"
holder: txn: 00000000-0000-0000-0000-000000000001 epoch: 0, iso: Serializable, ts: 10.000000000,1, info: unrepl [(str: Exclusive seq: 0)]


# ------------------------------------------------------------------------------
# Test that optimistic evaluation succeeds if the lock is held by our own
# transaction, regardless of lock strengths.
# ------------------------------------------------------------------------------

clear
----
num=0

new-request r=req5 txn=txn1 ts=10,1 spans=shared@a
----

scan r=req5
----
start-waiting: false

should-wait r=req5
----
false

acquire r=req5 k=a durability=u strength=shared
----
num=1
lock: "a"
holder: txn: 00000000-0000-0000-0000-000000000001 epoch: 0, iso: Serializable, ts: 10.000000000,1, info: unrepl [(str: Shared seq: 0)]

# Ensure a optimistic evaluation attempt from the same transaction that covers
# key "a" succeeds -- both with lower and higher lock strengths than the
# strength of the lock already held (shared).

new-request r=req6 txn=txn1 ts=10,1 spans=exclusive@a,c
----

scan-opt r=req6
----
start-waiting: false

should-wait r=req6
----
false

check-opt-no-conflicts r=req6 spans=exclusive@a,c
----
no-conflicts: true

dequeue r=req6
----
num=1
lock: "a"
holder: txn: 00000000-0000-0000-0000-000000000001 epoch: 0, iso: Serializable, ts: 10.000000000,1, info: unrepl [(str: Shared seq: 0)]

new-request r=req7 txn=txn1 ts=10,1 spans=none@a,c
----

scan-opt r=req7
----
start-waiting: false

should-wait r=req7
----
false

check-opt-no-conflicts r=req7 spans=none@a,c
----
no-conflicts: true

dequeue r=req7
----
num=1
lock: "a"
holder: txn: 00000000-0000-0000-0000-000000000001 epoch: 0, iso: Serializable, ts: 10.000000000,1, info: unrepl [(str: Shared seq: 0)]

# ------------------------------------------------------------------------------
# Test that optimistic evaluation works with SHARED locking strength -- if a
# shared lock is held, another transaction should be able to perform optimistic
# evaluation with shared locking strength and not conflict; optimistic evaluation
# should conflict if run with exclusive lock strength.
# ------------------------------------------------------------------------------

new-request r=req8 txn=txn2 ts=10,1 spans=none@a,c
----

scan-opt r=req8
----
start-waiting: false

should-wait r=req8
----
false

check-opt-no-conflicts r=req8 spans=shared@a,c
----
no-conflicts: true

dequeue r=req8
----
num=1
lock: "a"
holder: txn: 00000000-0000-0000-0000-000000000001 epoch: 0, iso: Serializable, ts: 10.000000000,1, info: unrepl [(str: Shared seq: 0)]

new-request r=req9 txn=txn2 ts=10,1 spans=exclusive@a,c
----

scan-opt r=req9
----
start-waiting: false

should-wait r=req9
----
false

check-opt-no-conflicts r=req9 spans=exclusive@a,c
----
no-conflicts: false

dequeue r=req9
----
num=1
lock: "a"
holder: txn: 00000000-0000-0000-0000-000000000001 epoch: 0, iso: Serializable, ts: 10.000000000,1, info: unrepl [(str: Shared seq: 0)]
19 changes: 15 additions & 4 deletions pkg/sql/conn_executor_prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,12 @@ func (ex *connExecutor) prepare(
}

// Use the existing transaction.
if err := prepare(ctx, ex.state.mu.txn); err != nil && origin != PreparedStatementOriginSessionMigration {
return nil, err
if err := prepare(ctx, ex.state.mu.txn); err != nil {
if origin != PreparedStatementOriginSessionMigration {
return nil, err
} else {
log.Warningf(ctx, "could not prepare statement during session migration: %v", err)
}
}

// Account for the memory used by this prepared statement.
Expand Down Expand Up @@ -320,8 +324,15 @@ func (ex *connExecutor) populatePrepared(
return 0, err
}
p.extendedEvalCtx.PrepareOnly = true
if err := ex.handleAOST(ctx, p.stmt.AST); err != nil {
return 0, err
// If the statement is being prepared by a session migration, then we should
// not evaluate the AS OF SYSTEM TIME timestamp. During session migration,
// there is no way for the statement being prepared to be executed in this
// transaction, so there's no need to fix the timestamp, unlike how we must
// for pgwire- or SQL-level prepared statements.
if origin != PreparedStatementOriginSessionMigration {
if err := ex.handleAOST(ctx, p.stmt.AST); err != nil {
return 0, err
}
}

// PREPARE has a limited subset of statements it can be run with. Postgres
Expand Down
14 changes: 14 additions & 0 deletions pkg/sql/testdata/session_migration/prepared_statements
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ wire_prepare s4
INSERT INTO t2 VALUES($1, $2)
----

# Regression test for transferring statements with AOST.
wire_prepare s5
SELECT a, b FROM t2 AS OF SYSTEM TIME '-2us'
----

wire_prepare s_empty
;
----
Expand Down Expand Up @@ -102,6 +107,15 @@ SELECT * FROM t2
----
1 cat

query
SELECT pg_sleep(0.1)
----
true

wire_query s5
----
1 cat

reset
----

Expand Down

0 comments on commit c0fbeb3

Please sign in to comment.