From 1dd1ba4b094321e210103ff5ea51bc4762a7c91d Mon Sep 17 00:00:00 2001 From: tangenta Date: Tue, 30 Jul 2024 15:37:09 +0800 Subject: [PATCH 1/4] ddl: remove reorg ctx when current instance is not owner --- pkg/ddl/backfilling.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/ddl/backfilling.go b/pkg/ddl/backfilling.go index 4fbed0422726d..9fa1c0823bf68 100644 --- a/pkg/ddl/backfilling.go +++ b/pkg/ddl/backfilling.go @@ -823,8 +823,12 @@ func (dc *ddlCtx) writePhysicalTableRecord( reorgInfo *reorgInfo, ) error { startKey, endKey := reorgInfo.StartKey, reorgInfo.EndKey - if err := dc.isReorgRunnable(reorgInfo.Job.ID, false); err != nil { + if errors.ErrorEqual(err, dbterror.ErrNotOwner) { + // This instance is not DDL owner, we remove reorgctx proactively + // to avoid being used later. + dc.removeReorgCtx(reorgInfo.ID) + } return errors.Trace(err) } From 8009bf9de8541315f47e6aac429c876b45cd1937 Mon Sep 17 00:00:00 2001 From: tangenta Date: Tue, 30 Jul 2024 22:49:31 +0800 Subject: [PATCH 2/4] ddl: record get owner TS and compare it before runReorgJob quit --- pkg/ddl/backfilling.go | 5 ----- pkg/ddl/ddl.go | 13 +++++++++++++ pkg/ddl/job_scheduler.go | 1 + pkg/ddl/reorg.go | 24 +++++++++++++++++++++--- 4 files changed, 35 insertions(+), 8 deletions(-) diff --git a/pkg/ddl/backfilling.go b/pkg/ddl/backfilling.go index 9fa1c0823bf68..c2d4648ea3a8a 100644 --- a/pkg/ddl/backfilling.go +++ b/pkg/ddl/backfilling.go @@ -824,11 +824,6 @@ func (dc *ddlCtx) writePhysicalTableRecord( ) error { startKey, endKey := reorgInfo.StartKey, reorgInfo.EndKey if err := dc.isReorgRunnable(reorgInfo.Job.ID, false); err != nil { - if errors.ErrorEqual(err, dbterror.ErrNotOwner) { - // This instance is not DDL owner, we remove reorgctx proactively - // to avoid being used later. - dc.removeReorgCtx(reorgInfo.ID) - } return errors.Trace(err) } diff --git a/pkg/ddl/ddl.go b/pkg/ddl/ddl.go index 48a021793cc73..b6fc7b445ec9a 100644 --- a/pkg/ddl/ddl.go +++ b/pkg/ddl/ddl.go @@ -519,6 +519,19 @@ type reorgContexts struct { sync.RWMutex // reorgCtxMap maps job ID to reorg context. reorgCtxMap map[int64]*reorgCtx + beOwnerTS int64 +} + +func (r *reorgContexts) getOwnerTS() int64 { + r.RLock() + defer r.RUnlock() + return r.beOwnerTS +} + +func (r *reorgContexts) setOwnerTS(ts int64) { + r.Lock() + r.beOwnerTS = ts + r.Unlock() } func (dc *ddlCtx) getReorgCtx(jobID int64) *reorgCtx { diff --git a/pkg/ddl/job_scheduler.go b/pkg/ddl/job_scheduler.go index 683af6bff20ef..b97c9de5cb3b8 100644 --- a/pkg/ddl/job_scheduler.go +++ b/pkg/ddl/job_scheduler.go @@ -112,6 +112,7 @@ func (l *ownerListener) OnBecomeOwner() { sessPool: l.ddl.sessPool, delRangeMgr: l.ddl.delRangeMgr, } + l.ddl.reorgCtx.setOwnerTS(time.Now().Unix()) l.scheduler.start() } diff --git a/pkg/ddl/reorg.go b/pkg/ddl/reorg.go index c5295281ee686..2154974826a5a 100644 --- a/pkg/ddl/reorg.go +++ b/pkg/ddl/reorg.go @@ -65,7 +65,7 @@ type reorgCtx struct { // If the reorganization job is done, we will use this channel to notify outer. // TODO: Now we use goroutine to simulate reorganization jobs, later we may // use a persistent job list. - doneCh chan error + doneCh chan reorgFnResult // rowCount is used to simulate a job's row count. rowCount int64 jobState model.JobState @@ -80,6 +80,13 @@ type reorgCtx struct { references atomicutil.Int32 } +// reorgFnResult records the DDL owner TS before executing reorg function, in order to help +// receiver determine if the result is from reorg function of previous DDL owner in this instance. +type reorgFnResult struct { + ownerTS int64 + err error +} + func newReorgExprCtx() exprctx.ExprContext { evalCtx := contextstatic.NewStaticEvalContext( contextstatic.WithSQLMode(mysql.ModeNone), @@ -251,11 +258,13 @@ func (w *worker) runReorgJob( return dbterror.ErrCancelledDDLJob } + beOwnerTS := w.ddlCtx.reorgCtx.getOwnerTS() rc = w.newReorgCtx(reorgInfo.Job.ID, reorgInfo.Job.GetRowCount()) w.wg.Add(1) go func() { defer w.wg.Done() - rc.doneCh <- reorgFn() + err := reorgFn() + rc.doneCh <- reorgFnResult{ownerTS: beOwnerTS, err: err} }() } @@ -271,7 +280,16 @@ func (w *worker) runReorgJob( // wait reorganization job done or timeout select { - case err := <-rc.doneCh: + case res := <-rc.doneCh: + err := res.err + curTS := w.ddlCtx.reorgCtx.getOwnerTS() + if res.ownerTS != curTS { + d.removeReorgCtx(job.ID) + logutil.DDLLogger().Warn("owner ts mismatch, return timeout error and retry", + zap.Int64("prevTS", res.ownerTS), + zap.Int64("curTS", curTS)) + return dbterror.ErrWaitReorgTimeout + } // Since job is cancelled,we don't care about its partial counts. if rc.isReorgCanceled() || terror.ErrorEqual(err, dbterror.ErrCancelledDDLJob) { d.removeReorgCtx(job.ID) From 664d376322fafe1372d9635ba445d727e7e41629 Mon Sep 17 00:00:00 2001 From: tangenta Date: Tue, 30 Jul 2024 22:50:36 +0800 Subject: [PATCH 3/4] remove unnecessary changes --- pkg/ddl/backfilling.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/ddl/backfilling.go b/pkg/ddl/backfilling.go index c2d4648ea3a8a..4fbed0422726d 100644 --- a/pkg/ddl/backfilling.go +++ b/pkg/ddl/backfilling.go @@ -823,6 +823,7 @@ func (dc *ddlCtx) writePhysicalTableRecord( reorgInfo *reorgInfo, ) error { startKey, endKey := reorgInfo.StartKey, reorgInfo.EndKey + if err := dc.isReorgRunnable(reorgInfo.Job.ID, false); err != nil { return errors.Trace(err) } From 44e93b079d4943ff6d4b74eabfb5fa671643d4b1 Mon Sep 17 00:00:00 2001 From: tangenta Date: Tue, 30 Jul 2024 23:06:18 +0800 Subject: [PATCH 4/4] fix build --- pkg/ddl/ddl.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ddl/ddl.go b/pkg/ddl/ddl.go index b6fc7b445ec9a..236dc403a2ec6 100644 --- a/pkg/ddl/ddl.go +++ b/pkg/ddl/ddl.go @@ -549,7 +549,7 @@ func (dc *ddlCtx) newReorgCtx(jobID int64, rowCount int64) *reorgCtx { return existedRC } rc := &reorgCtx{} - rc.doneCh = make(chan error, 1) + rc.doneCh = make(chan reorgFnResult, 1) // initial reorgCtx rc.setRowCount(rowCount) rc.mu.warnings = make(map[errors.ErrorID]*terror.Error)