Skip to content

Commit

Permalink
BatchCreateTableWith jobs: concat query to support cdc (#53515)
Browse files Browse the repository at this point in the history
close #53076
  • Loading branch information
GMHDBJD authored May 28, 2024
1 parent 4a51974 commit dd372a7
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 4 deletions.
24 changes: 23 additions & 1 deletion pkg/ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3010,8 +3010,29 @@ func (d *ddl) BatchCreateTableWithInfo(ctx sessionctx.Context,
return d.callHookOnChanged(jobs, err)
}

// BuildQueryStringFromJobs takes a slice of Jobs and concatenates their
// queries into a single query string.
// Each query is separated by a semicolon and a space.
// Trailing spaces are removed from each query, and a semicolon is appended
// if it's not already present.
func BuildQueryStringFromJobs(jobs []*model.Job) string {
var queryBuilder strings.Builder
for i, job := range jobs {
q := strings.TrimSpace(job.Query)
if !strings.HasSuffix(q, ";") {
q += ";"
}
queryBuilder.WriteString(q)

if i < len(jobs)-1 {
queryBuilder.WriteString(" ")
}
}
return queryBuilder.String()
}

// BatchCreateTableWithJobs combine CreateTableJobs to BatchCreateTableJob.
func (*ddl) BatchCreateTableWithJobs(jobs []*model.Job) (*model.Job, error) {
func BatchCreateTableWithJobs(jobs []*model.Job) (*model.Job, error) {
if len(jobs) == 0 {
return nil, errors.Trace(fmt.Errorf("expect non-empty jobs"))
}
Expand Down Expand Up @@ -3055,6 +3076,7 @@ func (*ddl) BatchCreateTableWithJobs(jobs []*model.Job) (*model.Job, error) {
combinedJob.Args = append(combinedJob.Args, args)
combinedJob.Args = append(combinedJob.Args, foreignKeyChecks)
combinedJob.InvolvingSchemaInfo = involvingSchemaInfo
combinedJob.Query = BuildQueryStringFromJobs(jobs)

return combinedJob, nil
}
Expand Down
63 changes: 63 additions & 0 deletions pkg/ddl/ddl_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,3 +213,66 @@ func TestCreateDropCreateTable(t *testing.T) {
require.Less(t, create0TS, dropTS, "first create should finish before drop")
require.Less(t, dropTS, create1TS, "second create should finish after drop")
}

func TestBuildQueryStringFromJobs(t *testing.T) {
testCases := []struct {
name string
jobs []*model.Job
expected string
}{
{
name: "Empty jobs",
jobs: []*model.Job{},
expected: "",
},
{
name: "Single create table job",
jobs: []*model.Job{{Query: "CREATE TABLE users (id INT PRIMARY KEY, name VARCHAR(255));"}},
expected: "CREATE TABLE users (id INT PRIMARY KEY, name VARCHAR(255));",
},
{
name: "Multiple create table jobs with trailing semicolons",
jobs: []*model.Job{
{Query: "CREATE TABLE users (id INT PRIMARY KEY, name VARCHAR(255));"},
{Query: "CREATE TABLE products (id INT PRIMARY KEY, description TEXT);"},
},
expected: "CREATE TABLE users (id INT PRIMARY KEY, name VARCHAR(255)); CREATE TABLE products (id INT PRIMARY KEY, description TEXT);",
},
{
name: "Multiple create table jobs with and without trailing semicolons",
jobs: []*model.Job{
{Query: "CREATE TABLE users (id INT PRIMARY KEY, name VARCHAR(255))"},
{Query: "CREATE TABLE products (id INT PRIMARY KEY, description TEXT);"},
{Query: " CREATE TABLE orders (id INT PRIMARY KEY, user_id INT, product_id INT) "},
},
expected: "CREATE TABLE users (id INT PRIMARY KEY, name VARCHAR(255)); CREATE TABLE products (id INT PRIMARY KEY, description TEXT); CREATE TABLE orders (id INT PRIMARY KEY, user_id INT, product_id INT);",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
actual := ddl.BuildQueryStringFromJobs(tc.jobs)
require.Equal(t, tc.expected, actual, "Query strings do not match")
})
}
}

func TestBatchCreateTableWithJobs(t *testing.T) {
job1 := &model.Job{
SchemaID: 1,
Type: model.ActionCreateTable,
BinlogInfo: &model.HistoryInfo{},
Args: []any{&model.TableInfo{Name: model.CIStr{O: "t1", L: "t1"}}, false},
Query: "create table db1.t1 (c1 int, c2 int)",
}
job2 := &model.Job{
SchemaID: 1,
Type: model.ActionCreateTable,
BinlogInfo: &model.HistoryInfo{},
Args: []any{&model.TableInfo{Name: model.CIStr{O: "t2", L: "t2"}}, &model.TableInfo{}},
Query: "create table db1.t2 (c1 int, c2 int);",
}
job, err := ddl.BatchCreateTableWithJobs([]*model.Job{job1, job2})
require.NoError(t, err)
require.Equal(t, "create table db1.t1 (c1 int, c2 int); create table db1.t2 (c1 int, c2 int);", job.Query)
}
6 changes: 3 additions & 3 deletions pkg/ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ func (d *ddl) addBatchDDLJobs(tasks []*limitJobTask) error {
bdrRole = string(ast.BDRRoleNone)
)

if newTasks, err := d.combineBatchCreateTableJobs(tasks); err == nil {
if newTasks, err := combineBatchCreateTableJobs(tasks); err == nil {
tasks = newTasks
}

Expand Down Expand Up @@ -511,7 +511,7 @@ func (d *ddl) addBatchDDLJobs(tasks []*limitJobTask) error {

// combineBatchCreateTableJobs combine batch jobs to another batch jobs.
// currently it only support combine CreateTable to CreateTables.
func (d *ddl) combineBatchCreateTableJobs(tasks []*limitJobTask) ([]*limitJobTask, error) {
func combineBatchCreateTableJobs(tasks []*limitJobTask) ([]*limitJobTask, error) {
if len(tasks) <= 1 || !tasks[0].job.LocalMode {
return tasks, nil
}
Expand All @@ -529,7 +529,7 @@ func (d *ddl) combineBatchCreateTableJobs(tasks []*limitJobTask) ([]*limitJobTas
jobs = append(jobs, task.job)
}

job, err := d.BatchCreateTableWithJobs(jobs)
job, err := BatchCreateTableWithJobs(jobs)
if err != nil {
return tasks, err
}
Expand Down

0 comments on commit dd372a7

Please sign in to comment.