Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/tikv: remove use of EnableAsyncCommit option in store/tikv #24462

Merged
merged 5 commits into from
May 12, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions store/driver/txn/txn_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ func (txn *tikvTxn) SetOption(opt int, val interface{}) {
txn.SetSchemaVer(val.(tikv.SchemaVer))
case tikvstore.CommitHook:
txn.SetCommitCallback(val.(func(string, error)))
case tikvstore.EnableAsyncCommit:
txn.SetEnableAsyncCommit(val.(bool))
case tikvstore.Enable1PC:
txn.SetEnable1PC(val.(bool))
case tikvstore.TxnScope:
Expand Down
4 changes: 1 addition & 3 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,12 +825,10 @@ func (c *twoPhaseCommitter) checkAsyncCommit() bool {
return false
}

enableAsyncCommitOption := c.txn.us.GetOption(kv.EnableAsyncCommit)
enableAsyncCommit := enableAsyncCommitOption != nil && enableAsyncCommitOption.(bool)
asyncCommitCfg := config.GetGlobalConfig().TiKVClient.AsyncCommit
// TODO the keys limit need more tests, this value makes the unit test pass by now.
// Async commit is not compatible with Binlog because of the non unique timestamp issue.
if c.sessionID > 0 && enableAsyncCommit &&
if c.sessionID > 0 && c.txn.enableAsyncCommit &&
uint(c.mutations.Len()) <= asyncCommitCfg.KeysLimit &&
!c.shouldWriteBinlog() {
totalKeySize := uint64(0)
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/tests/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (s *testCommitterSuite) begin(c *C) tikv.TxnProbe {
func (s *testCommitterSuite) beginAsyncCommit(c *C) tikv.TxnProbe {
txn, err := s.store.Begin()
c.Assert(err, IsNil)
txn.SetOption(kv.EnableAsyncCommit, true)
txn.SetEnableAsyncCommit(true)
return txn
}

Expand Down
4 changes: 2 additions & 2 deletions store/tikv/tests/async_commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (s *testAsyncCommitCommon) beginAsyncCommitWithLinearizability(c *C) tikv.T
func (s *testAsyncCommitCommon) beginAsyncCommit(c *C) tikv.TxnProbe {
txn, err := s.store.Begin()
c.Assert(err, IsNil)
txn.SetOption(kv.EnableAsyncCommit, true)
txn.SetEnableAsyncCommit(true)
return tikv.TxnProbe{KVTxn: txn}
}

Expand All @@ -160,7 +160,7 @@ func (s *testAsyncCommitSuite) SetUpTest(c *C) {
func (s *testAsyncCommitSuite) lockKeysWithAsyncCommit(c *C, keys, values [][]byte, primaryKey, primaryValue []byte, commitPrimary bool) (uint64, uint64) {
txn, err := s.store.Begin()
c.Assert(err, IsNil)
txn.SetOption(kv.EnableAsyncCommit, true)
txn.SetEnableAsyncCommit(true)
for i, k := range keys {
if len(values[i]) > 0 {
err = txn.Set(k, values[i])
Expand Down
5 changes: 3 additions & 2 deletions store/tikv/tests/snapshot_fail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ func (s *testSnapshotFailSuite) TestRetryMaxTsPointGetSkipLock(c *C) {
err = txn.Set([]byte("k2"), []byte("v2"))
c.Assert(err, IsNil)
txn.SetOption(kv.EnableAsyncCommit, true)
txn.SetEnableAsyncCommit(true)

c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/asyncCommitDoNothing", "return"), IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/twoPCShortLockTTL", "return"), IsNil)
Expand Down Expand Up @@ -181,7 +182,7 @@ func (s *testSnapshotFailSuite) TestRetryMaxTsPointGetSkipLock(c *C) {
// Prewrite k1 and k2 again without committing them
txn, err = s.store.Begin()
c.Assert(err, IsNil)
txn.SetOption(kv.EnableAsyncCommit, true)
txn.SetEnableAsyncCommit(true)
err = txn.Set([]byte("k1"), []byte("v3"))
c.Assert(err, IsNil)
err = txn.Set([]byte("k2"), []byte("v4"))
Expand Down Expand Up @@ -210,7 +211,7 @@ func (s *testSnapshotFailSuite) TestRetryPointGetResolveTS(c *C) {
c.Assert(txn.Set([]byte("k1"), []byte("v1")), IsNil)
err = txn.Set([]byte("k2"), []byte("v2"))
c.Assert(err, IsNil)
txn.SetOption(kv.EnableAsyncCommit, false)
txn.SetEnableAsyncCommit(false)
txn.SetEnable1PC(false)
txn.SetOption(kv.GuaranteeLinearizability, false)

Expand Down
6 changes: 6 additions & 0 deletions store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ type KVTxn struct {
syncLog bool
priority Priority
isPessimistic bool
enableAsyncCommit bool
enable1PC bool
scope string
kvFilter KVFilter
Expand Down Expand Up @@ -231,6 +232,11 @@ func (txn *KVTxn) SetCommitCallback(f func(string, error)) {
txn.commitCallback = f
}

// SetEnableAsyncCommit indicates if the transaction will try to use async commit.
func (txn *KVTxn) SetEnableAsyncCommit(b bool) {
txn.enableAsyncCommit = b
}

// SetEnable1PC indicates if the transaction will try to use 1 phase commit.
func (txn *KVTxn) SetEnable1PC(b bool) {
txn.enable1PC = b
Expand Down