Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: also collect unchanged unique keys for lock #36498

Merged
merged 11 commits into from
Jul 25, 2022
2 changes: 1 addition & 1 deletion executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,7 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error {
if err1 != nil {
return err1
}
keys = txnCtx.CollectUnchangedRowKeys(keys)
keys = txnCtx.CollectUnchangedLockKeys(keys)
if len(keys) == 0 {
return nil
}
Expand Down
16 changes: 15 additions & 1 deletion executor/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,21 @@ func updateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, old
unchangedRowKey := tablecodec.EncodeRowKeyWithHandle(physicalID, h)
txnCtx := sctx.GetSessionVars().TxnCtx
if txnCtx.IsPessimistic {
txnCtx.AddUnchangedRowKey(unchangedRowKey)
txnCtx.AddUnchangedLockKey(unchangedRowKey)
for _, idx := range t.Indices() {
if !idx.Meta().Unique {
continue
}
tangenta marked this conversation as resolved.
Show resolved Hide resolved
ukVals, err := idx.FetchValues(oldData, nil)
if err != nil {
return false, err
}
unchangedUniqueKey, _, err := idx.GenIndexKey(sc, ukVals, h, nil)
if err != nil {
return false, err
}
txnCtx.AddUnchangedLockKey(unchangedUniqueKey)
}
}
return false, nil
}
Expand Down
23 changes: 12 additions & 11 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,9 @@ type TxnCtxNoNeedToRestore struct {
currentShard int64
shardRand *rand.Rand

// unchangedRowKeys is used to store the unchanged rows that needs to lock for pessimistic transaction.
unchangedRowKeys map[string]struct{}
// unchangedLockKeys is used to store the unchanged keys that needs to lock for pessimistic transaction, including
// row keys and unique keys.
unchangedLockKeys map[string]struct{}

PessimisticCacheHit int

Expand Down Expand Up @@ -239,20 +240,20 @@ func (tc *TransactionContext) updateShard() {
tc.currentShard = int64(murmur3.Sum32(buf[:]))
}

// AddUnchangedRowKey adds an unchanged row key in update statement for pessimistic lock.
func (tc *TransactionContext) AddUnchangedRowKey(key []byte) {
if tc.unchangedRowKeys == nil {
tc.unchangedRowKeys = map[string]struct{}{}
// AddUnchangedLockKey adds an unchanged key in update statement for pessimistic lock.
func (tc *TransactionContext) AddUnchangedLockKey(key []byte) {
if tc.unchangedLockKeys == nil {
tc.unchangedLockKeys = map[string]struct{}{}
}
tc.unchangedRowKeys[string(key)] = struct{}{}
tc.unchangedLockKeys[string(key)] = struct{}{}
}

// CollectUnchangedRowKeys collects unchanged row keys for pessimistic lock.
func (tc *TransactionContext) CollectUnchangedRowKeys(buf []kv.Key) []kv.Key {
for key := range tc.unchangedRowKeys {
// CollectUnchangedLockKeys collects unchanged keys for pessimistic lock.
func (tc *TransactionContext) CollectUnchangedLockKeys(buf []kv.Key) []kv.Key {
for key := range tc.unchangedLockKeys {
buf = append(buf, kv.Key(key))
}
tc.unchangedRowKeys = nil
tc.unchangedLockKeys = nil
return buf
}

Expand Down
32 changes: 32 additions & 0 deletions tests/realtikvtest/pessimistictest/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,38 @@ func TestLockUnchangedRowKey(t *testing.T) {
tk2.MustExec("rollback")
}

func TestLockUnchangedUniqueKey(t *testing.T) {
store, clean := realtikvtest.CreateMockStoreAndSetup(t)
defer clean()

tk := testkit.NewTestKit(t, store)
tk2 := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk2.MustExec("use test")

// ref https://github.com/pingcap/tidb/issues/36438
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (i varchar(10), unique key(i))")
tk.MustExec("insert into t values ('a')")
tk.MustExec("begin pessimistic")
tk.MustExec("update t set i = 'a'")

errCh := make(chan error, 1)
go func() {
_, err := tk2.Exec("insert into t values ('a')")
errCh <- err
}()

select {
case <-errCh:
require.Fail(t, "insert is not blocked by update")
case <-time.After(500 * time.Millisecond):
tk.MustExec("rollback")
}

require.Error(t, <-errCh)
}

func TestOptimisticConflicts(t *testing.T) {
store, clean := realtikvtest.CreateMockStoreAndSetup(t)
defer clean()
Expand Down