From c34fa70dd2e04c7b945aede2cfecf65a79f32f8d Mon Sep 17 00:00:00 2001 From: disksing Date: Wed, 5 May 2021 21:27:10 +0800 Subject: [PATCH] store/tikv: remove use of EnableAsyncCommit option in store/tikv Signed-off-by: disksing --- store/driver/txn/txn_driver.go | 2 ++ store/tikv/2pc.go | 4 +--- store/tikv/tests/2pc_test.go | 2 +- store/tikv/tests/async_commit_test.go | 4 ++-- store/tikv/tests/snapshot_fail_test.go | 5 +++-- store/tikv/txn.go | 6 ++++++ 6 files changed, 15 insertions(+), 8 deletions(-) diff --git a/store/driver/txn/txn_driver.go b/store/driver/txn/txn_driver.go index 3b5435d596c77..aa6ce98a80186 100644 --- a/store/driver/txn/txn_driver.go +++ b/store/driver/txn/txn_driver.go @@ -148,6 +148,8 @@ func (txn *tikvTxn) SetOption(opt int, val interface{}) { txn.SetSchemaVer(val.(tikv.SchemaVer)) case tikvstore.CommitHook: txn.SetCommitCallback(val.(func(string, error))) + case tikvstore.EnableAsyncCommit: + txn.SetEnableAsyncCommit(val.(bool)) case tikvstore.Enable1PC: txn.SetEnable1PC(val.(bool)) case tikvstore.TxnScope: diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 8703b1861c65d..ee94eceec166a 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -825,12 +825,10 @@ func (c *twoPhaseCommitter) checkAsyncCommit() bool { return false } - enableAsyncCommitOption := c.txn.us.GetOption(kv.EnableAsyncCommit) - enableAsyncCommit := enableAsyncCommitOption != nil && enableAsyncCommitOption.(bool) asyncCommitCfg := config.GetGlobalConfig().TiKVClient.AsyncCommit // TODO the keys limit need more tests, this value makes the unit test pass by now. // Async commit is not compatible with Binlog because of the non unique timestamp issue. - if c.sessionID > 0 && enableAsyncCommit && + if c.sessionID > 0 && c.txn.enableAsyncCommit && uint(c.mutations.Len()) <= asyncCommitCfg.KeysLimit && !c.shouldWriteBinlog() { totalKeySize := uint64(0) diff --git a/store/tikv/tests/2pc_test.go b/store/tikv/tests/2pc_test.go index 6d7d7e89d1a8e..b095e2f5758b9 100644 --- a/store/tikv/tests/2pc_test.go +++ b/store/tikv/tests/2pc_test.go @@ -104,7 +104,7 @@ func (s *testCommitterSuite) begin(c *C) tikv.TxnProbe { func (s *testCommitterSuite) beginAsyncCommit(c *C) tikv.TxnProbe { txn, err := s.store.Begin() c.Assert(err, IsNil) - txn.SetOption(kv.EnableAsyncCommit, true) + txn.SetEnableAsyncCommit(true) return txn } diff --git a/store/tikv/tests/async_commit_test.go b/store/tikv/tests/async_commit_test.go index 0f4985fa7ab86..381771bfa0836 100644 --- a/store/tikv/tests/async_commit_test.go +++ b/store/tikv/tests/async_commit_test.go @@ -134,7 +134,7 @@ func (s *testAsyncCommitCommon) beginAsyncCommitWithLinearizability(c *C) tikv.T func (s *testAsyncCommitCommon) beginAsyncCommit(c *C) tikv.TxnProbe { txn, err := s.store.Begin() c.Assert(err, IsNil) - txn.SetOption(kv.EnableAsyncCommit, true) + txn.SetEnableAsyncCommit(true) return tikv.TxnProbe{KVTxn: txn} } @@ -160,7 +160,7 @@ func (s *testAsyncCommitSuite) SetUpTest(c *C) { func (s *testAsyncCommitSuite) lockKeysWithAsyncCommit(c *C, keys, values [][]byte, primaryKey, primaryValue []byte, commitPrimary bool) (uint64, uint64) { txn, err := s.store.Begin() c.Assert(err, IsNil) - txn.SetOption(kv.EnableAsyncCommit, true) + txn.SetEnableAsyncCommit(true) for i, k := range keys { if len(values[i]) > 0 { err = txn.Set(k, values[i]) diff --git a/store/tikv/tests/snapshot_fail_test.go b/store/tikv/tests/snapshot_fail_test.go index 1360841bd743a..aca3c59099cf7 100644 --- a/store/tikv/tests/snapshot_fail_test.go +++ b/store/tikv/tests/snapshot_fail_test.go @@ -152,6 +152,7 @@ func (s *testSnapshotFailSuite) TestRetryMaxTsPointGetSkipLock(c *C) { err = txn.Set([]byte("k2"), []byte("v2")) c.Assert(err, IsNil) txn.SetOption(kv.EnableAsyncCommit, true) + txn.SetEnableAsyncCommit(true) c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/asyncCommitDoNothing", "return"), IsNil) c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/twoPCShortLockTTL", "return"), IsNil) @@ -181,7 +182,7 @@ func (s *testSnapshotFailSuite) TestRetryMaxTsPointGetSkipLock(c *C) { // Prewrite k1 and k2 again without committing them txn, err = s.store.Begin() c.Assert(err, IsNil) - txn.SetOption(kv.EnableAsyncCommit, true) + txn.SetEnableAsyncCommit(true) err = txn.Set([]byte("k1"), []byte("v3")) c.Assert(err, IsNil) err = txn.Set([]byte("k2"), []byte("v4")) @@ -210,7 +211,7 @@ func (s *testSnapshotFailSuite) TestRetryPointGetResolveTS(c *C) { c.Assert(txn.Set([]byte("k1"), []byte("v1")), IsNil) err = txn.Set([]byte("k2"), []byte("v2")) c.Assert(err, IsNil) - txn.SetOption(kv.EnableAsyncCommit, false) + txn.SetEnableAsyncCommit(false) txn.SetEnable1PC(false) txn.SetOption(kv.GuaranteeLinearizability, false) diff --git a/store/tikv/txn.go b/store/tikv/txn.go index 4e462653c415c..a3e5c217025ae 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -78,6 +78,7 @@ type KVTxn struct { syncLog bool priority Priority isPessimistic bool + enableAsyncCommit bool enable1PC bool scope string kvFilter KVFilter @@ -231,6 +232,11 @@ func (txn *KVTxn) SetCommitCallback(f func(string, error)) { txn.commitCallback = f } +// SetEnableAsyncCommit indicates if the transaction will try to use async commit. +func (txn *KVTxn) SetEnableAsyncCommit(b bool) { + txn.enableAsyncCommit = b +} + // SetEnable1PC indicates if the transaction will try to use 1 phase commit. func (txn *KVTxn) SetEnable1PC(b bool) { txn.enable1PC = b