Skip to content

Commit

Permalink
ddl: fix db_test failure caused by domain reload delay under a high o…
Browse files Browse the repository at this point in the history
…verload (#21928)

Signed-off-by: AilinKid <[email protected]>
  • Loading branch information
AilinKid authored Dec 24, 2020
1 parent 092a01a commit be29d7e
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 13 deletions.
9 changes: 8 additions & 1 deletion ddl/callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ func (bi *BaseInterceptor) OnGetInfoSchema(ctx sessionctx.Context, is infoschema

// Callback is used for DDL.
type Callback interface {
// OnChanged is called after schema is changed.
// OnChanged is called after a ddl statement is finished.
OnChanged(err error) error
// OnSchemaStateChange is called after a schema state is changed.
OnSchemaStateChanged()
// OnJobRunBefore is called before running job.
OnJobRunBefore(job *model.Job)
// OnJobUpdated is called after the running job is updated.
Expand All @@ -56,6 +58,11 @@ func (c *BaseCallback) OnChanged(err error) error {
return err
}

// OnSchemaStateChanged implements Callback interface.
func (c *BaseCallback) OnSchemaStateChanged() {
// Nothing to do.
}

// OnJobRunBefore implements Callback.OnJobRunBefore interface.
func (c *BaseCallback) OnJobRunBefore(job *model.Job) {
// Nothing to do.
Expand Down
15 changes: 15 additions & 0 deletions ddl/callback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ func (ti *TestInterceptor) OnGetInfoSchema(ctx sessionctx.Context, is infoschema

type TestDDLCallback struct {
*BaseCallback
// We recommended to pass the domain parameter to the test ddl callback, it will ensure
// domain to reload schema before your ddl stepping into the next state change.
Do DomainReloader

onJobRunBefore func(*model.Job)
OnJobRunBeforeExported func(*model.Job)
Expand All @@ -48,6 +51,14 @@ type TestDDLCallback struct {
onWatched func(ctx context.Context)
}

func (tc *TestDDLCallback) OnSchemaStateChanged() {
if tc.Do != nil {
if err := tc.Do.Reload(); err != nil {
log.Warn("reload failed on schema state changed", zap.Error(err))
}
}
}

func (tc *TestDDLCallback) OnJobRunBefore(job *model.Job) {
log.Info("on job run before", zap.String("job", job.String()))
if tc.OnJobRunBeforeExported != nil {
Expand Down Expand Up @@ -92,3 +103,7 @@ func (s *testDDLSuite) TestCallback(c *C) {
cb.OnJobUpdated(nil)
cb.OnWatched(context.TODO())
}

type DomainReloader interface {
Reload() error
}
8 changes: 1 addition & 7 deletions ddl/db_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,6 @@ type testStateChangeSuiteBase struct {
preSQL string
}

func forceReloadDomain(sess session.Session) {
dom := domain.GetDomain(sess)
dom.Reload()
}

func (s *testStateChangeSuiteBase) SetUpSuite(c *C) {
s.lease = 200 * time.Millisecond
ddl.SetWaitTimeWhenErrorOccurred(1 * time.Microsecond)
Expand Down Expand Up @@ -822,7 +817,7 @@ func (s *testStateChangeSuiteBase) runTestInSchemaState(c *C, state model.Schema
_, err = s.se.Execute(context.Background(), "drop stats t")
c.Assert(err, IsNil)

callback := &ddl.TestDDLCallback{}
callback := &ddl.TestDDLCallback{Do: s.dom}
prevState := model.StateNone
var checkErr error
times := 0
Expand All @@ -838,7 +833,6 @@ func (s *testStateChangeSuiteBase) runTestInSchemaState(c *C, state model.Schema
if job.SchemaState != state {
return
}
forceReloadDomain(se)
for _, sqlWithErr := range sqlWithErrs {
_, err = se.Execute(context.Background(), sqlWithErr.sql)
if !terror.ErrorEqual(err, sqlWithErr.expectErr) {
Expand Down
4 changes: 1 addition & 3 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2079,7 +2079,6 @@ LOOP:
case <-ticker.C:
// delete some rows, and add some data
for i := num; i < num+step; i++ {
forceReloadDomain(tk.Se)
n := rand.Intn(num)
tk.MustExec("begin")
tk.MustExec("delete from t2 where c1 = ?", n)
Expand Down Expand Up @@ -4217,7 +4216,7 @@ func (s *testDBSuite2) TestTransactionOnAddDropColumn(c *C) {

originHook := s.dom.DDL().GetHook()
defer s.dom.DDL().(ddl.DDLForTest).SetHook(originHook)
hook := &ddl.TestDDLCallback{}
hook := &ddl.TestDDLCallback{Do: s.dom}
var checkErr error
hook.OnJobRunBeforeExported = func(job *model.Job) {
if checkErr != nil {
Expand Down Expand Up @@ -4718,7 +4717,6 @@ func (s *testSerialDBSuite) TestModifyColumnCharset(c *C) {
tk.MustExec("create table t_mcc(a varchar(8) charset utf8, b varchar(8) charset utf8)")
defer s.mustExec(tk, c, "drop table t_mcc;")

forceReloadDomain(tk.Se)
result := tk.MustQuery(`show create table t_mcc`)
result.Check(testkit.Rows(
"t_mcc CREATE TABLE `t_mcc` (\n" +
Expand Down
14 changes: 12 additions & 2 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package ddl

import (
"context"
"flag"
"fmt"
"sync"
"time"
Expand Down Expand Up @@ -394,9 +395,7 @@ func (d *ddl) close() {

// GetLease implements DDL.GetLease interface.
func (d *ddl) GetLease() time.Duration {
d.m.RLock()
lease := d.lease
d.m.RUnlock()
return lease
}

Expand Down Expand Up @@ -667,3 +666,14 @@ type RecoverInfo struct {
CurAutoIncID int64
CurAutoRandID int64
}

var (
// RunInGoTest is used to identify whether ddl in running in the test.
RunInGoTest bool
)

func init() {
if flag.Lookup("test.v") != nil || flag.Lookup("check.v") != nil {
RunInGoTest = true
}
}
7 changes: 7 additions & 0 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,13 @@ func (w *worker) handleDDLJobQueue(d *ddlCtx) error {
w.waitSchemaChanged(ctx, d, waitTime, schemaVer, job)
cancel()

if RunInGoTest {
// d.mu.hook is initialed from domain / test callback, which will force the owner host update schema diff synchronously.
d.mu.RLock()
d.mu.hook.OnSchemaStateChanged()
d.mu.RUnlock()
}

d.mu.RLock()
d.mu.hook.OnJobUpdated(job)
d.mu.RUnlock()
Expand Down
9 changes: 9 additions & 0 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,7 @@ type ddlCallback struct {
do *Domain
}

// OnChanged overrides ddl Callback interface.
func (c *ddlCallback) OnChanged(err error) error {
if err != nil {
return err
Expand All @@ -667,6 +668,14 @@ func (c *ddlCallback) OnChanged(err error) error {
return nil
}

// OnSchemaStateChange overrides the ddl Callback interface.
func (c *ddlCallback) OnSchemaStateChanged() {
err := c.do.Reload()
if err != nil {
logutil.BgLogger().Error("domain callback failed on schema state changed", zap.Error(err))
}
}

const resourceIdleTimeout = 3 * time.Minute // resources in the ResourcePool will be recycled after idleTimeout

// NewDomain creates a new domain. Should not create multiple domains for the same store.
Expand Down

0 comments on commit be29d7e

Please sign in to comment.