-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ddl: initial support for parallel DDL #6955
Changes from all commits
ee4ad7c
7d35817
065358e
7f8112b
eb969f5
3c09d09
3e4503a
f8bf768
3a0d0b3
5dc5f1c
bba7dee
b2116ec
847ba5f
49251bc
e7ba3e8
f763802
734522f
118bfb9
0453a0a
d34e942
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -227,7 +227,7 @@ type ddl struct { | |
quitCh chan struct{} | ||
|
||
*ddlCtx | ||
workers []*worker | ||
workers map[workerType]*worker | ||
} | ||
|
||
// ddlCtx is the context when we use worker to handle DDL jobs. | ||
|
@@ -236,7 +236,6 @@ type ddlCtx struct { | |
store kv.Storage | ||
ownerManager owner.Manager | ||
schemaSyncer SchemaSyncer | ||
ddlJobCh chan struct{} | ||
ddlJobDoneCh chan struct{} | ||
ddlEventCh chan<- *util.Event | ||
lease time.Duration // lease is schema lease. | ||
|
@@ -317,7 +316,6 @@ func newDDL(ctx context.Context, etcdCli *clientv3.Client, store kv.Storage, | |
uuid: id, | ||
store: store, | ||
lease: lease, | ||
ddlJobCh: make(chan struct{}, 1), | ||
ddlJobDoneCh: make(chan struct{}, 1), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we move the ddlJobDoneCh to the different worker? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why? |
||
ownerManager: manager, | ||
schemaSyncer: syncer, | ||
|
@@ -359,20 +357,20 @@ func (d *ddl) start(ctx context.Context, ctxPool *pools.ResourcePool) { | |
err := d.ownerManager.CampaignOwner(ctx) | ||
terror.Log(errors.Trace(err)) | ||
|
||
d.workers = make([]*worker, 1) | ||
// TODO: Add addIdxWorker. | ||
d.workers[0] = newWorker(generalWorker, 0, d.store, ctxPool) | ||
d.workers = make(map[workerType]*worker, 2) | ||
d.workers[generalWorker] = newWorker(generalWorker, 0, d.store, ctxPool) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. But it will be used in the future. |
||
d.workers[addIdxWorker] = newWorker(addIdxWorker, 1, d.store, ctxPool) | ||
for _, worker := range d.workers { | ||
worker.wg.Add(1) | ||
go worker.start(d.ddlCtx) | ||
// TODO: Add the type of DDL worker. | ||
metrics.DDLCounter.WithLabelValues(metrics.CreateDDLWorker).Inc() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we add worker type in the metrics? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, there are a |
||
|
||
// When the start function is called, we will send a fake job to let worker | ||
// checks owner firstly and try to find whether a job exists and run. | ||
asyncNotify(worker.ddlJobCh) | ||
} | ||
} | ||
|
||
// For every start, we will send a fake job to let worker | ||
// check owner firstly and try to find whether a job exists and run. | ||
asyncNotify(d.ddlJobCh) | ||
} | ||
|
||
func (d *ddl) close() { | ||
|
@@ -418,16 +416,15 @@ func (d *ddl) genGlobalID() (int64, error) { | |
globalID, err = meta.NewMeta(txn).GenGlobalID() | ||
return errors.Trace(err) | ||
}) | ||
|
||
return globalID, errors.Trace(err) | ||
} | ||
|
||
// generalWorker returns the first worker. The ddl structure has only one worker before we implement the parallel worker. | ||
// generalWorker returns the general worker. | ||
// It's used for testing. | ||
// TODO: Remove this function. | ||
func (d *ddl) generalWorker() *worker { | ||
if len(d.workers) == 0 { | ||
return nil | ||
} | ||
return d.workers[0] | ||
return d.workers[generalWorker] | ||
} | ||
|
||
// SchemaSyncer implements DDL.SchemaSyncer interface. | ||
|
@@ -449,6 +446,19 @@ func checkJobMaxInterval(job *model.Job) time.Duration { | |
return 1 * time.Second | ||
} | ||
|
||
func (d *ddl) asyncNotifyWorker(jobTp model.ActionType) { | ||
// If the workers don't run, we needn't to notify workers. | ||
if !RunWorker { | ||
return | ||
} | ||
|
||
if jobTp == model.ActionAddIndex { | ||
asyncNotify(d.workers[addIdxWorker].ddlJobCh) | ||
} else { | ||
asyncNotify(d.workers[generalWorker].ddlJobCh) | ||
} | ||
} | ||
|
||
func (d *ddl) doDDLJob(ctx sessionctx.Context, job *model.Job) error { | ||
// For every DDL, we must commit current transaction. | ||
if err := ctx.NewTxn(); err != nil { | ||
|
@@ -463,7 +473,7 @@ func (d *ddl) doDDLJob(ctx sessionctx.Context, job *model.Job) error { | |
ctx.GetSessionVars().StmtCtx.IsDDLJobInQueue = true | ||
|
||
// Notice worker that we push a new job and wait the job done. | ||
asyncNotify(d.ddlJobCh) | ||
d.asyncNotifyWorker(job.Type) | ||
log.Infof("[ddl] start DDL job %s, Query:%s", job, job.Query) | ||
|
||
var historyJob *model.Job | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is only used once?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, It's also used in
ddl_worker_test.go