diff --git a/store/driver/txn/txn_driver.go b/store/driver/txn/txn_driver.go index 24e6f11c8fef9..739b983fd3d99 100644 --- a/store/driver/txn/txn_driver.go +++ b/store/driver/txn/txn_driver.go @@ -152,6 +152,8 @@ func (txn *tikvTxn) SetOption(opt int, val interface{}) { txn.SetSchemaAmender(val.(tikv.SchemaAmender)) case tikvstore.CommitHook: txn.SetCommitCallback(val.(func(string, error))) + case tikvstore.EnableAsyncCommit: + txn.SetEnableAsyncCommit(val.(bool)) case tikvstore.Enable1PC: txn.SetEnable1PC(val.(bool)) case tikvstore.TxnScope: diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 8703b1861c65d..ee94eceec166a 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -825,12 +825,10 @@ func (c *twoPhaseCommitter) checkAsyncCommit() bool { return false } - enableAsyncCommitOption := c.txn.us.GetOption(kv.EnableAsyncCommit) - enableAsyncCommit := enableAsyncCommitOption != nil && enableAsyncCommitOption.(bool) asyncCommitCfg := config.GetGlobalConfig().TiKVClient.AsyncCommit // TODO the keys limit need more tests, this value makes the unit test pass by now. // Async commit is not compatible with Binlog because of the non unique timestamp issue. - if c.sessionID > 0 && enableAsyncCommit && + if c.sessionID > 0 && c.txn.enableAsyncCommit && uint(c.mutations.Len()) <= asyncCommitCfg.KeysLimit && !c.shouldWriteBinlog() { totalKeySize := uint64(0) diff --git a/store/tikv/tests/2pc_test.go b/store/tikv/tests/2pc_test.go index d1e635f205efa..5589752043b2b 100644 --- a/store/tikv/tests/2pc_test.go +++ b/store/tikv/tests/2pc_test.go @@ -105,7 +105,7 @@ func (s *testCommitterSuite) begin(c *C) tikv.TxnProbe { func (s *testCommitterSuite) beginAsyncCommit(c *C) tikv.TxnProbe { txn, err := s.store.Begin() c.Assert(err, IsNil) - txn.SetOption(kv.EnableAsyncCommit, true) + txn.SetEnableAsyncCommit(true) return txn } diff --git a/store/tikv/tests/async_commit_test.go b/store/tikv/tests/async_commit_test.go index 0f4985fa7ab86..381771bfa0836 100644 --- a/store/tikv/tests/async_commit_test.go +++ b/store/tikv/tests/async_commit_test.go @@ -134,7 +134,7 @@ func (s *testAsyncCommitCommon) beginAsyncCommitWithLinearizability(c *C) tikv.T func (s *testAsyncCommitCommon) beginAsyncCommit(c *C) tikv.TxnProbe { txn, err := s.store.Begin() c.Assert(err, IsNil) - txn.SetOption(kv.EnableAsyncCommit, true) + txn.SetEnableAsyncCommit(true) return tikv.TxnProbe{KVTxn: txn} } @@ -160,7 +160,7 @@ func (s *testAsyncCommitSuite) SetUpTest(c *C) { func (s *testAsyncCommitSuite) lockKeysWithAsyncCommit(c *C, keys, values [][]byte, primaryKey, primaryValue []byte, commitPrimary bool) (uint64, uint64) { txn, err := s.store.Begin() c.Assert(err, IsNil) - txn.SetOption(kv.EnableAsyncCommit, true) + txn.SetEnableAsyncCommit(true) for i, k := range keys { if len(values[i]) > 0 { err = txn.Set(k, values[i]) diff --git a/store/tikv/tests/snapshot_fail_test.go b/store/tikv/tests/snapshot_fail_test.go index 1360841bd743a..aca3c59099cf7 100644 --- a/store/tikv/tests/snapshot_fail_test.go +++ b/store/tikv/tests/snapshot_fail_test.go @@ -152,6 +152,7 @@ func (s *testSnapshotFailSuite) TestRetryMaxTsPointGetSkipLock(c *C) { err = txn.Set([]byte("k2"), []byte("v2")) c.Assert(err, IsNil) txn.SetOption(kv.EnableAsyncCommit, true) + txn.SetEnableAsyncCommit(true) c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/asyncCommitDoNothing", "return"), IsNil) c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/twoPCShortLockTTL", "return"), IsNil) @@ -181,7 +182,7 @@ func (s *testSnapshotFailSuite) TestRetryMaxTsPointGetSkipLock(c *C) { // Prewrite k1 and k2 again without committing them txn, err = s.store.Begin() c.Assert(err, IsNil) - txn.SetOption(kv.EnableAsyncCommit, true) + txn.SetEnableAsyncCommit(true) err = txn.Set([]byte("k1"), []byte("v3")) c.Assert(err, IsNil) err = txn.Set([]byte("k2"), []byte("v4")) @@ -210,7 +211,7 @@ func (s *testSnapshotFailSuite) TestRetryPointGetResolveTS(c *C) { c.Assert(txn.Set([]byte("k1"), []byte("v1")), IsNil) err = txn.Set([]byte("k2"), []byte("v2")) c.Assert(err, IsNil) - txn.SetOption(kv.EnableAsyncCommit, false) + txn.SetEnableAsyncCommit(false) txn.SetEnable1PC(false) txn.SetOption(kv.GuaranteeLinearizability, false) diff --git a/store/tikv/txn.go b/store/tikv/txn.go index 47ccdce12caea..a8c0f70f8da8d 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -82,6 +82,7 @@ type KVTxn struct { syncLog bool priority Priority isPessimistic bool + enableAsyncCommit bool enable1PC bool scope string kvFilter KVFilter @@ -272,6 +273,11 @@ func (txn *KVTxn) SetCommitCallback(f func(string, error)) { txn.commitCallback = f } +// SetEnableAsyncCommit indicates if the transaction will try to use async commit. +func (txn *KVTxn) SetEnableAsyncCommit(b bool) { + txn.enableAsyncCommit = b +} + // SetEnable1PC indicates if the transaction will try to use 1 phase commit. func (txn *KVTxn) SetEnable1PC(b bool) { txn.enable1PC = b