From 69403e2cb379eed205180b7e30a8d6301c46d9eb Mon Sep 17 00:00:00 2001 From: tangenta Date: Wed, 31 Jul 2024 19:07:28 +0800 Subject: [PATCH 1/2] This is an automated cherry-pick of #55049 Signed-off-by: ti-chi-bot --- pkg/ddl/ddl.go | 15 ++++++++- pkg/ddl/job_table.go | 10 ++++++ pkg/ddl/reorg.go | 72 ++++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 94 insertions(+), 3 deletions(-) diff --git a/pkg/ddl/ddl.go b/pkg/ddl/ddl.go index a34ef9aac5690..3d0034dfb6088 100644 --- a/pkg/ddl/ddl.go +++ b/pkg/ddl/ddl.go @@ -566,6 +566,19 @@ type reorgContexts struct { sync.RWMutex // reorgCtxMap maps job ID to reorg context. reorgCtxMap map[int64]*reorgCtx + beOwnerTS int64 +} + +func (r *reorgContexts) getOwnerTS() int64 { + r.RLock() + defer r.RUnlock() + return r.beOwnerTS +} + +func (r *reorgContexts) setOwnerTS(ts int64) { + r.Lock() + r.beOwnerTS = ts + r.Unlock() } func (dc *ddlCtx) getReorgCtx(jobID int64) *reorgCtx { @@ -583,7 +596,7 @@ func (dc *ddlCtx) newReorgCtx(jobID int64, rowCount int64) *reorgCtx { return existedRC } rc := &reorgCtx{} - rc.doneCh = make(chan error, 1) + rc.doneCh = make(chan reorgFnResult, 1) // initial reorgCtx rc.setRowCount(rowCount) rc.mu.warnings = make(map[errors.ErrorID]*terror.Error) diff --git a/pkg/ddl/job_table.go b/pkg/ddl/job_table.go index aa2ebc3d516d0..8df16711fbac2 100644 --- a/pkg/ddl/job_table.go +++ b/pkg/ddl/job_table.go @@ -89,12 +89,22 @@ func (d *ddl) getJob(se *sess.Session, tp jobType, filter func(*model.Job) (bool not = "" label = "get_job_reorg" } +<<<<<<< HEAD:pkg/ddl/job_table.go const getJobSQL = `select job_meta, processing from mysql.tidb_ddl_job where job_id in (select min(job_id) from mysql.tidb_ddl_job group by schema_ids, table_ids, processing) and %s reorg %s order by processing desc, job_id` var excludedJobIDs string if ids := d.runningJobs.allIDs(); len(ids) > 0 { excludedJobIDs = fmt.Sprintf("and job_id not in (%s)", ids) +======= + l.ddl.reorgCtx.setOwnerTS(time.Now().Unix()) + l.scheduler.start() +} + +func (l *ownerListener) OnRetireOwner() { + if l.scheduler == nil { + return +>>>>>>> aaca081cec3 (ddl: record get owner TS and compare it before runReorgJob quit (#55049)):pkg/ddl/job_scheduler.go } sql := fmt.Sprintf(getJobSQL, not, excludedJobIDs) rows, err := se.Execute(context.Background(), sql, label) diff --git a/pkg/ddl/reorg.go b/pkg/ddl/reorg.go index 5a82987ec7ab3..37e1e8db6ce4a 100644 --- a/pkg/ddl/reorg.go +++ b/pkg/ddl/reorg.go @@ -58,7 +58,7 @@ type reorgCtx struct { // If the reorganization job is done, we will use this channel to notify outer. // TODO: Now we use goroutine to simulate reorganization jobs, later we may // use a persistent job list. - doneCh chan error + doneCh chan reorgFnResult // rowCount is used to simulate a job's row count. rowCount int64 jobState model.JobState @@ -73,8 +73,61 @@ type reorgCtx struct { references atomicutil.Int32 } +<<<<<<< HEAD // newContext gets a context. It is only used for adding column in reorganization state. func newContext(store kv.Storage) sessionctx.Context { +======= +// reorgFnResult records the DDL owner TS before executing reorg function, in order to help +// receiver determine if the result is from reorg function of previous DDL owner in this instance. +type reorgFnResult struct { + ownerTS int64 + err error +} + +func newReorgExprCtx() exprctx.ExprContext { + evalCtx := contextstatic.NewStaticEvalContext( + contextstatic.WithSQLMode(mysql.ModeNone), + contextstatic.WithTypeFlags(types.DefaultStmtFlags), + contextstatic.WithErrLevelMap(stmtctx.DefaultStmtErrLevels), + ) + + planCacheTracker := contextutil.NewPlanCacheTracker(contextutil.IgnoreWarn) + + return contextstatic.NewStaticExprContext( + contextstatic.WithEvalCtx(evalCtx), + contextstatic.WithPlanCacheTracker(&planCacheTracker), + ) +} + +func reorgTypeFlagsWithSQLMode(mode mysql.SQLMode) types.Flags { + return types.StrictFlags. + WithTruncateAsWarning(!mode.HasStrictMode()). + WithIgnoreInvalidDateErr(mode.HasAllowInvalidDatesMode()). + WithIgnoreZeroInDate(!mode.HasStrictMode() || mode.HasAllowInvalidDatesMode()). + WithCastTimeToYearThroughConcat(true) +} + +func reorgErrLevelsWithSQLMode(mode mysql.SQLMode) errctx.LevelMap { + return errctx.LevelMap{ + errctx.ErrGroupTruncate: errctx.ResolveErrLevel(false, !mode.HasStrictMode()), + errctx.ErrGroupBadNull: errctx.ResolveErrLevel(false, !mode.HasStrictMode()), + errctx.ErrGroupDividedByZero: errctx.ResolveErrLevel( + !mode.HasErrorForDivisionByZeroMode(), + !mode.HasStrictMode(), + ), + } +} + +func reorgTimeZoneWithTzLoc(tzLoc *model.TimeZoneLocation) (*time.Location, error) { + if tzLoc == nil { + // It is set to SystemLocation to be compatible with nil LocationInfo. + return timeutil.SystemLocation(), nil + } + return tzLoc.GetLocation() +} + +func newReorgSessCtx(store kv.Storage) sessionctx.Context { +>>>>>>> aaca081cec3 (ddl: record get owner TS and compare it before runReorgJob quit (#55049)) c := mock.NewContext() c.Store = store c.GetSessionVars().SetStatusFlag(mysql.ServerStatusAutocommit, false) @@ -199,11 +252,17 @@ func (w *worker) runReorgJob(reorgInfo *reorgInfo, tblInfo *model.TableInfo, return dbterror.ErrCancelledDDLJob } + beOwnerTS := w.ddlCtx.reorgCtx.getOwnerTS() rc = w.newReorgCtx(reorgInfo.Job.ID, reorgInfo.Job.GetRowCount()) w.wg.Add(1) go func() { defer w.wg.Done() +<<<<<<< HEAD rc.doneCh <- f() +======= + err := reorgFn() + rc.doneCh <- reorgFnResult{ownerTS: beOwnerTS, err: err} +>>>>>>> aaca081cec3 (ddl: record get owner TS and compare it before runReorgJob quit (#55049)) }() } @@ -219,7 +278,16 @@ func (w *worker) runReorgJob(reorgInfo *reorgInfo, tblInfo *model.TableInfo, // wait reorganization job done or timeout select { - case err := <-rc.doneCh: + case res := <-rc.doneCh: + err := res.err + curTS := w.ddlCtx.reorgCtx.getOwnerTS() + if res.ownerTS != curTS { + d.removeReorgCtx(job.ID) + logutil.DDLLogger().Warn("owner ts mismatch, return timeout error and retry", + zap.Int64("prevTS", res.ownerTS), + zap.Int64("curTS", curTS)) + return dbterror.ErrWaitReorgTimeout + } // Since job is cancelled,we don't care about its partial counts. if rc.isReorgCanceled() || terror.ErrorEqual(err, dbterror.ErrCancelledDDLJob) { d.removeReorgCtx(job.ID) From c034e41a53524383d3d058701535bc13e2e53d25 Mon Sep 17 00:00:00 2001 From: tangenta Date: Thu, 1 Aug 2024 11:21:49 +0800 Subject: [PATCH 2/2] resolve conflict --- pkg/ddl/ddl.go | 1 + pkg/ddl/job_table.go | 10 -------- pkg/ddl/reorg.go | 56 +++----------------------------------------- 3 files changed, 4 insertions(+), 63 deletions(-) diff --git a/pkg/ddl/ddl.go b/pkg/ddl/ddl.go index 3d0034dfb6088..e2ce73c1984d2 100644 --- a/pkg/ddl/ddl.go +++ b/pkg/ddl/ddl.go @@ -844,6 +844,7 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error { if err != nil { logutil.DDLLogger().Error("error when getting the ddl history count", zap.Error(err)) } + d.reorgCtx.setOwnerTS(time.Now().Unix()) d.runningJobs.clear() }) diff --git a/pkg/ddl/job_table.go b/pkg/ddl/job_table.go index 8df16711fbac2..aa2ebc3d516d0 100644 --- a/pkg/ddl/job_table.go +++ b/pkg/ddl/job_table.go @@ -89,22 +89,12 @@ func (d *ddl) getJob(se *sess.Session, tp jobType, filter func(*model.Job) (bool not = "" label = "get_job_reorg" } -<<<<<<< HEAD:pkg/ddl/job_table.go const getJobSQL = `select job_meta, processing from mysql.tidb_ddl_job where job_id in (select min(job_id) from mysql.tidb_ddl_job group by schema_ids, table_ids, processing) and %s reorg %s order by processing desc, job_id` var excludedJobIDs string if ids := d.runningJobs.allIDs(); len(ids) > 0 { excludedJobIDs = fmt.Sprintf("and job_id not in (%s)", ids) -======= - l.ddl.reorgCtx.setOwnerTS(time.Now().Unix()) - l.scheduler.start() -} - -func (l *ownerListener) OnRetireOwner() { - if l.scheduler == nil { - return ->>>>>>> aaca081cec3 (ddl: record get owner TS and compare it before runReorgJob quit (#55049)):pkg/ddl/job_scheduler.go } sql := fmt.Sprintf(getJobSQL, not, excludedJobIDs) rows, err := se.Execute(context.Background(), sql, label) diff --git a/pkg/ddl/reorg.go b/pkg/ddl/reorg.go index 37e1e8db6ce4a..8d44e161ad5ac 100644 --- a/pkg/ddl/reorg.go +++ b/pkg/ddl/reorg.go @@ -73,10 +73,6 @@ type reorgCtx struct { references atomicutil.Int32 } -<<<<<<< HEAD -// newContext gets a context. It is only used for adding column in reorganization state. -func newContext(store kv.Storage) sessionctx.Context { -======= // reorgFnResult records the DDL owner TS before executing reorg function, in order to help // receiver determine if the result is from reorg function of previous DDL owner in this instance. type reorgFnResult struct { @@ -84,50 +80,8 @@ type reorgFnResult struct { err error } -func newReorgExprCtx() exprctx.ExprContext { - evalCtx := contextstatic.NewStaticEvalContext( - contextstatic.WithSQLMode(mysql.ModeNone), - contextstatic.WithTypeFlags(types.DefaultStmtFlags), - contextstatic.WithErrLevelMap(stmtctx.DefaultStmtErrLevels), - ) - - planCacheTracker := contextutil.NewPlanCacheTracker(contextutil.IgnoreWarn) - - return contextstatic.NewStaticExprContext( - contextstatic.WithEvalCtx(evalCtx), - contextstatic.WithPlanCacheTracker(&planCacheTracker), - ) -} - -func reorgTypeFlagsWithSQLMode(mode mysql.SQLMode) types.Flags { - return types.StrictFlags. - WithTruncateAsWarning(!mode.HasStrictMode()). - WithIgnoreInvalidDateErr(mode.HasAllowInvalidDatesMode()). - WithIgnoreZeroInDate(!mode.HasStrictMode() || mode.HasAllowInvalidDatesMode()). - WithCastTimeToYearThroughConcat(true) -} - -func reorgErrLevelsWithSQLMode(mode mysql.SQLMode) errctx.LevelMap { - return errctx.LevelMap{ - errctx.ErrGroupTruncate: errctx.ResolveErrLevel(false, !mode.HasStrictMode()), - errctx.ErrGroupBadNull: errctx.ResolveErrLevel(false, !mode.HasStrictMode()), - errctx.ErrGroupDividedByZero: errctx.ResolveErrLevel( - !mode.HasErrorForDivisionByZeroMode(), - !mode.HasStrictMode(), - ), - } -} - -func reorgTimeZoneWithTzLoc(tzLoc *model.TimeZoneLocation) (*time.Location, error) { - if tzLoc == nil { - // It is set to SystemLocation to be compatible with nil LocationInfo. - return timeutil.SystemLocation(), nil - } - return tzLoc.GetLocation() -} - -func newReorgSessCtx(store kv.Storage) sessionctx.Context { ->>>>>>> aaca081cec3 (ddl: record get owner TS and compare it before runReorgJob quit (#55049)) +// newContext gets a context. It is only used for adding column in reorganization state. +func newContext(store kv.Storage) sessionctx.Context { c := mock.NewContext() c.Store = store c.GetSessionVars().SetStatusFlag(mysql.ServerStatusAutocommit, false) @@ -257,12 +211,8 @@ func (w *worker) runReorgJob(reorgInfo *reorgInfo, tblInfo *model.TableInfo, w.wg.Add(1) go func() { defer w.wg.Done() -<<<<<<< HEAD - rc.doneCh <- f() -======= - err := reorgFn() + err := f() rc.doneCh <- reorgFnResult{ownerTS: beOwnerTS, err: err} ->>>>>>> aaca081cec3 (ddl: record get owner TS and compare it before runReorgJob quit (#55049)) }() }