Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/tikv:make KVTxn in tikv public #23019

Merged
merged 11 commits into from
Mar 1, 2021
33 changes: 33 additions & 0 deletions store/driver/tikv_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/copr"
txn_driver "github.com/pingcap/tidb/store/driver/txn"
"github.com/pingcap/tidb/store/gcworker"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/config"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/logutil"
pd "github.com/tikv/pd/client"
Expand Down Expand Up @@ -288,3 +290,34 @@ func (s *tikvStore) Close() error {
func (s *tikvStore) GetMemCache() kv.MemManager {
return s.memCache
}

// Begin a global transaction.
func (s *tikvStore) Begin() (kv.Transaction, error) {
return s.BeginWithTxnScope(oracle.GlobalTxnScope)
}

func (s *tikvStore) BeginWithTxnScope(txnScope string) (kv.Transaction, error) {
txn, err := s.KVStore.BeginWithTxnScope(txnScope)
if err != nil {
return txn, errors.Trace(err)
}
return txn_driver.NewTiKVTxn(txn), err
}

// BeginWithStartTS begins a transaction with startTS.
func (s *tikvStore) BeginWithStartTS(txnScope string, startTS uint64) (kv.Transaction, error) {
txn, err := s.KVStore.BeginWithStartTS(txnScope, startTS)
if err != nil {
return txn, errors.Trace(err)
}
return txn_driver.NewTiKVTxn(txn), err
}

// BeginWithExactStaleness begins transaction with given staleness
func (s *tikvStore) BeginWithExactStaleness(txnScope string, prevSec uint64) (kv.Transaction, error) {
txn, err := s.KVStore.BeginWithExactStaleness(txnScope, prevSec)
if err != nil {
return txn, errors.Trace(err)
}
return txn_driver.NewTiKVTxn(txn), err
}
28 changes: 28 additions & 0 deletions store/driver/txn/txn_driver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package txn
AndreMouche marked this conversation as resolved.
Show resolved Hide resolved

import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv"
)

type tikvTxn struct {
*tikv.KVTxn
}

// NewTiKVTxn returns a new Transaction.
func NewTiKVTxn(txn *tikv.KVTxn) kv.Transaction {
return &tikvTxn{txn}
}
31 changes: 31 additions & 0 deletions store/mockstore/unistore.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/copr"
driver "github.com/pingcap/tidb/store/driver/txn"
"github.com/pingcap/tidb/store/mockstore/unistore"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/config"
Expand Down Expand Up @@ -88,6 +89,36 @@ func (s *mockStorage) Describe() string {
return ""
}

// Begin a global transaction.
func (s *mockStorage) Begin() (kv.Transaction, error) {
txn, err := s.KVStore.Begin()
return newTiKVTxn(txn, err)
}

func (s *mockStorage) BeginWithTxnScope(txnScope string) (kv.Transaction, error) {
txn, err := s.KVStore.BeginWithTxnScope(txnScope)
return newTiKVTxn(txn, err)
}

// BeginWithStartTS begins a transaction with startTS.
func (s *mockStorage) BeginWithStartTS(txnScope string, startTS uint64) (kv.Transaction, error) {
txn, err := s.KVStore.BeginWithStartTS(txnScope, startTS)
return newTiKVTxn(txn, err)
}

// BeginWithExactStaleness begins transaction with given staleness
func (s *mockStorage) BeginWithExactStaleness(txnScope string, prevSec uint64) (kv.Transaction, error) {
txn, err := s.KVStore.BeginWithExactStaleness(txnScope, prevSec)
return newTiKVTxn(txn, err)
}

func newTiKVTxn(txn *tikv.KVTxn, err error) (kv.Transaction, error) {
if err != nil {
return nil, err
}
return driver.NewTiKVTxn(txn), nil
}

