Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: implement IterReverse for tikvSnapshot and use desc scan to get latest N ddl history jobs. #10152

Merged
merged 31 commits into from
May 31, 2019
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
1ffab4d
add reverse scan for mocktikv
crazycs520 Mar 13, 2019
f05174b
Merge branch 'master' of https://github.com/pingcap/tidb into reverse…
crazycs520 Apr 15, 2019
3b26e64
todo: fix tikv reverse
crazycs520 Apr 15, 2019
971c2fa
fix mockTiKV handle rpc reverse scan bug
crazycs520 Apr 15, 2019
eefc6ce
refactor code and remove desc_scan.go file
crazycs520 Apr 15, 2019
3b7cda0
merge desc_scan and scan function
crazycs520 Apr 15, 2019
d52ba8e
refine code
crazycs520 Apr 15, 2019
26e8806
remove blank line
crazycs520 Apr 15, 2019
1369842
Merge branch 'master' into reverse-kv-scan
crazycs520 Apr 15, 2019
80672d8
add comment
crazycs520 Apr 15, 2019
3c4a6b5
Merge branch 'reverse-kv-scan' of https://github.com/crazycs520/tidb …
crazycs520 Apr 15, 2019
a3583dd
Merge branch 'master' of https://github.com/pingcap/tidb into reverse…
crazycs520 May 24, 2019
7a54a84
use LocateEndKey to located the region end with the endKey
crazycs520 May 24, 2019
d2dcfe4
address comment and add test
crazycs520 May 27, 2019
1b992df
Merge branch 'master' into reverse-kv-scan
crazycs520 May 27, 2019
341617e
test for history ddl job with multiple region
crazycs520 May 27, 2019
b7b62d4
fix test
crazycs520 May 28, 2019
ce3d64b
refine comment
crazycs520 May 28, 2019
ff0cee9
rename test
crazycs520 May 28, 2019
04f4c1a
Merge branch 'master' into reverse-kv-scan
crazycs520 May 28, 2019
87482e1
Update meta/meta.go
crazycs520 May 28, 2019
aef7e7c
add test in meta_test
crazycs520 May 28, 2019
363de0e
fix test
crazycs520 May 28, 2019
d4a3305
fix test
crazycs520 May 30, 2019
20ca3f5
Merge branch 'master' of https://github.com/pingcap/tidb into reverse…
crazycs520 May 31, 2019
dd1653b
refine code:
crazycs520 May 31, 2019
b5aaaff
refine test
crazycs520 May 31, 2019
be97fe1
address comment
crazycs520 May 31, 2019
b2554c1
refine code
crazycs520 May 31, 2019
b2de4c3
Merge branch 'master' into reverse-kv-scan
crazycs520 May 31, 2019
dc22826
Merge branch 'master' into reverse-kv-scan
crazycs520 May 31, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/planner"
plannercore "github.com/pingcap/tidb/planner/core"
Expand Down Expand Up @@ -226,9 +227,9 @@ func (s *testSuite) TestAdmin(c *C) {
result.Check(testkit.Rows())
result = tk.MustQuery(`admin show ddl job queries 1, 2, 3, 4`)
result.Check(testkit.Rows())
historyJob, err := admin.GetHistoryDDLJobs(txn, admin.DefNumHistoryJobs)
result = tk.MustQuery(fmt.Sprintf("admin show ddl job queries %d", historyJob[0].ID))
result.Check(testkit.Rows(historyJob[0].Query))
historyJobs, err = admin.GetHistoryDDLJobs(txn, admin.DefNumHistoryJobs)
result = tk.MustQuery(fmt.Sprintf("admin show ddl job queries %d", historyJobs[0].ID))
result.Check(testkit.Rows(historyJobs[0].Query))
c.Assert(err, IsNil)

// check table test
Expand Down Expand Up @@ -282,6 +283,22 @@ func (s *testSuite) TestAdmin(c *C) {
tk.MustExec("ALTER TABLE t1 ADD COLUMN c4 bit(10) default 127;")
tk.MustExec("ALTER TABLE t1 ADD INDEX idx3 (c4);")
tk.MustExec("admin check table t1;")

// Test for reverse scan get history ddl jobs when ddl history jobs queue has multiple regions.
txn, err = s.store.Begin()
c.Assert(err, IsNil)
historyJobs, err = admin.GetHistoryDDLJobs(txn, 20)
c.Assert(err, IsNil)

// Split history ddl job queues.
m := meta.NewMeta(txn)
startKey := meta.DDLJobHistoryKey(m, 0)
endKey := meta.DDLJobHistoryKey(m, historyJobs[0].ID)
s.cluster.SplitKeys(s.mvccStore, startKey, endKey, int(historyJobs[0].ID/2))

historyJobs2, err := admin.GetHistoryDDLJobs(txn, 20)
c.Assert(err, IsNil)
c.Assert(historyJobs, DeepEquals, historyJobs2)
}

func (s *testSuite) fillData(tk *testkit.TestKit, table string) {
Expand Down
24 changes: 21 additions & 3 deletions meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ func (m *Meta) tableKey(tableID int64) []byte {
return []byte(fmt.Sprintf("%s:%d", mTablePrefix, tableID))
}

// DDLJobHistoryKey is only used for testing.
func DDLJobHistoryKey(m *Meta, jobID int64) []byte {
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
return m.txn.EncodeHashDataKey(mDDLJobHistoryKey, m.jobIDKey(jobID))
}

// GenAutoTableIDKeyValue generates meta key by dbID, tableID and corresponding value by autoID.
func (m *Meta) GenAutoTableIDKeyValue(dbID, tableID, autoID int64) (key, value []byte) {
dbKey := m.dbKey(dbID)
Expand Down Expand Up @@ -637,10 +642,23 @@ func (m *Meta) GetAllHistoryDDLJobs() ([]*model.Job, error) {
if err != nil {
return nil, errors.Trace(err)
}
jobs := make([]*model.Job, 0, len(pairs))
for _, pair := range pairs {
return decodeAndSortJob(pairs)
}

// GetLastNHistoryDDLJobs gets latest N history ddl jobs.
func (m *Meta) GetLastNHistoryDDLJobs(num int) ([]*model.Job, error) {
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
pairs, err := m.txn.HGetLastN(mDDLJobHistoryKey, num)
if err != nil {
return nil, errors.Trace(err)
}
return decodeAndSortJob(pairs)
}

func decodeAndSortJob(jobPairs []structure.HashPair) ([]*model.Job, error) {
jobs := make([]*model.Job, 0, len(jobPairs))
for _, pair := range jobPairs {
job := &model.Job{}
err = job.Decode(pair.Value)
err := job.Decode(pair.Value)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
11 changes: 11 additions & 0 deletions meta/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,10 @@ func (s *testSuite) TestMeta(c *C) {

err = txn.Commit(context.Background())
c.Assert(err, IsNil)

// Test for DDLJobHistoryKey.
key = meta.DDLJobHistoryKey(t, 888)
c.Assert(key, DeepEquals, []byte{0x6d, 0x44, 0x44, 0x4c, 0x4a, 0x6f, 0x62, 0x48, 0x69, 0xff, 0x73, 0x74, 0x6f, 0x72, 0x79, 0x0, 0x0, 0x0, 0xfc, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x68, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x3, 0x78, 0xff, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf7})
}

func (s *testSuite) TestSnapshot(c *C) {
Expand Down Expand Up @@ -356,6 +360,13 @@ func (s *testSuite) TestDDL(c *C) {
lastID = job.ID
}

// Test for get last N history ddl jobs.
historyJobs, err := t.GetLastNHistoryDDLJobs(2)
c.Assert(err, IsNil)
c.Assert(len(historyJobs), Equals, 2)
c.Assert(historyJobs[0].ID == 123, IsTrue)
c.Assert(historyJobs[1].ID == 1234, IsTrue)

// Test GetAllDDLJobsInQueue.
err = t.EnQueueDDLJob(job)
c.Assert(err, IsNil)
Expand Down
7 changes: 7 additions & 0 deletions store/mockstore/mocktikv/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/tablecodec"
)

Expand Down Expand Up @@ -340,6 +341,12 @@ func (c *Cluster) SplitIndex(mvccStore MVCCStore, tableID, indexID int64, count
c.splitRange(mvccStore, NewMvccKey(indexStart), NewMvccKey(indexEnd), count)
}

// SplitKeys evenly splits the start, end key into count regions.
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
// Only works for single store.
func (c *Cluster) SplitKeys(mvccStore MVCCStore, start, end kv.Key, count int) {
c.splitRange(mvccStore, NewMvccKey(start), NewMvccKey(end), count)
}

func (c *Cluster) splitRange(mvccStore MVCCStore, start, end MvccKey, count int) {
c.Lock()
defer c.Unlock()
Expand Down
28 changes: 22 additions & 6 deletions store/mockstore/mocktikv/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,14 +245,30 @@ func (h *rpcHandler) handleKvGet(req *kvrpcpb.GetRequest) *kvrpcpb.GetResponse {
}

func (h *rpcHandler) handleKvScan(req *kvrpcpb.ScanRequest) *kvrpcpb.ScanResponse {
if !h.checkKeyInRegion(req.GetStartKey()) {
panic("KvScan: startKey not in region")
}
endKey := h.endKey
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
if len(req.EndKey) > 0 && (len(endKey) == 0 || bytes.Compare(req.EndKey, endKey) < 0) {
endKey = req.EndKey
var pairs []Pair
if !req.Reverse {
if !h.checkKeyInRegion(req.GetStartKey()) {
panic("KvScan: startKey not in region")
}
if len(req.EndKey) > 0 && (len(endKey) == 0 || bytes.Compare(req.EndKey, endKey) < 0) {
endKey = req.EndKey
}
pairs = h.mvccStore.Scan(req.GetStartKey(), endKey, int(req.GetLimit()), req.GetVersion(), h.isolationLevel)
} else {
// TiKV use range [end_key, start_key) for reverse scan.
// Should use the req.EndKey to check in region.
if !h.checkKeyInRegion(req.GetEndKey()) {
panic("KvScan: startKey not in region")
}
// TiKV use range [end_key, start_key) for reverse scan.
// So the req.StartKey actually is the end_key.
if len(req.GetStartKey()) > 0 && (len(endKey) == 0 || bytes.Compare(req.GetStartKey(), endKey) < 0) {
endKey = req.GetStartKey()
}
pairs = h.mvccStore.ReverseScan(req.EndKey, endKey, int(req.GetLimit()), req.GetVersion(), h.isolationLevel)
}
pairs := h.mvccStore.Scan(req.GetStartKey(), endKey, int(req.GetLimit()), req.GetVersion(), h.isolationLevel)

return &kvrpcpb.ScanResponse{
Pairs: convertToPbPairs(pairs),
}
Expand Down
60 changes: 49 additions & 11 deletions store/tikv/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,13 @@ type Scanner struct {
nextStartKey []byte
endKey []byte
eof bool

// Use for reverse scan.
reverse bool
nextEndKey []byte
}

func newScanner(snapshot *tikvSnapshot, startKey []byte, endKey []byte, batchSize int) (*Scanner, error) {
func newScanner(snapshot *tikvSnapshot, startKey []byte, endKey []byte, batchSize int, reverse bool) (*Scanner, error) {
// It must be > 1. Otherwise scanner won't skipFirst.
if batchSize <= 1 {
batchSize = scanBatchSize
Expand All @@ -48,6 +52,8 @@ func newScanner(snapshot *tikvSnapshot, startKey []byte, endKey []byte, batchSiz
valid: true,
nextStartKey: startKey,
endKey: endKey,
reverse: reverse,
nextEndKey: endKey,
}
err := scanner.Next()
if kv.IsErrNotFound(err) {
Expand Down Expand Up @@ -83,14 +89,15 @@ func (s *Scanner) Next() error {
if !s.valid {
return errors.New("scanner iterator is invalid")
}
var err error
for {
s.idx++
if s.idx >= len(s.cache) {
if s.eof {
s.Close()
return nil
}
err := s.getData(bo)
err = s.getData(bo)
if err != nil {
s.Close()
return errors.Trace(err)
Expand All @@ -101,7 +108,8 @@ func (s *Scanner) Next() error {
}

current := s.cache[s.idx]
if len(s.endKey) > 0 && kv.Key(current.Key).Cmp(kv.Key(s.endKey)) >= 0 {
if (!s.reverse && (len(s.endKey) > 0 && kv.Key(current.Key).Cmp(kv.Key(s.endKey)) >= 0)) ||
(s.reverse && len(s.nextStartKey) > 0 && kv.Key(current.Key).Cmp(kv.Key(s.nextStartKey)) < 0) {
s.eof = true
s.Close()
return nil
Expand Down Expand Up @@ -147,18 +155,34 @@ func (s *Scanner) resolveCurrentLock(bo *Backoffer, current *pb.KvPair) error {
func (s *Scanner) getData(bo *Backoffer) error {
logutil.Logger(context.Background()).Debug("txn getData",
zap.Binary("nextStartKey", s.nextStartKey),
zap.Binary("nextEndKey", s.nextEndKey),
zap.Bool("reverse", s.reverse),
zap.Uint64("txnStartTS", s.startTS()))
sender := NewRegionRequestSender(s.snapshot.store.regionCache, s.snapshot.store.client)

var reqEndKey, reqStartKey []byte
var loc *KeyLocation
var err error
for {
loc, err := s.snapshot.store.regionCache.LocateKey(bo, s.nextStartKey)
if !s.reverse {
loc, err = s.snapshot.store.regionCache.LocateKey(bo, s.nextStartKey)
} else {
loc, err = s.snapshot.store.regionCache.LocateEndKey(bo, s.nextEndKey)
}
if err != nil {
return errors.Trace(err)
}

reqEndKey := s.endKey
if len(reqEndKey) > 0 && len(loc.EndKey) > 0 && bytes.Compare(loc.EndKey, reqEndKey) < 0 {
reqEndKey = loc.EndKey
if !s.reverse {
reqEndKey = s.endKey
if len(reqEndKey) > 0 && len(loc.EndKey) > 0 && bytes.Compare(loc.EndKey, reqEndKey) < 0 {
reqEndKey = loc.EndKey
}
} else {
reqStartKey = s.nextStartKey
if len(reqStartKey) == 0 ||
(len(loc.StartKey) > 0 && bytes.Compare(loc.StartKey, reqStartKey) > 0) {
reqStartKey = loc.StartKey
}
}

req := &tikvrpc.Request{
Expand All @@ -175,6 +199,11 @@ func (s *Scanner) getData(bo *Backoffer) error {
NotFillCache: s.snapshot.notFillCache,
},
}
if s.reverse {
req.Scan.StartKey = s.nextEndKey
req.Scan.EndKey = reqStartKey
req.Scan.Reverse = true
}
resp, err := sender.SendReq(bo, req, loc.Region, ReadTimeoutMedium)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -218,8 +247,13 @@ func (s *Scanner) getData(bo *Backoffer) error {
if len(kvPairs) < s.batchSize {
// No more data in current Region. Next getData() starts
// from current Region's endKey.
s.nextStartKey = loc.EndKey
if len(loc.EndKey) == 0 || (len(s.endKey) > 0 && kv.Key(s.nextStartKey).Cmp(kv.Key(s.endKey)) >= 0) {
if !s.reverse {
s.nextStartKey = loc.EndKey
} else {
s.nextEndKey = reqStartKey
}
if (!s.reverse && (len(loc.EndKey) == 0 || (len(s.endKey) > 0 && kv.Key(s.nextStartKey).Cmp(kv.Key(s.endKey)) >= 0))) ||
(s.reverse && (len(loc.StartKey) == 0 || (len(s.nextStartKey) > 0 && kv.Key(s.nextStartKey).Cmp(kv.Key(s.nextEndKey)) >= 0))) {
// Current Region is the last one.
s.eof = true
}
Expand All @@ -230,7 +264,11 @@ func (s *Scanner) getData(bo *Backoffer) error {
// may get an empty response if the Region in fact does not have
// more data.
lastKey := kvPairs[len(kvPairs)-1].GetKey()
s.nextStartKey = kv.Key(lastKey).Next()
if !s.reverse {
s.nextStartKey = kv.Key(lastKey).Next()
} else {
s.nextEndKey = kv.Key(lastKey)
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
}
return nil
}
}
37 changes: 35 additions & 2 deletions store/tikv/scan_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,52 @@ func (s *testScanMockSuite) TestScanMultipleRegions(c *C) {
txn, err = store.Begin()
c.Assert(err, IsNil)
snapshot := newTiKVSnapshot(store, kv.Version{Ver: txn.StartTS()})
scanner, err := newScanner(snapshot, []byte("a"), nil, 10)
scanner, err := newScanner(snapshot, []byte("a"), nil, 10, false)
c.Assert(err, IsNil)
for ch := byte('a'); ch <= byte('z'); ch++ {
c.Assert([]byte{ch}, BytesEquals, []byte(scanner.Key()))
c.Assert(scanner.Next(), IsNil)
}
c.Assert(scanner.Valid(), IsFalse)

scanner, err = newScanner(snapshot, []byte("a"), []byte("i"), 10)
scanner, err = newScanner(snapshot, []byte("a"), []byte("i"), 10, false)
c.Assert(err, IsNil)
for ch := byte('a'); ch <= byte('h'); ch++ {
c.Assert([]byte{ch}, BytesEquals, []byte(scanner.Key()))
c.Assert(scanner.Next(), IsNil)
}
c.Assert(scanner.Valid(), IsFalse)
}

func (s *testScanMockSuite) TestReverseScan(c *C) {
store := NewTestStore(c).(*tikvStore)
defer store.Close()

txn, err := store.Begin()
c.Assert(err, IsNil)
for ch := byte('a'); ch <= byte('z'); ch++ {
err = txn.Set([]byte{ch}, []byte{ch})
c.Assert(err, IsNil)
}
err = txn.Commit(context.Background())
c.Assert(err, IsNil)

txn, err = store.Begin()
c.Assert(err, IsNil)
snapshot := newTiKVSnapshot(store, kv.Version{Ver: txn.StartTS()})
scanner, err := newScanner(snapshot, nil, []byte("z"), 10, true)
c.Assert(err, IsNil)
for ch := byte('y'); ch >= byte('a'); ch-- {
c.Assert(string([]byte{ch}), Equals, string([]byte(scanner.Key())))
c.Assert(scanner.Next(), IsNil)
}
c.Assert(scanner.Valid(), IsFalse)

scanner, err = newScanner(snapshot, []byte("a"), []byte("i"), 10, true)
c.Assert(err, IsNil)
for ch := byte('h'); ch >= byte('a'); ch-- {
c.Assert(string([]byte{ch}), Equals, string([]byte(scanner.Key())))
c.Assert(scanner.Next(), IsNil)
}
c.Assert(scanner.Valid(), IsFalse)
}
5 changes: 3 additions & 2 deletions store/tikv/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,13 +295,14 @@ func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) {

// Iter return a list of key-value pair after `k`.
func (s *tikvSnapshot) Iter(k kv.Key, upperBound kv.Key) (kv.Iterator, error) {
scanner, err := newScanner(s, k, upperBound, scanBatchSize)
scanner, err := newScanner(s, k, upperBound, scanBatchSize, false)
return scanner, errors.Trace(err)
}

// IterReverse creates a reversed Iterator positioned on the first entry which key is less than k.
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
func (s *tikvSnapshot) IterReverse(k kv.Key) (kv.Iterator, error) {
return nil, kv.ErrNotImplemented
scanner, err := newScanner(s, nil, k, scanBatchSize, true)
return scanner, errors.Trace(err)
}

func extractLockFromKeyErr(keyErr *pb.KeyError) (*Lock, error) {
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/split_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (s *tikvStore) WaitScatterRegionFinish(regionID uint64) error {
if logFreq%10 == 0 {
logutil.Logger(context.Background()).Info("wait scatter region",
zap.Uint64("regionID", regionID),
zap.String("desc", string(resp.Desc)),
zap.String("reverse", string(resp.Desc)),
zap.String("status", pdpb.OperatorStatus_name[int32(resp.Status)]))
}
logFreq++
Expand Down
Loading