diff --git a/CHANGELOG.md b/CHANGELOG.md index 55621b37ecee2..a510f9dd5e47e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,58 @@ # TiDB Changelog All notable changes to this project will be documented in this file. See also [Release Notes](https://github.com/pingcap/docs/blob/master/releases/rn.md), [TiKV Changelog](https://github.com/tikv/tikv/blob/master/CHANGELOG.md) and [PD Changelog](https://github.com/pingcap/pd/blob/master/CHANGELOG.md). +## [3.0.0-rc.2] 2019-05-28 +### SQL Optimizer +* Support Index Join in more scenarios +[#10540](https://github.com/pingcap/tidb/pull/10540) +* Support exporting historical statistics [#10291](https://github.com/pingcap/tidb/pull/10291) +* Support the incremental `Analyze` operation on monotonically increasing index columns +[#10355](https://github.com/pingcap/tidb/pull/10355) +* Neglect the NULL value in the `Order By` clause [#10488](https://github.com/pingcap/tidb/pull/10488) +* Fix the wrong schema information calculation of the `UnionAll` logical operator when simplifying the column information [#10384](https://github.com/pingcap/tidb/pull/10384) +* Avoid modifying the original expression when pushing down the `Not` operator [#10363](https://github.com/pingcap/tidb/pull/10363/files) +* Support the `dump`/`load` correlation of histograms [#10573](https://github.com/pingcap/tidb/pull/10573) +### Execution Engine +* Handle virtual columns with a unique index properly when fetching duplicate rows in `batchChecker` [#10370](https://github.com/pingcap/tidb/pull/10370) +* Fix the scanning range calculation issue for the `CHAR` column [#10124](https://github.com/pingcap/tidb/pull/10124) +* Fix the issue of `PointGet` incorrectly processing negative numbers [#10113](https://github.com/pingcap/tidb/pull/10113) +* Merge `Window` functions with the same name to improve execution efficiency [#9866](https://github.com/pingcap/tidb/pull/9866) +* Allow the `RANGE` frame in a `Window` function to contain no `OrderBy` clause [#10496](https://github.com/pingcap/tidb/pull/10496) + +### Server +Fix the issue that TiDB continuously creates a new connection to TiKV when a fault occurs in TiKV [#10301](https://github.com/pingcap/tidb/pull/10301) +Make `tidb_disable_txn_auto_retry` affect all retryable errors instead of only write conflict errors [#10339](https://github.com/pingcap/tidb/pull/10339) +Allow DDL statements without parameters to be executed using `prepare`/`execute` [#10144](https://github.com/pingcap/tidb/pull/10144) +Add the `tidb_back_off_weight` variable to control the backoff time [#10266](https://github.com/pingcap/tidb/pull/10266) +Prohibit TiDB retrying non-automatically committed transactions in default conditions by setting the default value of `tidb_disable_txn_auto_retry` to `on` [#10266](https://github.com/pingcap/tidb/pull/10266) +Fix the database privilege judgment of `role` in `RBAC` [#10261](https://github.com/pingcap/tidb/pull/10261) +Support the pessimistic transaction model (experimental) [#10297](https://github.com/pingcap/tidb/pull/10297) +Reduce the wait time for handling lock conflicts in some cases [#10006](https://github.com/pingcap/tidb/pull/10006) +Make the Region cache able to visit follower nodes when a fault occurs in the leader node [#10256](https://github.com/pingcap/tidb/pull/10256) +Add the `tidb_low_resolution_tso` variable to control the number of TSOs obtained in batches and reduce the times of transactions obtaining TSO to adapt for scenarios where data consistency is not so strictly required [#10428](https://github.com/pingcap/tidb/pull/10428) + +### DDL +Fix the uppercase issue of the charset name in the storage of the old version of TiDB +[#10272](https://github.com/pingcap/tidb/pull/10272) +Support `preSplit` of table partition, which pre-allocates table Regions when creating a table to avoid write hotspots after the table is created +[#10221](https://github.com/pingcap/tidb/pull/10221) +Fix the issue that TiDB incorrectly updates the version information in PD in some cases [#10324](https://github.com/pingcap/tidb/pull/10324) +Support modifying the charset and collation using the `ALTER DATABASE` statement +[#10393](https://github.com/pingcap/tidb/pull/10393) +Support splitting Regions based on the index and range of the specified table to relieve hotspot issues +[#10203](https://github.com/pingcap/tidb/pull/10203) +Prohibit modifying the precision of the decimal column using the `alter table` statement +[#10433](https://github.com/pingcap/tidb/pull/10433) +Fix the restriction for expressions and functions in hash partition +[#10273](https://github.com/pingcap/tidb/pull/10273) +Fix the issue that adding indexes in a table that contains partitions will in some cases cause TiDB panic +[#10475](https://github.com/pingcap/tidb/pull/10475) +Validate table information before executing the DDL to avoid invalid table schemas +[#10464](https://github.com/pingcap/tidb/pull/10464) +Enable hash partition by default; and enable range columns partition when there is only one column in the partition definition +[#9936](https://github.com/pingcap/tidb/pull/9936) + + ## [3.0.0-rc.1] 2019-05-10 ### SQL Optimizer diff --git a/ddl/ddl.go b/ddl/ddl.go index e23c3168ae089..957dadf8af88f 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -479,7 +479,7 @@ func (d *ddl) close() { d.delRangeMgr.clear() } - logutil.Logger(ddlLogCtx).Info("[ddl] closing DDL", zap.String("ID", d.uuid), zap.Duration("takeTime", time.Since(startTime))) + logutil.Logger(ddlLogCtx).Info("[ddl] DDL closed", zap.String("ID", d.uuid), zap.Duration("take time", time.Since(startTime))) } // GetLease implements DDL.GetLease interface. diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index ff543a280cfdb..2151d9efebb78 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -103,9 +103,10 @@ func (w *worker) String() string { } func (w *worker) close() { + startTime := time.Now() close(w.quitCh) w.wg.Wait() - logutil.Logger(w.logCtx).Info("[ddl] close DDL worker") + logutil.Logger(w.logCtx).Info("[ddl] DDL worker closed", zap.Duration("take time", time.Since(startTime))) } // start is used for async online schema changing, it will try to become the owner firstly, @@ -637,7 +638,10 @@ func (w *worker) waitSchemaChanged(ctx context.Context, d *ddlCtx, waitTime time return } } - logutil.Logger(w.logCtx).Info("[ddl] wait latest schema version changed", zap.Int64("ver", latestSchemaVersion), zap.Duration("takeTime", time.Since(timeStart)), zap.String("job", job.String())) + logutil.Logger(w.logCtx).Info("[ddl] wait latest schema version changed", + zap.Int64("ver", latestSchemaVersion), + zap.Duration("take time", time.Since(timeStart)), + zap.String("job", job.String())) } // waitSchemaSynced handles the following situation: diff --git a/domain/domain.go b/domain/domain.go index 8708162e6653d..8fb37b80b8945 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -535,6 +535,7 @@ func (do *Domain) mustReload() (exitLoop bool) { // Close closes the Domain and release its resource. func (do *Domain) Close() { + startTime := time.Now() if do.ddl != nil { terror.Log(do.ddl.Stop()) } @@ -548,7 +549,7 @@ func (do *Domain) Close() { do.sysSessionPool.Close() do.slowQuery.Close() do.wg.Wait() - logutil.Logger(context.Background()).Info("domain closed") + logutil.Logger(context.Background()).Info("domain closed", zap.Duration("take time", time.Since(startTime))) } type ddlCallback struct { diff --git a/executor/aggregate.go b/executor/aggregate.go index fde12cb6bab8f..e8fa560797dab 100644 --- a/executor/aggregate.go +++ b/executor/aggregate.go @@ -789,6 +789,7 @@ func (e *StreamAggExec) Open(ctx context.Context) error { // Close implements the Executor Close interface. func (e *StreamAggExec) Close() error { e.childResult = nil + e.groupChecker.reset() return e.baseExecutor.Close() } @@ -954,3 +955,12 @@ func (e *groupChecker) meetNewGroup(row chunk.Row) (bool, error) { } return !firstGroup, nil } + +func (e *groupChecker) reset() { + if e.curGroupKey != nil { + e.curGroupKey = e.curGroupKey[:0] + } + if e.tmpGroupKey != nil { + e.tmpGroupKey = e.tmpGroupKey[:0] + } +} diff --git a/executor/aggregate_test.go b/executor/aggregate_test.go index 06826616d5564..98bc6d2e701d3 100644 --- a/executor/aggregate_test.go +++ b/executor/aggregate_test.go @@ -710,3 +710,14 @@ func (s *testSuite1) TestIssue10098(c *C) { tk.MustExec("insert into t values('1', '222'), ('12', '22')") tk.MustQuery("select group_concat(distinct a, b) from t").Check(testkit.Rows("1222,1222")) } + +func (s *testSuite1) TestIssue10608(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec(`drop table if exists t, s;`) + tk.MustExec("create table t(a int)") + tk.MustExec("create table s(a int, b int)") + tk.MustExec("insert into s values(100292, 508931), (120002, 508932)") + tk.MustExec("insert into t values(508931), (508932)") + tk.MustQuery("select (select group_concat(concat(123,'-')) from t where t.a = s.b group by t.a) as t from s;").Check(testkit.Rows("123-", "123-")) + +} diff --git a/executor/analyze.go b/executor/analyze.go index 1a9e6555e524d..de0176ed5e18c 100644 --- a/executor/analyze.go +++ b/executor/analyze.go @@ -702,7 +702,6 @@ func (e *AnalyzeFastExec) getNextSampleKey(bo *tikv.Backoffer, startKey kv.Key) func (e *AnalyzeFastExec) buildSampTask() (needRebuild bool, err error) { // Do get regions row count. bo := tikv.NewBackoffer(context.Background(), 500) - e.rowCount = 0 needRebuildForRoutine := make([]bool, e.concurrency) errs := make([]error, e.concurrency) sampTasksForRoutine := make([][]*AnalyzeFastTask, e.concurrency) @@ -734,6 +733,13 @@ func (e *AnalyzeFastExec) buildSampTask() (needRebuild bool, err error) { if err != nil { return false, err } + e.rowCount = 0 + for _, task := range e.sampTasks { + cnt := task.EndOffset - task.BeginOffset + task.BeginOffset = e.rowCount + task.EndOffset = e.rowCount + cnt + e.rowCount += cnt + } for { // Search for the region which contains the targetKey. loc, err := e.cache.LocateKey(bo, targetKey) @@ -949,7 +955,7 @@ func (e *AnalyzeFastExec) handleSampTasks(bo *tikv.Backoffer, workID int, err *e keys = append(keys, tablecodec.EncodeRowKeyWithHandle(tableID, randKey)) } - var kvMap map[string][]byte + kvMap := make(map[string][]byte, len(keys)) for _, key := range keys { var iter kv.Iterator iter, *err = snapshot.Iter(key, endKey) diff --git a/executor/analyze_test.go b/executor/analyze_test.go index 4bb818cbfa63a..038bfe35829ec 100644 --- a/executor/analyze_test.go +++ b/executor/analyze_test.go @@ -237,9 +237,6 @@ func (s *testSuite1) TestFastAnalyze(c *C) { tk.MustExec("create table t(a int primary key, b int, index index_b(b))") tk.MustExec("set @@session.tidb_enable_fast_analyze=1") tk.MustExec("set @@session.tidb_build_stats_concurrency=1") - for i := 0; i < 3000; i++ { - tk.MustExec(fmt.Sprintf("insert into t values (%d, %d)", i, i)) - } tblInfo, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t")) c.Assert(err, IsNil) tid := tblInfo.Meta().ID @@ -248,6 +245,9 @@ func (s *testSuite1) TestFastAnalyze(c *C) { splitKeys := generateTableSplitKeyForInt(tid, []int{600, 1200, 1800, 2400}) manipulateCluster(cluster, splitKeys) + for i := 0; i < 3000; i++ { + tk.MustExec(fmt.Sprintf("insert into t values (%d, %d)", i, i)) + } tk.MustExec("analyze table t with 5 buckets") is := executor.GetInfoSchema(tk.Se.(sessionctx.Context)) @@ -255,49 +255,25 @@ func (s *testSuite1) TestFastAnalyze(c *C) { c.Assert(err, IsNil) tableInfo := table.Meta() tbl := dom.StatsHandle().GetTableStats(tableInfo) - sTbl := fmt.Sprintln(tbl) - matched := false - if sTbl == "Table:39 Count:3000\n"+ + c.Assert(tbl.String(), Equals, "Table:39 Count:3000\n"+ "column:1 ndv:3000 totColSize:0\n"+ - "num: 603 lower_bound: 6 upper_bound: 612 repeats: 1\n"+ - "num: 603 lower_bound: 621 upper_bound: 1205 repeats: 1\n"+ - "num: 603 lower_bound: 1207 upper_bound: 1830 repeats: 1\n"+ - "num: 603 lower_bound: 1831 upper_bound: 2387 repeats: 1\n"+ - "num: 588 lower_bound: 2390 upper_bound: 2997 repeats: 1\n"+ + "num: 603 lower_bound: 0 upper_bound: 658 repeats: 1\n"+ + "num: 603 lower_bound: 663 upper_bound: 1248 repeats: 1\n"+ + "num: 603 lower_bound: 1250 upper_bound: 1823 repeats: 1\n"+ + "num: 603 lower_bound: 1830 upper_bound: 2379 repeats: 1\n"+ + "num: 588 lower_bound: 2380 upper_bound: 2998 repeats: 1\n"+ "column:2 ndv:3000 totColSize:0\n"+ - "num: 603 lower_bound: 6 upper_bound: 612 repeats: 1\n"+ - "num: 603 lower_bound: 621 upper_bound: 1205 repeats: 1\n"+ - "num: 603 lower_bound: 1207 upper_bound: 1830 repeats: 1\n"+ - "num: 603 lower_bound: 1831 upper_bound: 2387 repeats: 1\n"+ - "num: 588 lower_bound: 2390 upper_bound: 2997 repeats: 1\n"+ + "num: 603 lower_bound: 0 upper_bound: 658 repeats: 1\n"+ + "num: 603 lower_bound: 663 upper_bound: 1248 repeats: 1\n"+ + "num: 603 lower_bound: 1250 upper_bound: 1823 repeats: 1\n"+ + "num: 603 lower_bound: 1830 upper_bound: 2379 repeats: 1\n"+ + "num: 588 lower_bound: 2380 upper_bound: 2998 repeats: 1\n"+ "index:1 ndv:3000\n"+ - "num: 603 lower_bound: 6 upper_bound: 612 repeats: 1\n"+ - "num: 603 lower_bound: 621 upper_bound: 1205 repeats: 1\n"+ - "num: 603 lower_bound: 1207 upper_bound: 1830 repeats: 1\n"+ - "num: 603 lower_bound: 1831 upper_bound: 2387 repeats: 1\n"+ - "num: 588 lower_bound: 2390 upper_bound: 2997 repeats: 1\n" || - sTbl == "Table:39 Count:3000\n"+ - "column:2 ndv:3000 totColSize:0\n"+ - "num: 603 lower_bound: 6 upper_bound: 612 repeats: 1\n"+ - "num: 603 lower_bound: 621 upper_bound: 1205 repeats: 1\n"+ - "num: 603 lower_bound: 1207 upper_bound: 1830 repeats: 1\n"+ - "num: 603 lower_bound: 1831 upper_bound: 2387 repeats: 1\n"+ - "num: 588 lower_bound: 2390 upper_bound: 2997 repeats: 1\n"+ - "column:1 ndv:3000 totColSize:0\n"+ - "num: 603 lower_bound: 6 upper_bound: 612 repeats: 1\n"+ - "num: 603 lower_bound: 621 upper_bound: 1205 repeats: 1\n"+ - "num: 603 lower_bound: 1207 upper_bound: 1830 repeats: 1\n"+ - "num: 603 lower_bound: 1831 upper_bound: 2387 repeats: 1\n"+ - "num: 588 lower_bound: 2390 upper_bound: 2997 repeats: 1\n"+ - "index:1 ndv:3000\n"+ - "num: 603 lower_bound: 6 upper_bound: 612 repeats: 1\n"+ - "num: 603 lower_bound: 621 upper_bound: 1205 repeats: 1\n"+ - "num: 603 lower_bound: 1207 upper_bound: 1830 repeats: 1\n"+ - "num: 603 lower_bound: 1831 upper_bound: 2387 repeats: 1\n"+ - "num: 588 lower_bound: 2390 upper_bound: 2997 repeats: 1\n" { - matched = true - } - c.Assert(matched, Equals, true) + "num: 603 lower_bound: 0 upper_bound: 658 repeats: 1\n"+ + "num: 603 lower_bound: 663 upper_bound: 1248 repeats: 1\n"+ + "num: 603 lower_bound: 1250 upper_bound: 1823 repeats: 1\n"+ + "num: 603 lower_bound: 1830 upper_bound: 2379 repeats: 1\n"+ + "num: 588 lower_bound: 2380 upper_bound: 2998 repeats: 1") } func (s *testSuite1) TestAnalyzeIncremental(c *C) { @@ -415,7 +391,7 @@ func (s *testFastAnalyze) TestFastAnalyzeRetryRowCount(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") tk.MustExec("drop table if exists t") - tk.MustExec("create table t(a int primary key, b int, index index_b(b))") + tk.MustExec("create table t(a int primary key)") tk.MustExec("set @@session.tidb_enable_fast_analyze=1") tk.MustExec("set @@session.tidb_build_stats_concurrency=1") tblInfo, err := s.dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t")) @@ -425,12 +401,14 @@ func (s *testFastAnalyze) TestFastAnalyzeRetryRowCount(c *C) { splitKeys := generateTableSplitKeyForInt(tid, []int{6, 12, 18, 24, 30}) regionIDs := manipulateCluster(s.cluster, splitKeys) for i := 0; i < 30; i++ { - tk.MustExec(fmt.Sprintf("insert into t values (%d, %d)", i, i)) + tk.MustExec(fmt.Sprintf("insert into t values (%d)", i)) } s.cli.setFailRegion(regionIDs[4]) tk.MustExec("analyze table t") // 4 regions will be sampled, and it will retry the last failed region. c.Assert(s.cli.mu.count, Equals, int64(5)) + row := tk.MustQuery(`show stats_meta where db_name = "test" and table_name = "t"`).Rows()[0] + c.Assert(row[5], Equals, "30") } func (s *testSuite1) TestFailedAnalyzeRequest(c *C) { diff --git a/executor/builder.go b/executor/builder.go index e04a51245b824..2b5de81ee15e7 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -550,7 +550,12 @@ func (b *executorBuilder) buildShow(v *plannercore.Show) Executor { is: b.is, } if e.Tp == ast.ShowGrants && e.User == nil { - e.User = e.ctx.GetSessionVars().User + // The input is a "show grants" statement, fulfill the user and roles field. + // Note: "show grants" result are different from "show grants for current_user", + // The former determine privileges with roles, while the later doesn't. + vars := e.ctx.GetSessionVars() + e.User = vars.User + e.Roles = vars.ActiveRoles } if e.Tp == ast.ShowMasterStatus { // show master status need start ts. diff --git a/executor/executor.go b/executor/executor.go index 16bf5746049dd..bcc8465c38af5 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -1380,6 +1380,8 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) { sc.NotFillCache = !opts.SQLCache } sc.PadCharToFullLength = ctx.GetSessionVars().SQLMode.HasPadCharToFullLengthMode() + case *ast.ExplainStmt: + sc.InExplainStmt = true case *ast.ShowStmt: sc.IgnoreTruncate = true sc.IgnoreZeroInDate = true diff --git a/executor/executor_test.go b/executor/executor_test.go index e9d920478a0e1..84bc61af04429 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -42,6 +42,7 @@ import ( "github.com/pingcap/tidb/executor" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/planner" plannercore "github.com/pingcap/tidb/planner/core" @@ -226,9 +227,9 @@ func (s *testSuite) TestAdmin(c *C) { result.Check(testkit.Rows()) result = tk.MustQuery(`admin show ddl job queries 1, 2, 3, 4`) result.Check(testkit.Rows()) - historyJob, err := admin.GetHistoryDDLJobs(txn, admin.DefNumHistoryJobs) - result = tk.MustQuery(fmt.Sprintf("admin show ddl job queries %d", historyJob[0].ID)) - result.Check(testkit.Rows(historyJob[0].Query)) + historyJobs, err = admin.GetHistoryDDLJobs(txn, admin.DefNumHistoryJobs) + result = tk.MustQuery(fmt.Sprintf("admin show ddl job queries %d", historyJobs[0].ID)) + result.Check(testkit.Rows(historyJobs[0].Query)) c.Assert(err, IsNil) // check table test @@ -282,6 +283,22 @@ func (s *testSuite) TestAdmin(c *C) { tk.MustExec("ALTER TABLE t1 ADD COLUMN c4 bit(10) default 127;") tk.MustExec("ALTER TABLE t1 ADD INDEX idx3 (c4);") tk.MustExec("admin check table t1;") + + // Test for reverse scan get history ddl jobs when ddl history jobs queue has multiple regions. + txn, err = s.store.Begin() + c.Assert(err, IsNil) + historyJobs, err = admin.GetHistoryDDLJobs(txn, 20) + c.Assert(err, IsNil) + + // Split region for history ddl job queues. + m := meta.NewMeta(txn) + startKey := meta.DDLJobHistoryKey(m, 0) + endKey := meta.DDLJobHistoryKey(m, historyJobs[0].ID) + s.cluster.SplitKeys(s.mvccStore, startKey, endKey, int(historyJobs[0].ID/5)) + + historyJobs2, err := admin.GetHistoryDDLJobs(txn, 20) + c.Assert(err, IsNil) + c.Assert(historyJobs, DeepEquals, historyJobs2) } func (s *testSuite) fillData(tk *testkit.TestKit, table string) { diff --git a/executor/explain_test.go b/executor/explain_test.go new file mode 100644 index 0000000000000..f1596be8e2b0c --- /dev/null +++ b/executor/explain_test.go @@ -0,0 +1,62 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package executor_test + +import ( + . "github.com/pingcap/check" + "github.com/pingcap/parser/auth" + plannercore "github.com/pingcap/tidb/planner/core" + "github.com/pingcap/tidb/session" + "github.com/pingcap/tidb/util/testkit" +) + +func (s *testSuite1) TestExplainPriviliges(c *C) { + se, err := session.CreateSession4Test(s.store) + c.Assert(err, IsNil) + c.Assert(se.Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil), IsTrue) + tk := testkit.NewTestKit(c, s.store) + tk.Se = se + + tk.MustExec("create database explaindatabase") + tk.MustExec("use explaindatabase") + tk.MustExec("create table t (id int)") + tk.MustExec("create view v as select * from t") + tk.MustExec(`create user 'explain'@'%'`) + tk.MustExec(`flush privileges`) + + tk1 := testkit.NewTestKit(c, s.store) + se, err = session.CreateSession4Test(s.store) + c.Assert(err, IsNil) + c.Assert(se.Auth(&auth.UserIdentity{Username: "explain", Hostname: "%"}, nil, nil), IsTrue) + tk1.Se = se + + tk.MustExec(`grant select on explaindatabase.v to 'explain'@'%'`) + tk.MustExec(`flush privileges`) + tk1.MustQuery("show databases").Check(testkit.Rows("INFORMATION_SCHEMA", "explaindatabase")) + + tk1.MustExec("use explaindatabase") + tk1.MustQuery("select * from v") + err = tk1.ExecToErr("explain select * from v") + c.Assert(err.Error(), Equals, plannercore.ErrViewNoExplain.Error()) + + tk.MustExec(`grant show view on explaindatabase.v to 'explain'@'%'`) + tk.MustExec(`flush privileges`) + tk1.MustQuery("explain select * from v") + + tk.MustExec(`revoke select on explaindatabase.v from 'explain'@'%'`) + tk.MustExec(`flush privileges`) + + err = tk1.ExecToErr("explain select * from v") + c.Assert(err.Error(), Equals, plannercore.ErrTableaccessDenied.GenWithStackByArgs("SELECT", "explain", "%", "v").Error()) +} diff --git a/executor/show.go b/executor/show.go index e6e01a2fbcbfc..f25dcd603fdbe 100644 --- a/executor/show.go +++ b/executor/show.go @@ -784,9 +784,6 @@ func (e *ShowExec) fetchShowCreateTable() error { fmt.Fprintf(&buf, " COMPRESSION='%s'", tb.Meta().Compression) } - // add partition info here. - appendPartitionInfo(tb.Meta().Partition, &buf) - if hasAutoIncID { autoIncID, err := tb.Allocator(e.ctx).NextGlobalAutoID(tb.Meta().ID) if err != nil { @@ -809,6 +806,9 @@ func (e *ShowExec) fetchShowCreateTable() error { if len(tb.Meta().Comment) > 0 { fmt.Fprintf(&buf, " COMMENT='%s'", format.OutputFormat(tb.Meta().Comment)) } + // add partition info here. + appendPartitionInfo(tb.Meta().Partition, &buf) + e.appendRow([]interface{}{tb.Meta().Name.O, buf.String()}) return nil } diff --git a/executor/show_test.go b/executor/show_test.go index 0e3ce9a481991..7b6dfe3e82afc 100644 --- a/executor/show_test.go +++ b/executor/show_test.go @@ -157,6 +157,21 @@ func (s *testSuite2) TestIssue3641(c *C) { c.Assert(err.Error(), Equals, plannercore.ErrNoDB.Error()) } +func (s *testSuite2) TestIssue10549(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("CREATE DATABASE newdb;") + tk.MustExec("CREATE ROLE 'app_developer';") + tk.MustExec("GRANT ALL ON newdb.* TO 'app_developer';") + tk.MustExec("CREATE USER 'dev';") + tk.MustExec("GRANT 'app_developer' TO 'dev';") + tk.MustExec("SET DEFAULT ROLE app_developer TO 'dev';") + + c.Assert(tk.Se.Auth(&auth.UserIdentity{Username: "dev", Hostname: "localhost", AuthUsername: "dev", AuthHostname: "localhost"}, nil, nil), IsTrue) + tk.MustQuery("SHOW DATABASES;").Check(testkit.Rows("INFORMATION_SCHEMA", "newdb")) + tk.MustQuery("SHOW GRANTS;").Check(testkit.Rows("GRANT USAGE ON *.* TO 'dev'@'%'", "GRANT ALL PRIVILEGES ON newdb.* TO 'dev'@'%'", "GRANT 'app_developer'@'%' TO 'dev'@'%'")) + tk.MustQuery("SHOW GRANTS FOR CURRENT_USER").Check(testkit.Rows("GRANT USAGE ON *.* TO 'dev'@'%'", "GRANT 'app_developer'@'%' TO 'dev'@'%'")) +} + // TestShow2 is moved from session_test func (s *testSuite2) TestShow2(c *C) { tk := testkit.NewTestKit(c, s.store) @@ -455,6 +470,61 @@ func (s *testSuite2) TestShowCreateTable(c *C) { ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin/*!90000 SHARD_ROW_ID_BITS=4 PRE_SPLIT_REGIONS=3 */", )) tk.MustExec("drop table t") + + tk.MustExec("CREATE TABLE `log` (" + + "`LOG_ID` bigint(20) UNSIGNED NOT NULL AUTO_INCREMENT," + + "`ROUND_ID` bigint(20) UNSIGNED NOT NULL," + + "`USER_ID` int(10) UNSIGNED NOT NULL," + + "`USER_IP` int(10) UNSIGNED DEFAULT NULL," + + "`END_TIME` datetime NOT NULL," + + "`USER_TYPE` int(11) DEFAULT NULL," + + "`APP_ID` int(11) DEFAULT NULL," + + "PRIMARY KEY (`LOG_ID`,`END_TIME`)," + + "KEY `IDX_EndTime` (`END_TIME`)," + + "KEY `IDX_RoundId` (`ROUND_ID`)," + + "KEY `IDX_UserId_EndTime` (`USER_ID`,`END_TIME`)" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin AUTO_INCREMENT=505488 " + + "PARTITION BY RANGE ( month(`end_time`) ) (" + + "PARTITION p1 VALUES LESS THAN (2)," + + "PARTITION p2 VALUES LESS THAN (3)," + + "PARTITION p3 VALUES LESS THAN (4)," + + "PARTITION p4 VALUES LESS THAN (5)," + + "PARTITION p5 VALUES LESS THAN (6)," + + "PARTITION p6 VALUES LESS THAN (7)," + + "PARTITION p7 VALUES LESS THAN (8)," + + "PARTITION p8 VALUES LESS THAN (9)," + + "PARTITION p9 VALUES LESS THAN (10)," + + "PARTITION p10 VALUES LESS THAN (11)," + + "PARTITION p11 VALUES LESS THAN (12)," + + "PARTITION p12 VALUES LESS THAN (MAXVALUE))") + tk.MustQuery("show create table log").Check(testutil.RowsWithSep("|", + "log CREATE TABLE `log` (\n"+ + " `LOG_ID` bigint(20) unsigned NOT NULL AUTO_INCREMENT,\n"+ + " `ROUND_ID` bigint(20) unsigned NOT NULL,\n"+ + " `USER_ID` int(10) unsigned NOT NULL,\n"+ + " `USER_IP` int(10) unsigned DEFAULT NULL,\n"+ + " `END_TIME` datetime NOT NULL,\n"+ + " `USER_TYPE` int(11) DEFAULT NULL,\n"+ + " `APP_ID` int(11) DEFAULT NULL,\n"+ + " PRIMARY KEY (`LOG_ID`,`END_TIME`),\n"+ + " KEY `IDX_EndTime` (`END_TIME`),\n"+ + " KEY `IDX_RoundId` (`ROUND_ID`),\n"+ + " KEY `IDX_UserId_EndTime` (`USER_ID`,`END_TIME`)\n"+ + ") ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin AUTO_INCREMENT=505488\n"+ + "PARTITION BY RANGE ( month(`end_time`) ) (\n"+ + " PARTITION p1 VALUES LESS THAN (2),\n"+ + " PARTITION p2 VALUES LESS THAN (3),\n"+ + " PARTITION p3 VALUES LESS THAN (4),\n"+ + " PARTITION p4 VALUES LESS THAN (5),\n"+ + " PARTITION p5 VALUES LESS THAN (6),\n"+ + " PARTITION p6 VALUES LESS THAN (7),\n"+ + " PARTITION p7 VALUES LESS THAN (8),\n"+ + " PARTITION p8 VALUES LESS THAN (9),\n"+ + " PARTITION p9 VALUES LESS THAN (10),\n"+ + " PARTITION p10 VALUES LESS THAN (11),\n"+ + " PARTITION p11 VALUES LESS THAN (12),\n"+ + " PARTITION p12 VALUES LESS THAN (MAXVALUE)\n"+ + ")")) } func (s *testSuite2) TestShowEscape(c *C) { diff --git a/executor/simple.go b/executor/simple.go index 4ee51342d3838..87d76ba8bdf0b 100644 --- a/executor/simple.go +++ b/executor/simple.go @@ -221,17 +221,20 @@ func (e *SimpleExec) setDefaultRoleAll(s *ast.SetDefaultRoleStmt) error { return nil } -func (e *SimpleExec) executeSetDefaultRole(s *ast.SetDefaultRoleStmt) error { +func (e *SimpleExec) executeSetDefaultRole(s *ast.SetDefaultRoleStmt) (err error) { switch s.SetRoleOpt { case ast.SetRoleAll: - return e.setDefaultRoleAll(s) + err = e.setDefaultRoleAll(s) case ast.SetRoleNone: - return e.setDefaultRoleNone(s) + err = e.setDefaultRoleNone(s) case ast.SetRoleRegular: - return e.setDefaultRoleRegular(s) + err = e.setDefaultRoleRegular(s) } - err := domain.GetDomain(e.ctx).PrivilegeHandle().Update(e.ctx.(sessionctx.Context)) - return err + if err != nil { + return + } + domain.GetDomain(e.ctx).NotifyUpdatePrivilege(e.ctx) + return } func (e *SimpleExec) setRoleRegular(s *ast.SetRoleStmt) error { diff --git a/meta/meta.go b/meta/meta.go index 09cc9c3a89a92..e5eead8b8bbd6 100644 --- a/meta/meta.go +++ b/meta/meta.go @@ -135,6 +135,11 @@ func (m *Meta) tableKey(tableID int64) []byte { return []byte(fmt.Sprintf("%s:%d", mTablePrefix, tableID)) } +// DDLJobHistoryKey is only used for testing. +func DDLJobHistoryKey(m *Meta, jobID int64) []byte { + return m.txn.EncodeHashDataKey(mDDLJobHistoryKey, m.jobIDKey(jobID)) +} + // GenAutoTableIDKeyValue generates meta key by dbID, tableID and corresponding value by autoID. func (m *Meta) GenAutoTableIDKeyValue(dbID, tableID, autoID int64) (key, value []byte) { dbKey := m.dbKey(dbID) @@ -637,10 +642,23 @@ func (m *Meta) GetAllHistoryDDLJobs() ([]*model.Job, error) { if err != nil { return nil, errors.Trace(err) } - jobs := make([]*model.Job, 0, len(pairs)) - for _, pair := range pairs { + return decodeAndSortJob(pairs) +} + +// GetLastNHistoryDDLJobs gets latest N history ddl jobs. +func (m *Meta) GetLastNHistoryDDLJobs(num int) ([]*model.Job, error) { + pairs, err := m.txn.HGetLastN(mDDLJobHistoryKey, num) + if err != nil { + return nil, errors.Trace(err) + } + return decodeAndSortJob(pairs) +} + +func decodeAndSortJob(jobPairs []structure.HashPair) ([]*model.Job, error) { + jobs := make([]*model.Job, 0, len(jobPairs)) + for _, pair := range jobPairs { job := &model.Job{} - err = job.Decode(pair.Value) + err := job.Decode(pair.Value) if err != nil { return nil, errors.Trace(err) } diff --git a/meta/meta_test.go b/meta/meta_test.go index 5e719680f5e7a..2d0ef05f6605b 100644 --- a/meta/meta_test.go +++ b/meta/meta_test.go @@ -237,6 +237,10 @@ func (s *testSuite) TestMeta(c *C) { err = txn.Commit(context.Background()) c.Assert(err, IsNil) + + // Test for DDLJobHistoryKey. + key = meta.DDLJobHistoryKey(t, 888) + c.Assert(key, DeepEquals, []byte{0x6d, 0x44, 0x44, 0x4c, 0x4a, 0x6f, 0x62, 0x48, 0x69, 0xff, 0x73, 0x74, 0x6f, 0x72, 0x79, 0x0, 0x0, 0x0, 0xfc, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x68, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x3, 0x78, 0xff, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf7}) } func (s *testSuite) TestSnapshot(c *C) { @@ -356,6 +360,13 @@ func (s *testSuite) TestDDL(c *C) { lastID = job.ID } + // Test for get last N history ddl jobs. + historyJobs, err := t.GetLastNHistoryDDLJobs(2) + c.Assert(err, IsNil) + c.Assert(len(historyJobs), Equals, 2) + c.Assert(historyJobs[0].ID == 123, IsTrue) + c.Assert(historyJobs[1].ID == 1234, IsTrue) + // Test GetAllDDLJobsInQueue. err = t.EnQueueDDLJob(job) c.Assert(err, IsNil) diff --git a/owner/manager.go b/owner/manager.go index 5f738d11a9328..bd21159c526f1 100644 --- a/owner/manager.go +++ b/owner/manager.go @@ -245,6 +245,7 @@ func (m *ownerManager) campaignLoop(ctx context.Context, etcdSession *concurrenc return } case <-ctx.Done(): + logutil.Logger(logCtx).Info("break campaign loop, context is done") m.revokeSession(logPrefix, etcdSession.Lease()) return default: @@ -288,7 +289,7 @@ func (m *ownerManager) revokeSession(logPrefix string, leaseID clientv3.LeaseID) time.Duration(ManagerSessionTTL)*time.Second) _, err := m.etcdCli.Revoke(cancelCtx, leaseID) cancel() - logutil.Logger(m.logCtx).Info("break campaign loop, revoke err", zap.Error(err)) + logutil.Logger(m.logCtx).Info("revoke session", zap.Error(err)) } // GetOwnerID implements Manager.GetOwnerID interface. diff --git a/planner/core/errors.go b/planner/core/errors.go index 9facfedc6d240..72d75a9a5f47a 100644 --- a/planner/core/errors.go +++ b/planner/core/errors.go @@ -66,6 +66,7 @@ const ( codeDBaccessDenied = mysql.ErrDBaccessDenied codeTableaccessDenied = mysql.ErrTableaccessDenied codeSpecificAccessDenied = mysql.ErrSpecificAccessDenied + codeViewNoExplain = mysql.ErrViewNoExplain codeWindowFrameStartIllegal = mysql.ErrWindowFrameStartIllegal codeWindowFrameEndIllegal = mysql.ErrWindowFrameEndIllegal codeWindowFrameIllegal = mysql.ErrWindowFrameIllegal @@ -128,6 +129,7 @@ var ( ErrDBaccessDenied = terror.ClassOptimizer.New(mysql.ErrDBaccessDenied, mysql.MySQLErrName[mysql.ErrDBaccessDenied]) ErrTableaccessDenied = terror.ClassOptimizer.New(mysql.ErrTableaccessDenied, mysql.MySQLErrName[mysql.ErrTableaccessDenied]) ErrSpecificAccessDenied = terror.ClassOptimizer.New(mysql.ErrSpecificAccessDenied, mysql.MySQLErrName[mysql.ErrSpecificAccessDenied]) + ErrViewNoExplain = terror.ClassOptimizer.New(mysql.ErrViewNoExplain, mysql.MySQLErrName[mysql.ErrViewNoExplain]) ErrWindowFrameStartIllegal = terror.ClassOptimizer.New(codeWindowFrameStartIllegal, mysql.MySQLErrName[mysql.ErrWindowFrameStartIllegal]) ErrWindowFrameEndIllegal = terror.ClassOptimizer.New(codeWindowFrameEndIllegal, mysql.MySQLErrName[mysql.ErrWindowFrameEndIllegal]) ErrWindowFrameIllegal = terror.ClassOptimizer.New(codeWindowFrameIllegal, mysql.MySQLErrName[mysql.ErrWindowFrameIllegal]) @@ -183,6 +185,7 @@ func init() { codeDBaccessDenied: mysql.ErrDBaccessDenied, codeTableaccessDenied: mysql.ErrTableaccessDenied, codeSpecificAccessDenied: mysql.ErrSpecificAccessDenied, + codeViewNoExplain: mysql.ErrViewNoExplain, codeWindowFrameStartIllegal: mysql.ErrWindowFrameStartIllegal, codeWindowFrameEndIllegal: mysql.ErrWindowFrameEndIllegal, codeWindowFrameIllegal: mysql.ErrWindowFrameIllegal, diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index 6f19834b8b9b5..66dff13966e35 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -2178,7 +2178,7 @@ func (b *PlanBuilder) buildDataSource(tn *ast.TableName) (LogicalPlan, error) { tableInfo := tbl.Meta() var authErr error if b.ctx.GetSessionVars().User != nil { - authErr = ErrTableaccessDenied.GenWithStackByArgs("SELECT", b.ctx.GetSessionVars().User.Hostname, b.ctx.GetSessionVars().User.Username, tableInfo.Name.L) + authErr = ErrTableaccessDenied.GenWithStackByArgs("SELECT", b.ctx.GetSessionVars().User.Username, b.ctx.GetSessionVars().User.Hostname, tableInfo.Name.L) } b.visitInfo = appendVisitInfo(b.visitInfo, mysql.SelectPriv, dbName.L, tableInfo.Name.L, "", authErr) @@ -2324,6 +2324,10 @@ func (b *PlanBuilder) BuildDataSourceFromView(dbName model.CIStr, tableInfo *mod } b.visitInfo = append(originalVisitInfo, b.visitInfo...) + if b.ctx.GetSessionVars().StmtCtx.InExplainStmt { + b.visitInfo = appendVisitInfo(b.visitInfo, mysql.ShowViewPriv, dbName.L, tableInfo.Name.L, "", ErrViewNoExplain) + } + projSchema := expression.NewSchema(make([]*expression.Column, 0, len(tableInfo.View.Cols))...) projExprs := make([]expression.Expression, 0, len(tableInfo.View.Cols)) for i := range tableInfo.View.Cols { diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index be8b3843f59ba..d0e8d36bdc6eb 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -754,6 +754,10 @@ func (b *PlanBuilder) buildCheckIndexSchema(tn *ast.TableName, indexName string) func getColsInfo(tn *ast.TableName) (indicesInfo []*model.IndexInfo, colsInfo []*model.ColumnInfo, pkCol *model.ColumnInfo) { tbl := tn.TableInfo for _, col := range tbl.Columns { + // The virtual column will not store any data in TiKV, so it should be ignored when collect statistics + if col.IsGenerated() && !col.GeneratedStored { + continue + } if tbl.PKIsHandle && mysql.HasPriKeyFlag(col.Flag) { pkCol = col } else { diff --git a/privilege/privileges/privileges_test.go b/privilege/privileges/privileges_test.go index 82b80cbc8c752..b75dbeeac589c 100644 --- a/privilege/privileges/privileges_test.go +++ b/privilege/privileges/privileges_test.go @@ -162,6 +162,26 @@ func (s *testPrivilegeSuite) TestCheckTablePrivilege(c *C) { c.Assert(pc2.RequestVerification(activeRoles, "test", "test", "", mysql.IndexPriv), IsTrue) } +func (s *testPrivilegeSuite) TestCheckViewPrivilege(c *C) { + rootSe := newSession(c, s.store, s.dbName) + mustExec(c, rootSe, `CREATE USER 'vuser'@'localhost';`) + mustExec(c, rootSe, `CREATE VIEW v AS SELECT * FROM test;`) + + se := newSession(c, s.store, s.dbName) + activeRoles := make([]*auth.RoleIdentity, 0) + c.Assert(se.Auth(&auth.UserIdentity{Username: "vuser", Hostname: "localhost"}, nil, nil), IsTrue) + pc := privilege.GetPrivilegeManager(se) + c.Assert(pc.RequestVerification(activeRoles, "test", "v", "", mysql.SelectPriv), IsFalse) + + mustExec(c, rootSe, `GRANT SELECT ON test.v TO 'vuser'@'localhost';`) + c.Assert(pc.RequestVerification(activeRoles, "test", "v", "", mysql.SelectPriv), IsTrue) + c.Assert(pc.RequestVerification(activeRoles, "test", "v", "", mysql.ShowViewPriv), IsFalse) + + mustExec(c, rootSe, `GRANT SHOW VIEW ON test.v TO 'vuser'@'localhost';`) + c.Assert(pc.RequestVerification(activeRoles, "test", "v", "", mysql.SelectPriv), IsTrue) + c.Assert(pc.RequestVerification(activeRoles, "test", "v", "", mysql.ShowViewPriv), IsTrue) +} + func (s *testPrivilegeSuite) TestCheckPrivilegeWithRoles(c *C) { rootSe := newSession(c, s.store, s.dbName) mustExec(c, rootSe, `CREATE USER 'test_role'@'localhost';`) @@ -474,7 +494,6 @@ func (s *testPrivilegeSuite) TestUseDB(c *C) { mustExec(c, se, `CREATE USER 'dev'@'localhost'`) mustExec(c, se, `GRANT 'app_developer' TO 'dev'@'localhost'`) mustExec(c, se, `SET DEFAULT ROLE 'app_developer' TO 'dev'@'localhost'`) - mustExec(c, se, `FLUSH PRIVILEGES`) c.Assert(se.Auth(&auth.UserIdentity{Username: "dev", Hostname: "localhost", AuthUsername: "dev", AuthHostname: "localhost"}, nil, nil), IsTrue) _, err = se.Execute(context.Background(), "use app_db") c.Assert(err, IsNil) @@ -556,7 +575,7 @@ func (s *testPrivilegeSuite) TestAnalyzeTable(c *C) { c.Assert(err.Error(), Equals, "[planner:1142]INSERT command denied to user 'anobody'@'%' for table 't1'") _, err = se.Execute(context.Background(), "select * from t1") - c.Assert(err.Error(), Equals, "[planner:1142]SELECT command denied to user 'localhost'@'anobody' for table 't1'") + c.Assert(err.Error(), Equals, "[planner:1142]SELECT command denied to user 'anobody'@'localhost' for table 't1'") // try again after SELECT privilege granted c.Assert(se.Auth(&auth.UserIdentity{Username: "asuper", Hostname: "localhost", AuthUsername: "asuper", AuthHostname: "%"}, nil, nil), IsTrue) diff --git a/session/bootstrap.go b/session/bootstrap.go index b6e8917994542..8a6aba45a2036 100644 --- a/session/bootstrap.go +++ b/session/bootstrap.go @@ -263,6 +263,7 @@ const ( // bootstrap initiates system DB for a store. func bootstrap(s Session) { + startTime := time.Now() dom := domain.GetDomain(s) for { b, err := checkBootstrapped(s) @@ -273,6 +274,8 @@ func bootstrap(s Session) { // For rolling upgrade, we can't do upgrade only in the owner. if b { upgrade(s) + logutil.Logger(context.Background()).Info("upgrade successful in bootstrap", + zap.Duration("take time", time.Since(startTime))) return } // To reduce conflict when multiple TiDB-server start at the same time. @@ -280,6 +283,8 @@ func bootstrap(s Session) { if dom.DDL().OwnerManager().IsOwner() { doDDLWorks(s) doDMLWorks(s) + logutil.Logger(context.Background()).Info("bootstrap successful", + zap.Duration("take time", time.Since(startTime))) return } time.Sleep(200 * time.Millisecond) @@ -522,18 +527,20 @@ func upgrade(s Session) { _, err = s.Execute(context.Background(), "COMMIT") if err != nil { - time.Sleep(1 * time.Second) + sleepTime := 1 * time.Second + logutil.Logger(context.Background()).Info("update bootstrap ver failed", + zap.Error(err), zap.Duration("sleeping time", sleepTime)) + time.Sleep(sleepTime) // Check if TiDB is already upgraded. v, err1 := getBootstrapVersion(s) if err1 != nil { - logutil.Logger(context.Background()).Fatal("upgrade error", - zap.Error(err1)) + logutil.Logger(context.Background()).Fatal("upgrade failed", zap.Error(err1)) } if v >= currentBootstrapVersion { // It is already bootstrapped/upgraded by a higher version TiDB server. return } - logutil.Logger(context.Background()).Fatal("[Upgrade] upgrade error", + logutil.Logger(context.Background()).Fatal("[Upgrade] upgrade failed", zap.Int64("from", ver), zap.Int("to", currentBootstrapVersion), zap.Error(err)) @@ -912,16 +919,18 @@ func doDMLWorks(s Session) { writeSystemTZ(s) _, err := s.Execute(context.Background(), "COMMIT") if err != nil { - time.Sleep(1 * time.Second) + sleepTime := 1 * time.Second + logutil.Logger(context.Background()).Info("doDMLWorks failed", zap.Error(err), zap.Duration("sleeping time", sleepTime)) + time.Sleep(sleepTime) // Check if TiDB is already bootstrapped. b, err1 := checkBootstrapped(s) if err1 != nil { - logutil.Logger(context.Background()).Fatal("doDMLWorks error", zap.Error(err1)) + logutil.Logger(context.Background()).Fatal("doDMLWorks failed", zap.Error(err1)) } if b { return } - logutil.Logger(context.Background()).Fatal("doDMLWorks error", zap.Error(err)) + logutil.Logger(context.Background()).Fatal("doDMLWorks failed", zap.Error(err)) } } diff --git a/sessionctx/stmtctx/stmtctx.go b/sessionctx/stmtctx/stmtctx.go index d22d65f3ff4a3..e5492cb80b7dc 100644 --- a/sessionctx/stmtctx/stmtctx.go +++ b/sessionctx/stmtctx/stmtctx.go @@ -54,6 +54,7 @@ type StatementContext struct { InDeleteStmt bool InSelectStmt bool InLoadDataStmt bool + InExplainStmt bool IgnoreTruncate bool IgnoreZeroInDate bool DupKeyAsWarning bool diff --git a/statistics/cmsketch.go b/statistics/cmsketch.go index 747337c1d50ee..727049675b960 100644 --- a/statistics/cmsketch.go +++ b/statistics/cmsketch.go @@ -145,7 +145,7 @@ func (c *CMSketch) calculateDefaultVal(helper *topNHelper, estimateNDV, scaleRat c.defaultValue = 1 } else { estimateRemainingCount := rowCount - (helper.sampleSize-uint64(helper.onlyOnceItems))*scaleRatio - c.defaultValue = estimateRemainingCount / (estimateNDV - uint64(sampleNDV) + helper.onlyOnceItems) + c.defaultValue = estimateRemainingCount / mathutil.MaxUint64(1, estimateNDV-uint64(sampleNDV)+helper.onlyOnceItems) } } diff --git a/statistics/table.go b/statistics/table.go index 9fc6964020fe3..c95979e81dbde 100644 --- a/statistics/table.go +++ b/statistics/table.go @@ -16,6 +16,7 @@ package statistics import ( "fmt" "math" + "sort" "strings" "sync" @@ -98,12 +99,22 @@ func (t *Table) Copy() *Table { func (t *Table) String() string { strs := make([]string, 0, len(t.Columns)+1) strs = append(strs, fmt.Sprintf("Table:%d Count:%d", t.PhysicalID, t.Count)) + cols := make([]*Column, 0, len(t.Columns)) for _, col := range t.Columns { - strs = append(strs, col.String()) + cols = append(cols, col) } - for _, col := range t.Indices { + sort.Slice(cols, func(i, j int) bool { return cols[i].ID < cols[j].ID }) + for _, col := range cols { strs = append(strs, col.String()) } + idxs := make([]*Index, 0, len(t.Indices)) + for _, idx := range t.Indices { + idxs = append(idxs, idx) + } + sort.Slice(idxs, func(i, j int) bool { return idxs[i].ID < idxs[j].ID }) + for _, idx := range idxs { + strs = append(strs, idx.String()) + } return strings.Join(strs, "\n") } diff --git a/store/mockstore/mocktikv/cluster.go b/store/mockstore/mocktikv/cluster.go index 014e61042f5c8..7e6c0270ebc4f 100644 --- a/store/mockstore/mocktikv/cluster.go +++ b/store/mockstore/mocktikv/cluster.go @@ -22,6 +22,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/tablecodec" ) @@ -340,6 +341,12 @@ func (c *Cluster) SplitIndex(mvccStore MVCCStore, tableID, indexID int64, count c.splitRange(mvccStore, NewMvccKey(indexStart), NewMvccKey(indexEnd), count) } +// SplitKeys evenly splits the start, end key into "count" regions. +// Only works for single store. +func (c *Cluster) SplitKeys(mvccStore MVCCStore, start, end kv.Key, count int) { + c.splitRange(mvccStore, NewMvccKey(start), NewMvccKey(end), count) +} + func (c *Cluster) splitRange(mvccStore MVCCStore, start, end MvccKey, count int) { c.Lock() defer c.Unlock() diff --git a/store/mockstore/mocktikv/rpc.go b/store/mockstore/mocktikv/rpc.go index eb1afed758b3b..23d65f7267a85 100644 --- a/store/mockstore/mocktikv/rpc.go +++ b/store/mockstore/mocktikv/rpc.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "io" + "math" "strconv" "time" @@ -245,14 +246,32 @@ func (h *rpcHandler) handleKvGet(req *kvrpcpb.GetRequest) *kvrpcpb.GetResponse { } func (h *rpcHandler) handleKvScan(req *kvrpcpb.ScanRequest) *kvrpcpb.ScanResponse { - if !h.checkKeyInRegion(req.GetStartKey()) { - panic("KvScan: startKey not in region") - } - endKey := h.endKey - if len(req.EndKey) > 0 && (len(endKey) == 0 || bytes.Compare(req.EndKey, endKey) < 0) { - endKey = req.EndKey + endKey := MvccKey(h.endKey).Raw() + var pairs []Pair + if !req.Reverse { + if !h.checkKeyInRegion(req.GetStartKey()) { + panic("KvScan: startKey not in region") + } + if len(req.EndKey) > 0 && (len(endKey) == 0 || bytes.Compare(NewMvccKey(req.EndKey), h.endKey) < 0) { + endKey = req.EndKey + } + pairs = h.mvccStore.Scan(req.GetStartKey(), endKey, int(req.GetLimit()), req.GetVersion(), h.isolationLevel) + } else { + // TiKV use range [end_key, start_key) for reverse scan. + // Should use the req.EndKey to check in region. + if !h.checkKeyInRegion(req.GetEndKey()) { + panic("KvScan: startKey not in region") + } + + // TiKV use range [end_key, start_key) for reverse scan. + // So the req.StartKey actually is the end_key. + if len(req.StartKey) > 0 && (len(endKey) == 0 || bytes.Compare(NewMvccKey(req.StartKey), h.endKey) < 0) { + endKey = req.StartKey + } + + pairs = h.mvccStore.ReverseScan(req.EndKey, endKey, int(req.GetLimit()), req.GetVersion(), h.isolationLevel) } - pairs := h.mvccStore.Scan(req.GetStartKey(), endKey, int(req.GetLimit()), req.GetVersion(), h.isolationLevel) + return &kvrpcpb.ScanResponse{ Pairs: convertToPbPairs(pairs), } @@ -856,8 +875,8 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R // DebugGetRegionProperties is for fast analyze in mock tikv. case tikvrpc.CmdDebugGetRegionProperties: r := req.DebugGetRegionProperties - region, _ := c.Cluster.GetRegionByID(r.RegionId) - scanResp := handler.handleKvScan(&kvrpcpb.ScanRequest{StartKey: region.StartKey, EndKey: region.EndKey}) + region, _ := c.Cluster.GetRegion(r.RegionId) + scanResp := handler.handleKvScan(&kvrpcpb.ScanRequest{StartKey: MvccKey(region.StartKey).Raw(), EndKey: MvccKey(region.EndKey).Raw(), Version: math.MaxUint64, Limit: math.MaxUint32}) resp.DebugGetRegionProperties = &debugpb.GetRegionPropertiesResponse{ Props: []*debugpb.Property{{ Name: "mvcc.num_rows", diff --git a/store/tikv/backoff.go b/store/tikv/backoff.go index c34d5eb65643a..be374e3fc5145 100644 --- a/store/tikv/backoff.go +++ b/store/tikv/backoff.go @@ -110,12 +110,12 @@ func NewBackoffFn(base, cap, jitter int) func(ctx context.Context, maxSleepMs in } select { case <-time.After(time.Duration(realSleep) * time.Millisecond): + attempts++ + lastSleep = sleep + return lastSleep case <-ctx.Done(): + return 0 } - - attempts++ - lastSleep = sleep - return lastSleep } } diff --git a/store/tikv/client.go b/store/tikv/client.go index 09a34912463bf..eed1a7ed7a060 100644 --- a/store/tikv/client.go +++ b/store/tikv/client.go @@ -336,6 +336,10 @@ type batchCommandsEntry struct { err error } +func (b *batchCommandsEntry) isCanceled() bool { + return atomic.LoadInt32(&b.canceled) == 1 +} + const idleTimeout = 3 * time.Minute // fetchAllPendingRequests fetches all pending requests from the channel. @@ -476,6 +480,10 @@ func (a *connArray) batchSendLoop(cfg config.TiKVClient) { bestBatchWaitSize += 1 } + length = removeCanceledRequests(&entries, &requests) + if length == 0 { + continue // All requests are canceled. + } maxBatchID := atomic.AddUint64(&batchCommandsClient.idAlloc, uint64(length)) for i := 0; i < length; i++ { requestID := uint64(i) + maxBatchID - uint64(length) @@ -506,6 +514,23 @@ func (a *connArray) batchSendLoop(cfg config.TiKVClient) { } } +// removeCanceledRequests removes canceled requests before sending. +func removeCanceledRequests( + entries *[]*batchCommandsEntry, + requests *[]*tikvpb.BatchCommandsRequest_Request) int { + validEntries := (*entries)[:0] + validRequets := (*requests)[:0] + for _, e := range *entries { + if !e.isCanceled() { + validEntries = append(validEntries, e) + validRequets = append(validRequets, e.req) + } + } + *entries = validEntries + *requests = validRequets + return len(*entries) +} + // rpcClient is RPC client struct. // TODO: Add flow control between RPC clients in TiDB ond RPC servers in TiKV. // Since we use shared client connection to communicate to the same TiKV, it's possible diff --git a/store/tikv/client_test.go b/store/tikv/client_test.go index b673e9b674358..e52498726bef1 100644 --- a/store/tikv/client_test.go +++ b/store/tikv/client_test.go @@ -17,6 +17,7 @@ import ( "testing" . "github.com/pingcap/check" + "github.com/pingcap/kvproto/pkg/tikvpb" "github.com/pingcap/tidb/config" ) @@ -52,3 +53,27 @@ func (s *testClientSuite) TestConn(c *C) { c.Assert(err, NotNil) c.Assert(conn3, IsNil) } + +func (s *testClientSuite) TestRemoveCanceledRequests(c *C) { + req := new(tikvpb.BatchCommandsRequest_Request) + entries := []*batchCommandsEntry{ + {canceled: 1, req: req}, + {canceled: 0, req: req}, + {canceled: 1, req: req}, + {canceled: 1, req: req}, + {canceled: 0, req: req}, + } + entryPtr := &entries[0] + requests := make([]*tikvpb.BatchCommandsRequest_Request, len(entries)) + for i := range entries { + requests[i] = entries[i].req + } + length := removeCanceledRequests(&entries, &requests) + c.Assert(length, Equals, 2) + for _, e := range entries { + c.Assert(e.isCanceled(), IsFalse) + } + c.Assert(len(requests), Equals, 2) + newEntryPtr := &entries[0] + c.Assert(entryPtr, Equals, newEntryPtr) +} diff --git a/store/tikv/range_task.go b/store/tikv/range_task.go index 1fac00a3a2588..dcf3d72069219 100644 --- a/store/tikv/range_task.go +++ b/store/tikv/range_task.go @@ -129,8 +129,6 @@ func (s *RangeTaskRunner) RunOnRange(ctx context.Context, startKey []byte, endKe key := startKey for { select { - case <-ctx.Done(): - return errors.Trace(ctx.Err()) case <-statLogTicker.C: logutil.Logger(ctx).Info("range task in progress", zap.String("name", s.name), @@ -168,7 +166,12 @@ func (s *RangeTaskRunner) RunOnRange(ctx context.Context, startKey []byte, endKe } pushTaskStartTime := time.Now() - taskCh <- task + + select { + case taskCh <- task: + case <-ctx.Done(): + break + } metrics.TiKVRangeTaskPushDuration.WithLabelValues(s.name).Observe(time.Since(pushTaskStartTime).Seconds()) if isLast { @@ -247,8 +250,6 @@ func (w *rangeTaskWorker) run(ctx context.Context, cancel context.CancelFunc) { } completedRegions, err := w.handler(ctx, *r) - atomic.AddInt32(w.completedRegions, int32(completedRegions)) - if err != nil { logutil.Logger(ctx).Info("canceling range task because of error", zap.String("name", w.name), @@ -259,5 +260,6 @@ func (w *rangeTaskWorker) run(ctx context.Context, cancel context.CancelFunc) { cancel() break } + atomic.AddInt32(w.completedRegions, int32(completedRegions)) } } diff --git a/store/tikv/region_cache.go b/store/tikv/region_cache.go index 64005740fbc8a..c27c15c977a19 100644 --- a/store/tikv/region_cache.go +++ b/store/tikv/region_cache.go @@ -38,13 +38,14 @@ const ( ) var ( - tikvRegionCacheCounterWithDropRegionFromCacheOK = metrics.TiKVRegionCacheCounter.WithLabelValues("drop_region_from_cache", "ok") - tikvRegionCacheCounterWithGetRegionByIDOK = metrics.TiKVRegionCacheCounter.WithLabelValues("get_region_by_id", "ok") - tikvRegionCacheCounterWithGetRegionByIDError = metrics.TiKVRegionCacheCounter.WithLabelValues("get_region_by_id", "err") - tikvRegionCacheCounterWithGetRegionOK = metrics.TiKVRegionCacheCounter.WithLabelValues("get_region", "ok") - tikvRegionCacheCounterWithGetRegionError = metrics.TiKVRegionCacheCounter.WithLabelValues("get_region", "err") - tikvRegionCacheCounterWithGetStoreOK = metrics.TiKVRegionCacheCounter.WithLabelValues("get_store", "ok") - tikvRegionCacheCounterWithGetStoreError = metrics.TiKVRegionCacheCounter.WithLabelValues("get_store", "err") + tikvRegionCacheCounterWithInvalidateRegionFromCacheOK = metrics.TiKVRegionCacheCounter.WithLabelValues("invalidate_region_from_cache", "ok") + tikvRegionCacheCounterWithSendFail = metrics.TiKVRegionCacheCounter.WithLabelValues("send_fail", "ok") + tikvRegionCacheCounterWithGetRegionByIDOK = metrics.TiKVRegionCacheCounter.WithLabelValues("get_region_by_id", "ok") + tikvRegionCacheCounterWithGetRegionByIDError = metrics.TiKVRegionCacheCounter.WithLabelValues("get_region_by_id", "err") + tikvRegionCacheCounterWithGetRegionOK = metrics.TiKVRegionCacheCounter.WithLabelValues("get_region", "ok") + tikvRegionCacheCounterWithGetRegionError = metrics.TiKVRegionCacheCounter.WithLabelValues("get_region", "err") + tikvRegionCacheCounterWithGetStoreOK = metrics.TiKVRegionCacheCounter.WithLabelValues("get_store", "ok") + tikvRegionCacheCounterWithGetStoreError = metrics.TiKVRegionCacheCounter.WithLabelValues("get_store", "err") ) const ( @@ -121,6 +122,7 @@ func (r *Region) checkRegionCacheTTL(ts int64) bool { // invalidate invalidates a region, next time it will got null result. func (r *Region) invalidate() { + tikvRegionCacheCounterWithInvalidateRegionFromCacheOK.Inc() atomic.StoreInt64(&r.lastAccess, invalidatedLastAccessTime) } @@ -359,13 +361,18 @@ func (c *RegionCache) findRegionByKey(bo *Backoffer, key []byte, isEndKey bool) } // OnSendFail handles send request fail logic. -func (c *RegionCache) OnSendFail(bo *Backoffer, ctx *RPCContext, scheduleReload bool) { +func (c *RegionCache) OnSendFail(bo *Backoffer, ctx *RPCContext, scheduleReload bool, err error) { + tikvRegionCacheCounterWithSendFail.Inc() r := c.getCachedRegionWithRLock(ctx.Region) if r != nil { c.switchNextPeer(r, ctx.PeerIdx) if scheduleReload { r.scheduleReload() } + logutil.Logger(bo.ctx).Info("switch region peer to next due to send request fail", + zap.Stringer("current", ctx), + zap.Bool("needReload", scheduleReload), + zap.Error(err)) } } @@ -457,7 +464,6 @@ func (c *RegionCache) InvalidateCachedRegion(id RegionVerID) { if cachedRegion == nil { return } - tikvRegionCacheCounterWithDropRegionFromCacheOK.Inc() cachedRegion.invalidate() } @@ -473,14 +479,23 @@ func (c *RegionCache) UpdateLeader(regionID RegionVerID, leaderStoreID uint64, c if leaderStoreID == 0 { c.switchNextPeer(r, currentPeerIdx) + logutil.Logger(context.Background()).Info("switch region peer to next due to NotLeader with NULL leader", + zap.Int("currIdx", currentPeerIdx), + zap.Uint64("regionID", regionID.GetID())) return } if !c.switchToPeer(r, leaderStoreID) { - logutil.Logger(context.Background()).Debug("regionCache: cannot find peer when updating leader", + logutil.Logger(context.Background()).Info("invalidate region cache due to cannot find peer when updating leader", zap.Uint64("regionID", regionID.GetID()), + zap.Int("currIdx", currentPeerIdx), zap.Uint64("leaderStoreID", leaderStoreID)) r.invalidate() + } else { + logutil.Logger(context.Background()).Info("switch region leader to specific leader due to kv return NotLeader", + zap.Uint64("regionID", regionID.GetID()), + zap.Int("currIdx", currentPeerIdx), + zap.Uint64("leaderStoreID", leaderStoreID)) } } @@ -712,7 +727,6 @@ func (c *RegionCache) OnRegionEpochNotMatch(bo *Backoffer, ctx *RPCContext, curr if needInvalidateOld { cachedRegion, ok := c.mu.regions[ctx.Region] if ok { - tikvRegionCacheCounterWithDropRegionFromCacheOK.Inc() cachedRegion.invalidate() } } diff --git a/store/tikv/region_cache_test.go b/store/tikv/region_cache_test.go index dc78d01413eb9..ed3a28e03e1b7 100644 --- a/store/tikv/region_cache_test.go +++ b/store/tikv/region_cache_test.go @@ -239,7 +239,7 @@ func (s *testRegionCacheSuite) TestSendFailedButLeaderNotChange(c *C) { c.Assert(len(ctx.Meta.Peers), Equals, 3) // send fail leader switch to 2 - s.cache.OnSendFail(s.bo, ctx, false) + s.cache.OnSendFail(s.bo, ctx, false, nil) ctx, err = s.cache.GetRPCContext(s.bo, loc.Region) c.Assert(err, IsNil) c.Assert(ctx.Peer.Id, Equals, s.peer2) @@ -267,7 +267,7 @@ func (s *testRegionCacheSuite) TestSendFailedInHibernateRegion(c *C) { c.Assert(len(ctx.Meta.Peers), Equals, 3) // send fail leader switch to 2 - s.cache.OnSendFail(s.bo, ctx, false) + s.cache.OnSendFail(s.bo, ctx, false, nil) ctx, err = s.cache.GetRPCContext(s.bo, loc.Region) c.Assert(err, IsNil) c.Assert(ctx.Peer.Id, Equals, s.peer2) @@ -303,13 +303,13 @@ func (s *testRegionCacheSuite) TestSendFailedInMultipleNode(c *C) { c.Assert(len(ctx.Meta.Peers), Equals, 3) // send fail leader switch to 2 - s.cache.OnSendFail(s.bo, ctx, false) + s.cache.OnSendFail(s.bo, ctx, false, nil) ctx, err = s.cache.GetRPCContext(s.bo, loc.Region) c.Assert(err, IsNil) c.Assert(ctx.Peer.Id, Equals, s.peer2) // send 2 fail leader switch to 3 - s.cache.OnSendFail(s.bo, ctx, false) + s.cache.OnSendFail(s.bo, ctx, false, nil) ctx, err = s.cache.GetRPCContext(s.bo, loc.Region) c.Assert(err, IsNil) c.Assert(ctx.Peer.Id, Equals, peer3) diff --git a/store/tikv/region_request.go b/store/tikv/region_request.go index 78fde3854f853..c1a82fb344a44 100644 --- a/store/tikv/region_request.go +++ b/store/tikv/region_request.go @@ -172,7 +172,7 @@ func (s *RegionRequestSender) onSendFail(bo *Backoffer, ctx *RPCContext, err err } } - s.regionCache.OnSendFail(bo, ctx, s.needReloadRegion(ctx)) + s.regionCache.OnSendFail(bo, ctx, s.needReloadRegion(ctx), err) // Retry on send request failure when it's not canceled. // When a store is not available, the leader of related region should be elected quickly. diff --git a/store/tikv/scan.go b/store/tikv/scan.go index dd56a1f7ed5a4..4ce5979270c02 100644 --- a/store/tikv/scan.go +++ b/store/tikv/scan.go @@ -35,9 +35,13 @@ type Scanner struct { nextStartKey []byte endKey []byte eof bool + + // Use for reverse scan. + reverse bool + nextEndKey []byte } -func newScanner(snapshot *tikvSnapshot, startKey []byte, endKey []byte, batchSize int) (*Scanner, error) { +func newScanner(snapshot *tikvSnapshot, startKey []byte, endKey []byte, batchSize int, reverse bool) (*Scanner, error) { // It must be > 1. Otherwise scanner won't skipFirst. if batchSize <= 1 { batchSize = scanBatchSize @@ -48,6 +52,8 @@ func newScanner(snapshot *tikvSnapshot, startKey []byte, endKey []byte, batchSiz valid: true, nextStartKey: startKey, endKey: endKey, + reverse: reverse, + nextEndKey: endKey, } err := scanner.Next() if kv.IsErrNotFound(err) { @@ -83,6 +89,7 @@ func (s *Scanner) Next() error { if !s.valid { return errors.New("scanner iterator is invalid") } + var err error for { s.idx++ if s.idx >= len(s.cache) { @@ -90,7 +97,7 @@ func (s *Scanner) Next() error { s.Close() return nil } - err := s.getData(bo) + err = s.getData(bo) if err != nil { s.Close() return errors.Trace(err) @@ -101,7 +108,8 @@ func (s *Scanner) Next() error { } current := s.cache[s.idx] - if len(s.endKey) > 0 && kv.Key(current.Key).Cmp(kv.Key(s.endKey)) >= 0 { + if (!s.reverse && (len(s.endKey) > 0 && kv.Key(current.Key).Cmp(kv.Key(s.endKey)) >= 0)) || + (s.reverse && len(s.nextStartKey) > 0 && kv.Key(current.Key).Cmp(kv.Key(s.nextStartKey)) < 0) { s.eof = true s.Close() return nil @@ -147,18 +155,34 @@ func (s *Scanner) resolveCurrentLock(bo *Backoffer, current *pb.KvPair) error { func (s *Scanner) getData(bo *Backoffer) error { logutil.Logger(context.Background()).Debug("txn getData", zap.Binary("nextStartKey", s.nextStartKey), + zap.Binary("nextEndKey", s.nextEndKey), + zap.Bool("reverse", s.reverse), zap.Uint64("txnStartTS", s.startTS())) sender := NewRegionRequestSender(s.snapshot.store.regionCache, s.snapshot.store.client) - + var reqEndKey, reqStartKey []byte + var loc *KeyLocation + var err error for { - loc, err := s.snapshot.store.regionCache.LocateKey(bo, s.nextStartKey) + if !s.reverse { + loc, err = s.snapshot.store.regionCache.LocateKey(bo, s.nextStartKey) + } else { + loc, err = s.snapshot.store.regionCache.LocateEndKey(bo, s.nextEndKey) + } if err != nil { return errors.Trace(err) } - reqEndKey := s.endKey - if len(reqEndKey) > 0 && len(loc.EndKey) > 0 && bytes.Compare(loc.EndKey, reqEndKey) < 0 { - reqEndKey = loc.EndKey + if !s.reverse { + reqEndKey = s.endKey + if len(reqEndKey) > 0 && len(loc.EndKey) > 0 && bytes.Compare(loc.EndKey, reqEndKey) < 0 { + reqEndKey = loc.EndKey + } + } else { + reqStartKey = s.nextStartKey + if len(reqStartKey) == 0 || + (len(loc.StartKey) > 0 && bytes.Compare(loc.StartKey, reqStartKey) > 0) { + reqStartKey = loc.StartKey + } } req := &tikvrpc.Request{ @@ -175,6 +199,11 @@ func (s *Scanner) getData(bo *Backoffer) error { NotFillCache: s.snapshot.notFillCache, }, } + if s.reverse { + req.Scan.StartKey = s.nextEndKey + req.Scan.EndKey = reqStartKey + req.Scan.Reverse = true + } resp, err := sender.SendReq(bo, req, loc.Region, ReadTimeoutMedium) if err != nil { return errors.Trace(err) @@ -218,8 +247,13 @@ func (s *Scanner) getData(bo *Backoffer) error { if len(kvPairs) < s.batchSize { // No more data in current Region. Next getData() starts // from current Region's endKey. - s.nextStartKey = loc.EndKey - if len(loc.EndKey) == 0 || (len(s.endKey) > 0 && kv.Key(s.nextStartKey).Cmp(kv.Key(s.endKey)) >= 0) { + if !s.reverse { + s.nextStartKey = loc.EndKey + } else { + s.nextEndKey = reqStartKey + } + if (!s.reverse && (len(loc.EndKey) == 0 || (len(s.endKey) > 0 && kv.Key(s.nextStartKey).Cmp(kv.Key(s.endKey)) >= 0))) || + (s.reverse && (len(loc.StartKey) == 0 || (len(s.nextStartKey) > 0 && kv.Key(s.nextStartKey).Cmp(kv.Key(s.nextEndKey)) >= 0))) { // Current Region is the last one. s.eof = true } @@ -230,7 +264,11 @@ func (s *Scanner) getData(bo *Backoffer) error { // may get an empty response if the Region in fact does not have // more data. lastKey := kvPairs[len(kvPairs)-1].GetKey() - s.nextStartKey = kv.Key(lastKey).Next() + if !s.reverse { + s.nextStartKey = kv.Key(lastKey).Next() + } else { + s.nextEndKey = kv.Key(lastKey) + } return nil } } diff --git a/store/tikv/scan_mock_test.go b/store/tikv/scan_mock_test.go index 4cf09c50c6abb..204bcc95783d9 100644 --- a/store/tikv/scan_mock_test.go +++ b/store/tikv/scan_mock_test.go @@ -42,7 +42,7 @@ func (s *testScanMockSuite) TestScanMultipleRegions(c *C) { txn, err = store.Begin() c.Assert(err, IsNil) snapshot := newTiKVSnapshot(store, kv.Version{Ver: txn.StartTS()}) - scanner, err := newScanner(snapshot, []byte("a"), nil, 10) + scanner, err := newScanner(snapshot, []byte("a"), nil, 10, false) c.Assert(err, IsNil) for ch := byte('a'); ch <= byte('z'); ch++ { c.Assert([]byte{ch}, BytesEquals, []byte(scanner.Key())) @@ -50,7 +50,7 @@ func (s *testScanMockSuite) TestScanMultipleRegions(c *C) { } c.Assert(scanner.Valid(), IsFalse) - scanner, err = newScanner(snapshot, []byte("a"), []byte("i"), 10) + scanner, err = newScanner(snapshot, []byte("a"), []byte("i"), 10, false) c.Assert(err, IsNil) for ch := byte('a'); ch <= byte('h'); ch++ { c.Assert([]byte{ch}, BytesEquals, []byte(scanner.Key())) @@ -58,3 +58,36 @@ func (s *testScanMockSuite) TestScanMultipleRegions(c *C) { } c.Assert(scanner.Valid(), IsFalse) } + +func (s *testScanMockSuite) TestReverseScan(c *C) { + store := NewTestStore(c).(*tikvStore) + defer store.Close() + + txn, err := store.Begin() + c.Assert(err, IsNil) + for ch := byte('a'); ch <= byte('z'); ch++ { + err = txn.Set([]byte{ch}, []byte{ch}) + c.Assert(err, IsNil) + } + err = txn.Commit(context.Background()) + c.Assert(err, IsNil) + + txn, err = store.Begin() + c.Assert(err, IsNil) + snapshot := newTiKVSnapshot(store, kv.Version{Ver: txn.StartTS()}) + scanner, err := newScanner(snapshot, nil, []byte("z"), 10, true) + c.Assert(err, IsNil) + for ch := byte('y'); ch >= byte('a'); ch-- { + c.Assert(string([]byte{ch}), Equals, string([]byte(scanner.Key()))) + c.Assert(scanner.Next(), IsNil) + } + c.Assert(scanner.Valid(), IsFalse) + + scanner, err = newScanner(snapshot, []byte("a"), []byte("i"), 10, true) + c.Assert(err, IsNil) + for ch := byte('h'); ch >= byte('a'); ch-- { + c.Assert(string([]byte{ch}), Equals, string([]byte(scanner.Key()))) + c.Assert(scanner.Next(), IsNil) + } + c.Assert(scanner.Valid(), IsFalse) +} diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go index a73b866255850..08df06751ac66 100644 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -295,13 +295,14 @@ func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) { // Iter return a list of key-value pair after `k`. func (s *tikvSnapshot) Iter(k kv.Key, upperBound kv.Key) (kv.Iterator, error) { - scanner, err := newScanner(s, k, upperBound, scanBatchSize) + scanner, err := newScanner(s, k, upperBound, scanBatchSize, false) return scanner, errors.Trace(err) } // IterReverse creates a reversed Iterator positioned on the first entry which key is less than k. func (s *tikvSnapshot) IterReverse(k kv.Key) (kv.Iterator, error) { - return nil, kv.ErrNotImplemented + scanner, err := newScanner(s, nil, k, scanBatchSize, true) + return scanner, errors.Trace(err) } func extractLockFromKeyErr(keyErr *pb.KeyError) (*Lock, error) { diff --git a/store/tikv/split_region.go b/store/tikv/split_region.go index b9c8a6648b0dc..4af6830605e97 100644 --- a/store/tikv/split_region.go +++ b/store/tikv/split_region.go @@ -115,7 +115,7 @@ func (s *tikvStore) WaitScatterRegionFinish(regionID uint64) error { if logFreq%10 == 0 { logutil.Logger(context.Background()).Info("wait scatter region", zap.Uint64("regionID", regionID), - zap.String("desc", string(resp.Desc)), + zap.String("reverse", string(resp.Desc)), zap.String("status", pdpb.OperatorStatus_name[int32(resp.Status)])) } logFreq++ diff --git a/structure/hash.go b/structure/hash.go index 3249884a0b6fc..ddad8d69d0344 100644 --- a/structure/hash.go +++ b/structure/hash.go @@ -216,6 +216,23 @@ func (t *TxStructure) HGetAll(key []byte) ([]HashPair, error) { return res, errors.Trace(err) } +// HGetLastN gets latest N fields and values in hash. +func (t *TxStructure) HGetLastN(key []byte, num int) ([]HashPair, error) { + res := make([]HashPair, 0, num) + err := t.iterReverseHash(key, func(field []byte, value []byte) (bool, error) { + pair := HashPair{ + Field: append([]byte{}, field...), + Value: append([]byte{}, value...), + } + res = append(res, pair) + if len(res) >= num { + return false, nil + } + return true, nil + }) + return res, errors.Trace(err) +} + // HClear removes the hash value of the key. func (t *TxStructure) HClear(key []byte) error { metaKey := t.encodeHashMetaKey(key) @@ -268,6 +285,37 @@ func (t *TxStructure) iterateHash(key []byte, fn func(k []byte, v []byte) error) return nil } +func (t *TxStructure) iterReverseHash(key []byte, fn func(k []byte, v []byte) (bool, error)) error { + dataPrefix := t.hashDataKeyPrefix(key) + it, err := t.reader.IterReverse(dataPrefix.PrefixNext()) + if err != nil { + return errors.Trace(err) + } + + var field []byte + for it.Valid() { + if !it.Key().HasPrefix(dataPrefix) { + break + } + + _, field, err = t.decodeHashDataKey(it.Key()) + if err != nil { + return errors.Trace(err) + } + + more, err := fn(field, it.Value()) + if !more || err != nil { + return errors.Trace(err) + } + + err = it.Next() + if err != nil { + return errors.Trace(err) + } + } + return nil +} + func (t *TxStructure) loadHashMeta(metaKey []byte) (hashMeta, error) { v, err := t.reader.Get(metaKey) if kv.ErrNotExist.Equal(err) { diff --git a/structure/structure_test.go b/structure/structure_test.go index 5ecab9f75c3a9..e6e55fcf5dfef 100644 --- a/structure/structure_test.go +++ b/structure/structure_test.go @@ -244,6 +244,17 @@ func (s *testTxStructureSuite) TestHash(c *C) { {Field: []byte("1"), Value: []byte("1")}, {Field: []byte("2"), Value: []byte("2")}}) + res, err = tx.HGetLastN(key, 1) + c.Assert(err, IsNil) + c.Assert(res, DeepEquals, []structure.HashPair{ + {Field: []byte("2"), Value: []byte("2")}}) + + res, err = tx.HGetLastN(key, 2) + c.Assert(err, IsNil) + c.Assert(res, DeepEquals, []structure.HashPair{ + {Field: []byte("2"), Value: []byte("2")}, + {Field: []byte("1"), Value: []byte("1")}}) + err = tx.HDel(key, []byte("1")) c.Assert(err, IsNil) diff --git a/structure/type.go b/structure/type.go index 89759269871c9..7096d70e86984 100644 --- a/structure/type.go +++ b/structure/type.go @@ -63,6 +63,11 @@ func (t *TxStructure) encodeHashDataKey(key []byte, field []byte) kv.Key { return codec.EncodeBytes(ek, field) } +// EncodeHashDataKey exports for tests. +func (t *TxStructure) EncodeHashDataKey(key []byte, field []byte) kv.Key { + return t.encodeHashDataKey(key, field) +} + func (t *TxStructure) decodeHashDataKey(ek kv.Key) ([]byte, []byte, error) { var ( key []byte diff --git a/util/admin/admin.go b/util/admin/admin.go index 1d29b768da353..facf632b6b828 100644 --- a/util/admin/admin.go +++ b/util/admin/admin.go @@ -227,7 +227,7 @@ const DefNumHistoryJobs = 10 // The maximum count of history jobs is num. func GetHistoryDDLJobs(txn kv.Transaction, maxNumJobs int) ([]*model.Job, error) { t := meta.NewMeta(txn) - jobs, err := t.GetAllHistoryDDLJobs() + jobs, err := t.GetLastNHistoryDDLJobs(maxNumJobs) if err != nil { return nil, errors.Trace(err) }