Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

unistore: get/batchGet/scan support read-through-lock #29898

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 16 additions & 8 deletions br/pkg/lightning/restore/check_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/mydump"
"github.com/pingcap/tidb/br/pkg/lightning/verification"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/version"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/table"
Expand Down Expand Up @@ -179,6 +178,15 @@ func (rc *Controller) ClusterIsAvailable(ctx context.Context) error {
return nil
}

func isTiFlash(store *api.MetaStore) bool {
for _, label := range store.Labels {
if label.Key == "engine" && label.Value == "tiflash" {
return true
}
}
return false
}

func (rc *Controller) checkEmptyRegion(ctx context.Context) error {
passed := true
message := "Cluster doesn't have too many empty regions"
Expand Down Expand Up @@ -206,7 +214,7 @@ func (rc *Controller) checkEmptyRegion(ctx context.Context) error {
}
}
for _, store := range storeInfo.Stores {
stores[store.Store.Id] = store
stores[store.Store.StoreID] = store
}
tableCount := 0
for _, db := range rc.dbMetas {
Expand All @@ -224,10 +232,10 @@ func (rc *Controller) checkEmptyRegion(ctx context.Context) error {
)
for storeID, regionCnt := range regions {
if store, ok := stores[storeID]; ok {
if store.Store.State != metapb.StoreState_Up {
if metapb.StoreState(metapb.StoreState_value[store.Store.StateName]) != metapb.StoreState_Up {
continue
}
if version.IsTiFlash(store.Store.Store) {
if isTiFlash(store.Store) {
continue
}
if regionCnt > errorThrehold {
Expand Down Expand Up @@ -269,10 +277,10 @@ func (rc *Controller) checkRegionDistribution(ctx context.Context) error {
}
stores := make([]*api.StoreInfo, 0, len(result.Stores))
for _, store := range result.Stores {
if store.Store.State != metapb.StoreState_Up {
if metapb.StoreState(metapb.StoreState_value[store.Store.StateName]) != metapb.StoreState_Up {
continue
}
if version.IsTiFlash(store.Store.Store) {
if isTiFlash(store.Store) {
continue
}
stores = append(stores, store)
Expand Down Expand Up @@ -302,11 +310,11 @@ func (rc *Controller) checkRegionDistribution(ctx context.Context) error {
passed = false
message = fmt.Sprintf("Region distribution is unbalanced, the ratio of the regions count of the store(%v) "+
"with least regions(%v) to the store(%v) with most regions(%v) is %v, but we expect it must not be less than %v",
minStore.Store.Id, minStore.Status.RegionCount, maxStore.Store.Id, maxStore.Status.RegionCount, ratio, errorRegionCntMinMaxRatio)
minStore.Store.StoreID, minStore.Status.RegionCount, maxStore.Store.StoreID, maxStore.Status.RegionCount, ratio, errorRegionCntMinMaxRatio)
} else if ratio < warnRegionCntMinMaxRatio {
message = fmt.Sprintf("Region distribution is unbalanced, the ratio of the regions count of the store(%v) "+
"with least regions(%v) to the store(%v) with most regions(%v) is %v, but we expect it should not be less than %v",
minStore.Store.Id, minStore.Status.RegionCount, maxStore.Store.Id, maxStore.Status.RegionCount, ratio, warnRegionCntMinMaxRatio)
minStore.Store.StoreID, minStore.Status.RegionCount, maxStore.Store.StoreID, maxStore.Status.RegionCount, ratio, warnRegionCntMinMaxRatio)
}
return nil
}
Expand Down
20 changes: 10 additions & 10 deletions br/pkg/lightning/restore/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1778,7 +1778,7 @@ func (s *tableRestoreSuite) TestCheckClusterRegion(c *C) {
testCases := []testCase{
{
stores: api.StoresInfo{Stores: []*api.StoreInfo{
{Store: &api.MetaStore{Store: &metapb.Store{Id: 1}}, Status: &api.StoreStatus{RegionCount: 200}},
{Store: &api.MetaStore{StoreID: 1}, Status: &api.StoreStatus{RegionCount: 200}},
}},
emptyRegions: api.RegionsInfo{
Regions: append([]api.RegionInfo(nil), makeRegions(100, 1)...),
Expand All @@ -1789,9 +1789,9 @@ func (s *tableRestoreSuite) TestCheckClusterRegion(c *C) {
},
{
stores: api.StoresInfo{Stores: []*api.StoreInfo{
{Store: &api.MetaStore{Store: &metapb.Store{Id: 1}}, Status: &api.StoreStatus{RegionCount: 2000}},
{Store: &api.MetaStore{Store: &metapb.Store{Id: 2}}, Status: &api.StoreStatus{RegionCount: 3100}},
{Store: &api.MetaStore{Store: &metapb.Store{Id: 3}}, Status: &api.StoreStatus{RegionCount: 2500}},
{Store: &api.MetaStore{StoreID: 1}, Status: &api.StoreStatus{RegionCount: 2000}},
{Store: &api.MetaStore{StoreID: 2}, Status: &api.StoreStatus{RegionCount: 3100}},
{Store: &api.MetaStore{StoreID: 3}, Status: &api.StoreStatus{RegionCount: 2500}},
}},
emptyRegions: api.RegionsInfo{
Regions: append(append(append([]api.RegionInfo(nil),
Expand All @@ -1809,19 +1809,19 @@ func (s *tableRestoreSuite) TestCheckClusterRegion(c *C) {
},
{
stores: api.StoresInfo{Stores: []*api.StoreInfo{
{Store: &api.MetaStore{Store: &metapb.Store{Id: 1}}, Status: &api.StoreStatus{RegionCount: 1200}},
{Store: &api.MetaStore{Store: &metapb.Store{Id: 2}}, Status: &api.StoreStatus{RegionCount: 3000}},
{Store: &api.MetaStore{Store: &metapb.Store{Id: 3}}, Status: &api.StoreStatus{RegionCount: 2500}},
{Store: &api.MetaStore{StoreID: 1}, Status: &api.StoreStatus{RegionCount: 1200}},
{Store: &api.MetaStore{StoreID: 2}, Status: &api.StoreStatus{RegionCount: 3000}},
{Store: &api.MetaStore{StoreID: 3}, Status: &api.StoreStatus{RegionCount: 2500}},
}},
expectMsgs: []string{".*Region distribution is unbalanced.*but we expect it must not be less than 0.5.*"},
expectResult: false,
expectErrorCnt: 1,
},
{
stores: api.StoresInfo{Stores: []*api.StoreInfo{
{Store: &api.MetaStore{Store: &metapb.Store{Id: 1}}, Status: &api.StoreStatus{RegionCount: 0}},
{Store: &api.MetaStore{Store: &metapb.Store{Id: 2}}, Status: &api.StoreStatus{RegionCount: 2800}},
{Store: &api.MetaStore{Store: &metapb.Store{Id: 3}}, Status: &api.StoreStatus{RegionCount: 2500}},
{Store: &api.MetaStore{StoreID: 1}, Status: &api.StoreStatus{RegionCount: 0}},
{Store: &api.MetaStore{StoreID: 2}, Status: &api.StoreStatus{RegionCount: 2800}},
{Store: &api.MetaStore{StoreID: 3}, Status: &api.StoreStatus{RegionCount: 2500}},
}},
expectMsgs: []string{".*Region distribution is unbalanced.*but we expect it must not be less than 0.5.*"},
expectResult: false,
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ require (
github.com/pingcap/errors v0.11.5-0.20211009033009-93128226aaa3
github.com/pingcap/failpoint v0.0.0-20210316064728-7acb0f0a3dfd
github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059
github.com/pingcap/kvproto v0.0.0-20211029081837-3c7bd947cf9b
github.com/pingcap/kvproto v0.0.0-20211122024046-03abd340988f
github.com/pingcap/log v0.0.0-20210906054005-afc726e70354
github.com/pingcap/sysutil v0.0.0-20210730114356-fcd8a63f68c5
github.com/pingcap/tidb-tools v5.2.2-0.20211019062242-37a8bef2fa17+incompatible
Expand All @@ -66,7 +66,7 @@ require (
github.com/stretchr/testify v1.7.0
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tikv/client-go/v2 v2.0.0-alpha.0.20211115071040-a3f1c41ac1a0
github.com/tikv/pd v1.1.0-beta.0.20211104095303-69c86d05d379
github.com/tikv/pd v1.1.0-beta.0.20211118054146-02848d2660ee
github.com/twmb/murmur3 v1.1.3
github.com/uber/jaeger-client-go v2.22.1+incompatible
github.com/uber/jaeger-lib v2.4.1+incompatible // indirect
Expand All @@ -76,7 +76,7 @@ require (
go.etcd.io/etcd v0.5.0-alpha.5.0.20210512015243-d19fbe541bf9
go.uber.org/atomic v1.9.0
go.uber.org/automaxprocs v1.4.0
go.uber.org/goleak v1.1.11-0.20210813005559-691160354723
go.uber.org/goleak v1.1.12
go.uber.org/multierr v1.7.0
go.uber.org/zap v1.19.1
golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420
Expand Down
14 changes: 8 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -585,8 +585,9 @@ github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO
github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20210819164333-bd5706b9d9f2/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20210915062418-0f5764a128ad/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20211029081837-3c7bd947cf9b h1:/aj6ITlHSJZmsm4hIMOgJAAZti+Dmq11tCyKedA6Dcs=
github.com/pingcap/kvproto v0.0.0-20211029081837-3c7bd947cf9b/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20211109071446-a8b4d34474bc/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20211122024046-03abd340988f h1:hjInxK1Ie6CYx7Jy2pYnBdEnWI8jIfr423l9Yh6LRy8=
github.com/pingcap/kvproto v0.0.0-20211122024046-03abd340988f/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
Expand All @@ -597,7 +598,7 @@ github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3/go.mod h1:tckvA041
github.com/pingcap/sysutil v0.0.0-20210730114356-fcd8a63f68c5 h1:7rvAtZe/ZUzOKzgriNPQoBNvleJXBk4z7L3Z47+tS98=
github.com/pingcap/sysutil v0.0.0-20210730114356-fcd8a63f68c5/go.mod h1:XsOaV712rUk63aOEKYP9PhXTIE3FMNHmC2r1wX5wElY=
github.com/pingcap/tidb-dashboard v0.0.0-20211008050453-a25c25809529/go.mod h1:OCXbZTBTIMRcIt0jFsuCakZP+goYRv6IjawKbwLS2TQ=
github.com/pingcap/tidb-dashboard v0.0.0-20211031170437-08e58c069a2a/go.mod h1:OCXbZTBTIMRcIt0jFsuCakZP+goYRv6IjawKbwLS2TQ=
github.com/pingcap/tidb-dashboard v0.0.0-20211107164327-80363dfbe884/go.mod h1:OCXbZTBTIMRcIt0jFsuCakZP+goYRv6IjawKbwLS2TQ=
github.com/pingcap/tidb-tools v5.2.2-0.20211019062242-37a8bef2fa17+incompatible h1:c7+izmker91NkjkZ6FgTlmD4k1A5FLOAq+li6Ki2/GY=
github.com/pingcap/tidb-tools v5.2.2-0.20211019062242-37a8bef2fa17+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tipb v0.0.0-20211116093845-e9b045a0bdf8 h1:Vu/6oq8EFNWgyXRHiclNzTKIu+YKHPCSI/Ba5oVrLtM=
Expand Down Expand Up @@ -716,8 +717,8 @@ github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhV
github.com/tikv/client-go/v2 v2.0.0-alpha.0.20211115071040-a3f1c41ac1a0 h1:c12Pv8Xks4oubDr/uHHxrlBkwGJFqKZUEIUemHV794g=
github.com/tikv/client-go/v2 v2.0.0-alpha.0.20211115071040-a3f1c41ac1a0/go.mod h1:iiwtsCxcbNLK5i9VRYGvdcihgHXTKy2ukWjoaJsrphg=
github.com/tikv/pd v1.1.0-beta.0.20211029083450-e65f0c55b6ae/go.mod h1:varH0IE0jJ9E9WN2Ei/N6pajMlPkcXdDEf7f5mmsUVQ=
github.com/tikv/pd v1.1.0-beta.0.20211104095303-69c86d05d379 h1:nFm1jQDz1iRktoyV2SyM5zVk6+PJHQNunJZ7ZJcqzAo=
github.com/tikv/pd v1.1.0-beta.0.20211104095303-69c86d05d379/go.mod h1:y+09hAUXJbrd4c0nktL74zXDDuD7atGtfOKxL90PCOE=
github.com/tikv/pd v1.1.0-beta.0.20211118054146-02848d2660ee h1:rAAdvQ8Hh36syHr92g0VmZEpkH+40RGQBpFL2121xMs=
github.com/tikv/pd v1.1.0-beta.0.20211118054146-02848d2660ee/go.mod h1:lRbwxBAhnTQR5vqbTzeI/Bj62bD2OvYYuFezo2vrmeI=
github.com/tklauser/go-sysconf v0.3.4 h1:HT8SVixZd3IzLdfs/xlpq0jeSfTX57g1v6wB1EuzV7M=
github.com/tklauser/go-sysconf v0.3.4/go.mod h1:Cl2c8ZRWfHD5IrfHo9VN+FX9kCFjIOyVklgXycLB6ek=
github.com/tklauser/numcpus v0.2.1 h1:ct88eFm+Q7m2ZfXJdan1xYoXKlmwsfP+k88q05KvlZc=
Expand Down Expand Up @@ -807,8 +808,9 @@ go.uber.org/dig v1.8.0/go.mod h1:X34SnWGr8Fyla9zQNO2GSO2D+TIuqB14OS8JhYocIyw=
go.uber.org/fx v1.10.0/go.mod h1:vLRicqpG/qQEzno4SYU86iCwfT95EZza+Eba0ItuxqY=
go.uber.org/goleak v0.10.0/go.mod h1:VCZuO8V8mFPlL0F5J5GK1rtHV3DrFcQ1R8ryq7FK0aI=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/goleak v1.1.11-0.20210813005559-691160354723 h1:sHOAIxRGBp443oHZIPB+HsUGaksVCXVQENPxwTfQdH4=
go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/multierr v1.4.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
Expand Down
109 changes: 75 additions & 34 deletions store/mockstore/unistore/tikv/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func (store *MVCCStore) PessimisticLock(reqCtx *requestCtx, req *kvrpcpb.Pessimi
for _, m := range mutations {
lock, err := store.checkConflictInLockStore(reqCtx, m, startTS)
if err != nil {
var resourceGroupTag []byte = nil
var resourceGroupTag []byte
if req.Context != nil {
resourceGroupTag = req.Context.ResourceGroupTag
}
Expand Down Expand Up @@ -1098,34 +1098,56 @@ func (store *MVCCStore) checkCommitted(reader *dbreader.DBReader, key []byte, st
return 0, nil
}

func checkLock(lock mvcc.Lock, key []byte, startTS uint64, resolved []uint64) error {
if isResolved(lock.StartTS, resolved) {
return nil
// LockPair contains a pair of key and lock. It's used for reading through locks.
type LockPair struct {
key []byte
lock *mvcc.Lock
}

func getValueFromLock(lock *mvcc.Lock) []byte {
if lock.Op == byte(kvrpcpb.Op_Put) {
// lock owns the value so needn't to safeCopy it.
return lock.Value
}
return nil
}

// *LockPair is not nil if the lock in the committed timestamp set. Read operations can get value from it without deep copy.
func checkLock(lock mvcc.Lock, key []byte, startTS uint64, resolved []uint64, committed []uint64) (*LockPair, error) {
if inTSSet(lock.StartTS, resolved) {
return nil, nil
}
lockVisible := lock.StartTS <= startTS
isWriteLock := lock.Op == uint8(kvrpcpb.Op_Put) || lock.Op == uint8(kvrpcpb.Op_Del)
isPrimaryGet := startTS == maxSystemTS && bytes.Equal(lock.Primary, key) && !lock.UseAsyncCommit
if lockVisible && isWriteLock && !isPrimaryGet {
return BuildLockErr(safeCopy(key), &lock)
if inTSSet(lock.StartTS, committed) {
return &LockPair{safeCopy(key), &lock}, nil
}
return nil, BuildLockErr(safeCopy(key), &lock)
}
return nil
return nil, nil
}

// CheckKeysLock implements the MVCCStore interface.
func (store *MVCCStore) CheckKeysLock(startTS uint64, resolved []uint64, keys ...[]byte) error {
func (store *MVCCStore) CheckKeysLock(startTS uint64, resolved, committed []uint64, keys ...[]byte) ([]*LockPair, error) {
var buf []byte
var lockPairs []*LockPair
for _, key := range keys {
buf = store.lockStore.Get(key, buf)
if len(buf) == 0 {
continue
}
lock := mvcc.DecodeLock(buf)
err := checkLock(lock, key, startTS, resolved)
lockPair, err := checkLock(lock, key, startTS, resolved, committed)
if lockPair != nil {
lockPairs = append(lockPairs, lockPair)
}
if err != nil {
return err
return nil, err
}
}
return nil
return lockPairs, nil
}

// CheckRangeLock implements the MVCCStore interface.
Expand All @@ -1136,7 +1158,7 @@ func (store *MVCCStore) CheckRangeLock(startTS uint64, startKey, endKey []byte,
break
}
lock := mvcc.DecodeLock(it.Value())
err := checkLock(lock, it.Key(), startTS, resolved)
_, err := checkLock(lock, it.Key(), startTS, resolved, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -1386,14 +1408,32 @@ func (store *MVCCStore) DeleteFileInRange(start, end []byte) {
store.db.DeleteFilesInRange(start, end)
}

// Get implements the MVCCStore interface.
func (store *MVCCStore) Get(reqCtx *requestCtx, key []byte, version uint64) ([]byte, error) {
lockPairs, err := store.CheckKeysLock(version, reqCtx.rpcCtx.ResolvedLocks, reqCtx.rpcCtx.CommittedLocks, key)
if err != nil {
return nil, err
}
if len(lockPairs) != 0 {
return getValueFromLock(lockPairs[0].lock), nil
}
val, err := reqCtx.getDBReader().Get(key, version)
return safeCopy(val), err
}

// BatchGet implements the MVCCStore interface.
func (store *MVCCStore) BatchGet(reqCtx *requestCtx, keys [][]byte, version uint64) []*kvrpcpb.KvPair {
pairs := make([]*kvrpcpb.KvPair, 0, len(keys))
remain := make([][]byte, 0, len(keys))
for _, key := range keys {
err := store.CheckKeysLock(version, reqCtx.rpcCtx.ResolvedLocks, key)
lockPairs, err := store.CheckKeysLock(version, reqCtx.rpcCtx.ResolvedLocks, reqCtx.rpcCtx.CommittedLocks, key)
if err != nil {
pairs = append(pairs, &kvrpcpb.KvPair{Key: key, Error: convertToKeyError(err)})
} else if len(lockPairs) != 0 {
value := getValueFromLock(lockPairs[0].lock)
if value != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the lock type is not Put, value will be nil, so we skip this key here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lock type is either Put or Del because only locks that can block read are in LockPair.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mistakenly thought the Del record needs a nil return, LGTM.

pairs = append(pairs, &kvrpcpb.KvPair{Key: key, Value: value})
}
} else {
remain = append(remain, key)
}
Expand All @@ -1411,16 +1451,22 @@ func (store *MVCCStore) BatchGet(reqCtx *requestCtx, keys [][]byte, version uint
return pairs
}

func (store *MVCCStore) collectRangeLock(startTS uint64, startKey, endKey []byte, resolved []uint64) []*kvrpcpb.KvPair {
func (store *MVCCStore) collectRangeLock(startTS uint64, startKey, endKey []byte, resolved, committed []uint64) []*kvrpcpb.KvPair {
var pairs []*kvrpcpb.KvPair
it := store.lockStore.NewIterator()
for it.Seek(startKey); it.Valid(); it.Next() {
if exceedEndKey(it.Key(), endKey) {
break
}
lock := mvcc.DecodeLock(it.Value())
err := checkLock(lock, it.Key(), startTS, resolved)
if err != nil {
lockPair, err := checkLock(lock, it.Key(), startTS, resolved, committed)
if lockPair != nil {
pairs = append(pairs, &kvrpcpb.KvPair{
Key: lockPair.key,
// deleted key's value is nil
Value: getValueFromLock(lockPair.lock),
})
} else if err != nil {
pairs = append(pairs, &kvrpcpb.KvPair{
Error: convertToKeyError(err),
Key: safeCopy(it.Key()),
Expand All @@ -1430,8 +1476,8 @@ func (store *MVCCStore) collectRangeLock(startTS uint64, startKey, endKey []byte
return pairs
}

func isResolved(startTS uint64, resolved []uint64) bool {
for _, v := range resolved {
func inTSSet(startTS uint64, tsSet []uint64) bool {
for _, v := range tsSet {
if startTS == v {
return true
}
Expand Down Expand Up @@ -1486,7 +1532,7 @@ func (store *MVCCStore) Scan(reqCtx *requestCtx, req *kvrpcpb.ScanRequest) []*kv
var lockPairs []*kvrpcpb.KvPair
limit := req.GetLimit()
if req.SampleStep == 0 {
lockPairs = store.collectRangeLock(req.GetVersion(), startKey, endKey, req.Context.ResolvedLocks)
lockPairs = store.collectRangeLock(req.GetVersion(), startKey, endKey, reqCtx.rpcCtx.ResolvedLocks, reqCtx.rpcCtx.CommittedLocks)
} else {
limit = req.SampleStep * limit
}
Expand All @@ -1506,31 +1552,26 @@ func (store *MVCCStore) Scan(reqCtx *requestCtx, req *kvrpcpb.ScanRequest) []*kv
})
return scanProc.pairs
}
pairs := append(scanProc.pairs, lockPairs...)
sort.Slice(pairs, func(i, j int) bool {
pairs := append(lockPairs, scanProc.pairs...)
sort.SliceStable(pairs, func(i, j int) bool {
cmp := bytes.Compare(pairs[i].Key, pairs[j].Key)
if req.Reverse {
cmp = -cmp
}
if cmp < 0 {
return true
} else if cmp > 0 {
return false
}
return pairs[i].Error != nil
return cmp < 0
})
validPairs := pairs[:0]
var prevErr *kvrpcpb.KvPair
var prev *kvrpcpb.KvPair
for _, pair := range pairs {
if prevErr != nil && bytes.Equal(prevErr.Key, pair.Key) {
if prev != nil && bytes.Equal(prev.Key, pair.Key) {
continue
}
if pair.Error != nil {
prevErr = pair
}
validPairs = append(validPairs, pair)
if len(validPairs) >= int(limit) {
break
prev = pair
if pair.Error != nil || len(pair.Value) != 0 {
validPairs = append(validPairs, pair)
if len(validPairs) >= int(limit) {
break
}
}
}
return validPairs
Expand Down
Loading