Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: decouple job scheduler from 'ddl' and make it run/exit as owner changes #53548

Merged
merged 23 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 26 additions & 11 deletions pkg/autoid_service/autoid.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,17 +334,9 @@ func newWithCli(selfAddr string, cli *clientv3.Client, store kv.Storage) *Servic
leaderShip: l,
store: store,
}
l.SetBeOwnerHook(func() {
// Reset the map to avoid a case that a node lose leadership and regain it, then
// improperly use the stale map to serve the autoid requests.
// See https://github.com/pingcap/tidb/issues/52600
service.autoIDLock.Lock()
clear(service.autoIDMap)
service.autoIDLock.Unlock()

logutil.BgLogger().Info("leader change of autoid service, this node become owner",
zap.String("addr", selfAddr),
zap.String("category", "autoid service"))
l.SetListener(&ownerListener{
Service: service,
selfAddr: selfAddr,
})
// 10 means that autoid service's etcd lease is 10s.
err := l.CampaignOwner(10)
Expand Down Expand Up @@ -592,6 +584,29 @@ func (s *Service) Rebase(ctx context.Context, req *autoid.RebaseRequest) (*autoi
return &autoid.RebaseResponse{}, nil
}

type ownerListener struct {
*Service
selfAddr string
}

var _ owner.Listener = (*ownerListener)(nil)

func (l *ownerListener) OnBecomeOwner() {
// Reset the map to avoid a case that a node lose leadership and regain it, then
// improperly use the stale map to serve the autoid requests.
// See https://github.com/pingcap/tidb/issues/52600
l.autoIDLock.Lock()
clear(l.autoIDMap)
l.autoIDLock.Unlock()

logutil.BgLogger().Info("leader change of autoid service, this node become owner",
zap.String("addr", l.selfAddr),
zap.String("category", "autoid service"))
}

func (*ownerListener) OnRetireOwner() {
}

func init() {
autoid1.MockForTest = MockForTest
}
3 changes: 2 additions & 1 deletion pkg/ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,7 @@ func SetBackfillTaskChanSizeForTest(n int) {
// The above operations are completed in a transaction.
// Finally, update the concurrent processing of the total number of rows, and store the completed handle value.
func (dc *ddlCtx) writePhysicalTableRecord(
ctx context.Context,
sessPool *sess.Pool,
t table.PhysicalTable,
bfWorkerType backfillerType,
Expand All @@ -604,7 +605,7 @@ func (dc *ddlCtx) writePhysicalTableRecord(

jc := reorgInfo.NewJobContext()

eg, egCtx := util.NewErrorGroupWithRecoverWithCtx(dc.ctx)
eg, egCtx := util.NewErrorGroupWithRecoverWithCtx(ctx)

scheduler, err := newBackfillScheduler(egCtx, reorgInfo, sessPool, bfWorkerType, t, jc)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/ddl/backfilling_dist_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ func (s *backfillDistExecutor) newBackfillSubtaskExecutor(
jobMeta := &s.taskMeta.Job
ddlObj := s.d

// TODO getTableByTxn is using DDL ctx which is never cancelled except when shutdown.
// we should move this operation out of GetStepExecutor, and put into Init.
_, tblIface, err := ddlObj.getTableByTxn((*asAutoIDRequirement)(ddlObj.ddlCtx), jobMeta.SchemaID, jobMeta.TableID)
if err != nil {
return nil, err
Expand Down
15 changes: 8 additions & 7 deletions pkg/ddl/backfilling_dist_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import (
"github.com/pingcap/tidb/pkg/store/helper"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/util/backoff"
tidblogutil "github.com/pingcap/tidb/pkg/util/logutil"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/zap"
Expand Down Expand Up @@ -89,7 +88,7 @@ func (sch *BackfillingSchedulerExt) OnNextSubtasksBatch(
return nil, err
}
job := &backfillMeta.Job
tblInfo, err := getTblInfo(sch.d, job)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should framework context

tblInfo, err := getTblInfo(ctx, sch.d, job)
if err != nil {
return nil, err
}
Expand All @@ -101,7 +100,7 @@ func (sch *BackfillingSchedulerExt) OnNextSubtasksBatch(
if tblInfo.Partition != nil {
return generatePartitionPlan(tblInfo)
}
return generateNonPartitionPlan(sch.d, tblInfo, job, sch.GlobalSort, len(execIDs))
return generateNonPartitionPlan(ctx, sch.d, tblInfo, job, sch.GlobalSort, len(execIDs))
case proto.BackfillStepMergeSort:
return generateMergePlan(taskHandle, task, logger)
case proto.BackfillStepWriteAndIngest:
Expand Down Expand Up @@ -201,8 +200,8 @@ func (sch *LitBackfillScheduler) Close() {
sch.BaseScheduler.Close()
}

func getTblInfo(d *ddl, job *model.Job) (tblInfo *model.TableInfo, err error) {
err = kv.RunInNewTxn(d.ctx, d.store, true, func(_ context.Context, txn kv.Transaction) error {
func getTblInfo(ctx context.Context, d *ddl, job *model.Job) (tblInfo *model.TableInfo, err error) {
err = kv.RunInNewTxn(ctx, d.store, true, func(_ context.Context, txn kv.Transaction) error {
tblInfo, err = meta.NewMeta(txn).GetTable(job.SchemaID, job.TableID)
return err
})
Expand Down Expand Up @@ -242,11 +241,13 @@ const (
)

func generateNonPartitionPlan(
ctx context.Context,
d *ddl,
tblInfo *model.TableInfo,
job *model.Job,
useCloud bool,
instanceCnt int) (metas [][]byte, err error) {
instanceCnt int,
) (metas [][]byte, err error) {
tbl, err := getTable((*asAutoIDRequirement)(d.ddlCtx), job.SchemaID, tblInfo)
if err != nil {
return nil, err
Expand All @@ -267,7 +268,7 @@ func generateNonPartitionPlan(

subTaskMetas := make([][]byte, 0, 4)
backoffer := backoff.NewExponential(scanRegionBackoffBase, 2, scanRegionBackoffMax)
err = handle.RunWithRetry(d.ctx, 8, backoffer, tidblogutil.Logger(d.ctx), func(_ context.Context) (bool, error) {
err = handle.RunWithRetry(ctx, 8, backoffer, logutil.DDLLogger(), func(_ context.Context) (bool, error) {
regionCache := d.store.(helper.Storage).GetRegionCache()
recordRegionMetas, err := regionCache.LoadRegionsInKeyRange(tikv.NewBackofferWithVars(context.Background(), 20000, nil), startKey, endKey)
if err != nil {
Expand Down
11 changes: 6 additions & 5 deletions pkg/ddl/backfilling_dist_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ func TestBackfillingSchedulerLocalMode(t *testing.T) {
task.Step = sch.GetNextStep(&task.TaskBase)
require.Equal(t, proto.BackfillStepReadIndex, task.Step)
execIDs := []string{":4000"}
metas, err := sch.OnNextSubtasksBatch(context.Background(), nil, task, execIDs, task.Step)
ctx := util.WithInternalSourceType(context.Background(), "backfill")
metas, err := sch.OnNextSubtasksBatch(ctx, nil, task, execIDs, task.Step)
require.NoError(t, err)
require.Equal(t, len(tblInfo.Partition.Definitions), len(metas))
for i, par := range tblInfo.Partition.Definitions {
Expand All @@ -73,7 +74,7 @@ func TestBackfillingSchedulerLocalMode(t *testing.T) {
task.State = proto.TaskStateRunning
task.Step = sch.GetNextStep(&task.TaskBase)
require.Equal(t, proto.StepDone, task.Step)
metas, err = sch.OnNextSubtasksBatch(context.Background(), nil, task, execIDs, task.Step)
metas, err = sch.OnNextSubtasksBatch(ctx, nil, task, execIDs, task.Step)
require.NoError(t, err)
require.Len(t, metas, 0)

Expand All @@ -85,7 +86,7 @@ func TestBackfillingSchedulerLocalMode(t *testing.T) {
// 2.1 empty table
tk.MustExec("create table t1(id int primary key, v int)")
task = createAddIndexTask(t, dom, "test", "t1", proto.Backfill, false)
metas, err = sch.OnNextSubtasksBatch(context.Background(), nil, task, execIDs, task.Step)
metas, err = sch.OnNextSubtasksBatch(ctx, nil, task, execIDs, task.Step)
require.NoError(t, err)
require.Equal(t, 0, len(metas))
// 2.2 non empty table.
Expand All @@ -97,15 +98,15 @@ func TestBackfillingSchedulerLocalMode(t *testing.T) {
task = createAddIndexTask(t, dom, "test", "t2", proto.Backfill, false)
// 2.2.1 stepInit
task.Step = sch.GetNextStep(&task.TaskBase)
metas, err = sch.OnNextSubtasksBatch(context.Background(), nil, task, execIDs, task.Step)
metas, err = sch.OnNextSubtasksBatch(ctx, nil, task, execIDs, task.Step)
require.NoError(t, err)
require.Equal(t, 1, len(metas))
require.Equal(t, proto.BackfillStepReadIndex, task.Step)
// 2.2.2 BackfillStepReadIndex
task.State = proto.TaskStateRunning
task.Step = sch.GetNextStep(&task.TaskBase)
require.Equal(t, proto.StepDone, task.Step)
metas, err = sch.OnNextSubtasksBatch(context.Background(), nil, task, execIDs, task.Step)
metas, err = sch.OnNextSubtasksBatch(ctx, nil, task, execIDs, task.Step)
require.NoError(t, err)
require.Equal(t, 0, len(metas))
}
Expand Down
32 changes: 16 additions & 16 deletions pkg/ddl/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,8 @@ func checkSystemSchemaID(t *meta.Meta, schemaID int64, flashbackTSString string)
return nil
}

func checkAndSetFlashbackClusterInfo(se sessionctx.Context, d *ddlCtx, t *meta.Meta, job *model.Job, flashbackTS uint64) (err error) {
if err = ValidateFlashbackTS(d.ctx, se, flashbackTS); err != nil {
func checkAndSetFlashbackClusterInfo(ctx context.Context, se sessionctx.Context, d *ddlCtx, t *meta.Meta, job *model.Job, flashbackTS uint64) (err error) {
if err = ValidateFlashbackTS(ctx, se, flashbackTS); err != nil {
return err
}

Expand All @@ -231,13 +231,13 @@ func checkAndSetFlashbackClusterInfo(se sessionctx.Context, d *ddlCtx, t *meta.M
if err = closePDSchedule(); err != nil {
return err
}
if err = setTiDBEnableAutoAnalyze(d.ctx, se, variable.Off); err != nil {
if err = setTiDBEnableAutoAnalyze(ctx, se, variable.Off); err != nil {
return err
}
if err = setTiDBSuperReadOnly(d.ctx, se, variable.On); err != nil {
if err = setTiDBSuperReadOnly(ctx, se, variable.On); err != nil {
return err
}
if err = setTiDBTTLJobEnable(d.ctx, se, variable.Off); err != nil {
if err = setTiDBTTLJobEnable(ctx, se, variable.Off); err != nil {
return err
}

Expand All @@ -256,12 +256,12 @@ func checkAndSetFlashbackClusterInfo(se sessionctx.Context, d *ddlCtx, t *meta.M

// Check if there is an upgrade during [flashbackTS, now)
sql := fmt.Sprintf("select VARIABLE_VALUE from mysql.tidb as of timestamp '%s' where VARIABLE_NAME='tidb_server_version'", flashbackTSString)
rows, err := sess.NewSession(se).Execute(d.ctx, sql, "check_tidb_server_version")
rows, err := sess.NewSession(se).Execute(ctx, sql, "check_tidb_server_version")
if err != nil || len(rows) == 0 {
return errors.Errorf("Get history `tidb_server_version` failed, can't do flashback")
}
sql = fmt.Sprintf("select 1 from mysql.tidb where VARIABLE_NAME='tidb_server_version' and VARIABLE_VALUE=%s", rows[0].GetString(0))
rows, err = sess.NewSession(se).Execute(d.ctx, sql, "check_tidb_server_version")
rows, err = sess.NewSession(se).Execute(ctx, sql, "check_tidb_server_version")
if err != nil {
return errors.Trace(err)
}
Expand All @@ -271,7 +271,7 @@ func checkAndSetFlashbackClusterInfo(se sessionctx.Context, d *ddlCtx, t *meta.M

// Check is there a DDL task at flashbackTS.
sql = fmt.Sprintf("select count(*) from mysql.%s as of timestamp '%s'", JobTable, flashbackTSString)
rows, err = sess.NewSession(se).Execute(d.ctx, sql, "check_history_job")
rows, err = sess.NewSession(se).Execute(ctx, sql, "check_history_job")
if err != nil || len(rows) == 0 {
return errors.Errorf("Get history ddl jobs failed, can't do flashback")
}
Expand Down Expand Up @@ -609,12 +609,12 @@ func flashbackToVersion(
).RunOnRange(ctx, startKey, endKey)
}

func splitRegionsByKeyRanges(d *ddlCtx, keyRanges []kv.KeyRange) {
func splitRegionsByKeyRanges(ctx context.Context, d *ddlCtx, keyRanges []kv.KeyRange) {
if s, ok := d.store.(kv.SplittableStore); ok {
for _, keys := range keyRanges {
for {
// tableID is useless when scatter == false
_, err := s.SplitRegions(d.ctx, [][]byte{keys.StartKey, keys.EndKey}, false, nil)
_, err := s.SplitRegions(ctx, [][]byte{keys.StartKey, keys.EndKey}, false, nil)
if err == nil {
break
}
Expand Down Expand Up @@ -696,12 +696,12 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve
return ver, nil
// Stage 2, check flashbackTS, close GC and PD schedule, get flashback key ranges.
case model.StateDeleteOnly:
if err = checkAndSetFlashbackClusterInfo(sess, d, t, job, flashbackTS); err != nil {
if err = checkAndSetFlashbackClusterInfo(w.ctx, sess, d, t, job, flashbackTS); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
// We should get startTS here to avoid lost startTS when TiDB crashed during send prepare flashback RPC.
startTS, err = d.store.GetOracle().GetTimestamp(d.ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
startTS, err = d.store.GetOracle().GetTimestamp(w.ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
Expand All @@ -722,10 +722,10 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve
return updateSchemaVersion(d, t, job)
}
// Split region by keyRanges, make sure no unrelated key ranges be locked.
splitRegionsByKeyRanges(d, keyRanges)
splitRegionsByKeyRanges(w.ctx, d, keyRanges)
totalRegions.Store(0)
for _, r := range keyRanges {
if err = flashbackToVersion(d.ctx, d,
if err = flashbackToVersion(w.ctx, d,
func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) {
stats, err := SendPrepareFlashbackToVersionRPC(ctx, d.store.(tikv.Storage), flashbackTS, startTS, r)
totalRegions.Add(uint64(stats.CompletedRegions))
Expand All @@ -738,7 +738,7 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve
job.Args[totalLockedRegionsOffset] = totalRegions.Load()

// We should get commitTS here to avoid lost commitTS when TiDB crashed during send flashback RPC.
commitTS, err = d.store.GetOracle().GetTimestamp(d.ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
commitTS, err = d.store.GetOracle().GetTimestamp(w.ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
if err != nil {
return ver, errors.Trace(err)
}
Expand All @@ -756,7 +756,7 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve
}

for _, r := range keyRanges {
if err = flashbackToVersion(d.ctx, d,
if err = flashbackToVersion(w.ctx, d,
func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) {
// Use same startTS as prepare phase to simulate 1PC txn.
stats, err := SendFlashbackToVersionRPC(ctx, d.store.(tikv.Storage), flashbackTS, startTS, commitTS, r)
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -1094,7 +1094,7 @@ func (w *worker) updatePhysicalTableRow(t table.Table, reorgInfo *reorgInfo) err
// https://github.com/pingcap/tidb/issues/38297
return dbterror.ErrCancelledDDLJob.GenWithStack("Modify Column on partitioned table / typeUpdateColumnWorker not yet supported.")
}
err := w.writePhysicalTableRecord(w.sessPool, p, workType, reorgInfo)
err := w.writePhysicalTableRecord(w.ctx, w.sessPool, p, workType, reorgInfo)
if err != nil {
return err
}
Expand All @@ -1106,7 +1106,7 @@ func (w *worker) updatePhysicalTableRow(t table.Table, reorgInfo *reorgInfo) err
return nil
}
if tbl, ok := t.(table.PhysicalTable); ok {
return w.writePhysicalTableRecord(w.sessPool, tbl, typeUpdateColumnWorker, reorgInfo)
return w.writePhysicalTableRecord(w.ctx, w.sessPool, tbl, typeUpdateColumnWorker, reorgInfo)
}
return dbterror.ErrCancelledDDLJob.GenWithStack("internal error for phys tbl id: %d tbl id: %d", reorgInfo.PhysicalTableID, t.Meta().ID)
}
Expand Down
Loading