Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: let plan replayer record table tiflash replica #37336

Merged
merged 12 commits into from
Aug 24, 2022
7 changes: 7 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/golang/protobuf/proto"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain"
Expand Down Expand Up @@ -140,12 +141,18 @@ func TestLoadStats(t *testing.T) {
}

func TestPlanReplayer(t *testing.T) {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/infoschema/mockTiFlashStoreCount", `return(true)`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/infoschema/mockTiFlashStoreCount"))
}()
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int, b int, index idx_a(a))")
tk.MustExec("alter table t set tiflash replica 1")
tk.MustExec("plan replayer dump explain select * from t where a=10")
tk.MustExec("plan replayer dump explain select /*+ read_from_storage(tiflash[t]) */ * from t")

tk.MustExec("create table t1 (a int)")
tk.MustExec("create table t2 (a int)")
Expand Down
167 changes: 119 additions & 48 deletions executor/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,17 @@ import (
var _ Executor = &PlanReplayerSingleExec{}
var _ Executor = &PlanReplayerLoadExec{}

const (
configFile = "config.toml"
metaFile = "meta.txt"
variablesFile = "variables.toml"
sqlFile = "sqls.sql"
tiFlashReplicasFile = "table_tiflash_replica.txt"
sessionBindingFile = "session_bindings.sql"
globalBindingFile = "global_bindings.sql"
explainFile = "explain.txt"
)

// PlanReplayerSingleExec represents a plan replayer executor.
type PlanReplayerSingleExec struct {
baseExecutor
Expand Down Expand Up @@ -165,6 +176,7 @@ func (e *PlanReplayerSingleExec) Next(ctx context.Context, req *chunk.Chunk) err
| |-stats2.json
| |-....
|-config.toml
|-table_tiflash_replica.txt
|-variables.toml
|-bindings.sql
|-sqls.sql
Expand Down Expand Up @@ -232,6 +244,11 @@ func (e *PlanReplayerSingleExec) dumpSingle(ctx context.Context, path string) (f
return "", err
}

// Dump tables tiflash replicas
if err = dumpTiFlashReplica(e.ctx, zw, pairs); err != nil {
return "", err
}

// Dump stats
if err = dumpStats(zw, pairs, do); err != nil {
return "", err
Expand All @@ -243,7 +260,7 @@ func (e *PlanReplayerSingleExec) dumpSingle(ctx context.Context, path string) (f
}

// Dump sql
sql, err := zw.Create("sqls.sql")
sql, err := zw.Create(sqlFile)
if err != nil {
return "", nil
}
Expand Down Expand Up @@ -271,7 +288,7 @@ func (e *PlanReplayerSingleExec) dumpSingle(ctx context.Context, path string) (f
}

func dumpConfig(zw *zip.Writer) error {
cf, err := zw.Create("config.toml")
cf, err := zw.Create(configFile)
if err != nil {
return errors.AddStack(err)
}
Expand All @@ -282,7 +299,7 @@ func dumpConfig(zw *zip.Writer) error {
}

func dumpMeta(zw *zip.Writer) error {
mt, err := zw.Create("meta.txt")
mt, err := zw.Create(metaFile)
if err != nil {
return errors.AddStack(err)
}
Expand All @@ -293,6 +310,40 @@ func dumpMeta(zw *zip.Writer) error {
return nil
}

func dumpTiFlashReplica(ctx sessionctx.Context, zw *zip.Writer, pairs map[tableNamePair]struct{}) error {
bf, err := zw.Create(tiFlashReplicasFile)
if err != nil {
return errors.AddStack(err)
}
is := domain.GetDomain(ctx).InfoSchema()
for pair := range pairs {
dbName := model.NewCIStr(pair.DBName)
tableName := model.NewCIStr(pair.TableName)
t, err := is.TableByName(dbName, tableName)
if err != nil {
logutil.BgLogger().Warn("failed to find table info", zap.Error(err),
zap.String("dbName", dbName.L), zap.String("tableName", tableName.L))
continue
}
if t.Meta().TiFlashReplica != nil && t.Meta().TiFlashReplica.Count > 0 {
sql := fmt.Sprintf("SELECT TABLE_SCHEMA,TABLE_NAME,REPLICA_COUNT FROM INFORMATION_SCHEMA.TIFLASH_REPLICA WHERE TABLE_SCHEMA ='%s' AND TABLE_NAME='%s' AND REPLICA_COUNT > 0",
Yisaer marked this conversation as resolved.
Show resolved Hide resolved
dbName.L, tableName.L)
recordSet, err := ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.Background(), sql)
if err != nil {
return err
}
sRows, err := resultSetToStringSlice(context.Background(), recordSet)
if err != nil {
return err
}
for _, row := range sRows {
fmt.Fprintf(bf, "%s\n", strings.Join(row, "\t"))
}
}
}
return nil
}

func dumpSchemas(ctx sessionctx.Context, zw *zip.Writer, pairs map[tableNamePair]struct{}) error {
for pair := range pairs {
err := getShowCreateTable(pair, zw, ctx)
Expand Down Expand Up @@ -330,15 +381,15 @@ func dumpStats(zw *zip.Writer, pairs map[tableNamePair]struct{}, do *domain.Doma

func dumpVariables(ctx sessionctx.Context, zw *zip.Writer) error {
varMap := make(map[string]string)
recordSets, err := ctx.(sqlexec.SQLExecutor).Execute(context.Background(), "show variables")
recordSet, err := ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.Background(), "show variables")
if err != nil {
return err
}
sRows, err := resultSetToStringSlice(context.Background(), recordSets[0])
sRows, err := resultSetToStringSlice(context.Background(), recordSet)
if err != nil {
return err
}
vf, err := zw.Create("variables.toml")
vf, err := zw.Create(variablesFile)
if err != nil {
return errors.AddStack(err)
}
Expand All @@ -348,94 +399,74 @@ func dumpVariables(ctx sessionctx.Context, zw *zip.Writer) error {
if err := toml.NewEncoder(vf).Encode(varMap); err != nil {
return errors.AddStack(err)
}
if len(recordSets) > 0 {
if err := recordSets[0].Close(); err != nil {
return err
}
}
Yisaer marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

func dumpSessionBindings(ctx sessionctx.Context, zw *zip.Writer) error {
recordSets, err := ctx.(sqlexec.SQLExecutor).Execute(context.Background(), "show bindings")
recordSet, err := ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.Background(), "show bindings")
if err != nil {
return err
}
sRows, err := resultSetToStringSlice(context.Background(), recordSets[0])
sRows, err := resultSetToStringSlice(context.Background(), recordSet)
if err != nil {
return err
}
bf, err := zw.Create("session_bindings.sql")
bf, err := zw.Create(sessionBindingFile)
if err != nil {
return errors.AddStack(err)
}
for _, row := range sRows {
fmt.Fprintf(bf, "%s\n", strings.Join(row, "\t"))
}
if len(recordSets) > 0 {
if err := recordSets[0].Close(); err != nil {
return err
}
}
return nil
}

func dumpGlobalBindings(ctx sessionctx.Context, zw *zip.Writer) error {
recordSets, err := ctx.(sqlexec.SQLExecutor).Execute(context.Background(), "show global bindings")
recordSet, err := ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.Background(), "show global bindings")
if err != nil {
return err
}
sRows, err := resultSetToStringSlice(context.Background(), recordSets[0])
sRows, err := resultSetToStringSlice(context.Background(), recordSet)
if err != nil {
return err
}
bf, err := zw.Create("global_bindings.sql")
bf, err := zw.Create(globalBindingFile)
if err != nil {
return errors.AddStack(err)
}
for _, row := range sRows {
fmt.Fprintf(bf, "%s\n", strings.Join(row, "\t"))
}
if len(recordSets) > 0 {
if err := recordSets[0].Close(); err != nil {
return err
}
}
return nil
}

func dumpExplain(ctx sessionctx.Context, zw *zip.Writer, sql string, isAnalyze bool) error {
var recordSets []sqlexec.RecordSet
var recordSet sqlexec.RecordSet
var err error
if isAnalyze {
// Explain analyze
recordSets, err = ctx.(sqlexec.SQLExecutor).Execute(context.Background(), fmt.Sprintf("explain analyze %s", sql))
recordSet, err = ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.Background(), fmt.Sprintf("explain analyze %s", sql))
if err != nil {
return err
}
} else {
// Explain
recordSets, err = ctx.(sqlexec.SQLExecutor).Execute(context.Background(), fmt.Sprintf("explain %s", sql))
recordSet, err = ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.Background(), fmt.Sprintf("explain %s", sql))
if err != nil {
return err
}
}
sRows, err := resultSetToStringSlice(context.Background(), recordSets[0])
sRows, err := resultSetToStringSlice(context.Background(), recordSet)
if err != nil {
return err
}
fw, err := zw.Create("explain.txt")
fw, err := zw.Create(explainFile)
if err != nil {
return errors.AddStack(err)
}
for _, row := range sRows {
fmt.Fprintf(fw, "%s\n", strings.Join(row, "\t"))
}
if len(recordSets) > 0 {
if err := recordSets[0].Close(); err != nil {
return err
}
}
return nil
}

Expand Down Expand Up @@ -480,11 +511,11 @@ func getStatsForTable(do *domain.Domain, pair tableNamePair) (*handle.JSONTable,
}

func getShowCreateTable(pair tableNamePair, zw *zip.Writer, ctx sessionctx.Context) error {
recordSets, err := ctx.(sqlexec.SQLExecutor).Execute(context.Background(), fmt.Sprintf("show create table `%v`.`%v`", pair.DBName, pair.TableName))
recordSet, err := ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.Background(), fmt.Sprintf("show create table `%v`.`%v`", pair.DBName, pair.TableName))
if err != nil {
return err
}
sRows, err := resultSetToStringSlice(context.Background(), recordSets[0])
sRows, err := resultSetToStringSlice(context.Background(), recordSet)
if err != nil {
return err
}
Expand All @@ -508,11 +539,6 @@ func getShowCreateTable(pair tableNamePair, zw *zip.Writer, ctx sessionctx.Conte
}
fmt.Fprintf(fw, "create database if not exists `%v`; use `%v`;", pair.DBName, pair.DBName)
fmt.Fprintf(fw, "%s", sRows[0][1])
if len(recordSets) > 0 {
if err := recordSets[0].Close(); err != nil {
return err
}
}
return nil
}

Expand Down Expand Up @@ -604,9 +630,48 @@ func (e *PlanReplayerLoadExec) Next(ctx context.Context, req *chunk.Chunk) error
return nil
}

func loadSetTiFlashReplica(ctx sessionctx.Context, z *zip.Reader) error {
for _, zipFile := range z.File {
if strings.Compare(zipFile.Name, tiFlashReplicasFile) == 0 {
//varMap := make(map[string]string)
Yisaer marked this conversation as resolved.
Show resolved Hide resolved
v, err := zipFile.Open()
if err != nil {
return errors.AddStack(err)
}
//nolint: errcheck,all_revive
defer v.Close()
buf := new(bytes.Buffer)
_, err = buf.ReadFrom(v)
if err != nil {
return errors.AddStack(err)
}
rows := strings.Split(buf.String(), "\n")
for _, row := range rows {
if len(row) < 1 {
continue
}
r := strings.Split(row, "\t")
if len(r) < 3 {
logutil.BgLogger().Debug("plan replayer: skip error",
zap.Error(errors.New("setting tiflash replicas failed")))
continue
}
dbName := r[0]
tableName := r[1]
c := context.Background()
// Though we record tiflash replica in txt, we only set 1 tiflash replica as it's enough for reproduce the plan
sql := fmt.Sprintf("alter table %s.%s set tiflash replica 1", dbName, tableName)
_, err = ctx.(sqlexec.SQLExecutor).ExecuteInternal(c, sql)
logutil.BgLogger().Debug("plan replayer: skip error", zap.Error(err))
}
}
}
return nil
}

func loadVariables(ctx sessionctx.Context, z *zip.Reader) error {
for _, zipFile := range z.File {
if strings.Compare(zipFile.Name, "variables.toml") == 0 {
if strings.Compare(zipFile.Name, variablesFile) == 0 {
varMap := make(map[string]string)
v, err := zipFile.Open()
if err != nil {
Expand Down Expand Up @@ -658,15 +723,15 @@ func createSchemaAndItems(ctx sessionctx.Context, f *zip.File) error {
}
c := context.Background()
// create database if not exists
_, err = ctx.(sqlexec.SQLExecutor).Execute(c, sqls[0])
_, err = ctx.(sqlexec.SQLExecutor).ExecuteInternal(c, sqls[0])
logutil.BgLogger().Debug("plan replayer: skip error", zap.Error(err))
// use database
_, err = ctx.(sqlexec.SQLExecutor).Execute(c, sqls[1])
_, err = ctx.(sqlexec.SQLExecutor).ExecuteInternal(c, sqls[1])
if err != nil {
return err
}
// create table or view
_, err = ctx.(sqlexec.SQLExecutor).Execute(c, sqls[2])
_, err = ctx.(sqlexec.SQLExecutor).ExecuteInternal(c, sqls[2])
if err != nil {
return err
}
Expand Down Expand Up @@ -722,6 +787,12 @@ func (e *PlanReplayerLoadInfo) Update(data []byte) error {
}
}

// set tiflash replica if exists
err = loadSetTiFlashReplica(e.Ctx, z)
if err != nil {
return err
}

// build view next
for _, zipFile := range z.File {
path := strings.Split(zipFile.Name, "/")
Expand Down