From acb0b86d905a53af7d8b01db5a4f27ce0a34890e Mon Sep 17 00:00:00 2001 From: tangenta Date: Mon, 3 Jun 2024 18:26:54 +0800 Subject: [PATCH 1/2] This is an automated cherry-pick of #53720 Signed-off-by: ti-chi-bot --- pkg/ddl/BUILD.bazel | 5 +++ pkg/ddl/ddl.go | 9 +++--- pkg/ddl/ddl_api_test.go | 41 ++++++++++++++++++++++++ pkg/ddl/job_table.go | 6 ++++ pkg/ddl/placement_policy.go | 4 +-- pkg/ddl/schema.go | 2 +- pkg/ddl/table.go | 63 ++++++++++++++++++++++++++++++++----- pkg/ddl/table_test.go | 3 +- 8 files changed, 116 insertions(+), 17 deletions(-) diff --git a/pkg/ddl/BUILD.bazel b/pkg/ddl/BUILD.bazel index 79a316d794fbb..0ca18ce55587b 100644 --- a/pkg/ddl/BUILD.bazel +++ b/pkg/ddl/BUILD.bazel @@ -155,7 +155,11 @@ go_library( "//pkg/util/sqlexec", "//pkg/util/sqlkiller", "//pkg/util/stringutil", +<<<<<<< HEAD "//pkg/util/syncutil", +======= + "//pkg/util/tiflash", +>>>>>>> 44c9096efbc (ddl: get latest old table ID before replace view (#53720)) "//pkg/util/timeutil", "//pkg/util/topsql", "//pkg/util/topsql/state", @@ -328,6 +332,7 @@ go_test( "@com_github_tikv_client_go_v2//util", "@io_etcd_go_etcd_client_v3//:client", "@org_golang_google_grpc//:grpc", + "@org_golang_x_sync//errgroup", "@org_uber_go_atomic//:atomic", "@org_uber_go_goleak//:goleak", "@org_uber_go_zap//:zap", diff --git a/pkg/ddl/ddl.go b/pkg/ddl/ddl.go index a34ef9aac5690..069bef60e0243 100644 --- a/pkg/ddl/ddl.go +++ b/pkg/ddl/ddl.go @@ -64,7 +64,11 @@ import ( "github.com/pingcap/tidb/pkg/util/dbterror" "github.com/pingcap/tidb/pkg/util/dbterror/exeerrors" "github.com/pingcap/tidb/pkg/util/gcutil" +<<<<<<< HEAD "github.com/pingcap/tidb/pkg/util/syncutil" +======= + "github.com/pingcap/tidb/pkg/util/generic" +>>>>>>> 44c9096efbc (ddl: get latest old table ID before replace view (#53720)) "github.com/tikv/client-go/v2/tikvrpc" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/concurrency" @@ -383,11 +387,6 @@ type ddlCtx struct { // reorgCtx is used for reorganization. reorgCtx reorgContexts - // backfillCtx is used for backfill workers. - backfillCtx struct { - syncutil.RWMutex - jobCtxMap map[int64]*JobContext - } jobCtx struct { sync.RWMutex diff --git a/pkg/ddl/ddl_api_test.go b/pkg/ddl/ddl_api_test.go index e849b43b3ff17..a24b8e54fc931 100644 --- a/pkg/ddl/ddl_api_test.go +++ b/pkg/ddl/ddl_api_test.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/util/chunk" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" ) func TestGetDDLJobs(t *testing.T) { @@ -151,6 +152,46 @@ func enQueueDDLJobs(t *testing.T, sess sessiontypes.Session, txn kv.Transaction, } } +func TestCreateViewConcurrently(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + + tk.MustExec("create table t (a int);") + tk.MustExec("create view v as select * from t;") + var ( + counterErr error + counter int + ) + failpoint.EnableCall("github.com/pingcap/tidb/pkg/ddl/onDDLCreateView", func(job *model.Job) { + counter++ + if counter > 1 { + counterErr = fmt.Errorf("create view job should not run concurrently") + return + } + }) + failpoint.EnableCall("github.com/pingcap/tidb/pkg/ddl/afterDelivery2Worker", func(job *model.Job) { + if job.Type == model.ActionCreateView { + counter-- + } + }) + var eg errgroup.Group + for i := 0; i < 5; i++ { + eg.Go(func() error { + newTk := testkit.NewTestKit(t, store) + _, err := newTk.Exec("use test") + if err != nil { + return err + } + _, err = newTk.Exec("create or replace view v as select * from t;") + return err + }) + } + err := eg.Wait() + require.NoError(t, err) + require.NoError(t, counterErr) +} + func TestCreateDropCreateTable(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) diff --git a/pkg/ddl/job_table.go b/pkg/ddl/job_table.go index aa2ebc3d516d0..5467ca04c316b 100644 --- a/pkg/ddl/job_table.go +++ b/pkg/ddl/job_table.go @@ -421,8 +421,14 @@ func (d *ddl) delivery2Worker(wk *worker, pool *workerPool, job *model.Job) { d.wg.Run(func() { metrics.DDLRunningJobCount.WithLabelValues(pool.tp().String()).Inc() defer func() { +<<<<<<< HEAD d.runningJobs.remove(job) asyncNotify(d.ddlJobCh) +======= + failpoint.InjectCall("afterDelivery2Worker", job) + s.runningJobs.remove(job) + asyncNotify(s.ddlJobNotifyCh) +>>>>>>> 44c9096efbc (ddl: get latest old table ID before replace view (#53720)) metrics.DDLRunningJobCount.WithLabelValues(pool.tp().String()).Dec() }() ownerID := d.ownerManager.ID() diff --git a/pkg/ddl/placement_policy.go b/pkg/ddl/placement_policy.go index 4ab6495754f12..44e01b3fecd13 100644 --- a/pkg/ddl/placement_policy.go +++ b/pkg/ddl/placement_policy.go @@ -119,7 +119,7 @@ func getPlacementPolicyByName(d *ddlCtx, t *meta.Meta, policyName model.CIStr) ( } is := d.infoCache.GetLatest() - if is.SchemaMetaVersion() == currVer { + if is != nil && is.SchemaMetaVersion() == currVer { // Use cached policy. policy, ok := is.PolicyByName(policyName) if ok { @@ -320,7 +320,7 @@ func checkPlacementPolicyNotInUse(d *ddlCtx, t *meta.Meta, policy *model.PolicyI return err } is := d.infoCache.GetLatest() - if is.SchemaMetaVersion() == currVer { + if is != nil && is.SchemaMetaVersion() == currVer { err = CheckPlacementPolicyNotInUseFromInfoSchema(is, policy) } else { err = CheckPlacementPolicyNotInUseFromMeta(t, policy) diff --git a/pkg/ddl/schema.go b/pkg/ddl/schema.go index 3be32a14a26de..412169b08e11f 100644 --- a/pkg/ddl/schema.go +++ b/pkg/ddl/schema.go @@ -76,7 +76,7 @@ func checkSchemaNotExists(d *ddlCtx, t *meta.Meta, schemaID int64, dbInfo *model return err } is := d.infoCache.GetLatest() - if is.SchemaMetaVersion() == currVer { + if is != nil && is.SchemaMetaVersion() == currVer { return checkSchemaNotExistsFromInfoSchema(is, schemaID, dbInfo) } return checkSchemaNotExistsFromStore(t, schemaID, dbInfo) diff --git a/pkg/ddl/table.go b/pkg/ddl/table.go index d602117acdb48..1aa11c2f25731 100644 --- a/pkg/ddl/table.go +++ b/pkg/ddl/table.go @@ -300,14 +300,19 @@ func onCreateView(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) schemaID := job.SchemaID tbInfo := &model.TableInfo{} var orReplace bool - var oldTbInfoID int64 - if err := job.DecodeArgs(tbInfo, &orReplace, &oldTbInfoID); err != nil { + var _placeholder int64 // oldTblInfoID + if err := job.DecodeArgs(tbInfo, &orReplace, &_placeholder); err != nil { // Invalid arguments, cancel this job. job.State = model.JobStateCancelled return ver, errors.Trace(err) } tbInfo.State = model.StateNone - err := checkTableNotExists(d, t, schemaID, tbInfo.Name.L) + + oldTableID, err := findTableIDByName(d, t, schemaID, tbInfo.Name.L) + if infoschema.ErrTableNotExists.Equal(err) { + err = nil + } + failpoint.InjectCall("onDDLCreateView", job) if err != nil { if infoschema.ErrDatabaseNotExists.Equal(err) { job.State = model.JobStateCancelled @@ -329,13 +334,13 @@ func onCreateView(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) // none -> public tbInfo.State = model.StatePublic tbInfo.UpdateTS = t.StartTS - if oldTbInfoID > 0 && orReplace { - err = t.DropTableOrView(schemaID, job.SchemaName, oldTbInfoID, tbInfo.Name.L) + if oldTableID > 0 && orReplace { + err = t.DropTableOrView(schemaID, job.SchemaName, oldTableID, tbInfo.Name.L) if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } - err = t.GetAutoIDAccessors(schemaID, oldTbInfoID).Del() + err = t.GetAutoIDAccessors(schemaID, oldTableID).Del() if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) @@ -1505,7 +1510,7 @@ func checkTableNotExists(d *ddlCtx, t *meta.Meta, schemaID int64, tableName stri return err } is := d.infoCache.GetLatest() - if is.SchemaMetaVersion() == currVer { + if is != nil && is.SchemaMetaVersion() == currVer { return checkTableNotExistsFromInfoSchema(is, schemaID, tableName) } @@ -1519,7 +1524,7 @@ func checkTableNotExistsByName(d *ddlCtx, t *meta.Meta, schemaID int64, schemaNa return err } is := d.infoCache.GetLatest() - if is.SchemaMetaVersion() == currVer { + if is != nil && is.SchemaMetaVersion() == currVer { return checkTableNotExistsFromInfoSchema(is, schemaID, tableName) } return t.CheckTableNameNotExists(t.TableNameKey(schemaName, tableName)) @@ -1593,6 +1598,48 @@ func checkTableNotExistsFromStore(t *meta.Meta, schemaID int64, tableName string return nil } +func findTableIDByName(d *ddlCtx, t *meta.Meta, schemaID int64, tableName string) (int64, error) { + // Try to use memory schema info to check first. + currVer, err := t.GetSchemaVersion() + if err != nil { + return 0, err + } + is := d.infoCache.GetLatest() + if is != nil && is.SchemaMetaVersion() == currVer { + return findTableIDFromInfoSchema(is, schemaID, tableName) + } + + return findTableIDFromStore(t, schemaID, tableName) +} + +func findTableIDFromInfoSchema(is infoschema.InfoSchema, schemaID int64, tableName string) (int64, error) { + schema, ok := is.SchemaByID(schemaID) + if !ok { + return 0, infoschema.ErrDatabaseNotExists.GenWithStackByArgs("") + } + tbl, err := is.TableByName(schema.Name, model.NewCIStr(tableName)) + if err != nil { + return 0, err + } + return tbl.Meta().ID, nil +} + +func findTableIDFromStore(t *meta.Meta, schemaID int64, tableName string) (int64, error) { + tbls, err := t.ListSimpleTables(schemaID) + if err != nil { + if meta.ErrDBNotExists.Equal(err) { + return 0, infoschema.ErrDatabaseNotExists.GenWithStackByArgs("") + } + return 0, errors.Trace(err) + } + for _, tbl := range tbls { + if tbl.Name.L == tableName { + return tbl.ID, nil + } + } + return 0, infoschema.ErrTableNotExists.FastGenByArgs(tableName) +} + // updateVersionAndTableInfoWithCheck checks table info validate and updates the schema version and the table information func updateVersionAndTableInfoWithCheck(d *ddlCtx, t *meta.Meta, job *model.Job, tblInfo *model.TableInfo, shouldUpdateVer bool, multiInfos ...schemaIDAndTableInfo) ( ver int64, err error) { diff --git a/pkg/ddl/table_test.go b/pkg/ddl/table_test.go index 61fc39a943de4..13d37572e5cef 100644 --- a/pkg/ddl/table_test.go +++ b/pkg/ddl/table_test.go @@ -309,7 +309,8 @@ func TestCreateView(t *testing.T) { } ctx.SetValue(sessionctx.QueryString, "skip") err = d.DoDDLJob(ctx, job) - require.Error(t, err) + // The non-existing table id in job args will not be considered anymore. + require.NoError(t, err) } func checkTableCacheTest(t *testing.T, store kv.Storage, dbInfo *model.DBInfo, tblInfo *model.TableInfo) { From 194f641cf9e573c4822d4bdf64cccc42cdc84d4c Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 31 Jul 2024 17:13:18 +0800 Subject: [PATCH 2/2] fix git conflict Signed-off-by: lance6716 --- DEPS.bzl | 12 ++++++------ go.mod | 2 +- go.sum | 4 ++-- pkg/ddl/BUILD.bazel | 5 ----- pkg/ddl/ddl.go | 5 ----- pkg/ddl/job_table.go | 7 +------ 6 files changed, 10 insertions(+), 25 deletions(-) diff --git a/DEPS.bzl b/DEPS.bzl index 2102a85c088fa..bf727f814fc43 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -5845,13 +5845,13 @@ def go_deps(): name = "com_github_pingcap_failpoint", build_file_proto_mode = "disable_global", importpath = "github.com/pingcap/failpoint", - sha256 = "ea37b4dddfbccaaed9b313f9f1099dfbf00d36d768a8416d6d175cbe2c8b1254", - strip_prefix = "github.com/pingcap/failpoint@v0.0.0-20220801062533-2eaa32854a6c", + sha256 = "fb2b8146ff608751050d56d0506d271f75afa030d2d09d2da9e2bac562f6a866", + strip_prefix = "github.com/pingcap/failpoint@v0.0.0-20240528011301-b51a646c7c86", urls = [ - "http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/failpoint/com_github_pingcap_failpoint-v0.0.0-20220801062533-2eaa32854a6c.zip", - "http://ats.apps.svc/gomod/github.com/pingcap/failpoint/com_github_pingcap_failpoint-v0.0.0-20220801062533-2eaa32854a6c.zip", - "https://cache.hawkingrei.com/gomod/github.com/pingcap/failpoint/com_github_pingcap_failpoint-v0.0.0-20220801062533-2eaa32854a6c.zip", - "https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/failpoint/com_github_pingcap_failpoint-v0.0.0-20220801062533-2eaa32854a6c.zip", + "http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/failpoint/com_github_pingcap_failpoint-v0.0.0-20240528011301-b51a646c7c86.zip", + "http://ats.apps.svc/gomod/github.com/pingcap/failpoint/com_github_pingcap_failpoint-v0.0.0-20240528011301-b51a646c7c86.zip", + "https://cache.hawkingrei.com/gomod/github.com/pingcap/failpoint/com_github_pingcap_failpoint-v0.0.0-20240528011301-b51a646c7c86.zip", + "https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/failpoint/com_github_pingcap_failpoint-v0.0.0-20240528011301-b51a646c7c86.zip", ], ) go_repository( diff --git a/go.mod b/go.mod index 6be6868fa35e8..9066d46e2d0ff 100644 --- a/go.mod +++ b/go.mod @@ -82,7 +82,7 @@ require ( github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2 github.com/pingcap/badger v1.5.1-0.20230103063557-828f39b09b6d github.com/pingcap/errors v0.11.5-0.20240318064555-6bd07397691f - github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c + github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 github.com/pingcap/fn v1.0.0 github.com/pingcap/kvproto v0.0.0-20240227073058-929ab83f9754 github.com/pingcap/log v1.1.1-0.20240314023424-862ccc32f18d diff --git a/go.sum b/go.sum index 2df9059ac329c..8eb95164712b5 100644 --- a/go.sum +++ b/go.sum @@ -709,8 +709,8 @@ github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTw github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pingcap/errors v0.11.5-0.20240318064555-6bd07397691f h1:FxA+NgsdHNOv+/hZGxUh8Gb3WuZqgqmxDwztEOiA1v4= github.com/pingcap/errors v0.11.5-0.20240318064555-6bd07397691f/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg= -github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c h1:CgbKAHto5CQgWM9fSBIvaxsJHuGP0uM74HXtv3MyyGQ= -github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= +github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 h1:tdMsjOqUR7YXHoBitzdebTvOjs/swniBTOLy5XiMtuE= +github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86/go.mod h1:exzhVYca3WRtd6gclGNErRWb1qEgff3LYta0LvRmON4= github.com/pingcap/fn v1.0.0 h1:CyA6AxcOZkQh52wIqYlAmaVmF6EvrcqFywP463pjA8g= github.com/pingcap/fn v1.0.0/go.mod h1:u9WZ1ZiOD1RpNhcI42RucFh/lBuzTu6rw88a+oF2Z24= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E= diff --git a/pkg/ddl/BUILD.bazel b/pkg/ddl/BUILD.bazel index 0ca18ce55587b..d5bb826bee890 100644 --- a/pkg/ddl/BUILD.bazel +++ b/pkg/ddl/BUILD.bazel @@ -155,11 +155,6 @@ go_library( "//pkg/util/sqlexec", "//pkg/util/sqlkiller", "//pkg/util/stringutil", -<<<<<<< HEAD - "//pkg/util/syncutil", -======= - "//pkg/util/tiflash", ->>>>>>> 44c9096efbc (ddl: get latest old table ID before replace view (#53720)) "//pkg/util/timeutil", "//pkg/util/topsql", "//pkg/util/topsql/state", diff --git a/pkg/ddl/ddl.go b/pkg/ddl/ddl.go index 069bef60e0243..65e00da43c1ea 100644 --- a/pkg/ddl/ddl.go +++ b/pkg/ddl/ddl.go @@ -64,11 +64,6 @@ import ( "github.com/pingcap/tidb/pkg/util/dbterror" "github.com/pingcap/tidb/pkg/util/dbterror/exeerrors" "github.com/pingcap/tidb/pkg/util/gcutil" -<<<<<<< HEAD - "github.com/pingcap/tidb/pkg/util/syncutil" -======= - "github.com/pingcap/tidb/pkg/util/generic" ->>>>>>> 44c9096efbc (ddl: get latest old table ID before replace view (#53720)) "github.com/tikv/client-go/v2/tikvrpc" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/concurrency" diff --git a/pkg/ddl/job_table.go b/pkg/ddl/job_table.go index 5467ca04c316b..036bcae9b185d 100644 --- a/pkg/ddl/job_table.go +++ b/pkg/ddl/job_table.go @@ -421,14 +421,9 @@ func (d *ddl) delivery2Worker(wk *worker, pool *workerPool, job *model.Job) { d.wg.Run(func() { metrics.DDLRunningJobCount.WithLabelValues(pool.tp().String()).Inc() defer func() { -<<<<<<< HEAD + failpoint.InjectCall("afterDelivery2Worker", job) d.runningJobs.remove(job) asyncNotify(d.ddlJobCh) -======= - failpoint.InjectCall("afterDelivery2Worker", job) - s.runningJobs.remove(job) - asyncNotify(s.ddlJobNotifyCh) ->>>>>>> 44c9096efbc (ddl: get latest old table ID before replace view (#53720)) metrics.DDLRunningJobCount.WithLabelValues(pool.tp().String()).Dec() }() ownerID := d.ownerManager.ID()