Skip to content

Commit

Permalink
Merge pull request #514 from pingcap/siddontang/schema-bootstrap
Browse files Browse the repository at this point in the history
reset lease time after bootstrap
  • Loading branch information
siddontang committed Nov 5, 2015
2 parents f4d5a5b + b029a5f commit 0556faf
Show file tree
Hide file tree
Showing 8 changed files with 149 additions and 18 deletions.
22 changes: 22 additions & 0 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"github.com/juju/errors"
"github.com/ngaut/log"
"github.com/pingcap/tidb/column"
"github.com/pingcap/tidb/context"
"github.com/pingcap/tidb/infoschema"
Expand Down Expand Up @@ -57,9 +58,14 @@ type DDL interface {
DropIndex(ctx context.Context, tableIdent table.Ident, indexName model.CIStr) error
GetInformationSchema() infoschema.InfoSchema
AlterTable(ctx context.Context, tableIdent table.Ident, spec []*AlterSpecification) error
// SetLease will reset the lease time for online DDL change, it is a very dangerous function and you must guarantee that
// all servers have the same lease time.
SetLease(lease time.Duration)
}

type ddl struct {
m sync.Mutex

infoHandle *infoschema.Handle
onDDLChange OnDDLChange
store kv.Storage
Expand Down Expand Up @@ -142,6 +148,22 @@ func (d *ddl) isClosed() bool {
}
}

func (d *ddl) SetLease(lease time.Duration) {
d.m.Lock()
defer d.m.Unlock()

if lease == d.lease {
return
}

log.Warnf("change schema lease %s -> %s", d.lease, lease)
d.lease = lease

// close the running worker and start again
d.close()
d.start()
}

func (d *ddl) GetInformationSchema() infoschema.InfoSchema {
return d.infoHandle.Get()
}
Expand Down
2 changes: 2 additions & 0 deletions ddl/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,6 @@ func (s *testDDLSuite) TestCheckOnwer(c *C) {
time.Sleep(6 * lease)

testCheckOwner(c, d2, true)

d2.SetLease(1 * time.Second)
}
41 changes: 33 additions & 8 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type Domain struct {
store kv.Storage
infoHandle *infoschema.Handle
ddl ddl.DDL
lease time.Duration
leaseCh chan time.Duration
}

