Skip to content

Commit

Permalink
*: compatibility with staleread (#24285)
Browse files Browse the repository at this point in the history
  • Loading branch information
xhebox authored May 17, 2021
1 parent ae36fbd commit 2ca98e3
Show file tree
Hide file tree
Showing 32 changed files with 535 additions and 455 deletions.
21 changes: 12 additions & 9 deletions ddl/column_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,18 @@ type testColumnChangeSuite struct {
func (s *testColumnChangeSuite) SetUpSuite(c *C) {
SetWaitTimeWhenErrorOccurred(1 * time.Microsecond)
s.store = testCreateStore(c, "test_column_change")
s.dbInfo = &model.DBInfo{
Name: model.NewCIStr("test_column_change"),
ID: 1,
}
err := kv.RunInNewTxn(context.Background(), s.store, true, func(ctx context.Context, txn kv.Transaction) error {
t := meta.NewMeta(txn)
return errors.Trace(t.CreateDatabase(s.dbInfo))
})
c.Check(err, IsNil)
d := testNewDDLAndStart(
context.Background(),
c,
WithStore(s.store),
WithLease(testLease),
)
defer func() {
err := d.Stop()
c.Assert(err, IsNil)
}()
s.dbInfo = testSchemaInfo(c, d, "test_index_change")
testCreateSchema(c, testNewContext(d), d, s.dbInfo)
}

func (s *testColumnChangeSuite) TearDownSuite(c *C) {
Expand Down
3 changes: 1 addition & 2 deletions ddl/column_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ func (s *testColumnSuite) SetUpSuite(c *C) {

s.dbInfo = testSchemaInfo(c, d, "test_column")
testCreateSchema(c, testNewContext(d), d, s.dbInfo)
err := d.Stop()
c.Assert(err, IsNil)
c.Assert(d.Stop(), IsNil)
}

func (s *testColumnSuite) TearDownSuite(c *C) {
Expand Down
20 changes: 13 additions & 7 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ type ddlCtx struct {
ddlEventCh chan<- *util.Event
lease time.Duration // lease is schema lease.
binlogCli *pumpcli.PumpsClient // binlogCli is used for Binlog.
infoHandle *infoschema.Handle
infoCache *infoschema.InfoCache
statsHandle *handle.Handle
tableLockCkr util.DeadTableLockChecker
etcdCli *clientv3.Client
Expand Down Expand Up @@ -282,6 +282,15 @@ func newDDL(ctx context.Context, options ...Option) *ddl {
deadLockCkr = util.NewDeadTableLockChecker(etcdCli)
}

// TODO: make store and infoCache explicit arguments
// these two should be ensured to exist
if opt.Store == nil {
panic("store should not be nil")
}
if opt.InfoCache == nil {
panic("infoCache should not be nil")
}

ddlCtx := &ddlCtx{
uuid: id,
store: opt.Store,
Expand All @@ -290,7 +299,7 @@ func newDDL(ctx context.Context, options ...Option) *ddl {
ownerManager: manager,
schemaSyncer: syncer,
binlogCli: binloginfo.GetPumpsClient(),
infoHandle: opt.InfoHandle,
infoCache: opt.InfoCache,
tableLockCkr: deadLockCkr,
etcdCli: opt.EtcdCli,
}
Expand Down Expand Up @@ -411,7 +420,7 @@ func (d *ddl) GetLease() time.Duration {
// Please don't use this function, it is used by TestParallelDDLBeforeRunDDLJob to intercept the calling of d.infoHandle.Get(), use d.infoHandle.Get() instead.
// Otherwise, the TestParallelDDLBeforeRunDDLJob will hang up forever.
func (d *ddl) GetInfoSchemaWithInterceptor(ctx sessionctx.Context) infoschema.InfoSchema {
is := d.infoHandle.Get()
is := d.infoCache.GetLatest()

d.mu.RLock()
defer d.mu.RUnlock()
Expand Down Expand Up @@ -649,10 +658,7 @@ func (d *ddl) startCleanDeadTableLock() {
if !d.ownerManager.IsOwner() {
continue
}
if d.infoHandle == nil || !d.infoHandle.IsValid() {
continue
}
deadLockTables, err := d.tableLockCkr.GetDeadLockedTables(d.ctx, d.infoHandle.Get().AllSchemas())
deadLockTables, err := d.tableLockCkr.GetDeadLockedTables(d.ctx, d.infoCache.GetLatest().AllSchemas())
if err != nil {
logutil.BgLogger().Info("[ddl] get dead table lock failed.", zap.Error(err))
continue
Expand Down
30 changes: 15 additions & 15 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2367,7 +2367,7 @@ func (d *ddl) AlterTable(ctx sessionctx.Context, ident ast.Ident, specs []*ast.A
return errors.Trace(err)
}

is := d.infoHandle.Get()
is := d.infoCache.GetLatest()
if is.TableIsView(ident.Schema, ident.Name) || is.TableIsSequence(ident.Schema, ident.Name) {
return ErrWrongObject.GenWithStackByArgs(ident.Schema, ident.Name, "BASE TABLE")
}
Expand Down Expand Up @@ -2898,7 +2898,7 @@ func (d *ddl) AddColumns(ctx sessionctx.Context, ti ast.Ident, specs []*ast.Alte

// AddTablePartitions will add a new partition to the table.
func (d *ddl) AddTablePartitions(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error {
is := d.infoHandle.Get()
is := d.infoCache.GetLatest()
schema, ok := is.SchemaByName(ident.Schema)
if !ok {
return errors.Trace(infoschema.ErrDatabaseNotExists.GenWithStackByArgs(schema))
Expand Down Expand Up @@ -2959,7 +2959,7 @@ func (d *ddl) AddTablePartitions(ctx sessionctx.Context, ident ast.Ident, spec *

// CoalescePartitions coalesce partitions can be used with a table that is partitioned by hash or key to reduce the number of partitions by number.
func (d *ddl) CoalescePartitions(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error {
is := d.infoHandle.Get()
is := d.infoCache.GetLatest()
schema, ok := is.SchemaByName(ident.Schema)
if !ok {
return errors.Trace(infoschema.ErrDatabaseNotExists.GenWithStackByArgs(schema))
Expand Down Expand Up @@ -2991,7 +2991,7 @@ func (d *ddl) CoalescePartitions(ctx sessionctx.Context, ident ast.Ident, spec *
}

func (d *ddl) TruncateTablePartition(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error {
is := d.infoHandle.Get()
is := d.infoCache.GetLatest()
schema, ok := is.SchemaByName(ident.Schema)
if !ok {
return errors.Trace(infoschema.ErrDatabaseNotExists.GenWithStackByArgs(schema))
Expand Down Expand Up @@ -3039,7 +3039,7 @@ func (d *ddl) TruncateTablePartition(ctx sessionctx.Context, ident ast.Ident, sp
}

func (d *ddl) DropTablePartition(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error {
is := d.infoHandle.Get()
is := d.infoCache.GetLatest()
schema, ok := is.SchemaByName(ident.Schema)
if !ok {
return errors.Trace(infoschema.ErrDatabaseNotExists.GenWithStackByArgs(schema))
Expand Down Expand Up @@ -3752,7 +3752,7 @@ func processAndCheckDefaultValueAndColumn(ctx sessionctx.Context, col *table.Col
func (d *ddl) getModifiableColumnJob(ctx sessionctx.Context, ident ast.Ident, originalColName model.CIStr,
spec *ast.AlterTableSpec) (*model.Job, error) {
specNewColumn := spec.NewColumns[0]
is := d.infoHandle.Get()
is := d.infoCache.GetLatest()
schema, ok := is.SchemaByName(ident.Schema)
if !ok {
return nil, errors.Trace(infoschema.ErrDatabaseNotExists)
Expand Down Expand Up @@ -4203,7 +4203,7 @@ func (d *ddl) ModifyColumn(ctx sessionctx.Context, ident ast.Ident, spec *ast.Al

func (d *ddl) AlterColumn(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error {
specNewColumn := spec.NewColumns[0]
is := d.infoHandle.Get()
is := d.infoCache.GetLatest()
schema, ok := is.SchemaByName(ident.Schema)
if !ok {
return infoschema.ErrTableNotExists.GenWithStackByArgs(ident.Schema, ident.Name)
Expand Down Expand Up @@ -4257,7 +4257,7 @@ func (d *ddl) AlterColumn(ctx sessionctx.Context, ident ast.Ident, spec *ast.Alt

// AlterTableComment updates the table comment information.
func (d *ddl) AlterTableComment(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error {
is := d.infoHandle.Get()
is := d.infoCache.GetLatest()
schema, ok := is.SchemaByName(ident.Schema)
if !ok {
return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(ident.Schema)
Expand Down Expand Up @@ -4310,7 +4310,7 @@ func (d *ddl) AlterTableCharsetAndCollate(ctx sessionctx.Context, ident ast.Iden
return ErrUnknownCharacterSet.GenWithStackByArgs(toCharset)
}

is := d.infoHandle.Get()
is := d.infoCache.GetLatest()
schema, ok := is.SchemaByName(ident.Schema)
if !ok {
return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(ident.Schema)
Expand Down Expand Up @@ -4471,7 +4471,7 @@ func (d *ddl) AlterTableDropStatistics(ctx sessionctx.Context, ident ast.Ident,

// UpdateTableReplicaInfo updates the table flash replica infos.
func (d *ddl) UpdateTableReplicaInfo(ctx sessionctx.Context, physicalID int64, available bool) error {
is := d.infoHandle.Get()
is := d.infoCache.GetLatest()
tb, ok := is.TableByID(physicalID)
if !ok {
tb, _, _ = is.FindTableByPartitionID(physicalID)
Expand Down Expand Up @@ -4574,7 +4574,7 @@ func checkAlterTableCharset(tblInfo *model.TableInfo, dbInfo *model.DBInfo, toCh
// In TiDB, indexes are case-insensitive (so index 'a' and 'A" are considered the same index),
// but index names are case-sensitive (we can rename index 'a' to 'A')
func (d *ddl) RenameIndex(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error {
is := d.infoHandle.Get()
is := d.infoCache.GetLatest()
schema, ok := is.SchemaByName(ident.Schema)
if !ok {
return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(ident.Schema)
Expand Down Expand Up @@ -5232,7 +5232,7 @@ func buildFKInfo(fkName model.CIStr, keys []*ast.IndexPartSpecification, refer *
}

func (d *ddl) CreateForeignKey(ctx sessionctx.Context, ti ast.Ident, fkName model.CIStr, keys []*ast.IndexPartSpecification, refer *ast.ReferenceDef) error {
is := d.infoHandle.Get()
is := d.infoCache.GetLatest()
schema, ok := is.SchemaByName(ti.Schema)
if !ok {
return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(ti.Schema)
Expand Down Expand Up @@ -5264,7 +5264,7 @@ func (d *ddl) CreateForeignKey(ctx sessionctx.Context, ti ast.Ident, fkName mode
}

func (d *ddl) DropForeignKey(ctx sessionctx.Context, ti ast.Ident, fkName model.CIStr) error {
is := d.infoHandle.Get()
is := d.infoCache.GetLatest()
schema, ok := is.SchemaByName(ti.Schema)
if !ok {
return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(ti.Schema)
Expand All @@ -5290,7 +5290,7 @@ func (d *ddl) DropForeignKey(ctx sessionctx.Context, ti ast.Ident, fkName model.
}

func (d *ddl) DropIndex(ctx sessionctx.Context, ti ast.Ident, indexName model.CIStr, ifExists bool) error {
is := d.infoHandle.Get()
is := d.infoCache.GetLatest()
schema, ok := is.SchemaByName(ti.Schema)
if !ok {
return errors.Trace(infoschema.ErrDatabaseNotExists)
Expand Down Expand Up @@ -6036,7 +6036,7 @@ func (d *ddl) AlterTableAlterPartition(ctx sessionctx.Context, ident ast.Ident,
return errors.Trace(err)
}

oldBundle := infoschema.GetBundle(d.infoHandle.Get(), []int64{partitionID, meta.ID, schema.ID})
oldBundle := infoschema.GetBundle(d.infoCache.GetLatest(), []int64{partitionID, meta.ID, schema.ID})

oldBundle.ID = placement.GroupID(partitionID)

Expand Down
5 changes: 5 additions & 0 deletions ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/meta/autoid"
Expand Down Expand Up @@ -86,6 +87,10 @@ func TestT(t *testing.T) {
}

func testNewDDLAndStart(ctx context.Context, c *C, options ...Option) *ddl {
// init infoCache and a stub infoSchema
ic := infoschema.NewCache(2)
ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 0))
options = append(options, WithInfoCache(ic))
d := newDDL(ctx, options...)
err := d.Start(nil)
c.Assert(err, IsNil)
Expand Down
10 changes: 5 additions & 5 deletions ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func (s *testDDLSuite) TestTableError(c *C) {
// Schema ID is wrong, so dropping table is failed.
doDDLJobErr(c, -1, 1, model.ActionDropTable, nil, ctx, d)
// Table ID is wrong, so dropping table is failed.
dbInfo := testSchemaInfo(c, d, "test")
dbInfo := testSchemaInfo(c, d, "test_ddl")
testCreateSchema(c, testNewContext(d), d, dbInfo)
job := doDDLJobErr(c, dbInfo.ID, -1, model.ActionDropTable, nil, ctx, d)

Expand Down Expand Up @@ -295,7 +295,7 @@ func (s *testDDLSuite) TestViewError(c *C) {
c.Assert(err, IsNil)
}()
ctx := testNewContext(d)
dbInfo := testSchemaInfo(c, d, "test")
dbInfo := testSchemaInfo(c, d, "test_ddl")
testCreateSchema(c, testNewContext(d), d, dbInfo)

// Table ID or schema ID is wrong, so getting table is failed.
Expand Down Expand Up @@ -363,7 +363,7 @@ func (s *testDDLSuite) TestForeignKeyError(c *C) {
doDDLJobErr(c, -1, 1, model.ActionAddForeignKey, nil, ctx, d)
doDDLJobErr(c, -1, 1, model.ActionDropForeignKey, nil, ctx, d)

dbInfo := testSchemaInfo(c, d, "test")
dbInfo := testSchemaInfo(c, d, "test_ddl")
tblInfo := testTableInfo(c, d, "t", 3)
testCreateSchema(c, ctx, d, dbInfo)
testCreateTable(c, ctx, d, dbInfo, tblInfo)
Expand Down Expand Up @@ -393,7 +393,7 @@ func (s *testDDLSuite) TestIndexError(c *C) {
doDDLJobErr(c, -1, 1, model.ActionAddIndex, nil, ctx, d)
doDDLJobErr(c, -1, 1, model.ActionDropIndex, nil, ctx, d)

dbInfo := testSchemaInfo(c, d, "test")
dbInfo := testSchemaInfo(c, d, "test_ddl")
tblInfo := testTableInfo(c, d, "t", 3)
testCreateSchema(c, ctx, d, dbInfo)
testCreateTable(c, ctx, d, dbInfo, tblInfo)
Expand Down Expand Up @@ -435,7 +435,7 @@ func (s *testDDLSuite) TestColumnError(c *C) {
}()
ctx := testNewContext(d)

dbInfo := testSchemaInfo(c, d, "test")
dbInfo := testSchemaInfo(c, d, "test_ddl")
tblInfo := testTableInfo(c, d, "t", 3)
testCreateSchema(c, ctx, d, dbInfo)
testCreateTable(c, ctx, d, dbInfo, tblInfo)
Expand Down
22 changes: 12 additions & 10 deletions ddl/index_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
Expand All @@ -38,15 +37,18 @@ type testIndexChangeSuite struct {

func (s *testIndexChangeSuite) SetUpSuite(c *C) {
s.store = testCreateStore(c, "test_index_change")
s.dbInfo = &model.DBInfo{
Name: model.NewCIStr("test_index_change"),
ID: 1,
}
err := kv.RunInNewTxn(context.Background(), s.store, true, func(ctx context.Context, txn kv.Transaction) error {
t := meta.NewMeta(txn)
return errors.Trace(t.CreateDatabase(s.dbInfo))
})
c.Check(err, IsNil, Commentf("err %v", errors.ErrorStack(err)))
d := testNewDDLAndStart(
context.Background(),
c,
WithStore(s.store),
WithLease(testLease),
)
defer func() {
err := d.Stop()
c.Assert(err, IsNil)
}()
s.dbInfo = testSchemaInfo(c, d, "test_index_change")
testCreateSchema(c, testNewContext(d), d, s.dbInfo)
}

func (s *testIndexChangeSuite) TearDownSuite(c *C) {
Expand Down
16 changes: 8 additions & 8 deletions ddl/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ type Option func(*Options)

// Options represents all the options of the DDL module needs
type Options struct {
EtcdCli *clientv3.Client
Store kv.Storage
InfoHandle *infoschema.Handle
Hook Callback
Lease time.Duration
EtcdCli *clientv3.Client
Store kv.Storage
InfoCache *infoschema.InfoCache
Hook Callback
Lease time.Duration
}

// WithEtcdClient specifies the `clientv3.Client` of DDL used to request the etcd service
Expand All @@ -47,10 +47,10 @@ func WithStore(store kv.Storage) Option {
}
}

// WithInfoHandle specifies the `infoschema.Handle`
func WithInfoHandle(ih *infoschema.Handle) Option {
// WithInfoCache specifies the `infoschema.InfoCache`
func WithInfoCache(ic *infoschema.InfoCache) Option {
return func(options *Options) {
options.InfoHandle = ih
options.InfoCache = ic
}
}

Expand Down
6 changes: 3 additions & 3 deletions ddl/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ func (s *ddlOptionsSuite) TestOptions(c *C) {
callback := &ddl.BaseCallback{}
lease := time.Second * 3
store := &mock.Store{}
infoHandle := infoschema.NewHandle(store)
infoHandle := infoschema.NewCache(16)

options := []ddl.Option{
ddl.WithEtcdClient(client),
ddl.WithHook(callback),
ddl.WithLease(lease),
ddl.WithStore(store),
ddl.WithInfoHandle(infoHandle),
ddl.WithInfoCache(infoHandle),
}

opt := &ddl.Options{}
Expand All @@ -52,5 +52,5 @@ func (s *ddlOptionsSuite) TestOptions(c *C) {
c.Assert(opt.Hook, Equals, callback)
c.Assert(opt.Lease, Equals, lease)
c.Assert(opt.Store, Equals, store)
c.Assert(opt.InfoHandle, Equals, infoHandle)
c.Assert(opt.InfoCache, Equals, infoHandle)
}
Loading

0 comments on commit 2ca98e3

Please sign in to comment.