Skip to content

Commit

Permalink
executor: let plan replayer support view (#37224)
Browse files Browse the repository at this point in the history
close #37163
  • Loading branch information
Yisaer authored Aug 23, 2022
1 parent 78b32f3 commit 45588a1
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 22 deletions.
4 changes: 4 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,12 @@ func TestPlanReplayer(t *testing.T) {

tk.MustExec("create table t1 (a int)")
tk.MustExec("create table t2 (a int)")
tk.MustExec("create definer=`root`@`127.0.0.1` view v1 as select * from t1")
tk.MustExec("create definer=`root`@`127.0.0.1` view v2 as select * from v1")
tk.MustExec("plan replayer dump explain with tmp as (select a from t1 group by t1.a) select * from tmp, t2 where t2.a=tmp.a;")
tk.MustExec("plan replayer dump explain select * from t1 where t1.a > (with cte1 as (select 1) select count(1) from cte1);")
tk.MustExec("plan replayer dump explain select * from v1")
tk.MustExec("plan replayer dump explain select * from v2")
}

func TestShow(t *testing.T) {
Expand Down
125 changes: 103 additions & 22 deletions executor/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"encoding/base64"
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -59,12 +60,17 @@ type PlanReplayerSingleExec struct {
type tableNamePair struct {
DBName string
TableName string
IsView bool
}

type tableNameExtractor struct {
curDB string
ctx context.Context
executor sqlexec.RestrictedSQLExecutor
is infoschema.InfoSchema
curDB model.CIStr
names map[tableNamePair]struct{}
cteNames map[string]struct{}
err error
}

func (tne *tableNameExtractor) Enter(in ast.Node) (ast.Node, bool) {
Expand All @@ -75,10 +81,18 @@ func (tne *tableNameExtractor) Enter(in ast.Node) (ast.Node, bool) {
}

func (tne *tableNameExtractor) Leave(in ast.Node) (ast.Node, bool) {
if tne.err != nil {
return in, true
}
if t, ok := in.(*ast.TableName); ok {
tp := tableNamePair{DBName: t.Schema.L, TableName: t.Name.L}
isView, err := tne.handleIsView(t)
if err != nil {
tne.err = err
return in, true
}
tp := tableNamePair{DBName: t.Schema.L, TableName: t.Name.L, IsView: isView}
if tp.DBName == "" {
tp.DBName = tne.curDB
tp.DBName = tne.curDB.L
}
if _, ok := tne.names[tp]; !ok {
tne.names[tp] = struct{}{}
Expand All @@ -93,6 +107,29 @@ func (tne *tableNameExtractor) Leave(in ast.Node) (ast.Node, bool) {
return in, true
}

func (tne *tableNameExtractor) handleIsView(t *ast.TableName) (bool, error) {
schema := t.Schema
if schema.L == "" {
schema = tne.curDB
}
table := t.Name
isView := tne.is.TableIsView(schema, table)
if !isView {
return false, nil
}
viewTbl, err := tne.is.TableByName(schema, table)
if err != nil {
return false, err
}
sql := viewTbl.Meta().View.SelectStmt
node, err := tne.executor.ParseWithParams(tne.ctx, sql)
if err != nil {
return false, err
}
node.Accept(tne)
return true, nil
}

// Next implements the Executor Next interface.
func (e *PlanReplayerSingleExec) Next(ctx context.Context, req *chunk.Chunk) error {
req.GrowAndReset(e.maxChunkSize)
Expand All @@ -102,7 +139,7 @@ func (e *PlanReplayerSingleExec) Next(ctx context.Context, req *chunk.Chunk) err
if e.ExecStmt == nil {
return errors.New("plan replayer: sql is empty")
}
res, err := e.dumpSingle(domain.GetPlanReplayerDirName())
res, err := e.dumpSingle(ctx, domain.GetPlanReplayerDirName())
if err != nil {
return err
}
Expand All @@ -115,7 +152,14 @@ func (e *PlanReplayerSingleExec) Next(ctx context.Context, req *chunk.Chunk) err
// The files will be organized into the following format:
/*
|-meta.txt
|-schema.sql
|-schema
| |-db1.table1.schema.txt
| |-db2.table2.schema.txt
| |-....
|-view
| |-db1.view1.view.txt
| |-db2.view2.view.txt
| |-....
|-stats
| |-stats1.json
| |-stats2.json
Expand All @@ -127,7 +171,7 @@ func (e *PlanReplayerSingleExec) Next(ctx context.Context, req *chunk.Chunk) err
|_explain
|-explain.txt
*/
func (e *PlanReplayerSingleExec) dumpSingle(path string) (fileName string, err error) {
func (e *PlanReplayerSingleExec) dumpSingle(ctx context.Context, path string) (fileName string, err error) {
// Create path
err = os.MkdirAll(path, os.ModePerm)
if err != nil {
Expand Down Expand Up @@ -178,12 +222,12 @@ func (e *PlanReplayerSingleExec) dumpSingle(path string) (fileName string, err e
do := domain.GetDomain(e.ctx)

// Retrieve all tables
pairs, err := extractTableNames(e.ExecStmt, dbName.L)
pairs, err := e.extractTableNames(ctx, e.ExecStmt, dbName)
if err != nil {
return "", errors.AddStack(fmt.Errorf("plan replayer: invalid SQL text, err: %v", err))
}

// Dump Schema
// Dump Schema and View
if err = dumpSchemas(e.ctx, zw, pairs); err != nil {
return "", err
}
Expand Down Expand Up @@ -261,6 +305,9 @@ func dumpSchemas(ctx sessionctx.Context, zw *zip.Writer, pairs map[tableNamePair

func dumpStats(zw *zip.Writer, pairs map[tableNamePair]struct{}, do *domain.Domain) error {
for pair := range pairs {
if pair.IsView {
continue
}
jsonTbl, err := getStatsForTable(do, pair)
if err != nil {
return err
Expand Down Expand Up @@ -392,16 +439,27 @@ func dumpExplain(ctx sessionctx.Context, zw *zip.Writer, sql string, isAnalyze b
return nil
}

func extractTableNames(ExecStmt ast.StmtNode, curDB string) (map[tableNamePair]struct{}, error) {
func (e *PlanReplayerSingleExec) extractTableNames(ctx context.Context,
ExecStmt ast.StmtNode, curDB model.CIStr) (map[tableNamePair]struct{}, error) {
tableExtractor := &tableNameExtractor{
ctx: ctx,
executor: e.ctx.(sqlexec.RestrictedSQLExecutor),
is: domain.GetDomain(e.ctx).InfoSchema(),
curDB: curDB,
names: make(map[tableNamePair]struct{}),
cteNames: make(map[string]struct{}),
}
ExecStmt.Accept(tableExtractor)
if tableExtractor.err != nil {
return nil, tableExtractor.err
}
r := make(map[tableNamePair]struct{})
// remove cte in table names
for tablePair := range tableExtractor.names {
if tablePair.IsView {
r[tablePair] = struct{}{}
continue
}
// remove cte in table names
_, ok := tableExtractor.cteNames[tablePair.TableName]
if !ok {
r[tablePair] = struct{}{}
Expand Down Expand Up @@ -430,14 +488,25 @@ func getShowCreateTable(pair tableNamePair, zw *zip.Writer, ctx sessionctx.Conte
if err != nil {
return err
}
fw, err := zw.Create(fmt.Sprintf("schema/%v.%v.schema.txt", pair.DBName, pair.TableName))
if err != nil {
return errors.AddStack(err)
}
if len(sRows) == 0 || len(sRows[0]) != 2 {
return fmt.Errorf("plan replayer: get create table %v.%v failed", pair.DBName, pair.TableName)
var fw io.Writer
if pair.IsView {
fw, err = zw.Create(fmt.Sprintf("view/%v.%v.view.txt", pair.DBName, pair.TableName))
if err != nil {
return errors.AddStack(err)
}
if len(sRows) == 0 || len(sRows[0]) != 4 {
return fmt.Errorf("plan replayer: get create view %v.%v failed", pair.DBName, pair.TableName)
}
} else {
fw, err = zw.Create(fmt.Sprintf("schema/%v.%v.schema.txt", pair.DBName, pair.TableName))
if err != nil {
return errors.AddStack(err)
}
if len(sRows) == 0 || len(sRows[0]) != 2 {
return fmt.Errorf("plan replayer: get create table %v.%v failed", pair.DBName, pair.TableName)
}
}
fmt.Fprintf(fw, "create database `%v`; use `%v`;", pair.DBName, pair.DBName)
fmt.Fprintf(fw, "create database if not exists `%v`; use `%v`;", pair.DBName, pair.DBName)
fmt.Fprintf(fw, "%s", sRows[0][1])
if len(recordSets) > 0 {
if err := recordSets[0].Close(); err != nil {
Expand Down Expand Up @@ -570,7 +639,8 @@ func loadVariables(ctx sessionctx.Context, z *zip.Reader) error {
return nil
}

func createSchemaAndTables(ctx sessionctx.Context, f *zip.File) error {
// createSchemaAndItems creates schema and tables or views
func createSchemaAndItems(ctx sessionctx.Context, f *zip.File) error {
r, err := f.Open()
if err != nil {
return errors.AddStack(err)
Expand All @@ -587,15 +657,15 @@ func createSchemaAndTables(ctx sessionctx.Context, f *zip.File) error {
return errors.New("plan replayer: create schema and tables failed")
}
c := context.Background()
// create database
// create database if not exists
_, err = ctx.(sqlexec.SQLExecutor).Execute(c, sqls[0])
logutil.BgLogger().Debug("plan replayer: skip error", zap.Error(err))
// use database
_, err = ctx.(sqlexec.SQLExecutor).Execute(c, sqls[1])
if err != nil {
return err
}
// create table
// create table or view
_, err = ctx.(sqlexec.SQLExecutor).Execute(c, sqls[2])
if err != nil {
return err
Expand Down Expand Up @@ -641,11 +711,22 @@ func (e *PlanReplayerLoadInfo) Update(data []byte) error {
return err
}

// build schema and table
// build schema and table first
for _, zipFile := range z.File {
path := strings.Split(zipFile.Name, "/")
if len(path) == 2 && strings.Compare(path[0], "schema") == 0 {
err = createSchemaAndTables(e.Ctx, zipFile)
err = createSchemaAndItems(e.Ctx, zipFile)
if err != nil {
return err
}
}
}

// build view next
for _, zipFile := range z.File {
path := strings.Split(zipFile.Name, "/")
if len(path) == 2 && strings.Compare(path[0], "view") == 0 {
err = createSchemaAndItems(e.Ctx, zipFile)
if err != nil {
return err
}
Expand Down

0 comments on commit 45588a1

Please sign in to comment.