From dd372a7a27651625fd88d2f605b1af2be25636cd Mon Sep 17 00:00:00 2001 From: GMHDBJD <35025882+GMHDBJD@users.noreply.github.com> Date: Tue, 28 May 2024 22:31:21 +0800 Subject: [PATCH] BatchCreateTableWith jobs: concat query to support cdc (#53515) close pingcap/tidb#53076 --- pkg/ddl/ddl_api.go | 24 +++++++++++++++- pkg/ddl/ddl_api_test.go | 63 +++++++++++++++++++++++++++++++++++++++++ pkg/ddl/ddl_worker.go | 6 ++-- 3 files changed, 89 insertions(+), 4 deletions(-) diff --git a/pkg/ddl/ddl_api.go b/pkg/ddl/ddl_api.go index f1dd371e31ec2..e8b208079f63a 100644 --- a/pkg/ddl/ddl_api.go +++ b/pkg/ddl/ddl_api.go @@ -3010,8 +3010,29 @@ func (d *ddl) BatchCreateTableWithInfo(ctx sessionctx.Context, return d.callHookOnChanged(jobs, err) } +// BuildQueryStringFromJobs takes a slice of Jobs and concatenates their +// queries into a single query string. +// Each query is separated by a semicolon and a space. +// Trailing spaces are removed from each query, and a semicolon is appended +// if it's not already present. +func BuildQueryStringFromJobs(jobs []*model.Job) string { + var queryBuilder strings.Builder + for i, job := range jobs { + q := strings.TrimSpace(job.Query) + if !strings.HasSuffix(q, ";") { + q += ";" + } + queryBuilder.WriteString(q) + + if i < len(jobs)-1 { + queryBuilder.WriteString(" ") + } + } + return queryBuilder.String() +} + // BatchCreateTableWithJobs combine CreateTableJobs to BatchCreateTableJob. -func (*ddl) BatchCreateTableWithJobs(jobs []*model.Job) (*model.Job, error) { +func BatchCreateTableWithJobs(jobs []*model.Job) (*model.Job, error) { if len(jobs) == 0 { return nil, errors.Trace(fmt.Errorf("expect non-empty jobs")) } @@ -3055,6 +3076,7 @@ func (*ddl) BatchCreateTableWithJobs(jobs []*model.Job) (*model.Job, error) { combinedJob.Args = append(combinedJob.Args, args) combinedJob.Args = append(combinedJob.Args, foreignKeyChecks) combinedJob.InvolvingSchemaInfo = involvingSchemaInfo + combinedJob.Query = BuildQueryStringFromJobs(jobs) return combinedJob, nil } diff --git a/pkg/ddl/ddl_api_test.go b/pkg/ddl/ddl_api_test.go index e849b43b3ff17..be1d6c029a8a3 100644 --- a/pkg/ddl/ddl_api_test.go +++ b/pkg/ddl/ddl_api_test.go @@ -213,3 +213,66 @@ func TestCreateDropCreateTable(t *testing.T) { require.Less(t, create0TS, dropTS, "first create should finish before drop") require.Less(t, dropTS, create1TS, "second create should finish after drop") } + +func TestBuildQueryStringFromJobs(t *testing.T) { + testCases := []struct { + name string + jobs []*model.Job + expected string + }{ + { + name: "Empty jobs", + jobs: []*model.Job{}, + expected: "", + }, + { + name: "Single create table job", + jobs: []*model.Job{{Query: "CREATE TABLE users (id INT PRIMARY KEY, name VARCHAR(255));"}}, + expected: "CREATE TABLE users (id INT PRIMARY KEY, name VARCHAR(255));", + }, + { + name: "Multiple create table jobs with trailing semicolons", + jobs: []*model.Job{ + {Query: "CREATE TABLE users (id INT PRIMARY KEY, name VARCHAR(255));"}, + {Query: "CREATE TABLE products (id INT PRIMARY KEY, description TEXT);"}, + }, + expected: "CREATE TABLE users (id INT PRIMARY KEY, name VARCHAR(255)); CREATE TABLE products (id INT PRIMARY KEY, description TEXT);", + }, + { + name: "Multiple create table jobs with and without trailing semicolons", + jobs: []*model.Job{ + {Query: "CREATE TABLE users (id INT PRIMARY KEY, name VARCHAR(255))"}, + {Query: "CREATE TABLE products (id INT PRIMARY KEY, description TEXT);"}, + {Query: " CREATE TABLE orders (id INT PRIMARY KEY, user_id INT, product_id INT) "}, + }, + expected: "CREATE TABLE users (id INT PRIMARY KEY, name VARCHAR(255)); CREATE TABLE products (id INT PRIMARY KEY, description TEXT); CREATE TABLE orders (id INT PRIMARY KEY, user_id INT, product_id INT);", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + actual := ddl.BuildQueryStringFromJobs(tc.jobs) + require.Equal(t, tc.expected, actual, "Query strings do not match") + }) + } +} + +func TestBatchCreateTableWithJobs(t *testing.T) { + job1 := &model.Job{ + SchemaID: 1, + Type: model.ActionCreateTable, + BinlogInfo: &model.HistoryInfo{}, + Args: []any{&model.TableInfo{Name: model.CIStr{O: "t1", L: "t1"}}, false}, + Query: "create table db1.t1 (c1 int, c2 int)", + } + job2 := &model.Job{ + SchemaID: 1, + Type: model.ActionCreateTable, + BinlogInfo: &model.HistoryInfo{}, + Args: []any{&model.TableInfo{Name: model.CIStr{O: "t2", L: "t2"}}, &model.TableInfo{}}, + Query: "create table db1.t2 (c1 int, c2 int);", + } + job, err := ddl.BatchCreateTableWithJobs([]*model.Job{job1, job2}) + require.NoError(t, err) + require.Equal(t, "create table db1.t1 (c1 int, c2 int); create table db1.t2 (c1 int, c2 int);", job.Query) +} diff --git a/pkg/ddl/ddl_worker.go b/pkg/ddl/ddl_worker.go index 7e99b60896914..c3f2f1feea1b9 100644 --- a/pkg/ddl/ddl_worker.go +++ b/pkg/ddl/ddl_worker.go @@ -419,7 +419,7 @@ func (d *ddl) addBatchDDLJobs(tasks []*limitJobTask) error { bdrRole = string(ast.BDRRoleNone) ) - if newTasks, err := d.combineBatchCreateTableJobs(tasks); err == nil { + if newTasks, err := combineBatchCreateTableJobs(tasks); err == nil { tasks = newTasks } @@ -511,7 +511,7 @@ func (d *ddl) addBatchDDLJobs(tasks []*limitJobTask) error { // combineBatchCreateTableJobs combine batch jobs to another batch jobs. // currently it only support combine CreateTable to CreateTables. -func (d *ddl) combineBatchCreateTableJobs(tasks []*limitJobTask) ([]*limitJobTask, error) { +func combineBatchCreateTableJobs(tasks []*limitJobTask) ([]*limitJobTask, error) { if len(tasks) <= 1 || !tasks[0].job.LocalMode { return tasks, nil } @@ -529,7 +529,7 @@ func (d *ddl) combineBatchCreateTableJobs(tasks []*limitJobTask) ([]*limitJobTas jobs = append(jobs, task.job) } - job, err := d.BatchCreateTableWithJobs(jobs) + job, err := BatchCreateTableWithJobs(jobs) if err != nil { return tasks, err }