Skip to content

Commit

Permalink
store/tikv: remove use of EnableAsyncCommit option in store/tikv (#24462
Browse files Browse the repository at this point in the history
)
  • Loading branch information
disksing authored May 12, 2021
1 parent 081291b commit 1ae648b
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 8 deletions.
2 changes: 2 additions & 0 deletions store/driver/txn/txn_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ func (txn *tikvTxn) SetOption(opt int, val interface{}) {
txn.SetSchemaAmender(val.(tikv.SchemaAmender))
case tikvstore.CommitHook:
txn.SetCommitCallback(val.(func(string, error)))
case tikvstore.EnableAsyncCommit:
txn.SetEnableAsyncCommit(val.(bool))
case tikvstore.Enable1PC:
txn.SetEnable1PC(val.(bool))
case tikvstore.TxnScope:
Expand Down
4 changes: 1 addition & 3 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,12 +825,10 @@ func (c *twoPhaseCommitter) checkAsyncCommit() bool {
return false
}

enableAsyncCommitOption := c.txn.us.GetOption(kv.EnableAsyncCommit)
enableAsyncCommit := enableAsyncCommitOption != nil && enableAsyncCommitOption.(bool)
asyncCommitCfg := config.GetGlobalConfig().TiKVClient.AsyncCommit
// TODO the keys limit need more tests, this value makes the unit test pass by now.
// Async commit is not compatible with Binlog because of the non unique timestamp issue.
if c.sessionID > 0 && enableAsyncCommit &&
if c.sessionID > 0 && c.txn.enableAsyncCommit &&
uint(c.mutations.Len()) <= asyncCommitCfg.KeysLimit &&
!c.shouldWriteBinlog() {
totalKeySize := uint64(0)
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/tests/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (s *testCommitterSuite) begin(c *C) tikv.TxnProbe {
func (s *testCommitterSuite) beginAsyncCommit(c *C) tikv.TxnProbe {
txn, err := s.store.Begin()
c.Assert(err, IsNil)
txn.SetOption(kv.EnableAsyncCommit, true)
txn.SetEnableAsyncCommit(true)
return txn
}

Expand Down
4 changes: 2 additions & 2 deletions store/tikv/tests/async_commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (s *testAsyncCommitCommon) beginAsyncCommitWithLinearizability(c *C) tikv.T
func (s *testAsyncCommitCommon) beginAsyncCommit(c *C) tikv.TxnProbe {
txn, err := s.store.Begin()
c.Assert(err, IsNil)
txn.SetOption(kv.EnableAsyncCommit, true)
txn.SetEnableAsyncCommit(true)
return tikv.TxnProbe{KVTxn: txn}
}

Expand All @@ -160,7 +160,7 @@ func (s *testAsyncCommitSuite) SetUpTest(c *C) {
func (s *testAsyncCommitSuite) lockKeysWithAsyncCommit(c *C, keys, values [][]byte, primaryKey, primaryValue []byte, commitPrimary bool) (uint64, uint64) {
txn, err := s.store.Begin()
c.Assert(err, IsNil)
txn.SetOption(kv.EnableAsyncCommit, true)
txn.SetEnableAsyncCommit(true)
for i, k := range keys {
if len(values[i]) > 0 {
err = txn.Set(k, values[i])
Expand Down
5 changes: 3 additions & 2 deletions store/tikv/tests/snapshot_fail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ func (s *testSnapshotFailSuite) TestRetryMaxTsPointGetSkipLock(c *C) {
err = txn.Set([]byte("k2"), []byte("v2"))
c.Assert(err, IsNil)
txn.SetOption(kv.EnableAsyncCommit, true)
txn.SetEnableAsyncCommit(true)

c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/asyncCommitDoNothing", "return"), IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/twoPCShortLockTTL", "return"), IsNil)
Expand Down Expand Up @@ -181,7 +182,7 @@ func (s *testSnapshotFailSuite) TestRetryMaxTsPointGetSkipLock(c *C) {
// Prewrite k1 and k2 again without committing them
txn, err = s.store.Begin()
c.Assert(err, IsNil)
txn.SetOption(kv.EnableAsyncCommit, true)
txn.SetEnableAsyncCommit(true)
err = txn.Set([]byte("k1"), []byte("v3"))
c.Assert(err, IsNil)
err = txn.Set([]byte("k2"), []byte("v4"))
Expand Down Expand Up @@ -210,7 +211,7 @@ func (s *testSnapshotFailSuite) TestRetryPointGetResolveTS(c *C) {
c.Assert(txn.Set([]byte("k1"), []byte("v1")), IsNil)
err = txn.Set([]byte("k2"), []byte("v2"))
c.Assert(err, IsNil)
txn.SetOption(kv.EnableAsyncCommit, false)
txn.SetEnableAsyncCommit(false)
txn.SetEnable1PC(false)
txn.SetOption(kv.GuaranteeLinearizability, false)

Expand Down
6 changes: 6 additions & 0 deletions store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ type KVTxn struct {
syncLog bool
priority Priority
isPessimistic bool
enableAsyncCommit bool
enable1PC bool
scope string
kvFilter KVFilter
Expand Down Expand Up @@ -272,6 +273,11 @@ func (txn *KVTxn) SetCommitCallback(f func(string, error)) {
txn.commitCallback = f
}

// SetEnableAsyncCommit indicates if the transaction will try to use async commit.
func (txn *KVTxn) SetEnableAsyncCommit(b bool) {
txn.enableAsyncCommit = b
}

// SetEnable1PC indicates if the transaction will try to use 1 phase commit.
func (txn *KVTxn) SetEnable1PC(b bool) {
txn.enable1PC = b
Expand Down

0 comments on commit 1ae648b

Please sign in to comment.