Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#55049
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
tangenta authored and ti-chi-bot committed Oct 28, 2024
1 parent cc04dd7 commit 94181d3
Show file tree
Hide file tree
Showing 3 changed files with 987 additions and 3 deletions.
15 changes: 14 additions & 1 deletion ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,19 @@ type reorgContexts struct {
sync.RWMutex
// reorgCtxMap maps job ID to reorg context.
reorgCtxMap map[int64]*reorgCtx
beOwnerTS int64
}

func (r *reorgContexts) getOwnerTS() int64 {
r.RLock()
defer r.RUnlock()
return r.beOwnerTS
}

func (r *reorgContexts) setOwnerTS(ts int64) {
r.Lock()
r.beOwnerTS = ts
r.Unlock()
}

func (dc *ddlCtx) getReorgCtx(jobID int64) *reorgCtx {
Expand All @@ -543,7 +556,7 @@ func (dc *ddlCtx) newReorgCtx(jobID int64, rowCount int64) *reorgCtx {
return existedRC
}
rc := &reorgCtx{}
rc.doneCh = make(chan error, 1)
rc.doneCh = make(chan reorgFnResult, 1)
// initial reorgCtx
rc.setRowCount(rowCount)
rc.mu.warnings = make(map[errors.ErrorID]*terror.Error)
Expand Down
72 changes: 70 additions & 2 deletions ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type reorgCtx struct {
// If the reorganization job is done, we will use this channel to notify outer.
// TODO: Now we use goroutine to simulate reorganization jobs, later we may
// use a persistent job list.
doneCh chan error
doneCh chan reorgFnResult
// rowCount is used to simulate a job's row count.
rowCount int64
jobState model.JobState
Expand All @@ -78,8 +78,61 @@ type reorgCtx struct {
references atomicutil.Int32
}

<<<<<<< HEAD:ddl/reorg.go
// newContext gets a context. It is only used for adding column in reorganization state.
func newContext(store kv.Storage) sessionctx.Context {
=======
// reorgFnResult records the DDL owner TS before executing reorg function, in order to help
// receiver determine if the result is from reorg function of previous DDL owner in this instance.
type reorgFnResult struct {
ownerTS int64
err error
}

func newReorgExprCtx() exprctx.ExprContext {
evalCtx := contextstatic.NewStaticEvalContext(
contextstatic.WithSQLMode(mysql.ModeNone),
contextstatic.WithTypeFlags(types.DefaultStmtFlags),
contextstatic.WithErrLevelMap(stmtctx.DefaultStmtErrLevels),
)

planCacheTracker := contextutil.NewPlanCacheTracker(contextutil.IgnoreWarn)

return contextstatic.NewStaticExprContext(
contextstatic.WithEvalCtx(evalCtx),
contextstatic.WithPlanCacheTracker(&planCacheTracker),
)
}

func reorgTypeFlagsWithSQLMode(mode mysql.SQLMode) types.Flags {
return types.StrictFlags.
WithTruncateAsWarning(!mode.HasStrictMode()).
WithIgnoreInvalidDateErr(mode.HasAllowInvalidDatesMode()).
WithIgnoreZeroInDate(!mode.HasStrictMode() || mode.HasAllowInvalidDatesMode()).
WithCastTimeToYearThroughConcat(true)
}

func reorgErrLevelsWithSQLMode(mode mysql.SQLMode) errctx.LevelMap {
return errctx.LevelMap{
errctx.ErrGroupTruncate: errctx.ResolveErrLevel(false, !mode.HasStrictMode()),
errctx.ErrGroupBadNull: errctx.ResolveErrLevel(false, !mode.HasStrictMode()),
errctx.ErrGroupDividedByZero: errctx.ResolveErrLevel(
!mode.HasErrorForDivisionByZeroMode(),
!mode.HasStrictMode(),
),
}
}

func reorgTimeZoneWithTzLoc(tzLoc *model.TimeZoneLocation) (*time.Location, error) {
if tzLoc == nil {
// It is set to SystemLocation to be compatible with nil LocationInfo.
return timeutil.SystemLocation(), nil
}
return tzLoc.GetLocation()
}

func newReorgSessCtx(store kv.Storage) sessionctx.Context {
>>>>>>> aaca081cec3 (ddl: record get owner TS and compare it before runReorgJob quit (#55049)):pkg/ddl/reorg.go
c := mock.NewContext()
c.Store = store
c.GetSessionVars().SetStatusFlag(mysql.ServerStatusAutocommit, false)
Expand Down Expand Up @@ -246,11 +299,17 @@ func (w *worker) runReorgJob(rh *reorgHandler, reorgInfo *reorgInfo, tblInfo *mo
return dbterror.ErrCancelledDDLJob
}

beOwnerTS := w.ddlCtx.reorgCtx.getOwnerTS()
rc = w.newReorgCtx(reorgInfo.Job.ID, reorgInfo.Job.GetRowCount())
w.wg.Add(1)
go func() {
defer w.wg.Done()
<<<<<<< HEAD:ddl/reorg.go
rc.doneCh <- f()
=======
err := reorgFn()
rc.doneCh <- reorgFnResult{ownerTS: beOwnerTS, err: err}
>>>>>>> aaca081cec3 (ddl: record get owner TS and compare it before runReorgJob quit (#55049)):pkg/ddl/reorg.go
}()
}

Expand All @@ -266,7 +325,16 @@ func (w *worker) runReorgJob(rh *reorgHandler, reorgInfo *reorgInfo, tblInfo *mo

// wait reorganization job done or timeout
select {
case err := <-rc.doneCh:
case res := <-rc.doneCh:
err := res.err
curTS := w.ddlCtx.reorgCtx.getOwnerTS()
if res.ownerTS != curTS {
d.removeReorgCtx(job.ID)
logutil.DDLLogger().Warn("owner ts mismatch, return timeout error and retry",
zap.Int64("prevTS", res.ownerTS),
zap.Int64("curTS", curTS))
return dbterror.ErrWaitReorgTimeout
}
// Since job is cancelled,we don't care about its partial counts.
if rc.isReorgCanceled() || terror.ErrorEqual(err, dbterror.ErrCancelledDDLJob) {
d.removeReorgCtx(job.ID)
Expand Down
Loading

0 comments on commit 94181d3

Please sign in to comment.