diff --git a/server/conn_test.go b/server/conn_test.go index 6add06c0369a8..320cc3e70a019 100644 --- a/server/conn_test.go +++ b/server/conn_test.go @@ -710,7 +710,7 @@ func (ts *ConnTestSuite) TestPrefetchPointKeys(c *C) { c.Assert(err, IsNil) c.Assert(txn.Valid(), IsTrue) snap := txn.GetSnapshot() - c.Assert(tikv.SnapCacheHitCount(snap), Equals, 4) + c.Assert(snap.(*tikv.KVSnapshot).SnapCacheHitCount(), Equals, 4) tk.MustExec("commit") tk.MustQuery("select * from prefetch").Check(testkit.Rows("1 1 2", "2 2 4", "3 3 4")) diff --git a/session/clustered_index_test.go b/session/clustered_index_test.go index 9589d58426c0f..457a032f27c41 100644 --- a/session/clustered_index_test.go +++ b/session/clustered_index_test.go @@ -225,7 +225,10 @@ func (s *testClusteredSuite) TestClusteredInsertIgnoreBatchGetKeyCount(c *C) { tk.MustExec("insert ignore t values ('a', 1)") txn, err := tk.Se.Txn(false) c.Assert(err, IsNil) - snapSize := tikv.SnapCacheSize(txn.GetSnapshot()) + snapSize := 0 + if t, ok := txn.GetSnapshot().(*tikv.KVSnapshot); ok { + snapSize = t.SnapCacheSize() + } c.Assert(snapSize, Equals, 1) tk.MustExec("rollback") } diff --git a/store/driver/txn/txn_driver.go b/store/driver/txn/txn_driver.go index 6e227909bc495..95e3d916ac1a6 100644 --- a/store/driver/txn/txn_driver.go +++ b/store/driver/txn/txn_driver.go @@ -60,6 +60,11 @@ func (txn *tikvTxn) Commit(ctx context.Context) error { return txn.extractKeyErr(err) } +// GetSnapshot returns the Snapshot binding to this transaction. +func (txn *tikvTxn) GetSnapshot() kv.Snapshot { + return txn.KVTxn.GetSnapshot() +} + func (txn *tikvTxn) extractKeyErr(err error) error { if e, ok := errors.Cause(err).(*tikv.ErrKeyExist); ok { return txn.extractKeyExistsErr(e.GetKey()) diff --git a/store/tikv/kv.go b/store/tikv/kv.go index 6fe6f5c9518be..549b3211d4c53 100644 --- a/store/tikv/kv.go +++ b/store/tikv/kv.go @@ -213,7 +213,7 @@ func (s *KVStore) beginWithExactStaleness(txnScope string, prevSec uint64) (*KVT // GetSnapshot gets a snapshot that is able to read any data which data is <= ver. // if ts is MaxVersion or > current max committed version, we will use current version for this snapshot. -func (s *KVStore) GetSnapshot(ts uint64) kv.Snapshot { +func (s *KVStore) GetSnapshot(ts uint64) *KVSnapshot { snapshot := newTiKVSnapshot(s, ts, s.nextReplicaReadSeed()) return snapshot } diff --git a/store/tikv/scan.go b/store/tikv/scan.go index b8d9a1520eab0..52b7317fcfc79 100644 --- a/store/tikv/scan.go +++ b/store/tikv/scan.go @@ -27,7 +27,7 @@ import ( // Scanner support tikv scan type Scanner struct { - snapshot *tikvSnapshot + snapshot *KVSnapshot batchSize int cache []*pb.KvPair idx int @@ -42,7 +42,7 @@ type Scanner struct { eof bool } -func newScanner(snapshot *tikvSnapshot, startKey []byte, endKey []byte, batchSize int, reverse bool) (*Scanner, error) { +func newScanner(snapshot *KVSnapshot, startKey []byte, endKey []byte, batchSize int, reverse bool) (*Scanner, error) { // It must be > 1. Otherwise scanner won't skipFirst. if batchSize <= 1 { batchSize = scanBatchSize diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go index 14a7e2091a7c5..eba840db15190 100644 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -41,18 +41,14 @@ import ( "go.uber.org/zap" ) -var ( - _ kv.Snapshot = (*tikvSnapshot)(nil) -) - const ( scanBatchSize = 256 batchGetSize = 5120 maxTimestamp = math.MaxUint64 ) -// tikvSnapshot implements the kv.Snapshot interface. -type tikvSnapshot struct { +// KVSnapshot implements the kv.Snapshot interface. +type KVSnapshot struct { store *KVStore version uint64 isolationLevel kv.IsoLevel @@ -88,13 +84,13 @@ type tikvSnapshot struct { } // newTiKVSnapshot creates a snapshot of an TiKV store. -func newTiKVSnapshot(store *KVStore, ts uint64, replicaReadSeed uint32) *tikvSnapshot { +func newTiKVSnapshot(store *KVStore, ts uint64, replicaReadSeed uint32) *KVSnapshot { // Sanity check for snapshot version. if ts >= math.MaxInt64 && ts != math.MaxUint64 { err := errors.Errorf("try to get snapshot with a large ts %d", ts) panic(err) } - return &tikvSnapshot{ + return &KVSnapshot{ store: store, version: ts, priority: pb.CommandPri_Normal, @@ -104,7 +100,7 @@ func newTiKVSnapshot(store *KVStore, ts uint64, replicaReadSeed uint32) *tikvSna } } -func (s *tikvSnapshot) setSnapshotTS(ts uint64) { +func (s *KVSnapshot) setSnapshotTS(ts uint64) { // Sanity check for snapshot version. if ts >= math.MaxInt64 && ts != math.MaxUint64 { err := errors.Errorf("try to get snapshot with a large ts %d", ts) @@ -121,7 +117,7 @@ func (s *tikvSnapshot) setSnapshotTS(ts uint64) { // BatchGet gets all the keys' value from kv-server and returns a map contains key/value pairs. // The map will not contain nonexistent keys. -func (s *tikvSnapshot) BatchGet(ctx context.Context, keys []kv.Key) (map[string][]byte, error) { +func (s *KVSnapshot) BatchGet(ctx context.Context, keys []kv.Key) (map[string][]byte, error) { // Check the cached value first. m := make(map[string][]byte) s.mu.RLock() @@ -222,7 +218,7 @@ func appendBatchKeysBySize(b []batchKeys, region RegionVerID, keys [][]byte, siz return b } -func (s *tikvSnapshot) batchGetKeysByRegions(bo *Backoffer, keys [][]byte, collectF func(k, v []byte)) error { +func (s *KVSnapshot) batchGetKeysByRegions(bo *Backoffer, keys [][]byte, collectF func(k, v []byte)) error { defer func(start time.Time) { metrics.TxnCmdHistogramWithBatchGet.Observe(time.Since(start).Seconds()) }(time.Now()) @@ -264,7 +260,7 @@ func (s *tikvSnapshot) batchGetKeysByRegions(bo *Backoffer, keys [][]byte, colle return errors.Trace(err) } -func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, collectF func(k, v []byte)) error { +func (s *KVSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, collectF func(k, v []byte)) error { cli := NewClientHelper(s.store, s.resolvedLocks) s.mu.RLock() if s.mu.stats != nil { @@ -372,7 +368,7 @@ func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, coll } // Get gets the value for key k from snapshot. -func (s *tikvSnapshot) Get(ctx context.Context, k kv.Key) ([]byte, error) { +func (s *KVSnapshot) Get(ctx context.Context, k kv.Key) ([]byte, error) { defer func(start time.Time) { metrics.TxnCmdHistogramWithGet.Observe(time.Since(start).Seconds()) @@ -396,7 +392,7 @@ func (s *tikvSnapshot) Get(ctx context.Context, k kv.Key) ([]byte, error) { return val, nil } -func (s *tikvSnapshot) get(ctx context.Context, bo *Backoffer, k kv.Key) ([]byte, error) { +func (s *KVSnapshot) get(ctx context.Context, bo *Backoffer, k kv.Key) ([]byte, error) { // Check the cached values first. s.mu.RLock() if s.mu.cached != nil { @@ -523,7 +519,7 @@ func (s *tikvSnapshot) get(ctx context.Context, bo *Backoffer, k kv.Key) ([]byte } } -func (s *tikvSnapshot) mergeExecDetail(detail *pb.ExecDetailsV2) { +func (s *KVSnapshot) mergeExecDetail(detail *pb.ExecDetailsV2) { s.mu.Lock() defer s.mu.Unlock() if detail == nil || s.mu.stats == nil { @@ -540,20 +536,20 @@ func (s *tikvSnapshot) mergeExecDetail(detail *pb.ExecDetailsV2) { } // Iter return a list of key-value pair after `k`. -func (s *tikvSnapshot) Iter(k kv.Key, upperBound kv.Key) (kv.Iterator, error) { +func (s *KVSnapshot) Iter(k kv.Key, upperBound kv.Key) (kv.Iterator, error) { scanner, err := newScanner(s, k, upperBound, scanBatchSize, false) return scanner, errors.Trace(err) } // IterReverse creates a reversed Iterator positioned on the first entry which key is less than k. -func (s *tikvSnapshot) IterReverse(k kv.Key) (kv.Iterator, error) { +func (s *KVSnapshot) IterReverse(k kv.Key) (kv.Iterator, error) { scanner, err := newScanner(s, nil, k, scanBatchSize, true) return scanner, errors.Trace(err) } // SetOption sets an option with a value, when val is nil, uses the default // value of this option. Only ReplicaRead is supported for snapshot -func (s *tikvSnapshot) SetOption(opt kv.Option, val interface{}) { +func (s *KVSnapshot) SetOption(opt kv.Option, val interface{}) { switch opt { case kv.IsolationLevel: s.isolationLevel = val.(kv.IsoLevel) @@ -594,8 +590,8 @@ func (s *tikvSnapshot) SetOption(opt kv.Option, val interface{}) { } } -// ClearFollowerRead disables follower read on current transaction -func (s *tikvSnapshot) DelOption(opt kv.Option) { +// DelOption deletes an option. +func (s *KVSnapshot) DelOption(opt kv.Option) { switch opt { case kv.ReplicaRead: s.mu.Lock() @@ -608,24 +604,16 @@ func (s *tikvSnapshot) DelOption(opt kv.Option) { } } -// SnapCacheHitCount gets the snapshot cache hit count. -func SnapCacheHitCount(snap kv.Snapshot) int { - tikvSnap, ok := snap.(*tikvSnapshot) - if !ok { - return 0 - } - return int(atomic.LoadInt64(&tikvSnap.mu.hitCnt)) +// SnapCacheHitCount gets the snapshot cache hit count. Only for test. +func (s *KVSnapshot) SnapCacheHitCount() int { + return int(atomic.LoadInt64(&s.mu.hitCnt)) } -// SnapCacheSize gets the snapshot cache size. -func SnapCacheSize(snap kv.Snapshot) int { - tikvSnap, ok := snap.(*tikvSnapshot) - if !ok { - return 0 - } - tikvSnap.mu.RLock() - defer tikvSnap.mu.RLock() - return len(tikvSnap.mu.cached) +// SnapCacheSize gets the snapshot cache size. Only for test. +func (s *KVSnapshot) SnapCacheSize() int { + s.mu.RLock() + defer s.mu.RLock() + return len(s.mu.cached) } func extractLockFromKeyErr(keyErr *pb.KeyError) (*Lock, error) { @@ -740,7 +728,7 @@ func prettyWriteKey(buf *bytes.Buffer, key []byte) { } } -func (s *tikvSnapshot) recordBackoffInfo(bo *Backoffer) { +func (s *KVSnapshot) recordBackoffInfo(bo *Backoffer) { s.mu.RLock() if s.mu.stats == nil || bo.totalSleep == 0 { s.mu.RUnlock() @@ -765,7 +753,7 @@ func (s *tikvSnapshot) recordBackoffInfo(bo *Backoffer) { } } -func (s *tikvSnapshot) mergeRegionRequestStats(stats map[tikvrpc.CmdType]*RPCRuntimeStats) { +func (s *KVSnapshot) mergeRegionRequestStats(stats map[tikvrpc.CmdType]*RPCRuntimeStats) { s.mu.Lock() defer s.mu.Unlock() if s.mu.stats == nil { diff --git a/store/tikv/txn.go b/store/tikv/txn.go index 1c71663b5807c..382586cd746f9 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -47,7 +47,7 @@ type SchemaAmender interface { // KVTxn contains methods to interact with a TiKV transaction. type KVTxn struct { - snapshot *tikvSnapshot + snapshot *KVSnapshot us kv.UnionStore store *KVStore // for connection to region. startTS uint64 @@ -617,6 +617,6 @@ func (txn *KVTxn) GetMemBuffer() kv.MemBuffer { } // GetSnapshot returns the Snapshot binding to this transaction. -func (txn *KVTxn) GetSnapshot() kv.Snapshot { +func (txn *KVTxn) GetSnapshot() *KVSnapshot { return txn.snapshot }