diff --git a/pkg/ddl/backfilling.go b/pkg/ddl/backfilling.go index 450499bbf200b..33155f01e07a7 100644 --- a/pkg/ddl/backfilling.go +++ b/pkg/ddl/backfilling.go @@ -674,12 +674,8 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sess.Pool, t table.Physical totalAddedCount := job.GetRowCount() startKey, endKey := reorgInfo.StartKey, reorgInfo.EndKey + if err := dc.isReorgRunnable(reorgInfo.Job.ID, false); err != nil { - if errors.ErrorEqual(err, dbterror.ErrNotOwner) { - // This instance is not DDL owner, we remove reorgctx proactively - // to avoid being used later. - dc.removeReorgCtx(reorgInfo.ID) - } return errors.Trace(err) } diff --git a/pkg/ddl/ddl.go b/pkg/ddl/ddl.go index 0c791f22117d9..c499136de914e 100644 --- a/pkg/ddl/ddl.go +++ b/pkg/ddl/ddl.go @@ -493,6 +493,19 @@ type reorgContexts struct { sync.RWMutex // reorgCtxMap maps job ID to reorg context. reorgCtxMap map[int64]*reorgCtx + beOwnerTS int64 +} + +func (r *reorgContexts) getOwnerTS() int64 { + r.RLock() + defer r.RUnlock() + return r.beOwnerTS +} + +func (r *reorgContexts) setOwnerTS(ts int64) { + r.Lock() + r.beOwnerTS = ts + r.Unlock() } func (dc *ddlCtx) getReorgCtx(jobID int64) *reorgCtx { @@ -510,7 +523,7 @@ func (dc *ddlCtx) newReorgCtx(jobID int64, rowCount int64) *reorgCtx { return existedRC } rc := &reorgCtx{} - rc.doneCh = make(chan error, 1) + rc.doneCh = make(chan reorgFnResult, 1) // initial reorgCtx rc.setRowCount(rowCount) rc.mu.warnings = make(map[errors.ErrorID]*terror.Error) @@ -746,6 +759,7 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error { if err != nil { logutil.BgLogger().Error("error when getting the ddl history count", zap.Error(err)) } + d.reorgCtx.setOwnerTS(time.Now().Unix()) d.runningJobs.clear() }) diff --git a/pkg/ddl/reorg.go b/pkg/ddl/reorg.go index 971ffcbe23fd3..1c8573d8e9d08 100644 --- a/pkg/ddl/reorg.go +++ b/pkg/ddl/reorg.go @@ -59,7 +59,7 @@ type reorgCtx struct { // If the reorganization job is done, we will use this channel to notify outer. // TODO: Now we use goroutine to simulate reorganization jobs, later we may // use a persistent job list. - doneCh chan error + doneCh chan reorgFnResult // rowCount is used to simulate a job's row count. rowCount int64 jobState model.JobState @@ -74,6 +74,13 @@ type reorgCtx struct { references atomicutil.Int32 } +// reorgFnResult records the DDL owner TS before executing reorg function, in order to help +// receiver determine if the result is from reorg function of previous DDL owner in this instance. +type reorgFnResult struct { + ownerTS int64 + err error +} + // newContext gets a context. It is only used for adding column in reorganization state. func newContext(store kv.Storage) sessionctx.Context { c := mock.NewContext() @@ -200,11 +207,13 @@ func (w *worker) runReorgJob(reorgInfo *reorgInfo, tblInfo *model.TableInfo, return dbterror.ErrCancelledDDLJob } + beOwnerTS := w.ddlCtx.reorgCtx.getOwnerTS() rc = w.newReorgCtx(reorgInfo.Job.ID, reorgInfo.Job.GetRowCount()) w.wg.Add(1) go func() { defer w.wg.Done() - rc.doneCh <- f() + err := f() + rc.doneCh <- reorgFnResult{ownerTS: beOwnerTS, err: err} }() } @@ -220,7 +229,16 @@ func (w *worker) runReorgJob(reorgInfo *reorgInfo, tblInfo *model.TableInfo, // wait reorganization job done or timeout select { - case err := <-rc.doneCh: + case res := <-rc.doneCh: + err := res.err + curTS := w.ddlCtx.reorgCtx.getOwnerTS() + if res.ownerTS != curTS { + d.removeReorgCtx(job.ID) + logutil.BgLogger().Warn("owner ts mismatch, return timeout error and retry", + zap.Int64("prevTS", res.ownerTS), + zap.Int64("curTS", curTS)) + return dbterror.ErrWaitReorgTimeout + } // Since job is cancelled,we don't care about its partial counts. if rc.isReorgCanceled() || terror.ErrorEqual(err, dbterror.ErrCancelledDDLJob) { d.removeReorgCtx(job.ID)