Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Use the new way to handle the DDL operations #3367

Merged
merged 7 commits into from
Jun 1, 2017
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 4 additions & 13 deletions ddl/bg_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ func (d *ddl) handleBgJobQueue() error {

job := &model.Job{}
err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
owner, err := d.checkOwner(t, bgJobFlag)
if err != nil {
return errors.Trace(filterError(err, errNotOwner))
if !d.isOwner(bgJobFlag) {
return nil
}

var err error
t := meta.NewMeta(txn)
// Get the first background job and run it.
job, err = d.getFirstBgJob(t)
if err != nil {
Expand All @@ -52,15 +52,6 @@ func (d *ddl) handleBgJobQueue() error {
} else {
err = d.updateBgJob(t, job)
}
if err != nil {
return errors.Trace(err)
}

if ChangeOwnerInNewWay {
return nil
}
owner.LastUpdateTS = time.Now().UnixNano()
err = t.SetBgJobOwner(owner)
return errors.Trace(err)
})
if err != nil {
Expand Down
7 changes: 6 additions & 1 deletion ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,10 @@ func (d *ddl) close() {
}

close(d.quitCh)
err := d.worker.RemoveSelfVersionPath()
if err != nil {
log.Errorf("[ddl] remove self version pathe failed %v", err)
}
d.worker.Cancel()

d.wait.Wait()
Expand Down Expand Up @@ -403,7 +407,8 @@ func (d *ddl) doDDLJob(ctx context.Context, job *model.Job) error {
jobID := job.ID
// For a job from start to end, the state of it will be none -> delete only -> write only -> reorganization -> public
// For every state changes, we will wait as lease 2 * lease time, so here the ticker check is 10 * lease.
ticker := time.NewTicker(chooseLeaseTime(10*d.lease, 10*time.Second))
// But we use etcd to speed up, normally it takes less than 1s now, so we use 3s as the max value.
ticker := time.NewTicker(chooseLeaseTime(10*d.lease, 3*time.Second))
startTime := time.Now()
jobsGauge.WithLabelValues(JobType(ddlJobFlag).String(), job.Type.String()).Inc()
defer func() {
Expand Down
135 changes: 20 additions & 115 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ func (d *ddl) onDDLWorker() {
}

// We use 4 * lease time to check owner's timeout, so here, we will update owner's status
// every 2 * lease time. If lease is 0, we will use default 10s.
checkTime := chooseLeaseTime(2*d.lease, 10*time.Second)
// every 2 * lease time. If lease is 0, we will use default 1s.
// But we use etcd to speed up, normally it takes less than 1s now, so we use 1s as the max value.
checkTime := chooseLeaseTime(2*d.lease, 1*time.Second)

ticker := time.NewTicker(checkTime)
defer ticker.Stop()
Expand Down Expand Up @@ -68,93 +69,15 @@ func asyncNotify(ch chan struct{}) {
}
}

const maxOwnerTimeout = int64(20 * time.Minute)

// We define minBgOwnerTimeout and minDDLOwnerTimeout as variable,
// because we need to change them in test.
var (
minBgOwnerTimeout = int64(20 * time.Second)
minDDLOwnerTimeout = int64(4 * time.Second)
)

func (d *ddl) getCheckOwnerTimeout(flag JobType) int64 {
// we must wait 2 * lease time to guarantee other servers update the schema,
// the owner will update its owner status every 2 * lease time, so here we use
// 4 * lease to check its timeout.
timeout := int64(4 * d.lease)
if timeout > maxOwnerTimeout {
return maxOwnerTimeout
}

// The value of lease may be less than 1 second, so the operation of
// checking owner is frequent and it isn't necessary.
// So if timeout is less than 4 second, we will use default minDDLOwnerTimeout.
if flag == ddlJobFlag && timeout < minDDLOwnerTimeout {
return minDDLOwnerTimeout
}
if flag == bgJobFlag && timeout < minBgOwnerTimeout {
// Background job is serial processing, so we can extend the owner timeout to make sure
// a batch of rows will be processed before timeout.
// If timeout is less than maxBgOwnerTimeout, we will use default minBgOwnerTimeout.
return minBgOwnerTimeout
}
return timeout
}

func (d *ddl) isOwner(flag JobType) error {
func (d *ddl) isOwner(flag JobType) bool {
if flag == ddlJobFlag {
if d.worker.IsOwner() {
return nil
}
log.Infof("[ddl] not %s job owner, self id %s", flag, d.uuid)
return errNotOwner
}
if d.worker.IsBgOwner() {
return nil
}
log.Infof("[ddl] not %s job owner, self id %s", flag, d.uuid)
return errNotOwner
}

func (d *ddl) checkOwner(t *meta.Meta, flag JobType) (*model.Owner, error) {
if ChangeOwnerInNewWay {
return nil, errors.Trace(d.isOwner(flag))
}
owner, err := d.getJobOwner(t, flag)
if err != nil {
return nil, errors.Trace(err)
}
if owner == nil {
owner = &model.Owner{}
// try to set onwer
owner.OwnerID = d.uuid
isOwner := d.worker.IsOwner()
log.Infof("[ddl] it's the %s job owner %v, self id %s", flag, isOwner, d.uuid)
return isOwner
}

now := time.Now().UnixNano()
maxTimeout := d.getCheckOwnerTimeout(flag)
sub := now - owner.LastUpdateTS
if owner.OwnerID == d.uuid || sub > maxTimeout {
owner.OwnerID = d.uuid
owner.LastUpdateTS = now
// update status.
switch flag {
case ddlJobFlag:
err = t.SetDDLJobOwner(owner)
case bgJobFlag:
err = t.SetBgJobOwner(owner)
}
if err != nil {
return nil, errors.Trace(err)
}
log.Debugf("[ddl] become %s job owner, owner is %s sub %vs", flag, owner, sub/1e9)
}

if owner.OwnerID != d.uuid {
log.Debugf("[ddl] not %s job owner, self id %s owner is %s", flag, d.uuid, owner.OwnerID)
return nil, errors.Trace(errNotOwner)
}

return owner, nil
isOwner := d.worker.IsBgOwner()
log.Infof("[ddl] it's the %s job owner %v, self id %s", flag, isOwner, d.uuid)
return isOwner
}

func (d *ddl) getJobOwner(t *meta.Meta, flag JobType) (*model.Owner, error) {
Expand Down Expand Up @@ -267,15 +190,13 @@ func (d *ddl) handleDDLJobQueue() error {
var job *model.Job
var schemaVer int64
err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
owner, err := d.checkOwner(t, ddlJobFlag)
if terror.ErrorEqual(err, errNotOwner) {
// We are not owner, return and retry checking later.
// We are not owner, return and retry checking later.
if !d.isOwner(ddlJobFlag) {
return nil
} else if err != nil {
return errors.Trace(err)
}

var err error
t := meta.NewMeta(txn)
// We become the owner. Get the first job and run it.
job, err = d.getFirstDDLJob(t)
if job == nil || err != nil {
Expand All @@ -292,6 +213,7 @@ func (d *ddl) handleDDLJobQueue() error {
if elapsed > 0 && elapsed < waitTime {
log.Warnf("[ddl] the elapsed time from last update is %s < %s, wait again", elapsed, waitTime)
waitTime -= elapsed
time.Sleep(time.Millisecond)
return nil
}
}
Expand All @@ -309,15 +231,6 @@ func (d *ddl) handleDDLJobQueue() error {
} else {
err = d.updateDDLJob(t, job, txn.StartTS())
}
if err != nil {
return errors.Trace(err)
}

if ChangeOwnerInNewWay {
return nil
}
owner.LastUpdateTS = time.Now().UnixNano()
err = t.SetDDLJobOwner(owner)
return errors.Trace(err)
})
if err != nil {
Expand All @@ -335,14 +248,7 @@ func (d *ddl) handleDDLJobQueue() error {
// If the job is done or still running, we will wait 2 * lease time to guarantee other servers to update
// the newest schema.
if job.State == model.JobRunning || job.State == model.JobDone {
switch job.Type {
case model.ActionCreateSchema, model.ActionDropSchema, model.ActionCreateTable,
model.ActionTruncateTable, model.ActionDropTable:
// Do not need to wait for those DDL, because those DDL do not need to modify data,
// So there is no data inconsistent issue.
default:
d.waitSchemaChanged(waitTime, schemaVer)
}
d.waitSchemaChanged(waitTime, schemaVer)
}
if job.IsFinished() {
d.startBgJob(job.Type)
Expand All @@ -351,12 +257,11 @@ func (d *ddl) handleDDLJobQueue() error {
}
}

func chooseLeaseTime(n1 time.Duration, n2 time.Duration) time.Duration {
if n1 > 0 {
return n1
func chooseLeaseTime(t, max time.Duration) time.Duration {
if t == 0 || t > max {
return max
}

return n2
return t
}

// runDDLJob runs a DDL job. It returns the current schema version in this transaction.
Expand Down
30 changes: 2 additions & 28 deletions ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,33 +23,17 @@ import (
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/util/testleak"
"github.com/pingcap/tidb/util/types"
goctx "golang.org/x/net/context"
)

var _ = Suite(&testDDLSuite{})

type testDDLSuite struct {
originMinBgOwnerTimeout int64
originMinDDLOwnerTimeout int64
}
type testDDLSuite struct{}

const testLease = 5 * time.Millisecond

func (s *testDDLSuite) SetUpSuite(c *C) {
s.originMinDDLOwnerTimeout = minDDLOwnerTimeout
s.originMinBgOwnerTimeout = minBgOwnerTimeout
minDDLOwnerTimeout = int64(4 * testLease)
minBgOwnerTimeout = int64(4 * testLease)
}

func (s *testDDLSuite) TearDownSuite(c *C) {
minDDLOwnerTimeout = s.originMinDDLOwnerTimeout
minBgOwnerTimeout = s.originMinBgOwnerTimeout
}

func (s *testDDLSuite) TestCheckOwner(c *C) {
defer testleak.AfterTest(c)()
store := testCreateStore(c, "test_owner")
Expand Down Expand Up @@ -211,17 +195,7 @@ func (s *testDDLSuite) TestColumnError(c *C) {
}

func testCheckOwner(c *C, d *ddl, isOwner bool, flag JobType) {
err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
_, err := d.checkOwner(t, flag)
return err
})
if isOwner {
c.Assert(err, IsNil)
return
}

c.Assert(terror.ErrorEqual(err, errNotOwner), IsTrue)
c.Assert(d.isOwner(flag), Equals, isOwner)
}

func testCheckJobDone(c *C, d *ddl, job *model.Job, isAdd bool) {
Expand Down
6 changes: 5 additions & 1 deletion ddl/etcd_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,14 @@ func (w *worker) SetBgOwner(isOwner bool) {
}
}

// defaultSessionTTL is used for etcd session. It's default value is 10s.
const defaultSessionTTL = 10

func (w *worker) newSession(ctx goctx.Context, retryCnt int) error {
var err error
for i := 0; i < retryCnt; i++ {
w.etcdSession, err = concurrency.NewSession(w.etcdCli, concurrency.WithContext(ctx))
w.etcdSession, err = concurrency.NewSession(w.etcdCli,
concurrency.WithTTL(defaultSessionTTL), concurrency.WithContext(ctx))
if err != nil {
log.Warnf("[ddl] failed to new session, err %v", err)
time.Sleep(200 * time.Millisecond)
Expand Down
5 changes: 5 additions & 0 deletions ddl/mock_etcd_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ func (s *mockSchemaSyncer) UpdateGlobalVersion(ctx goctx.Context, version int64)
return nil
}

// RemoveSelfVersionPath implements SchemaSyncer.RemoveSelfVersionPath interface.
func (s *mockSchemaSyncer) RemoveSelfVersionPath() error {
return nil
}

// CheckAllVersions implements SchemaSyncer.CheckAllVersions interface.
func (s *mockSchemaSyncer) CheckAllVersions(ctx goctx.Context, latestVer int64) error {
for {
Expand Down
17 changes: 3 additions & 14 deletions ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,22 +96,11 @@ func (d *ddl) isReorgRunnable(txn kv.Transaction, flag JobType) error {
return errInvalidWorker.Gen("worker is closed")
}

if ChangeOwnerInNewWay {
return errors.Trace(d.isOwner(flag))
}

t := meta.NewMeta(txn)
owner, err := d.getJobOwner(t, flag)
if err != nil {
return errors.Trace(err)
}
if owner == nil || owner.OwnerID != d.uuid {
// if no owner, we will try later, so here just return error.
// or another server is owner, return error too.
log.Infof("[ddl] %s job, self id %s owner %s, txnTS:%d", flag, d.uuid, owner, txn.StartTS())
if !d.isOwner(flag) {
// If it's not the owner, we will try later, so here just returns an error.
log.Infof("[ddl] the %s not the %s job owner, txnTS:%d", d.uuid, flag, txn.StartTS())
return errors.Trace(errNotOwner)
}

return nil
}

Expand Down
17 changes: 17 additions & 0 deletions ddl/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
initialVersion = "0"
putKeyNoRetry = 1
putKeyDefaultRetryCnt = 3
delKeyDefaultRetryCnt = 3
putKeyRetryUnlimited = math.MaxInt64
putKeyDefaultTimeout = 2 * time.Second
putKeyRetryInterval = 30 * time.Millisecond
Expand All @@ -49,6 +50,8 @@ type SchemaSyncer interface {
UpdateSelfVersion(ctx goctx.Context, version int64) error
// UpdateGlobalVersion updates the latest version to the global path on etcd.
UpdateGlobalVersion(ctx goctx.Context, version int64) error
// RemoveSelfVersionPath remove the self path from etcd.
RemoveSelfVersionPath() error
// GlobalVersionCh gets the chan for watching global version.
GlobalVersionCh() clientv3.WatchChan
// CheckAllVersions checks whether all followers' schema version are equal to
Expand Down Expand Up @@ -113,6 +116,20 @@ func (s *schemaVersionSyncer) UpdateGlobalVersion(ctx goctx.Context, version int
return s.putKV(ctx, putKeyRetryUnlimited, ddlGlobalSchemaVersion, ver)
}

// RemoveSelfVersionPath implements SchemaSyncer.RemoveSelfVersionPath interface.
func (s *schemaVersionSyncer) RemoveSelfVersionPath() error {
ctx := goctx.Background()
var err error
for i := 0; i < delKeyDefaultRetryCnt; i++ {
_, err = s.etcdCli.Delete(ctx, s.selfSchemaVerPath)
if err == nil {
return nil
}
log.Warnf("remove schema version path %s failed %v no.%d", s.selfSchemaVerPath, err, i)
}
return errors.Trace(err)
}

func isContextFinished(err error) bool {
if terror.ErrorEqual(err, goctx.Canceled) ||
terror.ErrorEqual(err, goctx.DeadlineExceeded) {
Expand Down
Loading