Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/tikv: remove kv.Snaphost from store/tikv #23318

Merged
merged 2 commits into from
Mar 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,7 @@ func (ts *ConnTestSuite) TestPrefetchPointKeys(c *C) {
c.Assert(err, IsNil)
c.Assert(txn.Valid(), IsTrue)
snap := txn.GetSnapshot()
c.Assert(tikv.SnapCacheHitCount(snap), Equals, 4)
c.Assert(snap.(*tikv.KVSnapshot).SnapCacheHitCount(), Equals, 4)
tk.MustExec("commit")
tk.MustQuery("select * from prefetch").Check(testkit.Rows("1 1 2", "2 2 4", "3 3 4"))

Expand Down
5 changes: 4 additions & 1 deletion session/clustered_index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,10 @@ func (s *testClusteredSuite) TestClusteredInsertIgnoreBatchGetKeyCount(c *C) {
tk.MustExec("insert ignore t values ('a', 1)")
txn, err := tk.Se.Txn(false)
c.Assert(err, IsNil)
snapSize := tikv.SnapCacheSize(txn.GetSnapshot())
snapSize := 0
if t, ok := txn.GetSnapshot().(*tikv.KVSnapshot); ok {
snapSize = t.SnapCacheSize()
}
c.Assert(snapSize, Equals, 1)
tk.MustExec("rollback")
}
Expand Down
5 changes: 5 additions & 0 deletions store/driver/txn/txn_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ func (txn *tikvTxn) Commit(ctx context.Context) error {
return txn.extractKeyErr(err)
}

// GetSnapshot returns the Snapshot binding to this transaction.
func (txn *tikvTxn) GetSnapshot() kv.Snapshot {
return txn.KVTxn.GetSnapshot()
}

func (txn *tikvTxn) extractKeyErr(err error) error {
if e, ok := errors.Cause(err).(*tikv.ErrKeyExist); ok {
return txn.extractKeyExistsErr(e.GetKey())
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (s *KVStore) beginWithExactStaleness(txnScope string, prevSec uint64) (*KVT

// GetSnapshot gets a snapshot that is able to read any data which data is <= ver.
// if ts is MaxVersion or > current max committed version, we will use current version for this snapshot.
func (s *KVStore) GetSnapshot(ts uint64) kv.Snapshot {
func (s *KVStore) GetSnapshot(ts uint64) *KVSnapshot {
snapshot := newTiKVSnapshot(s, ts, s.nextReplicaReadSeed())
return snapshot
}
Expand Down
4 changes: 2 additions & 2 deletions store/tikv/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

// Scanner support tikv scan
type Scanner struct {
snapshot *tikvSnapshot
snapshot *KVSnapshot
batchSize int
cache []*pb.KvPair
idx int
Expand All @@ -42,7 +42,7 @@ type Scanner struct {
eof bool
}

func newScanner(snapshot *tikvSnapshot, startKey []byte, endKey []byte, batchSize int, reverse bool) (*Scanner, error) {
func newScanner(snapshot *KVSnapshot, startKey []byte, endKey []byte, batchSize int, reverse bool) (*Scanner, error) {
// It must be > 1. Otherwise scanner won't skipFirst.
if batchSize <= 1 {
batchSize = scanBatchSize
Expand Down
64 changes: 26 additions & 38 deletions store/tikv/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,14 @@ import (
"go.uber.org/zap"
)

var (
_ kv.Snapshot = (*tikvSnapshot)(nil)
)

const (
scanBatchSize = 256
batchGetSize = 5120
maxTimestamp = math.MaxUint64
)

// tikvSnapshot implements the kv.Snapshot interface.
type tikvSnapshot struct {
// KVSnapshot implements the kv.Snapshot interface.
type KVSnapshot struct {
store *KVStore
version uint64
isolationLevel kv.IsoLevel
Expand Down Expand Up @@ -88,13 +84,13 @@ type tikvSnapshot struct {
}

// newTiKVSnapshot creates a snapshot of an TiKV store.
func newTiKVSnapshot(store *KVStore, ts uint64, replicaReadSeed uint32) *tikvSnapshot {
func newTiKVSnapshot(store *KVStore, ts uint64, replicaReadSeed uint32) *KVSnapshot {
// Sanity check for snapshot version.
if ts >= math.MaxInt64 && ts != math.MaxUint64 {
err := errors.Errorf("try to get snapshot with a large ts %d", ts)
panic(err)
}
return &tikvSnapshot{
return &KVSnapshot{
store: store,
version: ts,
priority: pb.CommandPri_Normal,
Expand All @@ -104,7 +100,7 @@ func newTiKVSnapshot(store *KVStore, ts uint64, replicaReadSeed uint32) *tikvSna
}
}

func (s *tikvSnapshot) setSnapshotTS(ts uint64) {
func (s *KVSnapshot) setSnapshotTS(ts uint64) {
// Sanity check for snapshot version.
if ts >= math.MaxInt64 && ts != math.MaxUint64 {
err := errors.Errorf("try to get snapshot with a large ts %d", ts)
Expand All @@ -121,7 +117,7 @@ func (s *tikvSnapshot) setSnapshotTS(ts uint64) {

// BatchGet gets all the keys' value from kv-server and returns a map contains key/value pairs.
// The map will not contain nonexistent keys.
func (s *tikvSnapshot) BatchGet(ctx context.Context, keys []kv.Key) (map[string][]byte, error) {
func (s *KVSnapshot) BatchGet(ctx context.Context, keys []kv.Key) (map[string][]byte, error) {
// Check the cached value first.
m := make(map[string][]byte)
s.mu.RLock()
Expand Down Expand Up @@ -222,7 +218,7 @@ func appendBatchKeysBySize(b []batchKeys, region RegionVerID, keys [][]byte, siz
return b
}

func (s *tikvSnapshot) batchGetKeysByRegions(bo *Backoffer, keys [][]byte, collectF func(k, v []byte)) error {
func (s *KVSnapshot) batchGetKeysByRegions(bo *Backoffer, keys [][]byte, collectF func(k, v []byte)) error {
defer func(start time.Time) {
metrics.TxnCmdHistogramWithBatchGet.Observe(time.Since(start).Seconds())
}(time.Now())
Expand Down Expand Up @@ -264,7 +260,7 @@ func (s *tikvSnapshot) batchGetKeysByRegions(bo *Backoffer, keys [][]byte, colle
return errors.Trace(err)
}

func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, collectF func(k, v []byte)) error {
func (s *KVSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, collectF func(k, v []byte)) error {
cli := NewClientHelper(s.store, s.resolvedLocks)
s.mu.RLock()
if s.mu.stats != nil {
Expand Down Expand Up @@ -372,7 +368,7 @@ func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, coll
}

// Get gets the value for key k from snapshot.
func (s *tikvSnapshot) Get(ctx context.Context, k kv.Key) ([]byte, error) {
func (s *KVSnapshot) Get(ctx context.Context, k kv.Key) ([]byte, error) {

defer func(start time.Time) {
metrics.TxnCmdHistogramWithGet.Observe(time.Since(start).Seconds())
Expand All @@ -396,7 +392,7 @@ func (s *tikvSnapshot) Get(ctx context.Context, k kv.Key) ([]byte, error) {
return val, nil
}

func (s *tikvSnapshot) get(ctx context.Context, bo *Backoffer, k kv.Key) ([]byte, error) {
func (s *KVSnapshot) get(ctx context.Context, bo *Backoffer, k kv.Key) ([]byte, error) {
// Check the cached values first.
s.mu.RLock()
if s.mu.cached != nil {
Expand Down Expand Up @@ -523,7 +519,7 @@ func (s *tikvSnapshot) get(ctx context.Context, bo *Backoffer, k kv.Key) ([]byte
}
}

func (s *tikvSnapshot) mergeExecDetail(detail *pb.ExecDetailsV2) {
func (s *KVSnapshot) mergeExecDetail(detail *pb.ExecDetailsV2) {
s.mu.Lock()
defer s.mu.Unlock()
if detail == nil || s.mu.stats == nil {
Expand All @@ -540,20 +536,20 @@ func (s *tikvSnapshot) mergeExecDetail(detail *pb.ExecDetailsV2) {
}

// Iter return a list of key-value pair after `k`.
func (s *tikvSnapshot) Iter(k kv.Key, upperBound kv.Key) (kv.Iterator, error) {
func (s *KVSnapshot) Iter(k kv.Key, upperBound kv.Key) (kv.Iterator, error) {
scanner, err := newScanner(s, k, upperBound, scanBatchSize, false)
return scanner, errors.Trace(err)
}

// IterReverse creates a reversed Iterator positioned on the first entry which key is less than k.
func (s *tikvSnapshot) IterReverse(k kv.Key) (kv.Iterator, error) {
func (s *KVSnapshot) IterReverse(k kv.Key) (kv.Iterator, error) {
scanner, err := newScanner(s, nil, k, scanBatchSize, true)
return scanner, errors.Trace(err)
}

// SetOption sets an option with a value, when val is nil, uses the default
// value of this option. Only ReplicaRead is supported for snapshot
func (s *tikvSnapshot) SetOption(opt kv.Option, val interface{}) {
func (s *KVSnapshot) SetOption(opt kv.Option, val interface{}) {
switch opt {
case kv.IsolationLevel:
s.isolationLevel = val.(kv.IsoLevel)
Expand Down Expand Up @@ -594,8 +590,8 @@ func (s *tikvSnapshot) SetOption(opt kv.Option, val interface{}) {
}
}

// ClearFollowerRead disables follower read on current transaction
func (s *tikvSnapshot) DelOption(opt kv.Option) {
// DelOption deletes an option.
func (s *KVSnapshot) DelOption(opt kv.Option) {
switch opt {
case kv.ReplicaRead:
s.mu.Lock()
Expand All @@ -608,24 +604,16 @@ func (s *tikvSnapshot) DelOption(opt kv.Option) {
}
}

// SnapCacheHitCount gets the snapshot cache hit count.
func SnapCacheHitCount(snap kv.Snapshot) int {
tikvSnap, ok := snap.(*tikvSnapshot)
if !ok {
return 0
}
return int(atomic.LoadInt64(&tikvSnap.mu.hitCnt))
// SnapCacheHitCount gets the snapshot cache hit count. Only for test.
func (s *KVSnapshot) SnapCacheHitCount() int {
return int(atomic.LoadInt64(&s.mu.hitCnt))
}

// SnapCacheSize gets the snapshot cache size.
func SnapCacheSize(snap kv.Snapshot) int {
tikvSnap, ok := snap.(*tikvSnapshot)
if !ok {
return 0
}
tikvSnap.mu.RLock()
defer tikvSnap.mu.RLock()
return len(tikvSnap.mu.cached)
// SnapCacheSize gets the snapshot cache size. Only for test.
func (s *KVSnapshot) SnapCacheSize() int {
s.mu.RLock()
defer s.mu.RLock()
return len(s.mu.cached)
}

func extractLockFromKeyErr(keyErr *pb.KeyError) (*Lock, error) {
Expand Down Expand Up @@ -740,7 +728,7 @@ func prettyWriteKey(buf *bytes.Buffer, key []byte) {
}
}

func (s *tikvSnapshot) recordBackoffInfo(bo *Backoffer) {
func (s *KVSnapshot) recordBackoffInfo(bo *Backoffer) {
s.mu.RLock()
if s.mu.stats == nil || bo.totalSleep == 0 {
s.mu.RUnlock()
Expand All @@ -765,7 +753,7 @@ func (s *tikvSnapshot) recordBackoffInfo(bo *Backoffer) {
}
}

func (s *tikvSnapshot) mergeRegionRequestStats(stats map[tikvrpc.CmdType]*RPCRuntimeStats) {
func (s *KVSnapshot) mergeRegionRequestStats(stats map[tikvrpc.CmdType]*RPCRuntimeStats) {
s.mu.Lock()
defer s.mu.Unlock()
if s.mu.stats == nil {
Expand Down
4 changes: 2 additions & 2 deletions store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type SchemaAmender interface {

// KVTxn contains methods to interact with a TiKV transaction.
type KVTxn struct {
snapshot *tikvSnapshot
snapshot *KVSnapshot
us kv.UnionStore
store *KVStore // for connection to region.
startTS uint64
Expand Down Expand Up @@ -617,6 +617,6 @@ func (txn *KVTxn) GetMemBuffer() kv.MemBuffer {
}

// GetSnapshot returns the Snapshot binding to this transaction.
func (txn *KVTxn) GetSnapshot() kv.Snapshot {
func (txn *KVTxn) GetSnapshot() *KVSnapshot {
return txn.snapshot
}