From e939c9b28342eb65dace39b482e62cc99d2a1171 Mon Sep 17 00:00:00 2001 From: Hangjie Mo Date: Mon, 14 Nov 2022 13:29:54 +0800 Subject: [PATCH] ddl: fix return duplicate schema version during multi-schema change (#39093) close pingcap/tidb#39090 --- ddl/callback.go | 8 ++++---- ddl/callback_test.go | 22 ++++++++++++++-------- ddl/ddl_worker.go | 2 +- ddl/job_table.go | 2 +- ddl/multi_schema_change.go | 12 +++++++----- ddl/multi_schema_change_test.go | 27 +++++++++++++++++++++++++++ 6 files changed, 54 insertions(+), 19 deletions(-) diff --git a/ddl/callback.go b/ddl/callback.go index 84b22cdfed944..b6160d150e717 100644 --- a/ddl/callback.go +++ b/ddl/callback.go @@ -48,7 +48,7 @@ type Callback interface { // OnChanged is called after a ddl statement is finished. OnChanged(err error) error // OnSchemaStateChanged is called after a schema state is changed. - OnSchemaStateChanged() + OnSchemaStateChanged(schemaVer int64) // OnJobRunBefore is called before running job. OnJobRunBefore(job *model.Job) // OnJobUpdated is called after the running job is updated. @@ -71,7 +71,7 @@ func (*BaseCallback) OnChanged(err error) error { } // OnSchemaStateChanged implements Callback interface. -func (*BaseCallback) OnSchemaStateChanged() { +func (*BaseCallback) OnSchemaStateChanged(schemaVer int64) { // Nothing to do. } @@ -129,7 +129,7 @@ func (c *DefaultCallback) OnChanged(err error) error { } // OnSchemaStateChanged overrides the ddl Callback interface. -func (c *DefaultCallback) OnSchemaStateChanged() { +func (c *DefaultCallback) OnSchemaStateChanged(schemaVer int64) { err := c.do.Reload() if err != nil { logutil.BgLogger().Error("domain callback failed on schema state changed", zap.Error(err)) @@ -166,7 +166,7 @@ func (c *ctcCallback) OnChanged(err error) error { } // OnSchemaStateChanged overrides the ddl Callback interface. -func (c *ctcCallback) OnSchemaStateChanged() { +func (c *ctcCallback) OnSchemaStateChanged(retVer int64) { err := c.do.Reload() if err != nil { logutil.BgLogger().Error("domain callback failed on schema state changed", zap.Error(err)) diff --git a/ddl/callback_test.go b/ddl/callback_test.go index 4e7199dbef8ca..5a97e8212689e 100644 --- a/ddl/callback_test.go +++ b/ddl/callback_test.go @@ -48,13 +48,14 @@ type TestDDLCallback struct { // domain to reload schema before your ddl stepping into the next state change. Do DomainReloader - onJobRunBefore func(*model.Job) - OnJobRunBeforeExported func(*model.Job) - onJobUpdated func(*model.Job) - OnJobUpdatedExported atomic.Pointer[func(*model.Job)] - onWatched func(ctx context.Context) - OnGetJobBeforeExported func(string) - OnGetJobAfterExported func(string, *model.Job) + onJobRunBefore func(*model.Job) + OnJobRunBeforeExported func(*model.Job) + onJobUpdated func(*model.Job) + OnJobUpdatedExported atomic.Pointer[func(*model.Job)] + onWatched func(ctx context.Context) + OnGetJobBeforeExported func(string) + OnGetJobAfterExported func(string, *model.Job) + OnJobSchemaStateChanged func(int64) } // OnChanged mock the same behavior with the main DDL hook. @@ -73,12 +74,17 @@ func (tc *TestDDLCallback) OnChanged(err error) error { } // OnSchemaStateChanged mock the same behavior with the main ddl hook. -func (tc *TestDDLCallback) OnSchemaStateChanged() { +func (tc *TestDDLCallback) OnSchemaStateChanged(schemaVer int64) { if tc.Do != nil { if err := tc.Do.Reload(); err != nil { logutil.BgLogger().Warn("reload failed on schema state changed", zap.Error(err)) } } + + if tc.OnJobSchemaStateChanged != nil { + tc.OnJobSchemaStateChanged(schemaVer) + return + } } // OnJobRunBefore is used to run the user customized logic of `onJobRunBefore` first. diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index ee7e7f3c6594e..956f4c805347f 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -1028,7 +1028,7 @@ func (w *worker) handleDDLJobQueue(d *ddlCtx) error { if RunInGoTest { // d.mu.hook is initialed from domain / test callback, which will force the owner host update schema diff synchronously. d.mu.RLock() - d.mu.hook.OnSchemaStateChanged() + d.mu.hook.OnSchemaStateChanged(schemaVer) d.mu.RUnlock() } diff --git a/ddl/job_table.go b/ddl/job_table.go index 83585ca040704..d23f083539e87 100644 --- a/ddl/job_table.go +++ b/ddl/job_table.go @@ -293,7 +293,7 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) { if RunInGoTest { // d.mu.hook is initialed from domain / test callback, which will force the owner host update schema diff synchronously. d.mu.RLock() - d.mu.hook.OnSchemaStateChanged() + d.mu.hook.OnSchemaStateChanged(schemaVer) d.mu.RUnlock() } diff --git a/ddl/multi_schema_change.go b/ddl/multi_schema_change.go index f3e02d0a74584..ab306fe546932 100644 --- a/ddl/multi_schema_change.go +++ b/ddl/multi_schema_change.go @@ -118,11 +118,13 @@ func onMultiSchemaChange(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ve proxyJob := sub.ToProxyJob(job) if schemaVersionGenerated { proxyJob.MultiSchemaInfo.SkipVersion = true - } else { + } + proxyJobVer, err := w.runDDLJob(d, t, &proxyJob) + if !schemaVersionGenerated && proxyJobVer != 0 { schemaVersionGenerated = true + ver = proxyJobVer } - ver, err = w.runDDLJob(d, t, &proxyJob) - sub.FromProxyJob(&proxyJob, ver) + sub.FromProxyJob(&proxyJob, proxyJobVer) if err != nil || proxyJob.Error != nil { for j := i - 1; j >= 0; j-- { job.MultiSchemaInfo.SubJobs[j] = &subJobs[j] @@ -376,8 +378,8 @@ func finishMultiSchemaJob(job *model.Job, t *meta.Meta) (ver int64, err error) { } tblInfo, err := t.GetTable(job.SchemaID, job.TableID) if err != nil { - return ver, err + return 0, err } job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo) - return ver, err + return 0, err } diff --git a/ddl/multi_schema_change_test.go b/ddl/multi_schema_change_test.go index d2e62d20d736d..250fb692f3500 100644 --- a/ddl/multi_schema_change_test.go +++ b/ddl/multi_schema_change_test.go @@ -1141,6 +1141,33 @@ func TestMultiSchemaChangeUnsupportedType(t *testing.T) { "[ddl:8200]Unsupported multi schema change for modify auto id cache") } +func TestMultiSchemaChangeSchemaVersion(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test;") + tk.MustExec("create table t(a int, b int, c int, d int)") + tk.MustExec("insert into t values (1,2,3,4)") + + schemaVerMap := map[int64]struct{}{} + + originHook := dom.DDL().GetHook() + hook := &ddl.TestDDLCallback{Do: dom} + hook.OnJobSchemaStateChanged = func(schemaVer int64) { + if schemaVer != 0 { + // No same return schemaVer during multi-schema change + _, ok := schemaVerMap[schemaVer] + assert.False(t, ok) + schemaVerMap[schemaVer] = struct{}{} + } + } + dom.DDL().SetHook(hook) + tk.MustExec("alter table t drop column b, drop column c") + tk.MustExec("alter table t add column b int, add column c int") + tk.MustExec("alter table t add index k(b), add column e int") + tk.MustExec("alter table t alter index k invisible, drop column e") + dom.DDL().SetHook(originHook) +} + func TestMultiSchemaChangeMixedWithUpdate(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store)