func (do *Domain) loadInfoSchema(txn kv.Transaction) (err error) {
Expand Down Expand Up @@ -93,6 +93,14 @@ func (do *Domain) Store() kv.Storage {
return do.store
}

// SetLease will reset the lease time for online DDL change.
func (do *Domain) SetLease(lease time.Duration) {
do.leaseCh <- lease

// let ddl to reset lease too.
do.ddl.SetLease(lease)
}

func (do *Domain) onDDLChange(err error) error {
if err != nil {
return err
Expand All @@ -107,12 +115,15 @@ func (do *Domain) reload() error {
return errors.Trace(err)
}

func (do *Domain) loadSchemaInLoop() {
if do.lease <= 0 {
return
// check schema every 300 seconds default.
const defaultLoadTime = 300 * time.Second

func (do *Domain) loadSchemaInLoop(lease time.Duration) {
if lease <= 0 {
lease = defaultLoadTime
}

ticker := time.NewTicker(do.lease)
ticker := time.NewTicker(lease)
defer ticker.Stop()

for {
Expand All @@ -121,15 +132,29 @@ func (do *Domain) loadSchemaInLoop() {
if err := do.reload(); err != nil {
log.Fatalf("reload schema err %v", err)
}
case newLease := <-do.leaseCh:
if newLease <= 0 {
newLease = defaultLoadTime
}

if lease == newLease {
// nothing to do
continue
}

lease = newLease
// reset ticker too.
ticker.Stop()
ticker = time.NewTicker(lease)
}
}
}

// NewDomain creates a new domain.
func NewDomain(store kv.Storage, lease time.Duration) (d *Domain, err error) {
d = &Domain{
store: store,
lease: lease,
store: store,
leaseCh: make(chan time.Duration, 1),
}

d.infoHandle = infoschema.NewHandle(d.store)
Expand All @@ -139,7 +164,7 @@ func NewDomain(store kv.Storage, lease time.Duration) (d *Domain, err error) {
log.Fatalf("load schema err %v", err)
}

go d.loadSchemaInLoop()
go d.loadSchemaInLoop(lease)

return d, nil
}
3 changes: 3 additions & 0 deletions domain/domain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package domain

import (
"testing"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/tidb/model"
Expand Down Expand Up @@ -51,4 +52,6 @@ func (*testSuite) TestT(c *C) {
c.Assert(is, NotNil)
dom, err = NewDomain(store, 0)
c.Assert(err, IsNil)

dom.SetLease(10 * time.Second)
}
17 changes: 17 additions & 0 deletions meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ var (
mDBPrefix = "mDB"
mTablePrefix = "mTable"
mTableIDPrefix = "mTID"
mBootstrapKey = []byte("mBootstrapKey")
)

var (
Expand Down Expand Up @@ -515,3 +516,19 @@ func (m *Meta) GetHistoryDDLJob(id int64) (*model.Job, error) {
err = job.Decode(value)
return job, errors.Trace(err)
}

// IsBootstrapped returns whether we have already run bootstrap or not.
// return true means we don't need doing any other bootstrap.
func (m *Meta) IsBootstrapped() (bool, error) {
value, err := m.txn.GetInt64(mBootstrapKey)
if err != nil {
return false, errors.Trace(err)
}
return value == 1, nil
}

// FinishBootstrap finishes bootstrap.
func (m *Meta) FinishBootstrap() error {
err := m.txn.Set(mBootstrapKey, []byte("1"))
return errors.Trace(err)
}
11 changes: 11 additions & 0 deletions meta/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,17 @@ func (s *testSuite) TestMeta(c *C) {
c.Assert(err, IsNil)
c.Assert(dbs, HasLen, 0)

bootstrapped, err := t.IsBootstrapped()
c.Assert(err, IsNil)
c.Assert(bootstrapped, IsFalse)

err = t.FinishBootstrap()
c.Assert(err, IsNil)

bootstrapped, err = t.IsBootstrapped()
c.Assert(err, IsNil)
c.Assert(bootstrapped, IsTrue)

err = txn.Commit()
c.Assert(err, IsNil)
}
Expand Down
57 changes: 55 additions & 2 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/context"
"github.com/pingcap/tidb/field"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/privilege"
"github.com/pingcap/tidb/privilege/privileges"
Expand All @@ -42,6 +43,7 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/stmt"
"github.com/pingcap/tidb/stmt/stmts"
"github.com/pingcap/tidb/store/localstore"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/errors2"
"github.com/pingcap/tidb/util/sqlexec"
Expand Down Expand Up @@ -548,15 +550,66 @@ func CreateSession(store kv.Storage) (Session, error) {
sessionMu.Lock()
defer sessionMu.Unlock()

_, ok := storeBootstrapped[store.UUID()]
ok := isBoostrapped(store)
if !ok {
s.initing = true
bootstrap(s)
s.initing = false
storeBootstrapped[store.UUID()] = true
finishBoostrap(store)
}

// if store is not local store, we will reset lease time after bootstrap
if !localstore.IsLocalStore(store) {
lease := SchemaLease
if lease < minSchemaLease {
lease = minSchemaLease
}

sessionctx.GetDomain(s).SetLease(time.Duration(lease) * time.Second)
}

// TODO: Add auth here
privChecker := &privileges.UserPrivileges{}
privilege.BindPrivilegeChecker(s, privChecker)
return s, nil
}

func isBoostrapped(store kv.Storage) bool {
// check in memory
_, ok := storeBootstrapped[store.UUID()]
if ok {
return true
}

// check in kv store
err := kv.RunInNewTxn(store, false, func(txn kv.Transaction) error {
var err error
t := meta.NewMeta(txn)
ok, err = t.IsBootstrapped()
return errors.Trace(err)
})

if err != nil {
log.Fatalf("check bootstrapped err %v", err)
}

if ok {
// here mean memory is not ok, but other server has already finished it
storeBootstrapped[store.UUID()] = true
}

return ok
}

func finishBoostrap(store kv.Storage) {
storeBootstrapped[store.UUID()] = true

err := kv.RunInNewTxn(store, true, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
err := t.FinishBootstrap()
return errors.Trace(err)
})
if err != nil {
log.Fatalf("finish bootstrap err %v", err)
}
}
14 changes: 6 additions & 8 deletions tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,16 @@ func (dm *domainMap) Get(store kv.Storage) (d *domain.Domain, err error) {
return
}

if SchemaLease <= minSchemaLease {
SchemaLease = minSchemaLease
}

lease := SchemaLease
lease := time.Duration(0)
// if storage is local storage, we may in test environment or
// run server in single machine mode, so we don't need wait
// 2 * lease time for DDL operation.
if localstore.IsLocalStore(store) {
lease = 0
// if not local storage, first we will use a little lease time to
// bootstrap quickly, after boostrapped, we will reset the lease time.
if !localstore.IsLocalStore(store) {
lease = 100 * time.Second
}
d, err = domain.NewDomain(store, time.Duration(lease)*time.Second)
d, err = domain.NewDomain(store, lease)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down

0 comments on commit 0556faf

Please sign in to comment.