Skip to content

Commit

Permalink
*: Use the new way to handle the DDL operations (#3367)
Browse files Browse the repository at this point in the history
  • Loading branch information
zimulala authored and coocood committed Jun 1, 2017
1 parent 4a18896 commit 6935e17
Show file tree
Hide file tree
Showing 17 changed files with 76 additions and 215 deletions.
17 changes: 4 additions & 13 deletions ddl/bg_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ func (d *ddl) handleBgJobQueue() error {

job := &model.Job{}
err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
owner, err := d.checkOwner(t, bgJobFlag)
if err != nil {
return errors.Trace(filterError(err, errNotOwner))
if !d.isOwner(bgJobFlag) {
return nil
}

var err error
t := meta.NewMeta(txn)
// Get the first background job and run it.
job, err = d.getFirstBgJob(t)
if err != nil {
Expand All @@ -52,15 +52,6 @@ func (d *ddl) handleBgJobQueue() error {
} else {
err = d.updateBgJob(t, job)
}
if err != nil {
return errors.Trace(err)
}

if ChangeOwnerInNewWay {
return nil
}
owner.LastUpdateTS = time.Now().UnixNano()
err = t.SetBgJobOwner(owner)
return errors.Trace(err)
})
if err != nil {
Expand Down
7 changes: 6 additions & 1 deletion ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,10 @@ func (d *ddl) close() {
}

close(d.quitCh)
err := d.schemaSyncer.RemoveSelfVersionPath()
if err != nil {
log.Errorf("[ddl] remove self version pathe failed %v", err)
}
d.ownerManager.Cancel()

d.wait.Wait()
Expand Down Expand Up @@ -407,7 +411,8 @@ func (d *ddl) doDDLJob(ctx context.Context, job *model.Job) error {
jobID := job.ID
// For a job from start to end, the state of it will be none -> delete only -> write only -> reorganization -> public
// For every state changes, we will wait as lease 2 * lease time, so here the ticker check is 10 * lease.
ticker := time.NewTicker(chooseLeaseTime(10*d.lease, 10*time.Second))
// But we use etcd to speed up, normally it takes less than 1s now, so we use 3s as the max value.
ticker := time.NewTicker(chooseLeaseTime(10*d.lease, 3*time.Second))
startTime := time.Now()
jobsGauge.WithLabelValues(JobType(ddlJobFlag).String(), job.Type.String()).Inc()
defer func() {
Expand Down
135 changes: 20 additions & 115 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ func (d *ddl) onDDLWorker() {
}

// We use 4 * lease time to check owner's timeout, so here, we will update owner's status
// every 2 * lease time. If lease is 0, we will use default 10s.
checkTime := chooseLeaseTime(2*d.lease, 10*time.Second)
// every 2 * lease time. If lease is 0, we will use default 1s.
// But we use etcd to speed up, normally it takes less than 1s now, so we use 1s as the max value.
checkTime := chooseLeaseTime(2*d.lease, 1*time.Second)

ticker := time.NewTicker(checkTime)
defer ticker.Stop()
Expand Down Expand Up @@ -68,93 +69,15 @@ func asyncNotify(ch chan struct{}) {
}
}

const maxOwnerTimeout = int64(20 * time.Minute)

// We define minBgOwnerTimeout and minDDLOwnerTimeout as variable,
// because we need to change them in test.
var (
minBgOwnerTimeout = int64(20 * time.Second)
minDDLOwnerTimeout = int64(4 * time.Second)
)

func (d *ddl) getCheckOwnerTimeout(flag JobType) int64 {
// we must wait 2 * lease time to guarantee other servers update the schema,
// the owner will update its owner status every 2 * lease time, so here we use
// 4 * lease to check its timeout.
timeout := int64(4 * d.lease)
if timeout > maxOwnerTimeout {
return maxOwnerTimeout
}

// The value of lease may be less than 1 second, so the operation of
// checking owner is frequent and it isn't necessary.
// So if timeout is less than 4 second, we will use default minDDLOwnerTimeout.
if flag == ddlJobFlag && timeout < minDDLOwnerTimeout {
return minDDLOwnerTimeout
}
if flag == bgJobFlag && timeout < minBgOwnerTimeout {
// Background job is serial processing, so we can extend the owner timeout to make sure
// a batch of rows will be processed before timeout.
// If timeout is less than maxBgOwnerTimeout, we will use default minBgOwnerTimeout.
return minBgOwnerTimeout
}
return timeout
}

func (d *ddl) isOwner(flag JobType) error {
func (d *ddl) isOwner(flag JobType) bool {
if flag == ddlJobFlag {
if d.ownerManager.IsOwner() {
return nil
}
log.Infof("[ddl] not %s job owner, self id %s", flag, d.uuid)
return errNotOwner
}
if d.ownerManager.IsBgOwner() {
return nil
}
log.Infof("[ddl] not %s job owner, self id %s", flag, d.uuid)
return errNotOwner
}

func (d *ddl) checkOwner(t *meta.Meta, flag JobType) (*model.Owner, error) {
if ChangeOwnerInNewWay {
return nil, errors.Trace(d.isOwner(flag))
}
owner, err := d.getJobOwner(t, flag)
if err != nil {
return nil, errors.Trace(err)
}
if owner == nil {
owner = &model.Owner{}
// try to set onwer
owner.OwnerID = d.uuid
isOwner := d.ownerManager.IsOwner()
log.Infof("[ddl] it's the %s job owner %v, self id %s", flag, isOwner, d.uuid)
return isOwner
}

now := time.Now().UnixNano()
maxTimeout := d.getCheckOwnerTimeout(flag)
sub := now - owner.LastUpdateTS
if owner.OwnerID == d.uuid || sub > maxTimeout {
owner.OwnerID = d.uuid
owner.LastUpdateTS = now
// update status.
switch flag {
case ddlJobFlag:
err = t.SetDDLJobOwner(owner)
case bgJobFlag:
err = t.SetBgJobOwner(owner)
}
if err != nil {
return nil, errors.Trace(err)
}
log.Debugf("[ddl] become %s job owner, owner is %s sub %vs", flag, owner, sub/1e9)
}

if owner.OwnerID != d.uuid {
log.Debugf("[ddl] not %s job owner, self id %s owner is %s", flag, d.uuid, owner.OwnerID)
return nil, errors.Trace(errNotOwner)
}

return owner, nil
isOwner := d.ownerManager.IsBgOwner()
log.Infof("[ddl] it's the %s job owner %v, self id %s", flag, isOwner, d.uuid)
return isOwner
}

func (d *ddl) getJobOwner(t *meta.Meta, flag JobType) (*model.Owner, error) {
Expand Down Expand Up @@ -267,15 +190,13 @@ func (d *ddl) handleDDLJobQueue() error {
var job *model.Job
var schemaVer int64
err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
owner, err := d.checkOwner(t, ddlJobFlag)
if terror.ErrorEqual(err, errNotOwner) {
// We are not owner, return and retry checking later.
// We are not owner, return and retry checking later.
if !d.isOwner(ddlJobFlag) {
return nil
} else if err != nil {
return errors.Trace(err)
}

var err error
t := meta.NewMeta(txn)
// We become the owner. Get the first job and run it.
job, err = d.getFirstDDLJob(t)
if job == nil || err != nil {
Expand All @@ -292,6 +213,7 @@ func (d *ddl) handleDDLJobQueue() error {
if elapsed > 0 && elapsed < waitTime {
log.Warnf("[ddl] the elapsed time from last update is %s < %s, wait again", elapsed, waitTime)
waitTime -= elapsed
time.Sleep(time.Millisecond)
return nil
}
}
Expand All @@ -309,15 +231,6 @@ func (d *ddl) handleDDLJobQueue() error {
} else {
err = d.updateDDLJob(t, job, txn.StartTS())
}
if err != nil {
return errors.Trace(err)
}

if ChangeOwnerInNewWay {
return nil
}
owner.LastUpdateTS = time.Now().UnixNano()
err = t.SetDDLJobOwner(owner)
return errors.Trace(err)
})
if err != nil {
Expand All @@ -335,14 +248,7 @@ func (d *ddl) handleDDLJobQueue() error {
// If the job is done or still running, we will wait 2 * lease time to guarantee other servers to update
// the newest schema.
if job.State == model.JobRunning || job.State == model.JobDone {
switch job.Type {
case model.ActionCreateSchema, model.ActionDropSchema, model.ActionCreateTable,
model.ActionTruncateTable, model.ActionDropTable:
// Do not need to wait for those DDL, because those DDL do not need to modify data,
// So there is no data inconsistent issue.
default:
d.waitSchemaChanged(waitTime, schemaVer)
}
d.waitSchemaChanged(waitTime, schemaVer)
}
if job.IsFinished() {
d.startBgJob(job.Type)
Expand All @@ -351,12 +257,11 @@ func (d *ddl) handleDDLJobQueue() error {
}
}

func chooseLeaseTime(n1 time.Duration, n2 time.Duration) time.Duration {
if n1 > 0 {
return n1
func chooseLeaseTime(t, max time.Duration) time.Duration {
if t == 0 || t > max {
return max
}

return n2
return t
}

// runDDLJob runs a DDL job. It returns the current schema version in this transaction.
Expand Down
30 changes: 2 additions & 28 deletions ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,33 +23,17 @@ import (
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/util/testleak"
"github.com/pingcap/tidb/util/types"
goctx "golang.org/x/net/context"
)

var _ = Suite(&testDDLSuite{})

type testDDLSuite struct {
originMinBgOwnerTimeout int64
originMinDDLOwnerTimeout int64
}
type testDDLSuite struct{}

const testLease = 5 * time.Millisecond

func (s *testDDLSuite) SetUpSuite(c *C) {
s.originMinDDLOwnerTimeout = minDDLOwnerTimeout
s.originMinBgOwnerTimeout = minBgOwnerTimeout
minDDLOwnerTimeout = int64(4 * testLease)
minBgOwnerTimeout = int64(4 * testLease)
}

func (s *testDDLSuite) TearDownSuite(c *C) {
minDDLOwnerTimeout = s.originMinDDLOwnerTimeout
minBgOwnerTimeout = s.originMinBgOwnerTimeout
}

func (s *testDDLSuite) TestCheckOwner(c *C) {
defer testleak.AfterTest(c)()
store := testCreateStore(c, "test_owner")
Expand Down Expand Up @@ -211,17 +195,7 @@ func (s *testDDLSuite) TestColumnError(c *C) {
}

func testCheckOwner(c *C, d *ddl, isOwner bool, flag JobType) {
err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
_, err := d.checkOwner(t, flag)
return err
})
if isOwner {
c.Assert(err, IsNil)
return
}

c.Assert(terror.ErrorEqual(err, errNotOwner), IsTrue)
c.Assert(d.isOwner(flag), Equals, isOwner)
}

func testCheckJobDone(c *C, d *ddl, job *model.Job, isAdd bool) {
Expand Down
3 changes: 3 additions & 0 deletions ddl/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ func (s *mockSchemaSyncer) UpdateSelfVersion(ctx goctx.Context, version int64) e
return nil
}

// RemoveSelfVersionPath implements SchemaSyncer.RemoveSelfVersionPath interface.
func (s *mockSchemaSyncer) RemoveSelfVersionPath() error { return nil }

// OwnerUpdateGlobalVersion implements SchemaSyncer.OwnerUpdateGlobalVersion interface.
func (s *mockSchemaSyncer) OwnerUpdateGlobalVersion(ctx goctx.Context, version int64) error {
select {
Expand Down
17 changes: 3 additions & 14 deletions ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,22 +96,11 @@ func (d *ddl) isReorgRunnable(txn kv.Transaction, flag JobType) error {
return errInvalidWorker.Gen("worker is closed")
}

if ChangeOwnerInNewWay {
return errors.Trace(d.isOwner(flag))
}

t := meta.NewMeta(txn)
owner, err := d.getJobOwner(t, flag)
if err != nil {
return errors.Trace(err)
}
if owner == nil || owner.OwnerID != d.uuid {
// if no owner, we will try later, so here just return error.
// or another server is owner, return error too.
log.Infof("[ddl] %s job, self id %s owner %s, txnTS:%d", flag, d.uuid, owner, txn.StartTS())
if !d.isOwner(flag) {
// If it's not the owner, we will try later, so here just returns an error.
log.Infof("[ddl] the %s not the %s job owner, txnTS:%d", d.uuid, flag, txn.StartTS())
return errors.Trace(errNotOwner)
}

return nil
}

Expand Down
17 changes: 17 additions & 0 deletions ddl/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const (
initialVersion = "0"
putKeyNoRetry = 1
putKeyDefaultRetryCnt = 3
delKeyDefaultRetryCnt = 3
putKeyRetryUnlimited = math.MaxInt64
putKeyDefaultTimeout = 2 * time.Second
putKeyRetryInterval = 30 * time.Millisecond
Expand All @@ -48,6 +49,8 @@ type SchemaSyncer interface {
Init(ctx goctx.Context) error
// UpdateSelfVersion updates the current version to the self path on etcd.
UpdateSelfVersion(ctx goctx.Context, version int64) error
// RemoveSelfVersionPath remove the self path from etcd.
RemoveSelfVersionPath() error
// OwnerUpdateGlobalVersion updates the latest version to the global path on etcd.
OwnerUpdateGlobalVersion(ctx goctx.Context, version int64) error
// GlobalVersionCh gets the chan for watching global version.
Expand Down Expand Up @@ -122,6 +125,20 @@ func (s *schemaVersionSyncer) OwnerUpdateGlobalVersion(ctx goctx.Context, versio
return s.putKV(ctx, putKeyRetryUnlimited, ddlGlobalSchemaVersion, ver)
}

// RemoveSelfVersionPath implements SchemaSyncer.RemoveSelfVersionPath interface.
func (s *schemaVersionSyncer) RemoveSelfVersionPath() error {
ctx := goctx.Background()
var err error
for i := 0; i < delKeyDefaultRetryCnt; i++ {
_, err = s.etcdCli.Delete(ctx, s.selfSchemaVerPath)
if err == nil {
return nil
}
log.Warnf("remove schema version path %s failed %v no.%d", s.selfSchemaVerPath, err, i)
}
return errors.Trace(err)
}

func isContextFinished(err error) bool {
if terror.ErrorEqual(err, goctx.Canceled) ||
terror.ErrorEqual(err, goctx.DeadlineExceeded) {
Expand Down
Loading

0 comments on commit 6935e17

Please sign in to comment.