diff --git a/br/pkg/gluetidb/BUILD.bazel b/br/pkg/gluetidb/BUILD.bazel index 5340729c1d548..eddbd41ee46d4 100644 --- a/br/pkg/gluetidb/BUILD.bazel +++ b/br/pkg/gluetidb/BUILD.bazel @@ -1,4 +1,4 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "gluetidb", @@ -25,3 +25,21 @@ go_library( "@org_uber_go_zap//:zap", ], ) + +go_test( + name = "gluetidb_test", + timeout = "short", + srcs = ["glue_test.go"], + embed = [":gluetidb"], + flaky = True, + deps = [ + "//ddl", + "//kv", + "//meta", + "//parser/model", + "//sessionctx", + "//testkit", + "@com_github_pingcap_failpoint//:failpoint", + "@com_github_stretchr_testify//require", + ], +) diff --git a/br/pkg/gluetidb/glue.go b/br/pkg/gluetidb/glue.go index abe239f7723dc..cc5ace52b16e5 100644 --- a/br/pkg/gluetidb/glue.go +++ b/br/pkg/gluetidb/glue.go @@ -218,19 +218,21 @@ func (gs *tidbSession) CreatePlacementPolicy(ctx context.Context, policy *model. // SplitBatchCreateTable provide a way to split batch into small batch when batch size is large than 6 MB. // The raft entry has limit size of 6 MB, a batch of CreateTables may hit this limitation // TODO: shall query string be set for each split batch create, it looks does not matter if we set once for all. -func (gs *tidbSession) SplitBatchCreateTable(schema model.CIStr, info []*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error { +func (gs *tidbSession) SplitBatchCreateTable(schema model.CIStr, infos []*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error { var err error d := domain.GetDomain(gs.se).DDL() - if err = d.BatchCreateTableWithInfo(gs.se, schema, info, append(cs, ddl.OnExistIgnore)...); kv.ErrEntryTooLarge.Equal(err) { - if len(info) == 1 { + + if err = d.BatchCreateTableWithInfo(gs.se, schema, infos, append(cs, ddl.OnExistIgnore)...); kv.ErrEntryTooLarge.Equal(err) { + log.Info("entry too large, split batch create table", zap.Int("num table", len(infos))) + if len(infos) == 1 { return err } - mid := len(info) / 2 - err = gs.SplitBatchCreateTable(schema, info[:mid]) + mid := len(infos) / 2 + err = gs.SplitBatchCreateTable(schema, infos[:mid], cs...) if err != nil { return err } - err = gs.SplitBatchCreateTable(schema, info[mid:]) + err = gs.SplitBatchCreateTable(schema, infos[mid:], cs...) if err != nil { return err } @@ -268,7 +270,7 @@ func (gs *tidbSession) CreateTables(ctx context.Context, tables map[string][]*mo cloneTables = append(cloneTables, table) } gs.se.SetValue(sessionctx.QueryString, queryBuilder.String()) - if err := gs.SplitBatchCreateTable(dbName, cloneTables); err != nil { + if err := gs.SplitBatchCreateTable(dbName, cloneTables, cs...); err != nil { //It is possible to failure when TiDB does not support model.ActionCreateTables. //In this circumstance, BatchCreateTableWithInfo returns errno.ErrInvalidDDLJob, //we fall back to old way that creating table one by one diff --git a/br/pkg/gluetidb/glue_test.go b/br/pkg/gluetidb/glue_test.go new file mode 100644 index 0000000000000..e7c2f64dcfaa5 --- /dev/null +++ b/br/pkg/gluetidb/glue_test.go @@ -0,0 +1,208 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gluetidb + +import ( + "context" + "strconv" + "testing" + + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/ddl" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/meta" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/testkit" + "github.com/stretchr/testify/require" +) + +// batch create table with table id reused +func TestSplitBatchCreateTableWithTableId(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists table_id_resued1") + tk.MustExec("drop table if exists table_id_resued2") + tk.MustExec("drop table if exists table_id_new") + + d := dom.DDL() + require.NotNil(t, d) + + infos1 := []*model.TableInfo{} + infos1 = append(infos1, &model.TableInfo{ + ID: 124, + Name: model.NewCIStr("table_id_resued1"), + }) + infos1 = append(infos1, &model.TableInfo{ + ID: 125, + Name: model.NewCIStr("table_id_resued2"), + }) + + se := &tidbSession{se: tk.Session()} + + // keep/reused table id verification + tk.Session().SetValue(sessionctx.QueryString, "skip") + err := se.SplitBatchCreateTable(model.NewCIStr("test"), infos1, ddl.AllocTableIDIf(func(ti *model.TableInfo) bool { + return false + })) + require.NoError(t, err) + + tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'table_id_resued1'").Check(testkit.Rows("124")) + tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'table_id_resued2'").Check(testkit.Rows("125")) + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnOthers) + + // allocate new table id verification + // query the global id + var id int64 + err = kv.RunInNewTxn(ctx, store, true, func(_ context.Context, txn kv.Transaction) error { + m := meta.NewMeta(txn) + var err error + id, err = m.GenGlobalID() + return err + }) + + require.NoError(t, err) + + infos2 := []*model.TableInfo{} + infos2 = append(infos2, &model.TableInfo{ + ID: 124, + Name: model.NewCIStr("table_id_new"), + }) + + tk.Session().SetValue(sessionctx.QueryString, "skip") + err = se.SplitBatchCreateTable(model.NewCIStr("test"), infos2, ddl.AllocTableIDIf(func(ti *model.TableInfo) bool { + return true + })) + require.NoError(t, err) + + idGen, ok := tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'table_id_new'").Rows()[0][0].(string) + require.True(t, ok) + idGenNum, err := strconv.ParseInt(idGen, 10, 64) + require.NoError(t, err) + require.Greater(t, idGenNum, id) + + // a empty table info with len(info3) = 0 + infos3 := []*model.TableInfo{} + + err = se.SplitBatchCreateTable(model.NewCIStr("test"), infos3, ddl.AllocTableIDIf(func(ti *model.TableInfo) bool { + return false + })) + require.NoError(t, err) +} + +// batch create table with table id reused +func TestSplitBatchCreateTable(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists table_1") + tk.MustExec("drop table if exists table_2") + tk.MustExec("drop table if exists table_3") + + d := dom.DDL() + require.NotNil(t, d) + + infos := []*model.TableInfo{} + infos = append(infos, &model.TableInfo{ + ID: 1234, + Name: model.NewCIStr("tables_1"), + }) + infos = append(infos, &model.TableInfo{ + ID: 1235, + Name: model.NewCIStr("tables_2"), + }) + infos = append(infos, &model.TableInfo{ + ID: 1236, + Name: model.NewCIStr("tables_3"), + }) + + se := &tidbSession{se: tk.Session()} + + // keep/reused table id verification + tk.Session().SetValue(sessionctx.QueryString, "skip") + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/RestoreBatchCreateTableEntryTooLarge", "return(1)")) + err := se.SplitBatchCreateTable(model.NewCIStr("test"), infos, ddl.AllocTableIDIf(func(ti *model.TableInfo) bool { + return false + })) + + require.NoError(t, err) + tk.MustQuery("show tables like '%tables_%'").Check(testkit.Rows("tables_1", "tables_2", "tables_3")) + jobs := tk.MustQuery("admin show ddl jobs").Rows() + require.Greater(t, len(jobs), 3) + // check table_1 + job1 := jobs[0] + require.Equal(t, "test", job1[1]) + require.Equal(t, "tables_3", job1[2]) + require.Equal(t, "create tables", job1[3]) + require.Equal(t, "public", job1[4]) + + // check table_2 + job2 := jobs[1] + require.Equal(t, "test", job2[1]) + require.Equal(t, "tables_2", job2[2]) + require.Equal(t, "create tables", job2[3]) + require.Equal(t, "public", job2[4]) + + // check table_3 + job3 := jobs[2] + require.Equal(t, "test", job3[1]) + require.Equal(t, "tables_1", job3[2]) + require.Equal(t, "create tables", job3[3]) + require.Equal(t, "public", job3[4]) + + // check reused table id + tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'tables_1'").Check(testkit.Rows("1234")) + tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'tables_2'").Check(testkit.Rows("1235")) + tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'tables_3'").Check(testkit.Rows("1236")) + + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/RestoreBatchCreateTableEntryTooLarge")) +} + +// batch create table with table id reused +func TestSplitBatchCreateTableFailWithEntryTooLarge(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists table_1") + tk.MustExec("drop table if exists table_2") + tk.MustExec("drop table if exists table_3") + + d := dom.DDL() + require.NotNil(t, d) + + infos := []*model.TableInfo{} + infos = append(infos, &model.TableInfo{ + Name: model.NewCIStr("tables_1"), + }) + infos = append(infos, &model.TableInfo{ + Name: model.NewCIStr("tables_2"), + }) + infos = append(infos, &model.TableInfo{ + Name: model.NewCIStr("tables_3"), + }) + + se := &tidbSession{se: tk.Session()} + + tk.Session().SetValue(sessionctx.QueryString, "skip") + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/RestoreBatchCreateTableEntryTooLarge", "return(0)")) + err := se.SplitBatchCreateTable(model.NewCIStr("test"), infos, ddl.AllocTableIDIf(func(ti *model.TableInfo) bool { + return true + })) + + require.True(t, kv.ErrEntryTooLarge.Equal(err)) + + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/RestoreBatchCreateTableEntryTooLarge")) +} diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index 86db2daa270ec..67c04e1a302c9 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -2556,6 +2556,12 @@ func (d *ddl) BatchCreateTableWithInfo(ctx sessionctx.Context, infos []*model.TableInfo, cs ...CreateTableWithInfoConfigurier, ) error { + failpoint.Inject("RestoreBatchCreateTableEntryTooLarge", func(val failpoint.Value) { + injectBatchSize := val.(int) + if len(infos) > injectBatchSize { + failpoint.Return(kv.ErrEntryTooLarge) + } + }) c := GetCreateTableWithInfoConfig(cs) jobs := &model.Job{