diff --git a/ddl/ddl.go b/ddl/ddl.go index 6ba3b0cd736aa..3a232ec0100bc 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -366,6 +366,7 @@ type ddlCtx struct { sync.RWMutex // reorgCtxMap maps job ID to reorg context. reorgCtxMap map[int64]*reorgCtx + beOwnerTS int64 } jobCtx struct { @@ -488,6 +489,18 @@ func (dc *ddlCtx) jobContext(job *model.Job) *JobContext { return NewJobContext() } +func (dc *ddlCtx) getOwnerTS() int64 { + dc.reorgCtx.RLock() + defer dc.reorgCtx.RUnlock() + return dc.reorgCtx.beOwnerTS +} + +func (dc *ddlCtx) setOwnerTS(ts int64) { + dc.reorgCtx.Lock() + dc.reorgCtx.beOwnerTS = ts + dc.reorgCtx.Unlock() +} + func (dc *ddlCtx) getReorgCtx(job *model.Job) *reorgCtx { dc.reorgCtx.RLock() defer dc.reorgCtx.RUnlock() @@ -496,7 +509,7 @@ func (dc *ddlCtx) getReorgCtx(job *model.Job) *reorgCtx { func (dc *ddlCtx) newReorgCtx(r *reorgInfo) *reorgCtx { rc := &reorgCtx{} - rc.doneCh = make(chan error, 1) + rc.doneCh = make(chan reorgFnResult, 1) // initial reorgCtx rc.setRowCount(r.Job.GetRowCount()) rc.setNextKey(r.StartKey) @@ -727,6 +740,7 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error { if err != nil { logutil.BgLogger().Error("error when getting the ddl history count", zap.Error(err)) } + d.ddlCtx.setOwnerTS(time.Now().Unix()) }) d.delRangeMgr = d.newDeleteRangeManager(ctxPool == nil) diff --git a/ddl/reorg.go b/ddl/reorg.go index 30a2e4dd755fb..9251e4f8655c0 100644 --- a/ddl/reorg.go +++ b/ddl/reorg.go @@ -57,7 +57,7 @@ type reorgCtx struct { // If the reorganization job is done, we will use this channel to notify outer. // TODO: Now we use goroutine to simulate reorganization jobs, later we may // use a persistent job list. - doneCh chan error + doneCh chan reorgFnResult // rowCount is used to simulate a job's row count. rowCount int64 // notifyCancelReorgJob is used to notify the backfilling goroutine if the DDL job is cancelled. @@ -79,6 +79,13 @@ type reorgCtx struct { } } +// reorgFnResult records the DDL owner TS before executing reorg function, in order to help +// receiver determine if the result is from reorg function of previous DDL owner in this instance. +type reorgFnResult struct { + ownerTS int64 + err error +} + // nullableKey can store kv.Key. // Storing a nil object to atomic.Value can lead to panic. This is a workaround. type nullableKey struct { @@ -205,11 +212,13 @@ func (w *worker) runReorgJob(rh *reorgHandler, reorgInfo *reorgInfo, tblInfo *mo if job.IsCancelling() { return dbterror.ErrCancelledDDLJob } + beOwnerTS := w.ddlCtx.getOwnerTS() rc = w.newReorgCtx(reorgInfo) w.wg.Add(1) go func() { defer w.wg.Done() - rc.doneCh <- f() + err := f() + rc.doneCh <- reorgFnResult{ownerTS: beOwnerTS, err: err} }() } @@ -225,7 +234,16 @@ func (w *worker) runReorgJob(rh *reorgHandler, reorgInfo *reorgInfo, tblInfo *mo // wait reorganization job done or timeout select { - case err := <-rc.doneCh: + case res := <-rc.doneCh: + err := res.err + curTS := w.ddlCtx.getOwnerTS() + if res.ownerTS != curTS { + d.removeReorgCtx(job) + logutil.BgLogger().Warn("owner ts mismatch, return timeout error and retry", + zap.Int64("prevTS", res.ownerTS), + zap.Int64("curTS", curTS)) + return dbterror.ErrWaitReorgTimeout + } // Since job is cancelled,we don't care about its partial counts. if rc.isReorgCanceled() || terror.ErrorEqual(err, dbterror.ErrCancelledDDLJob) { d.removeReorgCtx(job)