Skip to content

Commit

Permalink
Merge branch 'master' into reorganize-partition-design
Browse files Browse the repository at this point in the history
  • Loading branch information
mjonss authored Dec 28, 2022
2 parents e8bf79a + f9af75f commit 52b7291
Show file tree
Hide file tree
Showing 156 changed files with 5,755 additions and 1,659 deletions.
20 changes: 14 additions & 6 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -2923,8 +2923,8 @@ def go_deps():
name = "com_github_pingcap_kvproto",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/kvproto",
sum = "h1:46ZD6xzQWJ8Jkeal/U7SqkX030Mgs8DAn6QV/9zbqOQ=",
version = "v0.0.0-20221130022225-6c56ac56fe5f",
sum = "h1:v0Z0nC0knwWHn3e9br8EMNfLBB14QDULn142UGjiTMQ=",
version = "v0.0.0-20221213093948-9ccc6beaf0aa",
)
go_repository(
name = "com_github_pingcap_log",
Expand Down Expand Up @@ -3523,12 +3523,20 @@ def go_deps():
sum = "h1:mbAskLJ0oJfDRtkanvQPiooDH8HvJ2FBh+iKT/OmiQQ=",
version = "v0.0.0-20181126055449-889f96f722a2",
)
go_repository(
name = "com_github_tiancaiamao_gp",
build_file_proto_mode = "disable",
importpath = "github.com/tiancaiamao/gp",
sum = "h1:4RNtqw1/tW67qP9fFgfQpTVd7DrfkaAWu4vsC18QmBo=",
version = "v0.0.0-20221221095600-1a473d1f9b4b",
)

