Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: compatibility with staleread #24285

Merged
merged 25 commits into from
May 17, 2021
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions ddl/column_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,18 @@ type testColumnChangeSuite struct {
func (s *testColumnChangeSuite) SetUpSuite(c *C) {
SetWaitTimeWhenErrorOccurred(1 * time.Microsecond)
s.store = testCreateStore(c, "test_column_change")
s.dbInfo = &model.DBInfo{
Name: model.NewCIStr("test_column_change"),
ID: 1,
}
err := kv.RunInNewTxn(context.Background(), s.store, true, func(ctx context.Context, txn kv.Transaction) error {
t := meta.NewMeta(txn)
return errors.Trace(t.CreateDatabase(s.dbInfo))
})
c.Check(err, IsNil)
d := testNewDDLAndStart(
context.Background(),
c,
WithStore(s.store),
WithLease(testLease),
)
defer func() {
err := d.Stop()
c.Assert(err, IsNil)
}()
s.dbInfo = testSchemaInfo(c, d, "test_index_change")
testCreateSchema(c, testNewContext(d), d, s.dbInfo)
}

func (s *testColumnChangeSuite) TearDownSuite(c *C) {
Expand Down
3 changes: 1 addition & 2 deletions ddl/column_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ func (s *testColumnSuite) SetUpSuite(c *C) {

s.dbInfo = testSchemaInfo(c, d, "test_column")
testCreateSchema(c, testNewContext(d), d, s.dbInfo)
err := d.Stop()
c.Assert(err, IsNil)
c.Assert(d.Stop(), IsNil)
}

func (s *testColumnSuite) TearDownSuite(c *C) {
Expand Down
20 changes: 13 additions & 7 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ type ddlCtx struct {
ddlEventCh chan<- *util.Event
lease time.Duration // lease is schema lease.
binlogCli *pumpcli.PumpsClient // binlogCli is used for Binlog.
infoHandle *infoschema.Handle
infoCache *infoschema.InfoCache
statsHandle *handle.Handle
tableLockCkr util.DeadTableLockChecker
etcdCli *clientv3.Client
Expand Down Expand Up @@ -282,6 +282,15 @@ func newDDL(ctx context.Context, options ...Option) *ddl {
deadLockCkr = util.NewDeadTableLockChecker(etcdCli)
}

// TODO: make store and infoCache explicit arguments
// these two should be ensured to exist
if opt.Store == nil {
panic("store should not be nil")
}
if opt.InfoCache == nil {
panic("infoCache should not be nil")
}

ddlCtx := &ddlCtx{
uuid: id,
store: opt.Store,
Expand All @@ -290,7 +299,7 @@ func newDDL(ctx context.Context, options ...Option) *ddl {
ownerManager: manager,
schemaSyncer: syncer,
binlogCli: binloginfo.GetPumpsClient(),
infoHandle: opt.InfoHandle,
infoCache: opt.InfoCache,
tableLockCkr: deadLockCkr,
etcdCli: opt.EtcdCli,
}
Expand Down Expand Up @@ -411,7 +420,7 @@ func (d *ddl) GetLease() time.Duration {
// Please don't use this function, it is used by TestParallelDDLBeforeRunDDLJob to intercept the calling of d.infoHandle.Get(), use d.infoHandle.Get() instead.
// Otherwise, the TestParallelDDLBeforeRunDDLJob will hang up forever.
func (d *ddl) GetInfoSchemaWithInterceptor(ctx sessionctx.Context) infoschema.InfoSchema {
is := d.infoHandle.Get()
is := d.infoCache.GetLatest()

d.mu.RLock()
defer d.mu.RUnlock()
Expand Down Expand Up @@ -649,10 +658,7 @@ func (d *ddl) startCleanDeadTableLock() {
if !d.ownerManager.IsOwner() {
continue
}
if d.infoHandle == nil || !d.infoHandle.IsValid() {
continue
}
deadLockTables, err := d.tableLockCkr.GetDeadLockedTables(d.ctx, d.infoHandle.Get().AllSchemas())
deadLockTables, err := d.tableLockCkr.GetDeadLockedTables(d.ctx, d.infoCache.GetLatest().AllSchemas())
if err != nil {
logutil.BgLogger().Info("[ddl] get dead table lock failed.", zap.Error(err))
continue
Expand Down
30 changes: 15 additions & 15 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2367,7 +2367,7 @@ func (d *ddl) AlterTable(ctx sessionctx.Context, ident ast.Ident, specs []*ast.A
return errors.Trace(err)
}

is := d.infoHandle.Get()
is := d.infoCache.GetLatest()
if is.TableIsView(ident.Schema, ident.Name) || is.TableIsSequence(ident.Schema, ident.Name) {
return ErrWrongObject.GenWithStackByArgs(ident.Schema, ident.Name, "BASE TABLE")
}
Expand Down Expand Up @@ -2898,7 +2898,7 @@ func (d *ddl) AddColumns(ctx sessionctx.Context, ti ast.Ident, specs []*ast.Alte

// AddTablePartitions will add a new partition to the table.
func (d *ddl) AddTablePartitions(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error {
is := d.infoHandle.Get()
is := d.infoCache.GetLatest()
schema, ok := is.SchemaByName(ident.Schema)
if !ok {
return errors.Trace(infoschema.ErrDatabaseNotExists.GenWithStackByArgs(schema))
Expand Down Expand Up @@ -2959,7 +2959,7 @@ func (d *ddl) AddTablePartitions(ctx sessionctx.Context, ident ast.Ident, spec *

// CoalescePartitions coalesce partitions can be used with a table that is partitioned by hash or key to reduce the number of partitions by number.
func (d *ddl) CoalescePartitions(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error {
is := d.infoHandle.Get()
is := d.infoCache.GetLatest()
schema, ok := is.SchemaByName(ident.Schema)
if !ok {
return errors.Trace(infoschema.ErrDatabaseNotExists.GenWithStackByArgs(schema))
Expand Down Expand Up @@ -2991,7 +2991,7 @@ func (d *ddl) CoalescePartitions(ctx sessionctx.Context, ident ast.Ident, spec *
}

func (d *ddl) TruncateTablePartition(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error {
is := d.infoHandle.Get()
is := d.infoCache.GetLatest()
schema, ok := is.SchemaByName(ident.Schema)
if !ok {
return errors.Trace(infoschema.ErrDatabaseNotExists.GenWithStackByArgs(schema))
Expand Down Expand Up @@ -3039,7 +3039,7 @@ func (d *ddl) TruncateTablePartition(ctx sessionctx.Context, ident ast.Ident, sp
}

func (d *ddl) DropTablePartition(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error {
is := d.infoHandle.Get()
is := d.infoCache.GetLatest()
schema, ok := is.SchemaByName(ident.Schema)
if !ok {
return errors.Trace(infoschema.ErrDatabaseNotExists.GenWithStackByArgs(schema))
Expand Down Expand Up @@ -3752,7 +3752,7 @@ func processAndCheckDefaultValueAndColumn(ctx sessionctx.Context, col *table.Col
func (d *ddl) getModifiableColumnJob(ctx sessionctx.Context, ident ast.Ident, originalColName model.CIStr,
spec *ast.AlterTableSpec) (*model.Job, error) {
specNewColumn := spec.NewColumns[0]
is := d.infoHandle.Get()
is := d.infoCache.GetLatest()
schema, ok := is.SchemaByName(ident.Schema)
if !ok {
return nil, errors.Trace(infoschema.ErrDatabaseNotExists)
Expand Down Expand Up @@ -4203,7 +4203,7 @@ func (d *ddl) ModifyColumn(ctx sessionctx.Context, ident ast.Ident, spec *ast.Al

func (d *ddl) AlterColumn(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error {
specNewColumn := spec.NewColumns[0]
is := d.infoHandle.Get()
is := d.infoCache.GetLatest()
schema, ok := is.SchemaByName(ident.Schema)
if !ok {
return infoschema.ErrTableNotExists.GenWithStackByArgs(ident.Schema, ident.Name)
Expand Down Expand Up @@ -4257,7 +4257,7 @@ func (d *ddl) AlterColumn(ctx sessionctx.Context, ident ast.Ident, spec *ast.Alt

// AlterTableComment updates the table comment information.
func (d *ddl) AlterTableComment(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error {
is := d.infoHandle.Get()
is := d.infoCache.GetLatest()
schema, ok := is.SchemaByName(ident.Schema)
if !ok {
return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(ident.Schema)
Expand Down Expand Up @@ -4310,7 +4310,7 @@ func (d *ddl) AlterTableCharsetAndCollate(ctx sessionctx.Context, ident ast.Iden
return ErrUnknownCharacterSet.GenWithStackByArgs(toCharset)
}

is := d.infoHandle.Get()
is := d.infoCache.GetLatest()
schema, ok := is.SchemaByName(ident.Schema)
if !ok {
return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(ident.Schema)
Expand Down Expand Up @@ -4471,7 +4471,7 @@ func (d *ddl) AlterTableDropStatistics(ctx sessionctx.Context, ident ast.Ident,

// UpdateTableReplicaInfo updates the table flash replica infos.
func (d *ddl) UpdateTableReplicaInfo(ctx sessionctx.Context, physicalID int64, available bool) error {
is := d.infoHandle.Get()
is := d.infoCache.GetLatest()
tb, ok := is.TableByID(physicalID)
if !ok {
tb, _, _ = is.FindTableByPartitionID(physicalID)
Expand Down Expand Up @@ -4574,7 +4574,7 @@ func checkAlterTableCharset(tblInfo *model.TableInfo, dbInfo *model.DBInfo, toCh
// In TiDB, indexes are case-insensitive (so index 'a' and 'A" are considered the same index),
// but index names are case-sensitive (we can rename index 'a' to 'A')
func (d *ddl) RenameIndex(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error {
is := d.infoHandle.Get()
is := d.infoCache.GetLatest()
schema, ok := is.SchemaByName(ident.Schema)
if !ok {
return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(ident.Schema)
Expand Down Expand Up @@ -5232,7 +5232,7 @@ func buildFKInfo(fkName model.CIStr, keys []*ast.IndexPartSpecification, refer *
}

func (d *ddl) CreateForeignKey(ctx sessionctx.Context, ti ast.Ident, fkName model.CIStr, keys []*ast.IndexPartSpecification, refer *ast.ReferenceDef) error {
is := d.infoHandle.Get()
is := d.infoCache.GetLatest()
schema, ok := is.SchemaByName(ti.Schema)
if !ok {
return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(ti.Schema)
Expand Down Expand Up @@ -5264,7 +5264,7 @@ func (d *ddl) CreateForeignKey(ctx sessionctx.Context, ti ast.Ident, fkName mode
}

func (d *ddl) DropForeignKey(ctx sessionctx.Context, ti ast.Ident, fkName model.CIStr) error {
is := d.infoHandle.Get()
is := d.infoCache.GetLatest()
schema, ok := is.SchemaByName(ti.Schema)
if !ok {
return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(ti.Schema)
Expand All @@ -5290,7 +5290,7 @@ func (d *ddl) DropForeignKey(ctx sessionctx.Context, ti ast.Ident, fkName model.
}

func (d *ddl) DropIndex(ctx sessionctx.Context, ti ast.Ident, indexName model.CIStr, ifExists bool) error {
is := d.infoHandle.Get()
is := d.infoCache.GetLatest()
schema, ok := is.SchemaByName(ti.Schema)
if !ok {
return errors.Trace(infoschema.ErrDatabaseNotExists)
Expand Down Expand Up @@ -6036,7 +6036,7 @@ func (d *ddl) AlterTableAlterPartition(ctx sessionctx.Context, ident ast.Ident,
return errors.Trace(err)
}

oldBundle := infoschema.GetBundle(d.infoHandle.Get(), []int64{partitionID, meta.ID, schema.ID})
oldBundle := infoschema.GetBundle(d.infoCache.GetLatest(), []int64{partitionID, meta.ID, schema.ID})

oldBundle.ID = placement.GroupID(partitionID)

Expand Down
5 changes: 5 additions & 0 deletions ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/meta/autoid"
Expand Down Expand Up @@ -86,6 +87,10 @@ func TestT(t *testing.T) {
}

func testNewDDLAndStart(ctx context.Context, c *C, options ...Option) *ddl {
// init infoCache and a stub infoSchema
ic := infoschema.NewCache(2)
ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 0))
options = append(options, WithInfoCache(ic))
d := newDDL(ctx, options...)
err := d.Start(nil)
c.Assert(err, IsNil)
Expand Down
10 changes: 5 additions & 5 deletions ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func (s *testDDLSuite) TestTableError(c *C) {
// Schema ID is wrong, so dropping table is failed.
doDDLJobErr(c, -1, 1, model.ActionDropTable, nil, ctx, d)
// Table ID is wrong, so dropping table is failed.
dbInfo := testSchemaInfo(c, d, "test")
dbInfo := testSchemaInfo(c, d, "test_ddl")
testCreateSchema(c, testNewContext(d), d, dbInfo)
job := doDDLJobErr(c, dbInfo.ID, -1, model.ActionDropTable, nil, ctx, d)

Expand Down Expand Up @@ -295,7 +295,7 @@ func (s *testDDLSuite) TestViewError(c *C) {
c.Assert(err, IsNil)
}()
ctx := testNewContext(d)
dbInfo := testSchemaInfo(c, d, "test")
dbInfo := testSchemaInfo(c, d, "test_ddl")
testCreateSchema(c, testNewContext(d), d, dbInfo)

// Table ID or schema ID is wrong, so getting table is failed.
Expand Down Expand Up @@ -363,7 +363,7 @@ func (s *testDDLSuite) TestForeignKeyError(c *C) {
doDDLJobErr(c, -1, 1, model.ActionAddForeignKey, nil, ctx, d)
doDDLJobErr(c, -1, 1, model.ActionDropForeignKey, nil, ctx, d)

dbInfo := testSchemaInfo(c, d, "test")
dbInfo := testSchemaInfo(c, d, "test_ddl")
tblInfo := testTableInfo(c, d, "t", 3)
testCreateSchema(c, ctx, d, dbInfo)
testCreateTable(c, ctx, d, dbInfo, tblInfo)
Expand Down Expand Up @@ -393,7 +393,7 @@ func (s *testDDLSuite) TestIndexError(c *C) {
doDDLJobErr(c, -1, 1, model.ActionAddIndex, nil, ctx, d)
doDDLJobErr(c, -1, 1, model.ActionDropIndex, nil, ctx, d)

dbInfo := testSchemaInfo(c, d, "test")
dbInfo := testSchemaInfo(c, d, "test_ddl")
tblInfo := testTableInfo(c, d, "t", 3)
testCreateSchema(c, ctx, d, dbInfo)
testCreateTable(c, ctx, d, dbInfo, tblInfo)
Expand Down Expand Up @@ -435,7 +435,7 @@ func (s *testDDLSuite) TestColumnError(c *C) {
}()
ctx := testNewContext(d)

dbInfo := testSchemaInfo(c, d, "test")
dbInfo := testSchemaInfo(c, d, "test_ddl")
tblInfo := testTableInfo(c, d, "t", 3)
testCreateSchema(c, ctx, d, dbInfo)
testCreateTable(c, ctx, d, dbInfo, tblInfo)
Expand Down
22 changes: 12 additions & 10 deletions ddl/index_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
Expand All @@ -38,15 +37,18 @@ type testIndexChangeSuite struct {

func (s *testIndexChangeSuite) SetUpSuite(c *C) {
s.store = testCreateStore(c, "test_index_change")
s.dbInfo = &model.DBInfo{
Name: model.NewCIStr("test_index_change"),
ID: 1,
}
err := kv.RunInNewTxn(context.Background(), s.store, true, func(ctx context.Context, txn kv.Transaction) error {
t := meta.NewMeta(txn)
return errors.Trace(t.CreateDatabase(s.dbInfo))
})
c.Check(err, IsNil, Commentf("err %v", errors.ErrorStack(err)))
d := testNewDDLAndStart(
context.Background(),
c,
WithStore(s.store),
WithLease(testLease),
)
defer func() {
err := d.Stop()
c.Assert(err, IsNil)
}()
s.dbInfo = testSchemaInfo(c, d, "test_index_change")
testCreateSchema(c, testNewContext(d), d, s.dbInfo)
}

func (s *testIndexChangeSuite) TearDownSuite(c *C) {
Expand Down
16 changes: 8 additions & 8 deletions ddl/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ type Option func(*Options)

// Options represents all the options of the DDL module needs
type Options struct {
EtcdCli *clientv3.Client
Store kv.Storage
InfoHandle *infoschema.Handle
Hook Callback
Lease time.Duration
EtcdCli *clientv3.Client
Store kv.Storage
InfoCache *infoschema.InfoCache
Hook Callback
Lease time.Duration
}

// WithEtcdClient specifies the `clientv3.Client` of DDL used to request the etcd service
Expand All @@ -47,10 +47,10 @@ func WithStore(store kv.Storage) Option {
}
}

// WithInfoHandle specifies the `infoschema.Handle`
func WithInfoHandle(ih *infoschema.Handle) Option {
// WithInfoCache specifies the `infoschema.InfoCache`
func WithInfoCache(ic *infoschema.InfoCache) Option {
return func(options *Options) {
options.InfoHandle = ih
options.InfoCache = ic
}
}

Expand Down
6 changes: 3 additions & 3 deletions ddl/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ func (s *ddlOptionsSuite) TestOptions(c *C) {
callback := &ddl.BaseCallback{}
lease := time.Second * 3
store := &mock.Store{}
infoHandle := infoschema.NewHandle(store)
infoHandle := infoschema.NewCache(16)

options := []ddl.Option{
ddl.WithEtcdClient(client),
ddl.WithHook(callback),
ddl.WithLease(lease),
ddl.WithStore(store),
ddl.WithInfoHandle(infoHandle),
ddl.WithInfoCache(infoHandle),
}

opt := &ddl.Options{}
Expand All @@ -52,5 +52,5 @@ func (s *ddlOptionsSuite) TestOptions(c *C) {
c.Assert(opt.Hook, Equals, callback)
c.Assert(opt.Lease, Equals, lease)
c.Assert(opt.Store, Equals, store)
c.Assert(opt.InfoHandle, Equals, infoHandle)
c.Assert(opt.InfoCache, Equals, infoHandle)
}
Loading