From 4a92dc98f32f240a6c0bf4483548ff2317fb177a Mon Sep 17 00:00:00 2001 From: xia Date: Wed, 31 May 2017 14:33:14 +0800 Subject: [PATCH 1/6] ddl: remove the old code --- ddl/bg_worker.go | 17 ++----- ddl/ddl_worker.go | 125 +++++++--------------------------------------- ddl/reorg.go | 17 ++----- 3 files changed, 25 insertions(+), 134 deletions(-) diff --git a/ddl/bg_worker.go b/ddl/bg_worker.go index daa2191b714c4..8f34405fd9420 100644 --- a/ddl/bg_worker.go +++ b/ddl/bg_worker.go @@ -31,12 +31,12 @@ func (d *ddl) handleBgJobQueue() error { job := &model.Job{} err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { - t := meta.NewMeta(txn) - owner, err := d.checkOwner(t, bgJobFlag) - if err != nil { - return errors.Trace(filterError(err, errNotOwner)) + if !d.isOwner(bgJobFlag) { + return nil } + var err error + t := meta.NewMeta(txn) // Get the first background job and run it. job, err = d.getFirstBgJob(t) if err != nil { @@ -52,15 +52,6 @@ func (d *ddl) handleBgJobQueue() error { } else { err = d.updateBgJob(t, job) } - if err != nil { - return errors.Trace(err) - } - - if ChangeOwnerInNewWay { - return nil - } - owner.LastUpdateTS = time.Now().UnixNano() - err = t.SetBgJobOwner(owner) return errors.Trace(err) }) if err != nil { diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 28360a54a6a39..49425c151d293 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -39,8 +39,9 @@ func (d *ddl) onDDLWorker() { } // We use 4 * lease time to check owner's timeout, so here, we will update owner's status - // every 2 * lease time. If lease is 0, we will use default 10s. - checkTime := chooseLeaseTime(2*d.lease, 10*time.Second) + // every 2 * lease time. If lease is 0, we will use default 1s. + // But we use etcd to speed up, normally it takes less than 1s now, so we use 1s as the max value. + checkTime := chooseLeaseTime(2*d.lease, 1*time.Second) ticker := time.NewTicker(checkTime) defer ticker.Stop() @@ -68,93 +69,15 @@ func asyncNotify(ch chan struct{}) { } } -const maxOwnerTimeout = int64(20 * time.Minute) - -// We define minBgOwnerTimeout and minDDLOwnerTimeout as variable, -// because we need to change them in test. -var ( - minBgOwnerTimeout = int64(20 * time.Second) - minDDLOwnerTimeout = int64(4 * time.Second) -) - -func (d *ddl) getCheckOwnerTimeout(flag JobType) int64 { - // we must wait 2 * lease time to guarantee other servers update the schema, - // the owner will update its owner status every 2 * lease time, so here we use - // 4 * lease to check its timeout. - timeout := int64(4 * d.lease) - if timeout > maxOwnerTimeout { - return maxOwnerTimeout - } - - // The value of lease may be less than 1 second, so the operation of - // checking owner is frequent and it isn't necessary. - // So if timeout is less than 4 second, we will use default minDDLOwnerTimeout. - if flag == ddlJobFlag && timeout < minDDLOwnerTimeout { - return minDDLOwnerTimeout - } - if flag == bgJobFlag && timeout < minBgOwnerTimeout { - // Background job is serial processing, so we can extend the owner timeout to make sure - // a batch of rows will be processed before timeout. - // If timeout is less than maxBgOwnerTimeout, we will use default minBgOwnerTimeout. - return minBgOwnerTimeout - } - return timeout -} - -func (d *ddl) isOwner(flag JobType) error { +func (d *ddl) isOwner(flag JobType) bool { if flag == ddlJobFlag { - if d.worker.IsOwner() { - return nil - } - log.Infof("[ddl] not %s job owner, self id %s", flag, d.uuid) - return errNotOwner - } - if d.worker.IsBgOwner() { - return nil + isOwner := d.worker.IsOwner() + log.Infof("[ddl] it's the %s job owner %v, self id %s", flag, d.uuid, isOwner) + return isOwner } - log.Infof("[ddl] not %s job owner, self id %s", flag, d.uuid) - return errNotOwner -} - -func (d *ddl) checkOwner(t *meta.Meta, flag JobType) (*model.Owner, error) { - if ChangeOwnerInNewWay { - return nil, errors.Trace(d.isOwner(flag)) - } - owner, err := d.getJobOwner(t, flag) - if err != nil { - return nil, errors.Trace(err) - } - if owner == nil { - owner = &model.Owner{} - // try to set onwer - owner.OwnerID = d.uuid - } - - now := time.Now().UnixNano() - maxTimeout := d.getCheckOwnerTimeout(flag) - sub := now - owner.LastUpdateTS - if owner.OwnerID == d.uuid || sub > maxTimeout { - owner.OwnerID = d.uuid - owner.LastUpdateTS = now - // update status. - switch flag { - case ddlJobFlag: - err = t.SetDDLJobOwner(owner) - case bgJobFlag: - err = t.SetBgJobOwner(owner) - } - if err != nil { - return nil, errors.Trace(err) - } - log.Debugf("[ddl] become %s job owner, owner is %s sub %vs", flag, owner, sub/1e9) - } - - if owner.OwnerID != d.uuid { - log.Debugf("[ddl] not %s job owner, self id %s owner is %s", flag, d.uuid, owner.OwnerID) - return nil, errors.Trace(errNotOwner) - } - - return owner, nil + isOwner := d.worker.IsBgOwner() + log.Infof("[ddl] it's the %s job owner %v, self id %s", flag, d.uuid, isOwner) + return isOwner } func (d *ddl) getJobOwner(t *meta.Meta, flag JobType) (*model.Owner, error) { @@ -267,15 +190,13 @@ func (d *ddl) handleDDLJobQueue() error { var job *model.Job var schemaVer int64 err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { - t := meta.NewMeta(txn) - owner, err := d.checkOwner(t, ddlJobFlag) - if terror.ErrorEqual(err, errNotOwner) { - // We are not owner, return and retry checking later. + // We are not owner, return and retry checking later. + if !d.isOwner(ddlJobFlag) { return nil - } else if err != nil { - return errors.Trace(err) } + var err error + t := meta.NewMeta(txn) // We become the owner. Get the first job and run it. job, err = d.getFirstDDLJob(t) if job == nil || err != nil { @@ -309,15 +230,6 @@ func (d *ddl) handleDDLJobQueue() error { } else { err = d.updateDDLJob(t, job, txn.StartTS()) } - if err != nil { - return errors.Trace(err) - } - - if ChangeOwnerInNewWay { - return nil - } - owner.LastUpdateTS = time.Now().UnixNano() - err = t.SetDDLJobOwner(owner) return errors.Trace(err) }) if err != nil { @@ -351,12 +263,11 @@ func (d *ddl) handleDDLJobQueue() error { } } -func chooseLeaseTime(n1 time.Duration, n2 time.Duration) time.Duration { - if n1 > 0 { - return n1 +func chooseLeaseTime(t, max time.Duration) time.Duration { + if t == 0 || t > max { + return max } - - return n2 + return t } // runDDLJob runs a DDL job. It returns the current schema version in this transaction. diff --git a/ddl/reorg.go b/ddl/reorg.go index cebbf710092da..523adaa15a47a 100644 --- a/ddl/reorg.go +++ b/ddl/reorg.go @@ -96,22 +96,11 @@ func (d *ddl) isReorgRunnable(txn kv.Transaction, flag JobType) error { return errInvalidWorker.Gen("worker is closed") } - if ChangeOwnerInNewWay { - return errors.Trace(d.isOwner(flag)) - } - - t := meta.NewMeta(txn) - owner, err := d.getJobOwner(t, flag) - if err != nil { - return errors.Trace(err) - } - if owner == nil || owner.OwnerID != d.uuid { - // if no owner, we will try later, so here just return error. - // or another server is owner, return error too. - log.Infof("[ddl] %s job, self id %s owner %s, txnTS:%d", flag, d.uuid, owner, txn.StartTS()) + if !d.isOwner(flag) { + // If it's not the owner, we will try later, so here just returns an error. + log.Infof("[ddl] the %s not the %s job owner, txnTS:%d", d.uuid, flag, txn.StartTS()) return errors.Trace(errNotOwner) } - return nil } From 3e629e6050962f29236ddc8a159a1d8a2e814cb8 Mon Sep 17 00:00:00 2001 From: xia Date: Wed, 31 May 2017 14:34:13 +0800 Subject: [PATCH 2/6] ddl: add remove slef path interface --- ddl/ddl.go | 7 ++++++- ddl/etcd_worker.go | 6 +++++- ddl/mock_etcd_worker.go | 5 +++++ ddl/syncer.go | 17 +++++++++++++++++ ddl/syncer_test.go | 10 +++++++++- 5 files changed, 42 insertions(+), 3 deletions(-) diff --git a/ddl/ddl.go b/ddl/ddl.go index 8734cd3601323..c346b9f90038b 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -319,6 +319,10 @@ func (d *ddl) close() { } close(d.quitCh) + err := d.worker.RemoveSelfVersionPath() + if err != nil { + log.Errorf("[ddl] remove self version pathe failed %v", err) + } d.worker.Cancel() d.wait.Wait() @@ -403,7 +407,8 @@ func (d *ddl) doDDLJob(ctx context.Context, job *model.Job) error { jobID := job.ID // For a job from start to end, the state of it will be none -> delete only -> write only -> reorganization -> public // For every state changes, we will wait as lease 2 * lease time, so here the ticker check is 10 * lease. - ticker := time.NewTicker(chooseLeaseTime(10*d.lease, 10*time.Second)) + // But we use etcd to speed up, normally it takes less than 1s now, so we use 3s as the max value. + ticker := time.NewTicker(chooseLeaseTime(10*d.lease, 3*time.Second)) startTime := time.Now() jobsGauge.WithLabelValues(JobType(ddlJobFlag).String(), job.Type.String()).Inc() defer func() { diff --git a/ddl/etcd_worker.go b/ddl/etcd_worker.go index e56d635088ba3..ac8ebf1c03696 100644 --- a/ddl/etcd_worker.go +++ b/ddl/etcd_worker.go @@ -117,10 +117,14 @@ func (w *worker) SetBgOwner(isOwner bool) { } } +// defaultSessionTTL is used for etcd session. It's default value is 10s. +const defaultSessionTTL = 10 + func (w *worker) newSession(ctx goctx.Context, retryCnt int) error { var err error for i := 0; i < retryCnt; i++ { - w.etcdSession, err = concurrency.NewSession(w.etcdCli, concurrency.WithContext(ctx)) + w.etcdSession, err = concurrency.NewSession(w.etcdCli, + concurrency.WithTTL(defaultSessionTTL), concurrency.WithContext(ctx)) if err != nil { log.Warnf("[ddl] failed to new session, err %v", err) time.Sleep(200 * time.Millisecond) diff --git a/ddl/mock_etcd_worker.go b/ddl/mock_etcd_worker.go index 82fd9a17774bc..3ecb4ba1dce7c 100644 --- a/ddl/mock_etcd_worker.go +++ b/ddl/mock_etcd_worker.go @@ -122,6 +122,11 @@ func (s *mockSchemaSyncer) UpdateGlobalVersion(ctx goctx.Context, version int64) return nil } +// RemoveSelfVersionPath implements SchemaSyncer.RemoveSelfVersionPath interface. +func (s *mockSchemaSyncer) RemoveSelfVersionPath() error { + return nil +} + // CheckAllVersions implements SchemaSyncer.CheckAllVersions interface. func (s *mockSchemaSyncer) CheckAllVersions(ctx goctx.Context, latestVer int64) error { for { diff --git a/ddl/syncer.go b/ddl/syncer.go index 714fbed5d2cda..1eb54838600b1 100644 --- a/ddl/syncer.go +++ b/ddl/syncer.go @@ -31,6 +31,7 @@ const ( initialVersion = "0" putKeyNoRetry = 1 putKeyDefaultRetryCnt = 3 + delKeyDefaultRetryCnt = 3 putKeyRetryUnlimited = math.MaxInt64 putKeyDefaultTimeout = 2 * time.Second putKeyRetryInterval = 30 * time.Millisecond @@ -49,6 +50,8 @@ type SchemaSyncer interface { UpdateSelfVersion(ctx goctx.Context, version int64) error // UpdateGlobalVersion updates the latest version to the global path on etcd. UpdateGlobalVersion(ctx goctx.Context, version int64) error + // RemoveSelfVersionPath remove the self path from etcd. + RemoveSelfVersionPath() error // GlobalVersionCh gets the chan for watching global version. GlobalVersionCh() clientv3.WatchChan // CheckAllVersions checks whether all followers' schema version are equal to @@ -113,6 +116,20 @@ func (s *schemaVersionSyncer) UpdateGlobalVersion(ctx goctx.Context, version int return s.putKV(ctx, putKeyRetryUnlimited, ddlGlobalSchemaVersion, ver) } +// RemoveSelfVersionPath implements SchemaSyncer.RemoveSelfVersionPath interface. +func (s *schemaVersionSyncer) RemoveSelfVersionPath() error { + ctx := goctx.Background() + var err error + for i := 0; i < delKeyDefaultRetryCnt; i++ { + _, err = s.etcdCli.Delete(ctx, s.selfSchemaVerPath) + if err == nil { + return nil + } + log.Warnf("remove schema version path %s failed %v no.%d", s.selfSchemaVerPath, err, i) + } + return errors.Trace(err) +} + func isContextFinished(err error) bool { if terror.ErrorEqual(err, goctx.Canceled) || terror.ErrorEqual(err, goctx.DeadlineExceeded) { diff --git a/ddl/syncer_test.go b/ddl/syncer_test.go index 2ef3c428d1406..c6028c1cf6a1a 100644 --- a/ddl/syncer_test.go +++ b/ddl/syncer_test.go @@ -25,7 +25,7 @@ import ( "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/integration" "github.com/coreos/etcd/mvcc/mvccpb" - //"github.com/ngaut/log" + "github.com/ngaut/log" "github.com/pingcap/tidb/store/localstore" "github.com/pingcap/tidb/store/localstore/goleveldb" goctx "golang.org/x/net/context" @@ -133,6 +133,14 @@ func TestSyncerSimple(t *testing.T) { t.Fatalf("check all version failed %v", err) } cancel() + + resp, err := s.etcdCli.Get(goctx.Background(), key) + log.Warnf("err %v, resp %v", err, resp) + d.worker.RemoveSelfVersionPath() + resp, err = s.etcdCli.Get(goctx.Background(), key) + if err != nil { + log.Warnf("err %v, resp %v", err, resp) + } } func checkRespKV(t *testing.T, kvCount int, key, val string, From 90ca0b6e5f4790cf8dc3689cfe846567210d26b7 Mon Sep 17 00:00:00 2001 From: zimulala Date: Wed, 31 May 2017 14:57:00 +0800 Subject: [PATCH 3/6] ddl: update test --- ddl/ddl_worker_test.go | 30 ++---------------------------- ddl/syncer_test.go | 23 ++++++++++++++--------- 2 files changed, 16 insertions(+), 37 deletions(-) diff --git a/ddl/ddl_worker_test.go b/ddl/ddl_worker_test.go index 44b14545a70ab..50cef96b33d8e 100644 --- a/ddl/ddl_worker_test.go +++ b/ddl/ddl_worker_test.go @@ -23,7 +23,6 @@ import ( "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/model" "github.com/pingcap/tidb/mysql" - "github.com/pingcap/tidb/terror" "github.com/pingcap/tidb/util/testleak" "github.com/pingcap/tidb/util/types" goctx "golang.org/x/net/context" @@ -31,25 +30,10 @@ import ( var _ = Suite(&testDDLSuite{}) -type testDDLSuite struct { - originMinBgOwnerTimeout int64 - originMinDDLOwnerTimeout int64 -} +type testDDLSuite struct{} const testLease = 5 * time.Millisecond -func (s *testDDLSuite) SetUpSuite(c *C) { - s.originMinDDLOwnerTimeout = minDDLOwnerTimeout - s.originMinBgOwnerTimeout = minBgOwnerTimeout - minDDLOwnerTimeout = int64(4 * testLease) - minBgOwnerTimeout = int64(4 * testLease) -} - -func (s *testDDLSuite) TearDownSuite(c *C) { - minDDLOwnerTimeout = s.originMinDDLOwnerTimeout - minBgOwnerTimeout = s.originMinBgOwnerTimeout -} - func (s *testDDLSuite) TestCheckOwner(c *C) { defer testleak.AfterTest(c)() store := testCreateStore(c, "test_owner") @@ -211,17 +195,7 @@ func (s *testDDLSuite) TestColumnError(c *C) { } func testCheckOwner(c *C, d *ddl, isOwner bool, flag JobType) { - err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error { - t := meta.NewMeta(txn) - _, err := d.checkOwner(t, flag) - return err - }) - if isOwner { - c.Assert(err, IsNil) - return - } - - c.Assert(terror.ErrorEqual(err, errNotOwner), IsTrue) + c.Assert(d.isOwner(flag), Equals, isOwner) } func testCheckJobDone(c *C, d *ddl, job *model.Job, isAdd bool) { diff --git a/ddl/syncer_test.go b/ddl/syncer_test.go index c6028c1cf6a1a..bbb46246f1e77 100644 --- a/ddl/syncer_test.go +++ b/ddl/syncer_test.go @@ -25,7 +25,6 @@ import ( "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/integration" "github.com/coreos/etcd/mvcc/mvccpb" - "github.com/ngaut/log" "github.com/pingcap/tidb/store/localstore" "github.com/pingcap/tidb/store/localstore/goleveldb" goctx "golang.org/x/net/context" @@ -82,7 +81,7 @@ func TestSyncerSimple(t *testing.T) { go func() { defer wg.Done() select { - case resp := <-d.worker.GlobalVerCh: + case resp := <-d.worker.GlobalVersionCh(): if len(resp.Events) < 1 { t.Fatalf("get chan events count less than 1") } @@ -104,9 +103,9 @@ func TestSyncerSimple(t *testing.T) { wg.Wait() - // for checkAllVersions + // for CheckAllVersions childCtx, cancel := goctx.WithTimeout(ctx, 20*time.Millisecond) - err = d.worker.checkAllVersions(childCtx, currentVer) + err = d.worker.CheckAllVersions(childCtx, currentVer) if err == nil { t.Fatalf("check result not match") } @@ -126,7 +125,7 @@ func TestSyncerSimple(t *testing.T) { } cancel() - // for checkAllVersions + // for CheckAllVersions childCtx, cancel = goctx.WithTimeout(ctx, 30*time.Millisecond) err = d.worker.CheckAllVersions(childCtx, currentVer) if err != nil { @@ -134,12 +133,18 @@ func TestSyncerSimple(t *testing.T) { } cancel() - resp, err := s.etcdCli.Get(goctx.Background(), key) - log.Warnf("err %v, resp %v", err, resp) + resp, err = cli.Get(goctx.Background(), key) + if err != nil { + t.Fatalf("get key %s failed %v", key, err) + } + checkRespKV(t, 1, key, "123", resp.Kvs...) d.worker.RemoveSelfVersionPath() - resp, err = s.etcdCli.Get(goctx.Background(), key) + resp, err = cli.Get(goctx.Background(), key) if err != nil { - log.Warnf("err %v, resp %v", err, resp) + t.Fatalf("get key %s failed %v", key, err) + } + if len(resp.Kvs) != 0 { + t.Fatalf("remove key %s failed %v", key, err) } } From 02ff3b2cb48ca781f380d3c4e4fa440c34f00419 Mon Sep 17 00:00:00 2001 From: zimulala Date: Thu, 1 Jun 2017 15:09:22 +0800 Subject: [PATCH 4/6] *: remove special processing to create table --- ddl/ddl_worker.go | 14 ++++---------- executor/ddl.go | 17 ----------------- sessionctx/variable/session.go | 4 ---- sessionctx/variable/sysvar.go | 1 - sessionctx/variable/tidb_vars.go | 1 - sessionctx/varsutil/varsutil.go | 2 -- sessionctx/varsutil/varsutil_test.go | 11 ----------- tidb_test.go | 1 - util/testkit/testkit.go | 1 - 9 files changed, 4 insertions(+), 48 deletions(-) diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 49425c151d293..2a7b6056a81ec 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -72,11 +72,11 @@ func asyncNotify(ch chan struct{}) { func (d *ddl) isOwner(flag JobType) bool { if flag == ddlJobFlag { isOwner := d.worker.IsOwner() - log.Infof("[ddl] it's the %s job owner %v, self id %s", flag, d.uuid, isOwner) + log.Infof("[ddl] it's the %s job owner %v, self id %s", flag, isOwner, d.uuid) return isOwner } isOwner := d.worker.IsBgOwner() - log.Infof("[ddl] it's the %s job owner %v, self id %s", flag, d.uuid, isOwner) + log.Infof("[ddl] it's the %s job owner %v, self id %s", flag, isOwner, d.uuid) return isOwner } @@ -213,6 +213,7 @@ func (d *ddl) handleDDLJobQueue() error { if elapsed > 0 && elapsed < waitTime { log.Warnf("[ddl] the elapsed time from last update is %s < %s, wait again", elapsed, waitTime) waitTime -= elapsed + time.Sleep(time.Millisecond) return nil } } @@ -247,14 +248,7 @@ func (d *ddl) handleDDLJobQueue() error { // If the job is done or still running, we will wait 2 * lease time to guarantee other servers to update // the newest schema. if job.State == model.JobRunning || job.State == model.JobDone { - switch job.Type { - case model.ActionCreateSchema, model.ActionDropSchema, model.ActionCreateTable, - model.ActionTruncateTable, model.ActionDropTable: - // Do not need to wait for those DDL, because those DDL do not need to modify data, - // So there is no data inconsistent issue. - default: - d.waitSchemaChanged(waitTime, schemaVer) - } + d.waitSchemaChanged(waitTime, schemaVer) } if job.IsFinished() { d.startBgJob(job.Type) diff --git a/executor/ddl.go b/executor/ddl.go index 867b5478eff8f..80a198a30ba3a 100644 --- a/executor/ddl.go +++ b/executor/ddl.go @@ -15,7 +15,6 @@ package executor import ( "strings" - "time" "github.com/juju/errors" "github.com/pingcap/tidb/ast" @@ -50,29 +49,20 @@ func (e *DDLExec) Next() (*Row, error) { if e.done { return nil, nil } - // For create/drop database, create/drop/truncate table - // DDL worker do not wait 2 lease, so we need to wait in executor to make sure - // all TiDB server has updated the schema. - var needWait bool var err error switch x := e.Statement.(type) { case *ast.TruncateTableStmt: err = e.executeTruncateTable(x) - needWait = true case *ast.CreateDatabaseStmt: err = e.executeCreateDatabase(x) - needWait = true case *ast.CreateTableStmt: err = e.executeCreateTable(x) - needWait = true case *ast.CreateIndexStmt: err = e.executeCreateIndex(x) case *ast.DropDatabaseStmt: err = e.executeDropDatabase(x) - needWait = true case *ast.DropTableStmt: err = e.executeDropTable(x) - needWait = true case *ast.DropIndexStmt: err = e.executeDropIndex(x) case *ast.AlterTableStmt: @@ -83,15 +73,8 @@ func (e *DDLExec) Next() (*Row, error) { if err != nil { return nil, errors.Trace(err) } - if e.ctx.GetSessionVars().SkipDDLWait { - needWait = false - } dom := sessionctx.GetDomain(e.ctx) - if needWait { - time.Sleep(dom.DDL().GetLease() * 2) - } - // Update InfoSchema in TxnCtx, so it will pass schema check. is := dom.InfoSchema() txnCtx := e.ctx.GetSessionVars().TxnCtx diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 0eae5b417d1ef..3dce02252c6ec 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -180,10 +180,6 @@ type SessionVars struct { // SkipUTF8Check check on input value. SkipUTF8Check bool - // SkipDDLWait can be set to true to skip 2 lease wait after creating/dropping/truncating table, creating/dropping database. - // Then if there are multiple TiDB servers, the new table may not be available for other TiDB servers. - SkipDDLWait bool - // BuildStatsConcurrencyVar is used to control statistics building concurrency. BuildStatsConcurrencyVar int diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 8d33cce0982d3..b744f9338afd3 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -604,7 +604,6 @@ var defaultSysVars = []*SysVar{ {ScopeGlobal | ScopeSession, TiDBIndexLookupConcurrency, strconv.Itoa(DefIndexLookupConcurrency)}, {ScopeGlobal | ScopeSession, TiDBIndexSerialScanConcurrency, strconv.Itoa(DefIndexSerialScanConcurrency)}, {ScopeGlobal | ScopeSession, TiDBMaxRowCountForINLJ, strconv.Itoa(DefMaxRowCountForINLJ)}, - {ScopeGlobal | ScopeSession, TiDBSkipDDLWait, boolToIntStr(DefSkipDDLWait)}, {ScopeGlobal | ScopeSession, TiDBSkipUTF8Check, boolToIntStr(DefSkipUTF8Check)}, {ScopeSession, TiDBBatchInsert, boolToIntStr(DefBatchInsert)}, } diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index e77d26fd9a0d3..391a854a3f142 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -102,7 +102,6 @@ const ( DefDistSQLScanConcurrency = 10 DefBuildStatsConcurrency = 4 DefMaxRowCountForINLJ = 128 - DefSkipDDLWait = false DefSkipUTF8Check = false DefOptAggPushDown = true DefOptInSubqUnfolding = false diff --git a/sessionctx/varsutil/varsutil.go b/sessionctx/varsutil/varsutil.go index a772acbdcceaa..e3ae3f8e7a01f 100644 --- a/sessionctx/varsutil/varsutil.go +++ b/sessionctx/varsutil/varsutil.go @@ -121,8 +121,6 @@ func SetSessionSystemVar(vars *variable.SessionVars, name string, value types.Da vars.SkipConstraintCheck = tidbOptOn(sVal) case variable.TiDBSkipUTF8Check: vars.SkipUTF8Check = tidbOptOn(sVal) - case variable.TiDBSkipDDLWait: - vars.SkipDDLWait = tidbOptOn(sVal) case variable.TiDBOptAggPushDown: vars.AllowAggPushDown = tidbOptOn(sVal) case variable.TiDBOptInSubqUnFolding: diff --git a/sessionctx/varsutil/varsutil_test.go b/sessionctx/varsutil/varsutil_test.go index 41848b67bd919..af8deb5020b4f 100644 --- a/sessionctx/varsutil/varsutil_test.go +++ b/sessionctx/varsutil/varsutil_test.go @@ -103,17 +103,6 @@ func (s *testVarsutilSuite) TestVarsutil(c *C) { c.Assert(err, IsNil) c.Assert(val, Equals, "1") - // Test case for get TiDBSkipDDLWait session variable. - val, err = GetSessionSystemVar(v, variable.TiDBSkipDDLWait) - c.Assert(val, Equals, "0") - c.Assert(v.SkipDDLWait, IsFalse) - SetSessionSystemVar(v, variable.TiDBSkipDDLWait, types.NewStringDatum("0")) - c.Assert(v.SkipDDLWait, IsFalse) - SetSessionSystemVar(v, variable.TiDBSkipDDLWait, types.NewStringDatum("1")) - c.Assert(v.SkipDDLWait, IsTrue) - val, err = GetSessionSystemVar(v, variable.TiDBSkipDDLWait) - c.Assert(val, Equals, "1") - // Test case for time_zone session variable. tests := []struct { input string diff --git a/tidb_test.go b/tidb_test.go index 3b5e3063780e5..21cbaf3c4c74f 100644 --- a/tidb_test.go +++ b/tidb_test.go @@ -396,7 +396,6 @@ func newSession(c *C, store kv.Storage, dbName string) Session { id := atomic.AddUint64(&testConnID, 1) se.SetConnectionID(id) c.Assert(err, IsNil) - se.GetSessionVars().SkipDDLWait = true se.Auth(`root@%`, nil, []byte("012345678901234567890")) mustExecSQL(c, se, "create database if not exists "+dbName) mustExecSQL(c, se, "use "+dbName) diff --git a/util/testkit/testkit.go b/util/testkit/testkit.go index fdef2852eda56..780e57247ee6e 100644 --- a/util/testkit/testkit.go +++ b/util/testkit/testkit.go @@ -93,7 +93,6 @@ func (tk *TestKit) Exec(sql string, args ...interface{}) (ast.RecordSet, error) tk.c.Assert(err, check.IsNil) id := atomic.AddUint64(&connectionID, 1) tk.Se.SetConnectionID(id) - tk.Se.GetSessionVars().SkipDDLWait = true } if len(args) == 0 { var rss []ast.RecordSet From 441c80b50d474d842b8d2867a1911ebe4b503980 Mon Sep 17 00:00:00 2001 From: zimulala Date: Thu, 1 Jun 2017 15:18:02 +0800 Subject: [PATCH 5/6] store/tikv: make sure new store uuid is unique --- store/tikv/kv.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/store/tikv/kv.go b/store/tikv/kv.go index 0683e8fec140b..613498a6566ea 100644 --- a/store/tikv/kv.go +++ b/store/tikv/kv.go @@ -151,7 +151,7 @@ func NewMockTikvStoreWithCluster(cluster *mocktikv.Cluster) (kv.Storage, error) mocktikv.BootstrapWithSingleStore(cluster) mvccStore := mocktikv.NewMvccStore() client := mocktikv.NewRPCClient(cluster, mvccStore) - uuid := fmt.Sprintf("mock-tikv-store-:%v", time.Now().Unix()) + uuid := fmt.Sprintf("mock-tikv-store-%v-%v", time.Now().Unix(), rand.Intn(10)) pdCli := &codecPDClient{mocktikv.NewPDClient(cluster)} return newTikvStore(uuid, pdCli, client, false) } From 140c068b2e281ad8e7c66fd55f89e3849aa438b2 Mon Sep 17 00:00:00 2001 From: zimulala Date: Thu, 1 Jun 2017 16:31:18 +0800 Subject: [PATCH 6/6] store/tikv: tiny update --- store/tikv/kv.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/store/tikv/kv.go b/store/tikv/kv.go index 613498a6566ea..13d66c4ab95a6 100644 --- a/store/tikv/kv.go +++ b/store/tikv/kv.go @@ -151,7 +151,9 @@ func NewMockTikvStoreWithCluster(cluster *mocktikv.Cluster) (kv.Storage, error) mocktikv.BootstrapWithSingleStore(cluster) mvccStore := mocktikv.NewMvccStore() client := mocktikv.NewRPCClient(cluster, mvccStore) - uuid := fmt.Sprintf("mock-tikv-store-%v-%v", time.Now().Unix(), rand.Intn(10)) + // Make sure the uuid is unique. + partID := fmt.Sprintf("%05d", rand.Intn(100000)) + uuid := fmt.Sprintf("mock-tikv-store-%v-%v", time.Now().Unix(), partID) pdCli := &codecPDClient{mocktikv.NewPDClient(cluster)} return newTikvStore(uuid, pdCli, client, false) }