diff --git a/executor/executor_test.go b/executor/executor_test.go index 04099b2508455..8b1ff4fecdca2 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -42,6 +42,7 @@ import ( "github.com/pingcap/tidb/executor" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/planner" plannercore "github.com/pingcap/tidb/planner/core" @@ -226,9 +227,9 @@ func (s *testSuite) TestAdmin(c *C) { result.Check(testkit.Rows()) result = tk.MustQuery(`admin show ddl job queries 1, 2, 3, 4`) result.Check(testkit.Rows()) - historyJob, err := admin.GetHistoryDDLJobs(txn, admin.DefNumHistoryJobs) - result = tk.MustQuery(fmt.Sprintf("admin show ddl job queries %d", historyJob[0].ID)) - result.Check(testkit.Rows(historyJob[0].Query)) + historyJobs, err = admin.GetHistoryDDLJobs(txn, admin.DefNumHistoryJobs) + result = tk.MustQuery(fmt.Sprintf("admin show ddl job queries %d", historyJobs[0].ID)) + result.Check(testkit.Rows(historyJobs[0].Query)) c.Assert(err, IsNil) // check table test @@ -282,6 +283,22 @@ func (s *testSuite) TestAdmin(c *C) { tk.MustExec("ALTER TABLE t1 ADD COLUMN c4 bit(10) default 127;") tk.MustExec("ALTER TABLE t1 ADD INDEX idx3 (c4);") tk.MustExec("admin check table t1;") + + // Test for reverse scan get history ddl jobs when ddl history jobs queue has multiple regions. + txn, err = s.store.Begin() + c.Assert(err, IsNil) + historyJobs, err = admin.GetHistoryDDLJobs(txn, 20) + c.Assert(err, IsNil) + + // Split region for history ddl job queues. + m := meta.NewMeta(txn) + startKey := meta.DDLJobHistoryKey(m, 0) + endKey := meta.DDLJobHistoryKey(m, historyJobs[0].ID) + s.cluster.SplitKeys(s.mvccStore, startKey, endKey, int(historyJobs[0].ID/5)) + + historyJobs2, err := admin.GetHistoryDDLJobs(txn, 20) + c.Assert(err, IsNil) + c.Assert(historyJobs, DeepEquals, historyJobs2) } func (s *testSuite) fillData(tk *testkit.TestKit, table string) { diff --git a/meta/meta.go b/meta/meta.go index 09cc9c3a89a92..e5eead8b8bbd6 100644 --- a/meta/meta.go +++ b/meta/meta.go @@ -135,6 +135,11 @@ func (m *Meta) tableKey(tableID int64) []byte { return []byte(fmt.Sprintf("%s:%d", mTablePrefix, tableID)) } +// DDLJobHistoryKey is only used for testing. +func DDLJobHistoryKey(m *Meta, jobID int64) []byte { + return m.txn.EncodeHashDataKey(mDDLJobHistoryKey, m.jobIDKey(jobID)) +} + // GenAutoTableIDKeyValue generates meta key by dbID, tableID and corresponding value by autoID. func (m *Meta) GenAutoTableIDKeyValue(dbID, tableID, autoID int64) (key, value []byte) { dbKey := m.dbKey(dbID) @@ -637,10 +642,23 @@ func (m *Meta) GetAllHistoryDDLJobs() ([]*model.Job, error) { if err != nil { return nil, errors.Trace(err) } - jobs := make([]*model.Job, 0, len(pairs)) - for _, pair := range pairs { + return decodeAndSortJob(pairs) +} + +// GetLastNHistoryDDLJobs gets latest N history ddl jobs. +func (m *Meta) GetLastNHistoryDDLJobs(num int) ([]*model.Job, error) { + pairs, err := m.txn.HGetLastN(mDDLJobHistoryKey, num) + if err != nil { + return nil, errors.Trace(err) + } + return decodeAndSortJob(pairs) +} + +func decodeAndSortJob(jobPairs []structure.HashPair) ([]*model.Job, error) { + jobs := make([]*model.Job, 0, len(jobPairs)) + for _, pair := range jobPairs { job := &model.Job{} - err = job.Decode(pair.Value) + err := job.Decode(pair.Value) if err != nil { return nil, errors.Trace(err) } diff --git a/meta/meta_test.go b/meta/meta_test.go index 5e719680f5e7a..2d0ef05f6605b 100644 --- a/meta/meta_test.go +++ b/meta/meta_test.go @@ -237,6 +237,10 @@ func (s *testSuite) TestMeta(c *C) { err = txn.Commit(context.Background()) c.Assert(err, IsNil) + + // Test for DDLJobHistoryKey. + key = meta.DDLJobHistoryKey(t, 888) + c.Assert(key, DeepEquals, []byte{0x6d, 0x44, 0x44, 0x4c, 0x4a, 0x6f, 0x62, 0x48, 0x69, 0xff, 0x73, 0x74, 0x6f, 0x72, 0x79, 0x0, 0x0, 0x0, 0xfc, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x68, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x3, 0x78, 0xff, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf7}) } func (s *testSuite) TestSnapshot(c *C) { @@ -356,6 +360,13 @@ func (s *testSuite) TestDDL(c *C) { lastID = job.ID } + // Test for get last N history ddl jobs. + historyJobs, err := t.GetLastNHistoryDDLJobs(2) + c.Assert(err, IsNil) + c.Assert(len(historyJobs), Equals, 2) + c.Assert(historyJobs[0].ID == 123, IsTrue) + c.Assert(historyJobs[1].ID == 1234, IsTrue) + // Test GetAllDDLJobsInQueue. err = t.EnQueueDDLJob(job) c.Assert(err, IsNil) diff --git a/store/mockstore/mocktikv/cluster.go b/store/mockstore/mocktikv/cluster.go index 014e61042f5c8..7e6c0270ebc4f 100644 --- a/store/mockstore/mocktikv/cluster.go +++ b/store/mockstore/mocktikv/cluster.go @@ -22,6 +22,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/tablecodec" ) @@ -340,6 +341,12 @@ func (c *Cluster) SplitIndex(mvccStore MVCCStore, tableID, indexID int64, count c.splitRange(mvccStore, NewMvccKey(indexStart), NewMvccKey(indexEnd), count) } +// SplitKeys evenly splits the start, end key into "count" regions. +// Only works for single store. +func (c *Cluster) SplitKeys(mvccStore MVCCStore, start, end kv.Key, count int) { + c.splitRange(mvccStore, NewMvccKey(start), NewMvccKey(end), count) +} + func (c *Cluster) splitRange(mvccStore MVCCStore, start, end MvccKey, count int) { c.Lock() defer c.Unlock() diff --git a/store/mockstore/mocktikv/rpc.go b/store/mockstore/mocktikv/rpc.go index c7cdeee95c33d..3cf3aa3e062db 100644 --- a/store/mockstore/mocktikv/rpc.go +++ b/store/mockstore/mocktikv/rpc.go @@ -245,14 +245,32 @@ func (h *rpcHandler) handleKvGet(req *kvrpcpb.GetRequest) *kvrpcpb.GetResponse { } func (h *rpcHandler) handleKvScan(req *kvrpcpb.ScanRequest) *kvrpcpb.ScanResponse { - if !h.checkKeyInRegion(req.GetStartKey()) { - panic("KvScan: startKey not in region") - } endKey := MvccKey(h.endKey).Raw() - if len(req.EndKey) > 0 && (len(endKey) == 0 || bytes.Compare(NewMvccKey(req.EndKey), h.endKey) < 0) { - endKey = req.EndKey + var pairs []Pair + if !req.Reverse { + if !h.checkKeyInRegion(req.GetStartKey()) { + panic("KvScan: startKey not in region") + } + if len(req.EndKey) > 0 && (len(endKey) == 0 || bytes.Compare(NewMvccKey(req.EndKey), h.endKey) < 0) { + endKey = req.EndKey + } + pairs = h.mvccStore.Scan(req.GetStartKey(), endKey, int(req.GetLimit()), req.GetVersion(), h.isolationLevel) + } else { + // TiKV use range [end_key, start_key) for reverse scan. + // Should use the req.EndKey to check in region. + if !h.checkKeyInRegion(req.GetEndKey()) { + panic("KvScan: startKey not in region") + } + + // TiKV use range [end_key, start_key) for reverse scan. + // So the req.StartKey actually is the end_key. + if len(req.StartKey) > 0 && (len(endKey) == 0 || bytes.Compare(NewMvccKey(req.StartKey), h.endKey) < 0) { + endKey = req.StartKey + } + + pairs = h.mvccStore.ReverseScan(req.EndKey, endKey, int(req.GetLimit()), req.GetVersion(), h.isolationLevel) } - pairs := h.mvccStore.Scan(req.GetStartKey(), endKey, int(req.GetLimit()), req.GetVersion(), h.isolationLevel) + return &kvrpcpb.ScanResponse{ Pairs: convertToPbPairs(pairs), } diff --git a/store/tikv/scan.go b/store/tikv/scan.go index dd56a1f7ed5a4..4ce5979270c02 100644 --- a/store/tikv/scan.go +++ b/store/tikv/scan.go @@ -35,9 +35,13 @@ type Scanner struct { nextStartKey []byte endKey []byte eof bool + + // Use for reverse scan. + reverse bool + nextEndKey []byte } -func newScanner(snapshot *tikvSnapshot, startKey []byte, endKey []byte, batchSize int) (*Scanner, error) { +func newScanner(snapshot *tikvSnapshot, startKey []byte, endKey []byte, batchSize int, reverse bool) (*Scanner, error) { // It must be > 1. Otherwise scanner won't skipFirst. if batchSize <= 1 { batchSize = scanBatchSize @@ -48,6 +52,8 @@ func newScanner(snapshot *tikvSnapshot, startKey []byte, endKey []byte, batchSiz valid: true, nextStartKey: startKey, endKey: endKey, + reverse: reverse, + nextEndKey: endKey, } err := scanner.Next() if kv.IsErrNotFound(err) { @@ -83,6 +89,7 @@ func (s *Scanner) Next() error { if !s.valid { return errors.New("scanner iterator is invalid") } + var err error for { s.idx++ if s.idx >= len(s.cache) { @@ -90,7 +97,7 @@ func (s *Scanner) Next() error { s.Close() return nil } - err := s.getData(bo) + err = s.getData(bo) if err != nil { s.Close() return errors.Trace(err) @@ -101,7 +108,8 @@ func (s *Scanner) Next() error { } current := s.cache[s.idx] - if len(s.endKey) > 0 && kv.Key(current.Key).Cmp(kv.Key(s.endKey)) >= 0 { + if (!s.reverse && (len(s.endKey) > 0 && kv.Key(current.Key).Cmp(kv.Key(s.endKey)) >= 0)) || + (s.reverse && len(s.nextStartKey) > 0 && kv.Key(current.Key).Cmp(kv.Key(s.nextStartKey)) < 0) { s.eof = true s.Close() return nil @@ -147,18 +155,34 @@ func (s *Scanner) resolveCurrentLock(bo *Backoffer, current *pb.KvPair) error { func (s *Scanner) getData(bo *Backoffer) error { logutil.Logger(context.Background()).Debug("txn getData", zap.Binary("nextStartKey", s.nextStartKey), + zap.Binary("nextEndKey", s.nextEndKey), + zap.Bool("reverse", s.reverse), zap.Uint64("txnStartTS", s.startTS())) sender := NewRegionRequestSender(s.snapshot.store.regionCache, s.snapshot.store.client) - + var reqEndKey, reqStartKey []byte + var loc *KeyLocation + var err error for { - loc, err := s.snapshot.store.regionCache.LocateKey(bo, s.nextStartKey) + if !s.reverse { + loc, err = s.snapshot.store.regionCache.LocateKey(bo, s.nextStartKey) + } else { + loc, err = s.snapshot.store.regionCache.LocateEndKey(bo, s.nextEndKey) + } if err != nil { return errors.Trace(err) } - reqEndKey := s.endKey - if len(reqEndKey) > 0 && len(loc.EndKey) > 0 && bytes.Compare(loc.EndKey, reqEndKey) < 0 { - reqEndKey = loc.EndKey + if !s.reverse { + reqEndKey = s.endKey + if len(reqEndKey) > 0 && len(loc.EndKey) > 0 && bytes.Compare(loc.EndKey, reqEndKey) < 0 { + reqEndKey = loc.EndKey + } + } else { + reqStartKey = s.nextStartKey + if len(reqStartKey) == 0 || + (len(loc.StartKey) > 0 && bytes.Compare(loc.StartKey, reqStartKey) > 0) { + reqStartKey = loc.StartKey + } } req := &tikvrpc.Request{ @@ -175,6 +199,11 @@ func (s *Scanner) getData(bo *Backoffer) error { NotFillCache: s.snapshot.notFillCache, }, } + if s.reverse { + req.Scan.StartKey = s.nextEndKey + req.Scan.EndKey = reqStartKey + req.Scan.Reverse = true + } resp, err := sender.SendReq(bo, req, loc.Region, ReadTimeoutMedium) if err != nil { return errors.Trace(err) @@ -218,8 +247,13 @@ func (s *Scanner) getData(bo *Backoffer) error { if len(kvPairs) < s.batchSize { // No more data in current Region. Next getData() starts // from current Region's endKey. - s.nextStartKey = loc.EndKey - if len(loc.EndKey) == 0 || (len(s.endKey) > 0 && kv.Key(s.nextStartKey).Cmp(kv.Key(s.endKey)) >= 0) { + if !s.reverse { + s.nextStartKey = loc.EndKey + } else { + s.nextEndKey = reqStartKey + } + if (!s.reverse && (len(loc.EndKey) == 0 || (len(s.endKey) > 0 && kv.Key(s.nextStartKey).Cmp(kv.Key(s.endKey)) >= 0))) || + (s.reverse && (len(loc.StartKey) == 0 || (len(s.nextStartKey) > 0 && kv.Key(s.nextStartKey).Cmp(kv.Key(s.nextEndKey)) >= 0))) { // Current Region is the last one. s.eof = true } @@ -230,7 +264,11 @@ func (s *Scanner) getData(bo *Backoffer) error { // may get an empty response if the Region in fact does not have // more data. lastKey := kvPairs[len(kvPairs)-1].GetKey() - s.nextStartKey = kv.Key(lastKey).Next() + if !s.reverse { + s.nextStartKey = kv.Key(lastKey).Next() + } else { + s.nextEndKey = kv.Key(lastKey) + } return nil } } diff --git a/store/tikv/scan_mock_test.go b/store/tikv/scan_mock_test.go index 4cf09c50c6abb..204bcc95783d9 100644 --- a/store/tikv/scan_mock_test.go +++ b/store/tikv/scan_mock_test.go @@ -42,7 +42,7 @@ func (s *testScanMockSuite) TestScanMultipleRegions(c *C) { txn, err = store.Begin() c.Assert(err, IsNil) snapshot := newTiKVSnapshot(store, kv.Version{Ver: txn.StartTS()}) - scanner, err := newScanner(snapshot, []byte("a"), nil, 10) + scanner, err := newScanner(snapshot, []byte("a"), nil, 10, false) c.Assert(err, IsNil) for ch := byte('a'); ch <= byte('z'); ch++ { c.Assert([]byte{ch}, BytesEquals, []byte(scanner.Key())) @@ -50,7 +50,7 @@ func (s *testScanMockSuite) TestScanMultipleRegions(c *C) { } c.Assert(scanner.Valid(), IsFalse) - scanner, err = newScanner(snapshot, []byte("a"), []byte("i"), 10) + scanner, err = newScanner(snapshot, []byte("a"), []byte("i"), 10, false) c.Assert(err, IsNil) for ch := byte('a'); ch <= byte('h'); ch++ { c.Assert([]byte{ch}, BytesEquals, []byte(scanner.Key())) @@ -58,3 +58,36 @@ func (s *testScanMockSuite) TestScanMultipleRegions(c *C) { } c.Assert(scanner.Valid(), IsFalse) } + +func (s *testScanMockSuite) TestReverseScan(c *C) { + store := NewTestStore(c).(*tikvStore) + defer store.Close() + + txn, err := store.Begin() + c.Assert(err, IsNil) + for ch := byte('a'); ch <= byte('z'); ch++ { + err = txn.Set([]byte{ch}, []byte{ch}) + c.Assert(err, IsNil) + } + err = txn.Commit(context.Background()) + c.Assert(err, IsNil) + + txn, err = store.Begin() + c.Assert(err, IsNil) + snapshot := newTiKVSnapshot(store, kv.Version{Ver: txn.StartTS()}) + scanner, err := newScanner(snapshot, nil, []byte("z"), 10, true) + c.Assert(err, IsNil) + for ch := byte('y'); ch >= byte('a'); ch-- { + c.Assert(string([]byte{ch}), Equals, string([]byte(scanner.Key()))) + c.Assert(scanner.Next(), IsNil) + } + c.Assert(scanner.Valid(), IsFalse) + + scanner, err = newScanner(snapshot, []byte("a"), []byte("i"), 10, true) + c.Assert(err, IsNil) + for ch := byte('h'); ch >= byte('a'); ch-- { + c.Assert(string([]byte{ch}), Equals, string([]byte(scanner.Key()))) + c.Assert(scanner.Next(), IsNil) + } + c.Assert(scanner.Valid(), IsFalse) +} diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go index a73b866255850..08df06751ac66 100644 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -295,13 +295,14 @@ func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) { // Iter return a list of key-value pair after `k`. func (s *tikvSnapshot) Iter(k kv.Key, upperBound kv.Key) (kv.Iterator, error) { - scanner, err := newScanner(s, k, upperBound, scanBatchSize) + scanner, err := newScanner(s, k, upperBound, scanBatchSize, false) return scanner, errors.Trace(err) } // IterReverse creates a reversed Iterator positioned on the first entry which key is less than k. func (s *tikvSnapshot) IterReverse(k kv.Key) (kv.Iterator, error) { - return nil, kv.ErrNotImplemented + scanner, err := newScanner(s, nil, k, scanBatchSize, true) + return scanner, errors.Trace(err) } func extractLockFromKeyErr(keyErr *pb.KeyError) (*Lock, error) { diff --git a/store/tikv/split_region.go b/store/tikv/split_region.go index b9c8a6648b0dc..4af6830605e97 100644 --- a/store/tikv/split_region.go +++ b/store/tikv/split_region.go @@ -115,7 +115,7 @@ func (s *tikvStore) WaitScatterRegionFinish(regionID uint64) error { if logFreq%10 == 0 { logutil.Logger(context.Background()).Info("wait scatter region", zap.Uint64("regionID", regionID), - zap.String("desc", string(resp.Desc)), + zap.String("reverse", string(resp.Desc)), zap.String("status", pdpb.OperatorStatus_name[int32(resp.Status)])) } logFreq++ diff --git a/structure/hash.go b/structure/hash.go index 3249884a0b6fc..ddad8d69d0344 100644 --- a/structure/hash.go +++ b/structure/hash.go @@ -216,6 +216,23 @@ func (t *TxStructure) HGetAll(key []byte) ([]HashPair, error) { return res, errors.Trace(err) } +// HGetLastN gets latest N fields and values in hash. +func (t *TxStructure) HGetLastN(key []byte, num int) ([]HashPair, error) { + res := make([]HashPair, 0, num) + err := t.iterReverseHash(key, func(field []byte, value []byte) (bool, error) { + pair := HashPair{ + Field: append([]byte{}, field...), + Value: append([]byte{}, value...), + } + res = append(res, pair) + if len(res) >= num { + return false, nil + } + return true, nil + }) + return res, errors.Trace(err) +} + // HClear removes the hash value of the key. func (t *TxStructure) HClear(key []byte) error { metaKey := t.encodeHashMetaKey(key) @@ -268,6 +285,37 @@ func (t *TxStructure) iterateHash(key []byte, fn func(k []byte, v []byte) error) return nil } +func (t *TxStructure) iterReverseHash(key []byte, fn func(k []byte, v []byte) (bool, error)) error { + dataPrefix := t.hashDataKeyPrefix(key) + it, err := t.reader.IterReverse(dataPrefix.PrefixNext()) + if err != nil { + return errors.Trace(err) + } + + var field []byte + for it.Valid() { + if !it.Key().HasPrefix(dataPrefix) { + break + } + + _, field, err = t.decodeHashDataKey(it.Key()) + if err != nil { + return errors.Trace(err) + } + + more, err := fn(field, it.Value()) + if !more || err != nil { + return errors.Trace(err) + } + + err = it.Next() + if err != nil { + return errors.Trace(err) + } + } + return nil +} + func (t *TxStructure) loadHashMeta(metaKey []byte) (hashMeta, error) { v, err := t.reader.Get(metaKey) if kv.ErrNotExist.Equal(err) { diff --git a/structure/structure_test.go b/structure/structure_test.go index 5ecab9f75c3a9..e6e55fcf5dfef 100644 --- a/structure/structure_test.go +++ b/structure/structure_test.go @@ -244,6 +244,17 @@ func (s *testTxStructureSuite) TestHash(c *C) { {Field: []byte("1"), Value: []byte("1")}, {Field: []byte("2"), Value: []byte("2")}}) + res, err = tx.HGetLastN(key, 1) + c.Assert(err, IsNil) + c.Assert(res, DeepEquals, []structure.HashPair{ + {Field: []byte("2"), Value: []byte("2")}}) + + res, err = tx.HGetLastN(key, 2) + c.Assert(err, IsNil) + c.Assert(res, DeepEquals, []structure.HashPair{ + {Field: []byte("2"), Value: []byte("2")}, + {Field: []byte("1"), Value: []byte("1")}}) + err = tx.HDel(key, []byte("1")) c.Assert(err, IsNil) diff --git a/structure/type.go b/structure/type.go index 89759269871c9..7096d70e86984 100644 --- a/structure/type.go +++ b/structure/type.go @@ -63,6 +63,11 @@ func (t *TxStructure) encodeHashDataKey(key []byte, field []byte) kv.Key { return codec.EncodeBytes(ek, field) } +// EncodeHashDataKey exports for tests. +func (t *TxStructure) EncodeHashDataKey(key []byte, field []byte) kv.Key { + return t.encodeHashDataKey(key, field) +} + func (t *TxStructure) decodeHashDataKey(ek kv.Key) ([]byte, []byte, error) { var ( key []byte diff --git a/util/admin/admin.go b/util/admin/admin.go index 1d29b768da353..facf632b6b828 100644 --- a/util/admin/admin.go +++ b/util/admin/admin.go @@ -227,7 +227,7 @@ const DefNumHistoryJobs = 10 // The maximum count of history jobs is num. func GetHistoryDDLJobs(txn kv.Transaction, maxNumJobs int) ([]*model.Job, error) { t := meta.NewMeta(txn) - jobs, err := t.GetAllHistoryDDLJobs() + jobs, err := t.GetLastNHistoryDDLJobs(maxNumJobs) if err != nil { return nil, errors.Trace(err) }