Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: initial support for parallel DDL #6955

Merged
merged 20 commits into from
Jul 25, 2018
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions ddl/column_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ func (s *testColumnSuite) TearDownSuite(c *C) {
testleak.AfterTest(c)()
}

func testCreateColumn(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo,
colName string, pos *ast.ColumnPosition, defaultValue interface{}) *model.Job {
func buildCreateColumnJob(dbInfo *model.DBInfo, tblInfo *model.TableInfo, colName string,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is only used once?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, It's also used in ddl_worker_test.go

pos *ast.ColumnPosition, defaultValue interface{}) *model.Job {
col := &model.ColumnInfo{
Name: model.NewCIStr(colName),
Offset: len(tblInfo.Columns),
Expand All @@ -79,22 +79,31 @@ func testCreateColumn(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{col, pos, 0},
}
return job
}

func testCreateColumn(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo,
colName string, pos *ast.ColumnPosition, defaultValue interface{}) *model.Job {
job := buildCreateColumnJob(dbInfo, tblInfo, colName, pos, defaultValue)
err := d.doDDLJob(ctx, job)
c.Assert(err, IsNil)
v := getSchemaVer(c, ctx)
checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo})
return job
}

func testDropColumn(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, colName string, isError bool) *model.Job {
job := &model.Job{
func buildDropColumnJob(dbInfo *model.DBInfo, tblInfo *model.TableInfo, colName string) *model.Job {
return &model.Job{
SchemaID: dbInfo.ID,
TableID: tblInfo.ID,
Type: model.ActionDropColumn,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{model.NewCIStr(colName)},
}
}

func testDropColumn(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, colName string, isError bool) *model.Job {
job := buildDropColumnJob(dbInfo, tblInfo, colName)
err := d.doDDLJob(ctx, job)
if isError {
c.Assert(err, NotNil)
Expand Down
8 changes: 7 additions & 1 deletion ddl/db_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,14 +581,20 @@ func (s *testStateChangeSuite) testControlParallelExecSQL(c *C, sql1, sql2 strin
return
}
var qLen int64
var err1 error
for {
kv.RunInNewTxn(s.store, false, func(txn kv.Transaction) error {
m := meta.NewMeta(txn)
// Get the number of jobs from the adding index queue.
addIdxLen, err1 := m.DDLJobQueueLen(meta.AddIndexJobListKey)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use GetDDLJobs and get the length of the return value?

if err1 != nil {
return err1
}

qLen, err1 = m.DDLJobQueueLen()
if err1 != nil {
return err1
}
qLen += addIdxLen
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can line 603 increase sleep time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but I think I don't do it in this PR.

return nil
})
if qLen == 2 {
Expand Down
42 changes: 26 additions & 16 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ type ddl struct {
quitCh chan struct{}

*ddlCtx
workers []*worker
workers map[workerType]*worker
}

// ddlCtx is the context when we use worker to handle DDL jobs.
Expand All @@ -236,7 +236,6 @@ type ddlCtx struct {
store kv.Storage
ownerManager owner.Manager
schemaSyncer SchemaSyncer
ddlJobCh chan struct{}
ddlJobDoneCh chan struct{}
ddlEventCh chan<- *util.Event
lease time.Duration // lease is schema lease.
Expand Down Expand Up @@ -317,7 +316,6 @@ func newDDL(ctx context.Context, etcdCli *clientv3.Client, store kv.Storage,
uuid: id,
store: store,
lease: lease,
ddlJobCh: make(chan struct{}, 1),
ddlJobDoneCh: make(chan struct{}, 1),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we move the ddlJobDoneCh to the different worker?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

ownerManager: manager,
schemaSyncer: syncer,
Expand Down Expand Up @@ -359,20 +357,20 @@ func (d *ddl) start(ctx context.Context, ctxPool *pools.ResourcePool) {
err := d.ownerManager.CampaignOwner(ctx)
terror.Log(errors.Trace(err))

d.workers = make([]*worker, 1)
// TODO: Add addIdxWorker.
d.workers[0] = newWorker(generalWorker, 0, d.store, ctxPool)
d.workers = make(map[workerType]*worker, 2)
d.workers[generalWorker] = newWorker(generalWorker, 0, d.store, ctxPool)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The worker.id here doesn't seem to be particularly useful, does it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. But it will be used in the future.

d.workers[addIdxWorker] = newWorker(addIdxWorker, 1, d.store, ctxPool)
for _, worker := range d.workers {
worker.wg.Add(1)
go worker.start(d.ddlCtx)
// TODO: Add the type of DDL worker.
metrics.DDLCounter.WithLabelValues(metrics.CreateDDLWorker).Inc()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add worker type in the metrics?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, there are a TODO: in line 357


// When the start function is called, we will send a fake job to let worker
// checks owner firstly and try to find whether a job exists and run.
asyncNotify(worker.ddlJobCh)
}
}

// For every start, we will send a fake job to let worker
// check owner firstly and try to find whether a job exists and run.
asyncNotify(d.ddlJobCh)
}

func (d *ddl) close() {
Expand Down Expand Up @@ -418,16 +416,15 @@ func (d *ddl) genGlobalID() (int64, error) {
globalID, err = meta.NewMeta(txn).GenGlobalID()
return errors.Trace(err)
})

return globalID, errors.Trace(err)
}

// generalWorker returns the first worker. The ddl structure has only one worker before we implement the parallel worker.
// generalWorker returns the general worker.
// It's used for testing.
// TODO: Remove this function.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it's only used in tests, and we can use d.workers[generalWorker] directly.

func (d *ddl) generalWorker() *worker {
if len(d.workers) == 0 {
return nil
}
return d.workers[0]
return d.workers[generalWorker]
}

// SchemaSyncer implements DDL.SchemaSyncer interface.
Expand All @@ -449,6 +446,19 @@ func checkJobMaxInterval(job *model.Job) time.Duration {
return 1 * time.Second
}

func (d *ddl) asyncNotifyWorker(jobTp model.ActionType) {
// If the workers don't run, we needn't to notify workers.
if !RunWorker {
return
}

if jobTp == model.ActionAddIndex {
asyncNotify(d.workers[addIdxWorker].ddlJobCh)
} else {
asyncNotify(d.workers[generalWorker].ddlJobCh)
}
}

func (d *ddl) doDDLJob(ctx sessionctx.Context, job *model.Job) error {
// For every DDL, we must commit current transaction.
if err := ctx.NewTxn(); err != nil {
Expand All @@ -463,7 +473,7 @@ func (d *ddl) doDDLJob(ctx sessionctx.Context, job *model.Job) error {
ctx.GetSessionVars().StmtCtx.IsDDLJobInQueue = true

// Notice worker that we push a new job and wait the job done.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/Notice/Notify/g

asyncNotify(d.ddlJobCh)
d.asyncNotifyWorker(job.Type)
log.Infof("[ddl] start DDL job %s, Query:%s", job, job.Query)

var historyJob *model.Job
Expand Down
24 changes: 20 additions & 4 deletions ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ func checkHistoryJobArgs(c *C, ctx sessionctx.Context, id int64, args *historyJo
}
}

func testCreateIndex(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, unique bool, indexName string, colName string) *model.Job {
job := &model.Job{
func buildCreateIdxJob(dbInfo *model.DBInfo, tblInfo *model.TableInfo, unique bool, indexName string, colName string) *model.Job {
return &model.Job{
SchemaID: dbInfo.ID,
TableID: tblInfo.ID,
Type: model.ActionAddIndex,
Expand All @@ -145,26 +145,42 @@ func testCreateIndex(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo,
Column: &ast.ColumnName{Name: model.NewCIStr(colName)},
Length: types.UnspecifiedLength}}},
}
}

func testCreateIndex(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, unique bool, indexName string, colName string) *model.Job {
job := buildCreateIdxJob(dbInfo, tblInfo, unique, indexName, colName)
err := d.doDDLJob(ctx, job)
c.Assert(err, IsNil)
v := getSchemaVer(c, ctx)
checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo})
return job
}

func testDropIndex(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, indexName string) *model.Job {
job := &model.Job{
func buildDropIdxJob(dbInfo *model.DBInfo, tblInfo *model.TableInfo, indexName string) *model.Job {
return &model.Job{
SchemaID: dbInfo.ID,
TableID: tblInfo.ID,
Type: model.ActionDropIndex,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{model.NewCIStr(indexName)},
}
}

func testDropIndex(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, indexName string) *model.Job {
job := buildDropIdxJob(dbInfo, tblInfo, indexName)
err := d.doDDLJob(ctx, job)
c.Assert(err, IsNil)
v := getSchemaVer(c, ctx)
checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo})
return job
}

func buildRebaseAutoIDJobJob(dbInfo *model.DBInfo, tblInfo *model.TableInfo, newBaseID int64) *model.Job {
return &model.Job{
SchemaID: dbInfo.ID,
TableID: tblInfo.ID,
Type: model.ActionRebaseAutoID,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{newBaseID},
}
}
Loading