Skip to content

Commit

Permalink
ddl: record get owner TS and compare it before runReorgJob quit (ping…
Browse files Browse the repository at this point in the history
  • Loading branch information
tangenta authored and hawkingrei committed Aug 1, 2024
1 parent 75fc104 commit b100812
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 4 deletions.
15 changes: 14 additions & 1 deletion pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,19 @@ type reorgContexts struct {
sync.RWMutex
// reorgCtxMap maps job ID to reorg context.
reorgCtxMap map[int64]*reorgCtx
beOwnerTS int64
}

func (r *reorgContexts) getOwnerTS() int64 {
r.RLock()
defer r.RUnlock()
return r.beOwnerTS
}

func (r *reorgContexts) setOwnerTS(ts int64) {
r.Lock()
r.beOwnerTS = ts
r.Unlock()
}

func (dc *ddlCtx) getReorgCtx(jobID int64) *reorgCtx {
Expand All @@ -536,7 +549,7 @@ func (dc *ddlCtx) newReorgCtx(jobID int64, rowCount int64) *reorgCtx {
return existedRC
}
rc := &reorgCtx{}
rc.doneCh = make(chan error, 1)
rc.doneCh = make(chan reorgFnResult, 1)
// initial reorgCtx
rc.setRowCount(rowCount)
rc.mu.warnings = make(map[errors.ErrorID]*terror.Error)
Expand Down
1 change: 1 addition & 0 deletions pkg/ddl/job_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func (l *ownerListener) OnBecomeOwner() {
sessPool: l.ddl.sessPool,
delRangeMgr: l.ddl.delRangeMgr,
}
l.ddl.reorgCtx.setOwnerTS(time.Now().Unix())
l.scheduler.start()
}

Expand Down
24 changes: 21 additions & 3 deletions pkg/ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type reorgCtx struct {
// If the reorganization job is done, we will use this channel to notify outer.
// TODO: Now we use goroutine to simulate reorganization jobs, later we may
// use a persistent job list.
doneCh chan error
doneCh chan reorgFnResult
// rowCount is used to simulate a job's row count.
rowCount int64
jobState model.JobState
Expand All @@ -80,6 +80,13 @@ type reorgCtx struct {
references atomicutil.Int32
}

// reorgFnResult records the DDL owner TS before executing reorg function, in order to help
// receiver determine if the result is from reorg function of previous DDL owner in this instance.
type reorgFnResult struct {
ownerTS int64
err error
}

func newReorgExprCtx() exprctx.ExprContext {
evalCtx := contextstatic.NewStaticEvalContext(
contextstatic.WithSQLMode(mysql.ModeNone),
Expand Down Expand Up @@ -251,11 +258,13 @@ func (w *worker) runReorgJob(
return dbterror.ErrCancelledDDLJob
}

beOwnerTS := w.ddlCtx.reorgCtx.getOwnerTS()
rc = w.newReorgCtx(reorgInfo.Job.ID, reorgInfo.Job.GetRowCount())
w.wg.Add(1)
go func() {
defer w.wg.Done()
rc.doneCh <- reorgFn()
err := reorgFn()
rc.doneCh <- reorgFnResult{ownerTS: beOwnerTS, err: err}
}()
}

Expand All @@ -271,7 +280,16 @@ func (w *worker) runReorgJob(

// wait reorganization job done or timeout
select {
case err := <-rc.doneCh:
case res := <-rc.doneCh:
err := res.err
curTS := w.ddlCtx.reorgCtx.getOwnerTS()
if res.ownerTS != curTS {
d.removeReorgCtx(job.ID)
logutil.DDLLogger().Warn("owner ts mismatch, return timeout error and retry",
zap.Int64("prevTS", res.ownerTS),
zap.Int64("curTS", curTS))
return dbterror.ErrWaitReorgTimeout
}
// Since job is cancelled,we don't care about its partial counts.
if rc.isReorgCanceled() || terror.ErrorEqual(err, dbterror.ErrCancelledDDLJob) {
d.removeReorgCtx(job.ID)
Expand Down

0 comments on commit b100812

Please sign in to comment.