diff --git a/ddl/ddl.go b/ddl/ddl.go index 47ef8d91be713..26d73eade7baa 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -196,21 +196,16 @@ type DDL interface { RegisterEventCh(chan<- *util.Event) // SchemaSyncer gets the schema syncer. SchemaSyncer() SchemaSyncer - // OwnerManager gets the owner manager, and it's used for testing. + // OwnerManager gets the owner manager. OwnerManager() owner.Manager - - // WorkerVars gets the session variables for DDL worker. It's exported for testing. - WorkerVars() *variable.SessionVars - // SetHook sets the hook. It's exported for testing. - SetHook(h Callback) - // GetHook gets the hook. It's exported for testing. - GetHook() Callback - - // GetTableMaxRowID gets table max row ID. It's exported for testing. + // GetTableMaxRowID gets table max row ID. GetTableMaxRowID(startTS uint64, tblInfo *model.TableInfo) (int64, bool, error) + + // SetBinlogClient sets the binlog client for DDL worker. It's exported for testing. + SetBinlogClient(interface{}) } -// ddl represents the statements which are used to define the database structure or schema. +// ddl is used to handle the statements that define the structure or schema of the database. type ddl struct { m sync.RWMutex infoHandle *infoschema.Handle @@ -229,12 +224,11 @@ type ddlCtx struct { ddlJobDoneCh chan struct{} ddlEventCh chan<- *util.Event lease time.Duration // lease is schema lease. + binlogCli interface{} // binlogCli is used for Binlog. // hook may be modified. hook Callback hookMu sync.RWMutex - - workerVars *variable.SessionVars // workerVars is used for Binlog. } func (dc *ddlCtx) isOwner() bool { @@ -307,14 +301,13 @@ func newDDL(ctx context.Context, etcdCli *clientv3.Client, store kv.Storage, ddlJobDoneCh: make(chan struct{}, 1), ownerManager: manager, schemaSyncer: syncer, - workerVars: variable.NewSessionVars(), + binlogCli: binloginfo.GetPumpClient(), hook: hook, } d := &ddl{ infoHandle: infoHandle, ddlCtx: ddlCtx, } - d.workerVars.BinlogClient = binloginfo.GetPumpClient() d.start(ctx, ctxPool) variable.RegisterStatistics(d) @@ -508,25 +501,9 @@ func (d *ddl) callHookOnChanged(err error) error { return errors.Trace(err) } -// SetHook implements DDL.SetHook interface. -func (d *ddl) SetHook(h Callback) { - d.hookMu.Lock() - defer d.hookMu.Unlock() - - d.hook = h -} - -// GetHook implements DDL.GetHook interface. -func (d *ddl) GetHook() Callback { - d.hookMu.RLock() - defer d.hookMu.RUnlock() - - return d.hook -} - -// WorkerVars implements DDL.WorkerVars interface. -func (d *ddl) WorkerVars() *variable.SessionVars { - return d.workerVars +// SetBinlogClient implements DDL.SetBinlogClient interface. +func (d *ddl) SetBinlogClient(binlogCli interface{}) { + d.binlogCli = binlogCli } // DDL error codes. diff --git a/ddl/ddl_db_change_test.go b/ddl/ddl_db_change_test.go index 697b5700066d9..3f76af2f0df92 100644 --- a/ddl/ddl_db_change_test.go +++ b/ddl/ddl_db_change_test.go @@ -174,7 +174,7 @@ func (s *testStateChangeSuite) test(c *C, tableName, alterTableSQL string, testI } } d := s.dom.DDL() - d.SetHook(callback) + d.(ddl.DDLForTest).SetHook(callback) _, err = s.se.Execute(context.Background(), alterTableSQL) c.Assert(err, IsNil) err = testInfo.compileSQL(4) @@ -186,7 +186,7 @@ func (s *testStateChangeSuite) test(c *C, tableName, alterTableSQL string, testI c.Assert(err, IsNil) c.Assert(errors.ErrorStack(checkErr), Equals, "") callback = &ddl.TestDDLCallback{} - d.SetHook(callback) + d.(ddl.DDLForTest).SetHook(callback) } type stateCase struct { @@ -381,12 +381,12 @@ func (s *testStateChangeSuite) runTestInSchemaState(c *C, state model.SchemaStat } } d := s.dom.DDL() - d.SetHook(callback) + d.(ddl.DDLForTest).SetHook(callback) _, err = s.se.Execute(context.Background(), alterTableSQL) c.Assert(err, IsNil) c.Assert(errors.ErrorStack(checkErr), Equals, "") callback = &ddl.TestDDLCallback{} - d.SetHook(callback) + d.(ddl.DDLForTest).SetHook(callback) if expectQuery != nil { tk := testkit.NewTestKit(c, s.store) @@ -454,7 +454,7 @@ func (s *testStateChangeSuite) TestShowIndex(c *C) { } d := s.dom.DDL() - d.SetHook(callback) + d.(ddl.DDLForTest).SetHook(callback) alterTableSQL := `alter table t add index c2(c2)` _, err = s.se.Execute(context.Background(), alterTableSQL) c.Assert(err, IsNil) @@ -465,7 +465,7 @@ func (s *testStateChangeSuite) TestShowIndex(c *C) { err = checkResult(result, testkit.Rows("t 0 PRIMARY 1 c1 A 0 BTREE ", "t 1 c2 1 c2 A 0 YES BTREE ")) c.Assert(err, IsNil) callback = &ddl.TestDDLCallback{} - d.SetHook(callback) + d.(ddl.DDLForTest).SetHook(callback) } func (s *testStateChangeSuite) TestParallelAlterModifyColumn(c *C) { @@ -541,7 +541,7 @@ func (s *testStateChangeSuite) testControlParallelExecSQL(c *C, sql1, sql2 strin times++ } d := s.dom.DDL() - d.SetHook(callback) + d.(ddl.DDLForTest).SetHook(callback) wg := sync.WaitGroup{} var err1 error @@ -573,7 +573,7 @@ func (s *testStateChangeSuite) testControlParallelExecSQL(c *C, sql1, sql2 strin f(c, err1, err2) callback = &ddl.TestDDLCallback{} - d.SetHook(callback) + d.(ddl.DDLForTest).SetHook(callback) } func (s *testStateChangeSuite) testParallelExecSQL(c *C, sql string) { @@ -598,7 +598,7 @@ func (s *testStateChangeSuite) testParallelExecSQL(c *C, sql string) { } d := s.dom.DDL() - d.SetHook(callback) + d.(ddl.DDLForTest).SetHook(callback) wg.Add(2) go func() { @@ -614,7 +614,7 @@ func (s *testStateChangeSuite) testParallelExecSQL(c *C, sql string) { c.Assert(err2, IsNil) c.Assert(err3, IsNil) callback = &ddl.TestDDLCallback{} - d.SetHook(callback) + d.(ddl.DDLForTest).SetHook(callback) } // TestCreateTableIfNotExists parallel exec create table if not exists xxx. No error returns is expected. diff --git a/ddl/ddl_db_test.go b/ddl/ddl_db_test.go index 842f149c89ba2..c06f42afcb183 100644 --- a/ddl/ddl_db_test.go +++ b/ddl/ddl_db_test.go @@ -454,9 +454,7 @@ func (s *testDBSuite) TestCancelAddIndex(c *C) { checkErr = errors.Trace(err) } } - originHook := s.dom.DDL().GetHook() - s.dom.DDL().SetHook(hook) - defer s.dom.DDL().SetHook(originHook) + s.dom.DDL().(ddl.DDLForTest).SetHook(hook) done := make(chan error, 1) go backgroundExec(s.store, "create unique index c3_index on t1 (c3)", done) @@ -498,6 +496,8 @@ LOOP: s.mustExec(c, "drop table t1") ddl.ReorgWaitTimeout = oldReorgWaitTimeout + callback := &ddl.TestDDLCallback{} + s.dom.DDL().(ddl.DDLForTest).SetHook(callback) } func (s *testDBSuite) TestAddAnonymousIndex(c *C) { diff --git a/ddl/ddl_test.go b/ddl/ddl_test.go index 3f3def2e277b8..361c58b21e813 100644 --- a/ddl/ddl_test.go +++ b/ddl/ddl_test.go @@ -33,6 +33,19 @@ import ( "golang.org/x/net/context" ) +type DDLForTest interface { + // SetHook sets the hook. + SetHook(h Callback) +} + +// SetHook implements DDL.SetHook interface. +func (d *ddl) SetHook(h Callback) { + d.hookMu.Lock() + defer d.hookMu.Unlock() + + d.hook = h +} + func TestT(t *testing.T) { CustomVerboseFlag = true logLevel := os.Getenv("log_level") diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 216713534f937..1428107f47d33 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -14,6 +14,7 @@ package ddl import ( + "fmt" "sync" "time" @@ -74,17 +75,30 @@ func newWorker(tp workerType, id int, store kv.Storage, ctxPool *pools.ResourceP return worker } +func (w *worker) String() string { + var str string + switch w.tp { + case generalWorker: + str = "general" + case addIdxWorker: + str = "add index" + default: + str = "unknow" + } + return fmt.Sprintf("%d, tp %s", w.id, str) +} + func (w *worker) close() { close(w.quitCh) w.delRangeManager.clear() w.wg.Wait() - log.Infof("[ddl] close DDL worker %v", w.tp) + log.Infof("[ddl] close DDL worker %s", w) } // start is used for async online schema changing, it will try to become the owner firstly, // then wait or pull the job queue to handle a schema change job. func (w *worker) start(d *ddlCtx) { - log.Infof("[ddl] start DDL worker %v", w.tp) + log.Infof("[ddl] start DDL worker %s", w) defer w.wg.Done() w.delRangeManager.start() @@ -100,7 +114,7 @@ func (w *worker) start(d *ddlCtx) { r := recover() if r != nil { buf := util.GetStack() - log.Errorf("[ddl] worker %v %s", r, buf) + log.Errorf("[ddl] ddl %s, worker %s, %v %s", d.uuid, w, r, buf) metrics.PanicCounter.WithLabelValues(metrics.LabelDDL).Inc() } }() @@ -110,13 +124,14 @@ func (w *worker) start(d *ddlCtx) { case <-ticker.C: log.Debugf("[ddl] wait %s to check DDL status again", checkTime) case <-w.ddlJobCh: + log.Debugf("[ddl] worker %s waits %s to check DDL status again", w, checkTime) case <-w.quitCh: return } err := w.handleDDLJobQueue(d) if err != nil { - log.Errorf("[ddl] handle ddl job err %v", errors.ErrorStack(err)) + log.Errorf("[ddl] worker %s handles DDL job err %v", w, errors.ErrorStack(err)) } } } @@ -336,7 +351,7 @@ func (w *worker) handleDDLJobQueue(d *ddlCtx) error { } if job.IsDone() || job.IsRollbackDone() { - binloginfo.SetDDLBinlog(d.workerVars.BinlogClient, txn, job.ID, job.Query) + binloginfo.SetDDLBinlog(d.binlogCli, txn, job.ID, job.Query) if !job.IsRollbackDone() { job.State = model.JobStateSynced } @@ -363,7 +378,7 @@ func (w *worker) handleDDLJobQueue(d *ddlCtx) error { if runJobErr != nil || waitDependencyJob { // wait a while to retry again. If we don't wait here, DDL will retry this job immediately, // which may act like a deadlock. - log.Infof("[ddl] run DDL job error, sleep a while:%v then retry it.", WaitTimeWhenErrorOccured) + log.Infof("[ddl] worker %s runs DDL job error, sleeps a while:%v then retries it.", w, WaitTimeWhenErrorOccured) metrics.DDLJobErrCounter.Inc() time.Sleep(WaitTimeWhenErrorOccured) } diff --git a/ddl/reorg.go b/ddl/reorg.go index f3c99d5b17eb0..0efaf1134c9b1 100644 --- a/ddl/reorg.go +++ b/ddl/reorg.go @@ -139,7 +139,7 @@ func (w *worker) runReorgJob(t *meta.Meta, reorgInfo *reorgInfo, lease time.Dura w.reorgCtx.clean() return errors.Trace(err) case <-w.quitCh: - log.Info("[ddl] run reorg job ddl quit") + log.Info("[ddl] run reorg job quit") w.reorgCtx.setNextHandle(0) w.reorgCtx.setRowCount(0) // We return errWaitReorgTimeout here too, so that outer loop will break. diff --git a/sessionctx/binloginfo/binloginfo_test.go b/sessionctx/binloginfo/binloginfo_test.go index f290e82b3386d..5c4baba712b99 100644 --- a/sessionctx/binloginfo/binloginfo_test.go +++ b/sessionctx/binloginfo/binloginfo_test.go @@ -113,7 +113,7 @@ func (s *testBinlogSuite) SetUpSuite(c *C) { s.ddl = sessionDomain.DDL() s.client = binlog.NewPumpClient(clientCon) - s.ddl.WorkerVars().BinlogClient = s.client + s.ddl.SetBinlogClient(s.client) } func (s *testBinlogSuite) TearDownSuite(c *C) {