Skip to content

Commit

Permalink
Merge branch 'master' into br_keep_upstream
Browse files Browse the repository at this point in the history
  • Loading branch information
3pointer committed Aug 6, 2021
2 parents 389b4e1 + 4d9473a commit b32ce9b
Show file tree
Hide file tree
Showing 53 changed files with 1,109 additions and 435 deletions.
33 changes: 18 additions & 15 deletions bindinfo/bind_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ func (s *testSuite) TestGlobalBinding(c *C) {
c.Check(err, IsNil)
c.Check(chk.NumRows(), Equals, 0)

_, err = tk.Exec("delete from mysql.bind_info")
_, err = tk.Exec("delete from mysql.bind_info where source != 'builtin'")
c.Assert(err, IsNil)
}
}
Expand Down Expand Up @@ -1102,6 +1102,7 @@ func (s *testSuite) TestBaselineDBLowerCase(c *C) {
// default_db should have lower case.
c.Assert(rows[0][2], Equals, "spm")
tk.MustQuery("select original_sql, default_db, status from mysql.bind_info where original_sql = 'select * from `spm` . `t`'").Check(testkit.Rows(
"select * from `spm` . `t` SPM deleted",
"select * from `spm` . `t` spm using",
))
}
Expand Down Expand Up @@ -1512,9 +1513,9 @@ func (s *testSuite) TestReloadBindings(c *C) {
tk.MustExec("create global binding for select * from t using select * from t use index(idx)")
rows := tk.MustQuery("show global bindings").Rows()
c.Assert(len(rows), Equals, 1)
rows = tk.MustQuery("select * from mysql.bind_info").Rows()
rows = tk.MustQuery("select * from mysql.bind_info where source != 'builtin'").Rows()
c.Assert(len(rows), Equals, 1)
tk.MustExec("truncate table mysql.bind_info")
tk.MustExec("delete from mysql.bind_info where source != 'builtin'")
c.Assert(s.domain.BindHandle().Update(false), IsNil)
rows = tk.MustQuery("show global bindings").Rows()
c.Assert(len(rows), Equals, 1)
Expand Down Expand Up @@ -1595,7 +1596,7 @@ func (s *testSuite) TestOutdatedInfoSchema(c *C) {
tk.MustExec("create table t(a int, b int, index idx(a))")
tk.MustExec("create global binding for select * from t using select * from t use index(idx)")
c.Assert(s.domain.BindHandle().Update(false), IsNil)
tk.MustExec("truncate table mysql.bind_info")
s.cleanBindingEnv(tk)
tk.MustExec("create global binding for select * from t using select * from t use index(idx)")
}

Expand Down Expand Up @@ -2002,11 +2003,11 @@ func (s *testSuite) TestReCreateBind(c *C) {
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int, b int, index idx(a))")

tk.MustQuery("select * from mysql.bind_info").Check(testkit.Rows())
tk.MustQuery("select * from mysql.bind_info where source != 'builtin'").Check(testkit.Rows())
tk.MustQuery("show global bindings").Check(testkit.Rows())

tk.MustExec("create global binding for select * from t using select * from t")
tk.MustQuery("select original_sql, status from mysql.bind_info").Check(testkit.Rows(
tk.MustQuery("select original_sql, status from mysql.bind_info where source != 'builtin';").Check(testkit.Rows(
"select * from `test` . `t` using",
))
rows := tk.MustQuery("show global bindings").Rows()
Expand All @@ -2015,13 +2016,15 @@ func (s *testSuite) TestReCreateBind(c *C) {
c.Assert(rows[0][3], Equals, "using")

tk.MustExec("create global binding for select * from t using select * from t")
tk.MustQuery("select original_sql, status from mysql.bind_info").Check(testkit.Rows(
"select * from `test` . `t` using",
))
rows = tk.MustQuery("show global bindings").Rows()
c.Assert(len(rows), Equals, 1)
c.Assert(rows[0][0], Equals, "select * from `test` . `t`")
c.Assert(rows[0][3], Equals, "using")

rows = tk.MustQuery("select original_sql, status from mysql.bind_info where source != 'builtin';").Rows()
c.Assert(len(rows), Equals, 2)
c.Assert(rows[0][1], Equals, "deleted")
c.Assert(rows[1][1], Equals, "using")
}

func (s *testSuite) TestExplainShowBindSQL(c *C) {
Expand All @@ -2036,10 +2039,9 @@ func (s *testSuite) TestExplainShowBindSQL(c *C) {
"select * from `test` . `t` SELECT * FROM `test`.`t` USE INDEX (`a`)",
))

tk.MustExec("explain select * from t")
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1105 Using the bindSQL: SELECT * FROM `test`.`t` USE INDEX (`a`)"))
tk.MustExec("explain analyze select * from t")
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1105 Using the bindSQL: SELECT * FROM `test`.`t` USE INDEX (`a`)"))
tk.MustExec("explain format = 'verbose' select * from t")
tk.MustQuery("show warnings").Check(testkit.Rows("Note 1105 Using the bindSQL: SELECT * FROM `test`.`t` USE INDEX (`a`)"))
// explain analyze do not support verbose yet.
}

func (s *testSuite) TestDMLIndexHintBind(c *C) {
Expand Down Expand Up @@ -2097,8 +2099,9 @@ func (s *testSuite) TestConcurrentCapture(c *C) {
tk.MustExec("select * from t")
tk.MustExec("select * from t")
tk.MustExec("admin capture bindings")
tk.MustQuery("select original_sql, source from mysql.bind_info where source != 'builtin'").Check(testkit.Rows(
"select * from `test` . `t` capture",
tk.MustQuery("select original_sql, source, status from mysql.bind_info where source != 'builtin'").Check(testkit.Rows(
"select * from `test` . `t` manual deleted",
"select * from `test` . `t` capture using",
))
}

Expand Down
20 changes: 14 additions & 6 deletions bindinfo/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (h *BindHandle) Update(fullLoad bool) (err error) {

exec := h.sctx.Context.(sqlexec.RestrictedSQLExecutor)
stmt, err := exec.ParseWithParams(context.TODO(), `SELECT original_sql, bind_sql, default_db, status, create_time, update_time, charset, collation, source
FROM mysql.bind_info WHERE update_time > %? ORDER BY update_time`, updateTime)
FROM mysql.bind_info WHERE update_time > %? ORDER BY update_time, create_time`, updateTime)
if err != nil {
return err
}
Expand Down Expand Up @@ -218,14 +218,16 @@ func (h *BindHandle) CreateBindRecord(sctx sessionctx.Context, record *BindRecor
if err = h.lockBindInfoTable(); err != nil {
return err
}
// Binding recreation should physically delete previous bindings.
_, err = exec.ExecuteInternal(context.TODO(), `DELETE FROM mysql.bind_info WHERE original_sql = %?`, record.OriginalSQL)

now := types.NewTime(types.FromGoTime(time.Now()), mysql.TypeTimestamp, 3)

updateTs := now.String()
_, err = exec.ExecuteInternal(context.TODO(), `UPDATE mysql.bind_info SET status = %?, update_time = %? WHERE original_sql = %? AND update_time < %?`,
deleted, updateTs, record.OriginalSQL, updateTs)
if err != nil {
return err
}

now := types.NewTime(types.FromGoTime(time.Now()), mysql.TypeTimestamp, 3)

for i := range record.Bindings {
record.Bindings[i].CreateTime = now
record.Bindings[i].UpdateTime = now
Expand Down Expand Up @@ -697,7 +699,13 @@ func getHintsForSQL(sctx sessionctx.Context, sql string) (string, error) {
rs, err := sctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), fmt.Sprintf("EXPLAIN FORMAT='hint' %s", sql))
sctx.GetSessionVars().UsePlanBaselines = origVals
if rs != nil {
defer terror.Call(rs.Close)
defer func() {
// Audit log is collected in Close(), set InRestrictedSQL to avoid 'create sql binding' been recorded as 'explain'.
origin := sctx.GetSessionVars().InRestrictedSQL
sctx.GetSessionVars().InRestrictedSQL = true
terror.Call(rs.Close)
sctx.GetSessionVars().InRestrictedSQL = origin
}()
}
if err != nil {
return "", err
Expand Down
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,7 @@ var deprecatedConfig = map[string]struct{}{
"tikv-client.copr-cache.enable": {},
"alter-primary-key": {}, // use NONCLUSTERED keyword instead
"enable-streaming": {},
"allow-expression-index": {},
}

func isAllDeprecatedConfigItems(items []string) bool {
Expand Down
4 changes: 2 additions & 2 deletions ddl/column_type_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2175,8 +2175,8 @@ func (s *testColumnTypeChangeSuite) TestCastDateToTimestampInReorgAttribute(c *C
s.dom.DDL().(ddl.DDLForTest).SetHook(hook)

tk.MustExec("alter table t modify column a TIMESTAMP NULL DEFAULT '2021-04-28 03:35:11' FIRST")
c.Assert(checkErr1.Error(), Equals, "[types:1292]Incorrect datetime value: '3977-02-22 00:00:00'")
c.Assert(checkErr2.Error(), Equals, "[types:1292]Incorrect datetime value: '3977-02-22 00:00:00'")
c.Assert(checkErr1.Error(), Equals, "[types:1292]Incorrect timestamp value: '3977-02-22'")
c.Assert(checkErr2.Error(), Equals, "[types:1292]Incorrect timestamp value: '3977-02-22'")
tk.MustExec("drop table if exists t")
}

Expand Down
25 changes: 20 additions & 5 deletions ddl/db_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1052,8 +1052,9 @@ func (s *testStateChangeSuite) TestParallelAlterModifyColumn(c *C) {
f := func(c *C, err1, err2 error) {
c.Assert(err1, IsNil)
c.Assert(err2, IsNil)
_, err := s.se.Execute(context.Background(), "select * from t")
rs, err := s.se.Execute(context.Background(), "select * from t")
c.Assert(err, IsNil)
c.Assert(rs[0].Close(), IsNil)
}
s.testControlParallelExecSQL(c, sql, sql, f)
}
Expand All @@ -1064,8 +1065,9 @@ func (s *testStateChangeSuite) TestParallelAddGeneratedColumnAndAlterModifyColum
f := func(c *C, err1, err2 error) {
c.Assert(err1, IsNil)
c.Assert(err2.Error(), Equals, "[ddl:8200]Unsupported modify column: oldCol is a dependent column 'a' for generated column")
_, err := s.se.Execute(context.Background(), "select * from t")
rs, err := s.se.Execute(context.Background(), "select * from t")
c.Assert(err, IsNil)
c.Assert(rs[0].Close(), IsNil)
}
s.testControlParallelExecSQL(c, sql1, sql2, f)
}
Expand All @@ -1076,8 +1078,9 @@ func (s *testStateChangeSuite) TestParallelAlterModifyColumnAndAddPK(c *C) {
f := func(c *C, err1, err2 error) {
c.Assert(err1, IsNil)
c.Assert(err2.Error(), Equals, "[ddl:8200]Unsupported modify column: this column has primary key flag")
_, err := s.se.Execute(context.Background(), "select * from t")
rs, err := s.se.Execute(context.Background(), "select * from t")
c.Assert(err, IsNil)
c.Assert(rs[0].Close(), IsNil)
}
s.testControlParallelExecSQL(c, sql1, sql2, f)
}
Expand Down Expand Up @@ -1361,12 +1364,24 @@ func (s *testStateChangeSuiteBase) testControlParallelExecSQL(c *C, sql1, sql2 s
wg.Add(2)
go func() {
defer wg.Done()
_, err1 = se.Execute(context.Background(), sql1)
var rss []sqlexec.RecordSet
rss, err1 = se.Execute(context.Background(), sql1)
if err1 == nil && len(rss) > 0 {
for _, rs := range rss {
c.Assert(rs.Close(), IsNil)
}
}
}()
go func() {
defer wg.Done()
<-ch
_, err2 = se1.Execute(context.Background(), sql2)
var rss []sqlexec.RecordSet
rss, err2 = se1.Execute(context.Background(), sql2)
if err2 == nil && len(rss) > 0 {
for _, rs := range rss {
c.Assert(rs.Close(), IsNil)
}
}
}()

wg.Wait()
Expand Down
13 changes: 7 additions & 6 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,12 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, e Executor) (sqlex
ctx = opentracing.ContextWithSpan(ctx, span1)
}

var err error
defer func() {
terror.Log(e.Close())
a.logAudit()
}()

// Check if "tidb_snapshot" is set for the write executors.
// In history read mode, we can not do write operations.
switch e.(type) {
Expand All @@ -581,12 +587,6 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, e Executor) (sqlex
}
}

var err error
defer func() {
terror.Log(e.Close())
a.logAudit()
}()

err = Next(ctx, e, newFirstChunk(e))
if err != nil {
return nil, err
Expand Down Expand Up @@ -829,6 +829,7 @@ func (a *ExecStmt) logAudit() {
if sessVars.InRestrictedSQL {
return
}

err := plugin.ForeachPlugin(plugin.Audit, func(p *plugin.Plugin) error {
audit := plugin.DeclareAuditManifest(p.Manifest)
if audit.OnGeneralEvent != nil {
Expand Down
36 changes: 28 additions & 8 deletions executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/executor/aggfuncs"
"github.com/pingcap/tidb/expression"
Expand Down Expand Up @@ -298,14 +299,17 @@ func (e *HashAggExec) Close() error {

// Open implements the Executor Open interface.
func (e *HashAggExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return err
}
failpoint.Inject("mockHashAggExecBaseExecutorOpenReturnedError", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(errors.New("mock HashAggExec.baseExecutor.Open returned error"))
}
})

if err := e.baseExecutor.Open(ctx); err != nil {
return err
}
// If panic here, the children executor should be closed because they are open.
defer closeBaseExecutor(&e.baseExecutor)
e.prepared = false

e.memTracker = memory.NewTracker(e.id, -1)
Expand Down Expand Up @@ -344,6 +348,15 @@ func (e *HashAggExec) initForUnparallelExec() {
}
}

func closeBaseExecutor(b *baseExecutor) {
if r := recover(); r != nil {
// Release the resource, but throw the panic again and let the top level handle it.
terror.Log(b.Close())
logutil.BgLogger().Warn("panic in Open(), close base executor and throw exception again")
panic(r)
}
}

func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) {
sessionVars := e.ctx.GetSessionVars()
finalConcurrency := sessionVars.HashAggFinalConcurrency()
Expand Down Expand Up @@ -1218,14 +1231,18 @@ type StreamAggExec struct {

// Open implements the Executor Open interface.
func (e *StreamAggExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return err
}
failpoint.Inject("mockStreamAggExecBaseExecutorOpenReturnedError", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(errors.New("mock StreamAggExec.baseExecutor.Open returned error"))
}
})

if err := e.baseExecutor.Open(ctx); err != nil {
return err
}
// If panic in Open, the children executor should be closed because they are open.
defer closeBaseExecutor(&e.baseExecutor)

e.childResult = newFirstChunk(e.children[0])
e.executed = false
e.isChildReturnEmpty = true
Expand Down Expand Up @@ -1886,10 +1903,13 @@ type AggSpillDiskAction struct {

// Action set HashAggExec spill mode.
func (a *AggSpillDiskAction) Action(t *memory.Tracker) {
if atomic.LoadUint32(&a.e.inSpillMode) == 0 && a.spillTimes < maxSpillTimes {
// Guarantee that processed data is at least 20% of the threshold, to avoid spilling too frequently.
if atomic.LoadUint32(&a.e.inSpillMode) == 0 && a.spillTimes < maxSpillTimes && a.e.memTracker.BytesConsumed() >= t.GetBytesLimit()/5 {
a.spillTimes++
logutil.BgLogger().Info("memory exceeds quota, set aggregate mode to spill-mode",
zap.Uint32("spillTimes", a.spillTimes))
zap.Uint32("spillTimes", a.spillTimes),
zap.Int64("consumed", t.BytesConsumed()),
zap.Int64("quota", t.GetBytesLimit()))
atomic.StoreUint32(&a.e.inSpillMode, 1)
return
}
Expand Down
13 changes: 12 additions & 1 deletion executor/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1471,7 +1471,7 @@ func (s *testSerialSuite) TestAggInDisk(c *C) {
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t(a int)")
sql := "insert into t values (0)"
for i := 1; i <= 300; i++ {
for i := 1; i <= 200; i++ {
sql += fmt.Sprintf(",(%v)", i)
}
sql += ";"
Expand All @@ -1488,4 +1488,15 @@ func (s *testSerialSuite) TestAggInDisk(c *C) {
strings.Contains(disk, "Bytes"), IsTrue)
}
}

// Add code cover
// Test spill chunk. Add a line to avoid tmp spill chunk is always full.
tk.MustExec("insert into t values(0)")
tk.MustQuery("select sum(tt.b) from ( select /*+ HASH_AGG() */ avg(t1.a) as b from t t1 join t t2 group by t1.a, t2.a) as tt").Check(
testkit.Rows("4040100.0000"))
// Test no groupby and no data.
tk.MustExec("drop table t;")
tk.MustExec("create table t(c int, c1 int);")
tk.MustQuery("select /*+ HASH_AGG() */ count(c) from t;").Check(testkit.Rows("0"))
tk.MustQuery("select /*+ HASH_AGG() */ count(c) from t group by c1;").Check(testkit.Rows())
}
7 changes: 7 additions & 0 deletions executor/bind.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ func (e *SQLBindExec) dropSQLBind() error {
}

func (e *SQLBindExec) createSQLBind() error {
// For audit log, SQLBindExec execute "explain" statement internally, save and recover stmtctx
// is necessary to avoid 'create binding' been recorded as 'explain'.
saveStmtCtx := e.ctx.GetSessionVars().StmtCtx
defer func() {
e.ctx.GetSessionVars().StmtCtx = saveStmtCtx
}()

bindInfo := bindinfo.Binding{
BindSQL: e.bindSQL,
Charset: e.charset,
Expand Down
3 changes: 2 additions & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1950,7 +1950,8 @@ func (b *executorBuilder) buildUpdate(v *plannercore.Update) Executor {
if b.err != nil {
return nil
}
b.err = plannercore.CheckUpdateList(assignFlag, v)
// should use the new tblID2table, since the update's schema may have been changed in Execstmt.
b.err = plannercore.CheckUpdateList(assignFlag, v, tblID2table)
if b.err != nil {
return nil
}
Expand Down
Loading

0 comments on commit b32ce9b

Please sign in to comment.