func (s *mockStorage) Close() error {
s.Store.Close()
return s.KVStore.Close()
Expand Down
8 changes: 4 additions & 4 deletions store/tikv/1pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ import (
"github.com/pingcap/tidb/store/tikv/util"
)

func (s *testAsyncCommitCommon) begin1PC(c *C) *tikvTxn {
func (s *testAsyncCommitCommon) begin1PC(c *C) *KVTxn {
txn, err := s.store.Begin()
c.Assert(err, IsNil)
txn.SetOption(kv.Enable1PC, true)
return txn.(*tikvTxn)
return txn
}

type testOnePCSuite struct {
Expand Down Expand Up @@ -241,8 +241,8 @@ func (s *testOnePCSuite) Test1PCLinearizability(c *C) {
c.Assert(err, IsNil)
err = t1.Commit(ctx)
c.Assert(err, IsNil)
commitTS1 := t1.(*tikvTxn).committer.commitTS
commitTS2 := t2.(*tikvTxn).committer.commitTS
commitTS1 := t1.committer.commitTS
commitTS2 := t2.committer.commitTS
c.Assert(commitTS2, Less, commitTS1)
}

Expand Down
8 changes: 4 additions & 4 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ var (
// twoPhaseCommitter executes a two-phase commit protocol.
type twoPhaseCommitter struct {
store *KVStore
txn *tikvTxn
txn *KVTxn
startTS uint64
mutations *memBufferMutations
lockTTL uint64
Expand Down Expand Up @@ -294,7 +294,7 @@ func (c *PlainMutations) AppendMutation(mutation PlainMutation) {
}

// newTwoPhaseCommitter creates a twoPhaseCommitter.
func newTwoPhaseCommitter(txn *tikvTxn, sessionID uint64) (*twoPhaseCommitter, error) {
func newTwoPhaseCommitter(txn *KVTxn, sessionID uint64) (*twoPhaseCommitter, error) {
return &twoPhaseCommitter{
store: txn.store,
txn: txn,
Expand Down Expand Up @@ -1761,14 +1761,14 @@ func (batchExe *batchExecutor) process(batches []batchMutations) error {
return err
}

func getTxnPriority(txn *tikvTxn) pb.CommandPri {
func getTxnPriority(txn *KVTxn) pb.CommandPri {
if pri := txn.us.GetOption(kv.Priority); pri != nil {
return PriorityToPB(pri.(int))
}
return pb.CommandPri_Normal
}

func getTxnSyncLog(txn *tikvTxn) bool {
func getTxnSyncLog(txn *KVTxn) bool {
if syncOption := txn.us.GetOption(kv.SyncLog); syncOption != nil {
return syncOption.(bool)
}
Expand Down
10 changes: 5 additions & 5 deletions store/tikv/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,17 +88,17 @@ func (s *testCommitterSuite) TearDownSuite(c *C) {
s.OneByOneSuite.TearDownSuite(c)
}

func (s *testCommitterSuite) begin(c *C) *tikvTxn {
func (s *testCommitterSuite) begin(c *C) *KVTxn {
txn, err := s.store.Begin()
c.Assert(err, IsNil)
return txn.(*tikvTxn)
return txn
}

func (s *testCommitterSuite) beginAsyncCommit(c *C) *tikvTxn {
func (s *testCommitterSuite) beginAsyncCommit(c *C) *KVTxn {
txn, err := s.store.Begin()
c.Assert(err, IsNil)
txn.SetOption(kv.EnableAsyncCommit, true)
return txn.(*tikvTxn)
return txn
}

func (s *testCommitterSuite) checkValues(c *C, m map[string]string) {
Expand Down Expand Up @@ -423,7 +423,7 @@ func errMsgMustContain(c *C, err error, msg string) {
c.Assert(strings.Contains(err.Error(), msg), IsTrue)
}

func newTwoPhaseCommitterWithInit(txn *tikvTxn, sessionID uint64) (*twoPhaseCommitter, error) {
func newTwoPhaseCommitterWithInit(txn *KVTxn, sessionID uint64) (*twoPhaseCommitter, error) {
c, err := newTwoPhaseCommitter(txn, sessionID)
if err != nil {
return nil, errors.Trace(err)
Expand Down
10 changes: 5 additions & 5 deletions store/tikv/async_commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,23 +115,23 @@ func (s *testAsyncCommitCommon) mustGetNoneFromSnapshot(c *C, version uint64, ke
c.Assert(errors.Cause(err), Equals, kv.ErrNotExist)
}

func (s *testAsyncCommitCommon) beginAsyncCommitWithLinearizability(c *C) *tikvTxn {
func (s *testAsyncCommitCommon) beginAsyncCommitWithLinearizability(c *C) *KVTxn {
txn := s.beginAsyncCommit(c)
txn.SetOption(kv.GuaranteeLinearizability, true)
return txn
}

func (s *testAsyncCommitCommon) beginAsyncCommit(c *C) *tikvTxn {
func (s *testAsyncCommitCommon) beginAsyncCommit(c *C) *KVTxn {
txn, err := s.store.Begin()
c.Assert(err, IsNil)
txn.SetOption(kv.EnableAsyncCommit, true)
return txn.(*tikvTxn)
return txn
}

func (s *testAsyncCommitCommon) begin(c *C) *tikvTxn {
func (s *testAsyncCommitCommon) begin(c *C) *KVTxn {
txn, err := s.store.Begin()
c.Assert(err, IsNil)
return txn.(*tikvTxn)
return txn
}

type testAsyncCommitSuite struct {
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/binlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type BinlogWriteResult interface {
}

type binlogExecutor struct {
txn *tikvTxn
txn *KVTxn
}

func (e *binlogExecutor) Skip() {
Expand Down
8 changes: 0 additions & 8 deletions store/tikv/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,6 @@ type Storage interface {
// Closed returns the closed channel.
Closed() <-chan struct{}

// Begin a global transaction
Begin() (kv.Transaction, error)
// Begin a transaction with the given txnScope (local or global)
BeginWithTxnScope(txnScope string) (kv.Transaction, error)
// BeginWithStartTS begins transaction with given txnScope and startTS.
BeginWithStartTS(txnScope string, startTS uint64) (kv.Transaction, error)
// BeginWithStalenessTS begins transaction with given staleness
BeginWithExactStaleness(txnScope string, prevSec uint64) (kv.Transaction, error)
// GetSnapshot gets a snapshot that is able to read any data which data is <= ver.
// if ver is MaxVersion or > current max committed version, we will use current version for this snapshot.
GetSnapshot(ver kv.Version) kv.Snapshot
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/isolation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (s *testIsolationSuite) SetWithRetry(c *C, k, v []byte) writeRecord {
if err == nil {
return writeRecord{
startTS: txn.StartTS(),
commitTS: txn.(*tikvTxn).commitTS,
commitTS: txn.commitTS,
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions store/tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,13 +166,13 @@ func (s *KVStore) runSafePointChecker() {
}

// Begin a global transaction.
func (s *KVStore) Begin() (kv.Transaction, error) {
func (s *KVStore) Begin() (*KVTxn, error) {
return s.BeginWithTxnScope(oracle.GlobalTxnScope)
}

// BeginWithTxnScope begins a transaction with the given txnScope (local or
// global)
func (s *KVStore) BeginWithTxnScope(txnScope string) (kv.Transaction, error) {
func (s *KVStore) BeginWithTxnScope(txnScope string) (*KVTxn, error) {
txn, err := newTiKVTxn(s, txnScope)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -181,7 +181,7 @@ func (s *KVStore) BeginWithTxnScope(txnScope string) (kv.Transaction, error) {
}

// BeginWithStartTS begins a transaction with startTS.
func (s *KVStore) BeginWithStartTS(txnScope string, startTS uint64) (kv.Transaction, error) {
func (s *KVStore) BeginWithStartTS(txnScope string, startTS uint64) (*KVTxn, error) {
txn, err := newTiKVTxnWithStartTS(s, txnScope, startTS, s.nextReplicaReadSeed())
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -190,7 +190,7 @@ func (s *KVStore) BeginWithStartTS(txnScope string, startTS uint64) (kv.Transact
}

// BeginWithExactStaleness begins transaction with given staleness
func (s *KVStore) BeginWithExactStaleness(txnScope string, prevSec uint64) (kv.Transaction, error) {
func (s *KVStore) BeginWithExactStaleness(txnScope string, prevSec uint64) (*KVTxn, error) {
txn, err := newTiKVTxnWithExactStaleness(s, txnScope, prevSec)
if err != nil {
return nil, errors.Trace(err)
Expand Down
4 changes: 2 additions & 2 deletions store/tikv/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ const bigTxnThreshold = 16

// LockResolver resolves locks and also caches resolved txn status.
type LockResolver struct {
store Storage
store *KVStore
mu struct {
sync.RWMutex
// resolved caches resolved txns (FIFO, txn id -> txnStatus).
Expand All @@ -56,7 +56,7 @@ type LockResolver struct {
}
}

func newLockResolver(store Storage) *LockResolver {
func newLockResolver(store *KVStore) *LockResolver {
r := &LockResolver{
store: store,
}
Expand Down
Loading