From be29d7e054adb78344add01a7e112814e3ffbcfe Mon Sep 17 00:00:00 2001 From: Arenatlx <314806019@qq.com> Date: Thu, 24 Dec 2020 15:43:30 +0800 Subject: [PATCH] ddl: fix db_test failure caused by domain reload delay under a high overload (#21928) Signed-off-by: AilinKid <314806019@qq.com> --- ddl/callback.go | 9 ++++++++- ddl/callback_test.go | 15 +++++++++++++++ ddl/db_change_test.go | 8 +------- ddl/db_test.go | 4 +--- ddl/ddl.go | 14 ++++++++++++-- ddl/ddl_worker.go | 7 +++++++ domain/domain.go | 9 +++++++++ 7 files changed, 53 insertions(+), 13 deletions(-) diff --git a/ddl/callback.go b/ddl/callback.go index af2b4856e8c1d..758740527f0bc 100644 --- a/ddl/callback.go +++ b/ddl/callback.go @@ -37,8 +37,10 @@ func (bi *BaseInterceptor) OnGetInfoSchema(ctx sessionctx.Context, is infoschema // Callback is used for DDL. type Callback interface { - // OnChanged is called after schema is changed. + // OnChanged is called after a ddl statement is finished. OnChanged(err error) error + // OnSchemaStateChange is called after a schema state is changed. + OnSchemaStateChanged() // OnJobRunBefore is called before running job. OnJobRunBefore(job *model.Job) // OnJobUpdated is called after the running job is updated. @@ -56,6 +58,11 @@ func (c *BaseCallback) OnChanged(err error) error { return err } +// OnSchemaStateChanged implements Callback interface. +func (c *BaseCallback) OnSchemaStateChanged() { + // Nothing to do. +} + // OnJobRunBefore implements Callback.OnJobRunBefore interface. func (c *BaseCallback) OnJobRunBefore(job *model.Job) { // Nothing to do. diff --git a/ddl/callback_test.go b/ddl/callback_test.go index 7f01cb626d62c..80ec643de54e3 100644 --- a/ddl/callback_test.go +++ b/ddl/callback_test.go @@ -40,6 +40,9 @@ func (ti *TestInterceptor) OnGetInfoSchema(ctx sessionctx.Context, is infoschema type TestDDLCallback struct { *BaseCallback + // We recommended to pass the domain parameter to the test ddl callback, it will ensure + // domain to reload schema before your ddl stepping into the next state change. + Do DomainReloader onJobRunBefore func(*model.Job) OnJobRunBeforeExported func(*model.Job) @@ -48,6 +51,14 @@ type TestDDLCallback struct { onWatched func(ctx context.Context) } +func (tc *TestDDLCallback) OnSchemaStateChanged() { + if tc.Do != nil { + if err := tc.Do.Reload(); err != nil { + log.Warn("reload failed on schema state changed", zap.Error(err)) + } + } +} + func (tc *TestDDLCallback) OnJobRunBefore(job *model.Job) { log.Info("on job run before", zap.String("job", job.String())) if tc.OnJobRunBeforeExported != nil { @@ -92,3 +103,7 @@ func (s *testDDLSuite) TestCallback(c *C) { cb.OnJobUpdated(nil) cb.OnWatched(context.TODO()) } + +type DomainReloader interface { + Reload() error +} diff --git a/ddl/db_change_test.go b/ddl/db_change_test.go index ca39c4ff462af..d87fc6406d26f 100644 --- a/ddl/db_change_test.go +++ b/ddl/db_change_test.go @@ -67,11 +67,6 @@ type testStateChangeSuiteBase struct { preSQL string } -func forceReloadDomain(sess session.Session) { - dom := domain.GetDomain(sess) - dom.Reload() -} - func (s *testStateChangeSuiteBase) SetUpSuite(c *C) { s.lease = 200 * time.Millisecond ddl.SetWaitTimeWhenErrorOccurred(1 * time.Microsecond) @@ -822,7 +817,7 @@ func (s *testStateChangeSuiteBase) runTestInSchemaState(c *C, state model.Schema _, err = s.se.Execute(context.Background(), "drop stats t") c.Assert(err, IsNil) - callback := &ddl.TestDDLCallback{} + callback := &ddl.TestDDLCallback{Do: s.dom} prevState := model.StateNone var checkErr error times := 0 @@ -838,7 +833,6 @@ func (s *testStateChangeSuiteBase) runTestInSchemaState(c *C, state model.Schema if job.SchemaState != state { return } - forceReloadDomain(se) for _, sqlWithErr := range sqlWithErrs { _, err = se.Execute(context.Background(), sqlWithErr.sql) if !terror.ErrorEqual(err, sqlWithErr.expectErr) { diff --git a/ddl/db_test.go b/ddl/db_test.go index a7cdf60b79ce5..6d7fab25f5985 100644 --- a/ddl/db_test.go +++ b/ddl/db_test.go @@ -2079,7 +2079,6 @@ LOOP: case <-ticker.C: // delete some rows, and add some data for i := num; i < num+step; i++ { - forceReloadDomain(tk.Se) n := rand.Intn(num) tk.MustExec("begin") tk.MustExec("delete from t2 where c1 = ?", n) @@ -4217,7 +4216,7 @@ func (s *testDBSuite2) TestTransactionOnAddDropColumn(c *C) { originHook := s.dom.DDL().GetHook() defer s.dom.DDL().(ddl.DDLForTest).SetHook(originHook) - hook := &ddl.TestDDLCallback{} + hook := &ddl.TestDDLCallback{Do: s.dom} var checkErr error hook.OnJobRunBeforeExported = func(job *model.Job) { if checkErr != nil { @@ -4718,7 +4717,6 @@ func (s *testSerialDBSuite) TestModifyColumnCharset(c *C) { tk.MustExec("create table t_mcc(a varchar(8) charset utf8, b varchar(8) charset utf8)") defer s.mustExec(tk, c, "drop table t_mcc;") - forceReloadDomain(tk.Se) result := tk.MustQuery(`show create table t_mcc`) result.Check(testkit.Rows( "t_mcc CREATE TABLE `t_mcc` (\n" + diff --git a/ddl/ddl.go b/ddl/ddl.go index 0e51aede2b970..0a9266802a898 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -19,6 +19,7 @@ package ddl import ( "context" + "flag" "fmt" "sync" "time" @@ -394,9 +395,7 @@ func (d *ddl) close() { // GetLease implements DDL.GetLease interface. func (d *ddl) GetLease() time.Duration { - d.m.RLock() lease := d.lease - d.m.RUnlock() return lease } @@ -667,3 +666,14 @@ type RecoverInfo struct { CurAutoIncID int64 CurAutoRandID int64 } + +var ( + // RunInGoTest is used to identify whether ddl in running in the test. + RunInGoTest bool +) + +func init() { + if flag.Lookup("test.v") != nil || flag.Lookup("check.v") != nil { + RunInGoTest = true + } +} diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 5723d2bfd6d69..4719d5b7c35e7 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -516,6 +516,13 @@ func (w *worker) handleDDLJobQueue(d *ddlCtx) error { w.waitSchemaChanged(ctx, d, waitTime, schemaVer, job) cancel() + if RunInGoTest { + // d.mu.hook is initialed from domain / test callback, which will force the owner host update schema diff synchronously. + d.mu.RLock() + d.mu.hook.OnSchemaStateChanged() + d.mu.RUnlock() + } + d.mu.RLock() d.mu.hook.OnJobUpdated(job) d.mu.RUnlock() diff --git a/domain/domain.go b/domain/domain.go index e1fd6c7a89b41..dd60effb3cbaa 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -653,6 +653,7 @@ type ddlCallback struct { do *Domain } +// OnChanged overrides ddl Callback interface. func (c *ddlCallback) OnChanged(err error) error { if err != nil { return err @@ -667,6 +668,14 @@ func (c *ddlCallback) OnChanged(err error) error { return nil } +// OnSchemaStateChange overrides the ddl Callback interface. +func (c *ddlCallback) OnSchemaStateChanged() { + err := c.do.Reload() + if err != nil { + logutil.BgLogger().Error("domain callback failed on schema state changed", zap.Error(err)) + } +} + const resourceIdleTimeout = 3 * time.Minute // resources in the ResourcePool will be recycled after idleTimeout // NewDomain creates a new domain. Should not create multiple domains for the same store.