go_repository(
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:/glZOHs/K2pkCioDVae+aThUHFYRYQkEgY4NUTgfh+s=",
version = "v2.0.3",
sum = "h1:m6glgBGCIds9QURbk8Mn+8mjLKDcv6nWrNwYh92fydQ=",
version = "v2.0.4-0.20221226080148-018c59dbd837",
)
go_repository(
name = "com_github_tikv_pd_client",
Expand Down Expand Up @@ -4432,8 +4440,8 @@ def go_deps():
name = "org_golang_x_oauth2",
build_file_proto_mode = "disable_global",
importpath = "golang.org/x/oauth2",
sum = "h1:GtQkldQ9m7yvzCL1V+LrYow3Khe0eJH0w7RbX/VbaIU=",
version = "v0.2.0",
sum = "h1:6l90koy8/LaBLmLu8jpHeHexzMwEita0zFfYlggy2F8=",
version = "v0.3.0",
)
go_repository(
name = "org_golang_x_sync",
Expand Down
9 changes: 9 additions & 0 deletions bindinfo/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,15 @@ func (h *BindHandle) SetBindRecordStatus(originalSQL string, binding *Binding, n
return
}

// SetBindRecordStatusByDigest set a BindRecord's status to the storage and bind cache.
func (h *BindHandle) SetBindRecordStatusByDigest(newStatus, sqlDigest string) (ok bool, err error) {
oldRecord, err := h.GetBindRecordBySQLDigest(sqlDigest)
if err != nil {
return false, err
}
return h.SetBindRecordStatus(oldRecord.OriginalSQL, nil, newStatus)
}

// GCBindRecord physically removes the deleted bind records in mysql.bind_info.
func (h *BindHandle) GCBindRecord() (err error) {
h.bindInfo.Lock()
Expand Down
28 changes: 25 additions & 3 deletions br/pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,32 @@ func (gs *tidbSession) CreatePlacementPolicy(ctx context.Context, policy *model.
return d.CreatePlacementPolicyWithInfo(gs.se, policy, ddl.OnExistIgnore)
}

// SplitBatchCreateTable provide a way to split batch into small batch when batch size is large than 6 MB.
// The raft entry has limit size of 6 MB, a batch of CreateTables may hit this limitation
// TODO: shall query string be set for each split batch create, it looks does not matter if we set once for all.
func (gs *tidbSession) SplitBatchCreateTable(schema model.CIStr, info []*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
var err error
d := domain.GetDomain(gs.se).DDL()
if err = d.BatchCreateTableWithInfo(gs.se, schema, info, append(cs, ddl.OnExistIgnore)...); kv.ErrEntryTooLarge.Equal(err) {
if len(info) == 1 {
return err
}
mid := len(info) / 2
err = gs.SplitBatchCreateTable(schema, info[:mid])
if err != nil {
return err
}
err = gs.SplitBatchCreateTable(schema, info[mid:])
if err != nil {
return err
}
return nil
}
return err
}

// CreateTables implements glue.BatchCreateTableSession.
func (gs *tidbSession) CreateTables(ctx context.Context, tables map[string][]*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
d := domain.GetDomain(gs.se).DDL()
var dbName model.CIStr

// Disable foreign key check when batch create tables.
Expand Down Expand Up @@ -233,8 +256,7 @@ func (gs *tidbSession) CreateTables(ctx context.Context, tables map[string][]*mo
cloneTables = append(cloneTables, table)
}
gs.se.SetValue(sessionctx.QueryString, queryBuilder.String())
err := d.BatchCreateTableWithInfo(gs.se, dbName, cloneTables, append(cs, ddl.OnExistIgnore)...)
if err != nil {
if err := gs.SplitBatchCreateTable(dbName, cloneTables); err != nil {
//It is possible to failure when TiDB does not support model.ActionCreateTables.
//In this circumstance, BatchCreateTableWithInfo returns errno.ErrInvalidDDLJob,
//we fall back to old way that creating table one by one
Expand Down
48 changes: 6 additions & 42 deletions br/pkg/lightning/restore/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,10 @@ func DBFromConfig(ctx context.Context, dsn config.DBStore) (*sql.DB, error) {
"tidb_opt_write_row_id": "1",
// always set auto-commit to ON
"autocommit": "1",
// alway set transaction mode to optimistic
// always set transaction mode to optimistic
"tidb_txn_mode": "optimistic",
// disable foreign key checks
"foreign_key_checks": "0",
}

if dsn.Vars != nil {
Expand Down Expand Up @@ -143,47 +145,6 @@ func (timgr *TiDBManager) Close() {
timgr.db.Close()
}

func InitSchema(ctx context.Context, g glue.Glue, database string, tablesSchema map[string]string) error {
logger := log.FromContext(ctx).With(zap.String("db", database))
sqlExecutor := g.GetSQLExecutor()

var createDatabase strings.Builder
createDatabase.WriteString("CREATE DATABASE IF NOT EXISTS ")
common.WriteMySQLIdentifier(&createDatabase, database)
err := sqlExecutor.ExecuteWithLog(ctx, createDatabase.String(), "create database", logger)
if err != nil {
return errors.Trace(err)
}

task := logger.Begin(zap.InfoLevel, "create tables")
var sqlCreateStmts []string
loopCreate:
for tbl, sqlCreateTable := range tablesSchema {
task.Debug("create table", zap.String("schema", sqlCreateTable))

sqlCreateStmts, err = createIfNotExistsStmt(g.GetParser(), sqlCreateTable, database, tbl)
if err != nil {
break
}

// TODO: maybe we should put these createStems into a transaction
for _, s := range sqlCreateStmts {
err = sqlExecutor.ExecuteWithLog(
ctx,
s,
"create table",
logger.With(zap.String("table", common.UniqueTable(database, tbl))),
)
if err != nil {
break loopCreate
}
}
}
task.End(zap.ErrorLevel, err)

return errors.Trace(err)
}

func createIfNotExistsStmt(p *parser.Parser, createTable, dbName, tblName string) ([]string, error) {
stmts, _, err := p.ParseSQL(createTable)
if err != nil {
Expand All @@ -199,6 +160,9 @@ func createIfNotExistsStmt(p *parser.Parser, createTable, dbName, tblName string
case *ast.CreateDatabaseStmt:
node.Name = model.NewCIStr(dbName)
node.IfNotExists = true
case *ast.DropDatabaseStmt:
node.Name = model.NewCIStr(dbName)
node.IfExists = true
case *ast.CreateTableStmt:
node.Table.Schema = model.NewCIStr(dbName)
node.Table.Name = model.NewCIStr(tblName)
Expand Down
91 changes: 0 additions & 91 deletions br/pkg/lightning/restore/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,97 +165,6 @@ func TestCreateTableIfNotExistsStmt(t *testing.T) {
`, "m"))
}

func TestInitSchema(t *testing.T) {
s := newTiDBSuite(t)
ctx := context.Background()

s.mockDB.
ExpectExec("CREATE DATABASE IF NOT EXISTS `db`").
WillReturnResult(sqlmock.NewResult(1, 1))
s.mockDB.
ExpectExec("\\QCREATE TABLE IF NOT EXISTS `db`.`t1` (`a` INT PRIMARY KEY,`b` VARCHAR(200));\\E").
WillReturnResult(sqlmock.NewResult(2, 1))
s.mockDB.
ExpectExec("\\QSET @@SESSION.`FOREIGN_KEY_CHECKS`=0;\\E").
WillReturnResult(sqlmock.NewResult(0, 0))
s.mockDB.
ExpectExec("\\QCREATE TABLE IF NOT EXISTS `db`.`t2` (`xx` TEXT) AUTO_INCREMENT = 11203;\\E").
WillReturnResult(sqlmock.NewResult(2, 1))
s.mockDB.
ExpectClose()

s.mockDB.MatchExpectationsInOrder(false) // maps are unordered.
err := InitSchema(ctx, s.tiGlue, "db", map[string]string{
"t1": "create table t1 (a int primary key, b varchar(200));",
"t2": "/*!40014 SET FOREIGN_KEY_CHECKS=0*/;CREATE TABLE `db`.`t2` (xx TEXT) AUTO_INCREMENT=11203;",
})
s.mockDB.MatchExpectationsInOrder(true)
require.NoError(t, err)
}

func TestInitSchemaSyntaxError(t *testing.T) {
s := newTiDBSuite(t)
ctx := context.Background()

s.mockDB.
ExpectExec("CREATE DATABASE IF NOT EXISTS `db`").
WillReturnResult(sqlmock.NewResult(1, 1))
s.mockDB.
ExpectClose()

err := InitSchema(ctx, s.tiGlue, "db", map[string]string{
"t1": "create table `t1` with invalid syntax;",
})
require.Error(t, err)
}

func TestInitSchemaErrorLost(t *testing.T) {
s := newTiDBSuite(t)
ctx := context.Background()

s.mockDB.
ExpectExec("CREATE DATABASE IF NOT EXISTS `db`").
WillReturnResult(sqlmock.NewResult(1, 1))

s.mockDB.
ExpectExec("CREATE TABLE IF NOT EXISTS.*").
WillReturnError(&mysql.MySQLError{
Number: tmysql.ErrTooBigFieldlength,
Message: "Column length too big",
})

s.mockDB.
ExpectClose()

err := InitSchema(ctx, s.tiGlue, "db", map[string]string{
"t1": "create table `t1` (a int);",
"t2": "create table t2 (a int primary key, b varchar(200));",
})
require.Regexp(t, ".*Column length too big.*", err.Error())
}

func TestInitSchemaUnsupportedSchemaError(t *testing.T) {
s := newTiDBSuite(t)
ctx := context.Background()

s.mockDB.
ExpectExec("CREATE DATABASE IF NOT EXISTS `db`").
WillReturnResult(sqlmock.NewResult(1, 1))
s.mockDB.
ExpectExec("CREATE TABLE IF NOT EXISTS `db`.`t1`.*").
WillReturnError(&mysql.MySQLError{
Number: tmysql.ErrTooBigFieldlength,
Message: "Column length too big",
})
s.mockDB.
ExpectClose()

err := InitSchema(ctx, s.tiGlue, "db", map[string]string{
"t1": "create table `t1` (a VARCHAR(999999999));",
})
require.Regexp(t, ".*Column length too big.*", err.Error())
}

func TestDropTable(t *testing.T) {
s := newTiDBSuite(t)
ctx := context.Background()
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/restore/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func (db *DB) tableIDAllocFilter() ddl.AllocTableIDIf {
if db.preallocedIDs == nil {
return true
}
prealloced := db.preallocedIDs.Prealloced(ti.ID)
prealloced := db.preallocedIDs.PreallocedFor(ti)
if prealloced {
log.Info("reusing table ID", zap.Stringer("table", ti.Name))
}
Expand Down
5 changes: 4 additions & 1 deletion br/pkg/restore/prealloc_table_id/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ go_library(
srcs = ["alloc.go"],
importpath = "github.com/pingcap/tidb/br/pkg/restore/prealloc_table_id",
visibility = ["//visibility:public"],
deps = ["//br/pkg/metautil"],
deps = [
"//br/pkg/metautil",
"//parser/model",
],
)

go_test(
Expand Down
23 changes: 23 additions & 0 deletions br/pkg/restore/prealloc_table_id/alloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"math"

"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/parser/model"
)

const (
Expand Down Expand Up @@ -48,6 +49,14 @@ func New(tables []*metautil.Table) *PreallocIDs {
if t.Info.ID > max && t.Info.ID < insaneTableIDThreshold {
max = t.Info.ID
}

if t.Info.Partition != nil && t.Info.Partition.Definitions != nil {
for _, part := range t.Info.Partition.Definitions {
if part.ID > max && part.ID < insaneTableIDThreshold {
max = part.ID
}
}
}
}
return &PreallocIDs{
end: max + 1,
Expand Down Expand Up @@ -86,3 +95,17 @@ func (p *PreallocIDs) Alloc(m Allocator) error {
func (p *PreallocIDs) Prealloced(tid int64) bool {
return p.allocedFrom <= tid && tid < p.end
}

func (p *PreallocIDs) PreallocedFor(ti *model.TableInfo) bool {
if !p.Prealloced(ti.ID) {
return false
}
if ti.Partition != nil && ti.Partition.Definitions != nil {
for _, part := range ti.Partition.Definitions {
if !p.Prealloced(part.ID) {
return false
}
}
}
return true
}
Loading

0 comments on commit 52b7291

Please sign in to comment.