diff --git a/ddl/column_change_test.go b/ddl/column_change_test.go index 94e8787a2bdc4..6bd5a94f7235e 100644 --- a/ddl/column_change_test.go +++ b/ddl/column_change_test.go @@ -47,15 +47,18 @@ type testColumnChangeSuite struct { func (s *testColumnChangeSuite) SetUpSuite(c *C) { SetWaitTimeWhenErrorOccurred(1 * time.Microsecond) s.store = testCreateStore(c, "test_column_change") - s.dbInfo = &model.DBInfo{ - Name: model.NewCIStr("test_column_change"), - ID: 1, - } - err := kv.RunInNewTxn(context.Background(), s.store, true, func(ctx context.Context, txn kv.Transaction) error { - t := meta.NewMeta(txn) - return errors.Trace(t.CreateDatabase(s.dbInfo)) - }) - c.Check(err, IsNil) + d := testNewDDLAndStart( + context.Background(), + c, + WithStore(s.store), + WithLease(testLease), + ) + defer func() { + err := d.Stop() + c.Assert(err, IsNil) + }() + s.dbInfo = testSchemaInfo(c, d, "test_index_change") + testCreateSchema(c, testNewContext(d), d, s.dbInfo) } func (s *testColumnChangeSuite) TearDownSuite(c *C) { diff --git a/ddl/column_test.go b/ddl/column_test.go index 862fb4aa04c59..f3eaa26d22385 100644 --- a/ddl/column_test.go +++ b/ddl/column_test.go @@ -54,8 +54,7 @@ func (s *testColumnSuite) SetUpSuite(c *C) { s.dbInfo = testSchemaInfo(c, d, "test_column") testCreateSchema(c, testNewContext(d), d, s.dbInfo) - err := d.Stop() - c.Assert(err, IsNil) + c.Assert(d.Stop(), IsNil) } func (s *testColumnSuite) TearDownSuite(c *C) { diff --git a/ddl/ddl.go b/ddl/ddl.go index 6f20fe25ccc07..9eb05b86741ed 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -202,7 +202,7 @@ type ddlCtx struct { ddlEventCh chan<- *util.Event lease time.Duration // lease is schema lease. binlogCli *pumpcli.PumpsClient // binlogCli is used for Binlog. - infoHandle *infoschema.Handle + infoCache *infoschema.InfoCache statsHandle *handle.Handle tableLockCkr util.DeadTableLockChecker etcdCli *clientv3.Client @@ -282,6 +282,15 @@ func newDDL(ctx context.Context, options ...Option) *ddl { deadLockCkr = util.NewDeadTableLockChecker(etcdCli) } + // TODO: make store and infoCache explicit arguments + // these two should be ensured to exist + if opt.Store == nil { + panic("store should not be nil") + } + if opt.InfoCache == nil { + panic("infoCache should not be nil") + } + ddlCtx := &ddlCtx{ uuid: id, store: opt.Store, @@ -290,7 +299,7 @@ func newDDL(ctx context.Context, options ...Option) *ddl { ownerManager: manager, schemaSyncer: syncer, binlogCli: binloginfo.GetPumpsClient(), - infoHandle: opt.InfoHandle, + infoCache: opt.InfoCache, tableLockCkr: deadLockCkr, etcdCli: opt.EtcdCli, } @@ -411,7 +420,7 @@ func (d *ddl) GetLease() time.Duration { // Please don't use this function, it is used by TestParallelDDLBeforeRunDDLJob to intercept the calling of d.infoHandle.Get(), use d.infoHandle.Get() instead. // Otherwise, the TestParallelDDLBeforeRunDDLJob will hang up forever. func (d *ddl) GetInfoSchemaWithInterceptor(ctx sessionctx.Context) infoschema.InfoSchema { - is := d.infoHandle.Get() + is := d.infoCache.GetLatest() d.mu.RLock() defer d.mu.RUnlock() @@ -649,10 +658,7 @@ func (d *ddl) startCleanDeadTableLock() { if !d.ownerManager.IsOwner() { continue } - if d.infoHandle == nil || !d.infoHandle.IsValid() { - continue - } - deadLockTables, err := d.tableLockCkr.GetDeadLockedTables(d.ctx, d.infoHandle.Get().AllSchemas()) + deadLockTables, err := d.tableLockCkr.GetDeadLockedTables(d.ctx, d.infoCache.GetLatest().AllSchemas()) if err != nil { logutil.BgLogger().Info("[ddl] get dead table lock failed.", zap.Error(err)) continue diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index a3f8bb7f9c622..d0289dc19e39f 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -2367,7 +2367,7 @@ func (d *ddl) AlterTable(ctx sessionctx.Context, ident ast.Ident, specs []*ast.A return errors.Trace(err) } - is := d.infoHandle.Get() + is := d.infoCache.GetLatest() if is.TableIsView(ident.Schema, ident.Name) || is.TableIsSequence(ident.Schema, ident.Name) { return ErrWrongObject.GenWithStackByArgs(ident.Schema, ident.Name, "BASE TABLE") } @@ -2898,7 +2898,7 @@ func (d *ddl) AddColumns(ctx sessionctx.Context, ti ast.Ident, specs []*ast.Alte // AddTablePartitions will add a new partition to the table. func (d *ddl) AddTablePartitions(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error { - is := d.infoHandle.Get() + is := d.infoCache.GetLatest() schema, ok := is.SchemaByName(ident.Schema) if !ok { return errors.Trace(infoschema.ErrDatabaseNotExists.GenWithStackByArgs(schema)) @@ -2959,7 +2959,7 @@ func (d *ddl) AddTablePartitions(ctx sessionctx.Context, ident ast.Ident, spec * // CoalescePartitions coalesce partitions can be used with a table that is partitioned by hash or key to reduce the number of partitions by number. func (d *ddl) CoalescePartitions(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error { - is := d.infoHandle.Get() + is := d.infoCache.GetLatest() schema, ok := is.SchemaByName(ident.Schema) if !ok { return errors.Trace(infoschema.ErrDatabaseNotExists.GenWithStackByArgs(schema)) @@ -2991,7 +2991,7 @@ func (d *ddl) CoalescePartitions(ctx sessionctx.Context, ident ast.Ident, spec * } func (d *ddl) TruncateTablePartition(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error { - is := d.infoHandle.Get() + is := d.infoCache.GetLatest() schema, ok := is.SchemaByName(ident.Schema) if !ok { return errors.Trace(infoschema.ErrDatabaseNotExists.GenWithStackByArgs(schema)) @@ -3039,7 +3039,7 @@ func (d *ddl) TruncateTablePartition(ctx sessionctx.Context, ident ast.Ident, sp } func (d *ddl) DropTablePartition(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error { - is := d.infoHandle.Get() + is := d.infoCache.GetLatest() schema, ok := is.SchemaByName(ident.Schema) if !ok { return errors.Trace(infoschema.ErrDatabaseNotExists.GenWithStackByArgs(schema)) @@ -3752,7 +3752,7 @@ func processAndCheckDefaultValueAndColumn(ctx sessionctx.Context, col *table.Col func (d *ddl) getModifiableColumnJob(ctx sessionctx.Context, ident ast.Ident, originalColName model.CIStr, spec *ast.AlterTableSpec) (*model.Job, error) { specNewColumn := spec.NewColumns[0] - is := d.infoHandle.Get() + is := d.infoCache.GetLatest() schema, ok := is.SchemaByName(ident.Schema) if !ok { return nil, errors.Trace(infoschema.ErrDatabaseNotExists) @@ -4203,7 +4203,7 @@ func (d *ddl) ModifyColumn(ctx sessionctx.Context, ident ast.Ident, spec *ast.Al func (d *ddl) AlterColumn(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error { specNewColumn := spec.NewColumns[0] - is := d.infoHandle.Get() + is := d.infoCache.GetLatest() schema, ok := is.SchemaByName(ident.Schema) if !ok { return infoschema.ErrTableNotExists.GenWithStackByArgs(ident.Schema, ident.Name) @@ -4257,7 +4257,7 @@ func (d *ddl) AlterColumn(ctx sessionctx.Context, ident ast.Ident, spec *ast.Alt // AlterTableComment updates the table comment information. func (d *ddl) AlterTableComment(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error { - is := d.infoHandle.Get() + is := d.infoCache.GetLatest() schema, ok := is.SchemaByName(ident.Schema) if !ok { return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(ident.Schema) @@ -4310,7 +4310,7 @@ func (d *ddl) AlterTableCharsetAndCollate(ctx sessionctx.Context, ident ast.Iden return ErrUnknownCharacterSet.GenWithStackByArgs(toCharset) } - is := d.infoHandle.Get() + is := d.infoCache.GetLatest() schema, ok := is.SchemaByName(ident.Schema) if !ok { return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(ident.Schema) @@ -4471,7 +4471,7 @@ func (d *ddl) AlterTableDropStatistics(ctx sessionctx.Context, ident ast.Ident, // UpdateTableReplicaInfo updates the table flash replica infos. func (d *ddl) UpdateTableReplicaInfo(ctx sessionctx.Context, physicalID int64, available bool) error { - is := d.infoHandle.Get() + is := d.infoCache.GetLatest() tb, ok := is.TableByID(physicalID) if !ok { tb, _, _ = is.FindTableByPartitionID(physicalID) @@ -4574,7 +4574,7 @@ func checkAlterTableCharset(tblInfo *model.TableInfo, dbInfo *model.DBInfo, toCh // In TiDB, indexes are case-insensitive (so index 'a' and 'A" are considered the same index), // but index names are case-sensitive (we can rename index 'a' to 'A') func (d *ddl) RenameIndex(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error { - is := d.infoHandle.Get() + is := d.infoCache.GetLatest() schema, ok := is.SchemaByName(ident.Schema) if !ok { return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(ident.Schema) @@ -5232,7 +5232,7 @@ func buildFKInfo(fkName model.CIStr, keys []*ast.IndexPartSpecification, refer * } func (d *ddl) CreateForeignKey(ctx sessionctx.Context, ti ast.Ident, fkName model.CIStr, keys []*ast.IndexPartSpecification, refer *ast.ReferenceDef) error { - is := d.infoHandle.Get() + is := d.infoCache.GetLatest() schema, ok := is.SchemaByName(ti.Schema) if !ok { return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(ti.Schema) @@ -5264,7 +5264,7 @@ func (d *ddl) CreateForeignKey(ctx sessionctx.Context, ti ast.Ident, fkName mode } func (d *ddl) DropForeignKey(ctx sessionctx.Context, ti ast.Ident, fkName model.CIStr) error { - is := d.infoHandle.Get() + is := d.infoCache.GetLatest() schema, ok := is.SchemaByName(ti.Schema) if !ok { return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(ti.Schema) @@ -5290,7 +5290,7 @@ func (d *ddl) DropForeignKey(ctx sessionctx.Context, ti ast.Ident, fkName model. } func (d *ddl) DropIndex(ctx sessionctx.Context, ti ast.Ident, indexName model.CIStr, ifExists bool) error { - is := d.infoHandle.Get() + is := d.infoCache.GetLatest() schema, ok := is.SchemaByName(ti.Schema) if !ok { return errors.Trace(infoschema.ErrDatabaseNotExists) @@ -6036,7 +6036,7 @@ func (d *ddl) AlterTableAlterPartition(ctx sessionctx.Context, ident ast.Ident, return errors.Trace(err) } - oldBundle := infoschema.GetBundle(d.infoHandle.Get(), []int64{partitionID, meta.ID, schema.ID}) + oldBundle := infoschema.GetBundle(d.infoCache.GetLatest(), []int64{partitionID, meta.ID, schema.ID}) oldBundle.ID = placement.GroupID(partitionID) diff --git a/ddl/ddl_test.go b/ddl/ddl_test.go index b77c3300c2700..79635bfc0933b 100644 --- a/ddl/ddl_test.go +++ b/ddl/ddl_test.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/parser/model" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/domain/infosync" + "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/meta/autoid" @@ -86,6 +87,10 @@ func TestT(t *testing.T) { } func testNewDDLAndStart(ctx context.Context, c *C, options ...Option) *ddl { + // init infoCache and a stub infoSchema + ic := infoschema.NewCache(2) + ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 0)) + options = append(options, WithInfoCache(ic)) d := newDDL(ctx, options...) err := d.Start(nil) c.Assert(err, IsNil) diff --git a/ddl/ddl_worker_test.go b/ddl/ddl_worker_test.go index 6e745820b04b9..72fef7c96c19e 100644 --- a/ddl/ddl_worker_test.go +++ b/ddl/ddl_worker_test.go @@ -247,7 +247,7 @@ func (s *testDDLSuite) TestTableError(c *C) { // Schema ID is wrong, so dropping table is failed. doDDLJobErr(c, -1, 1, model.ActionDropTable, nil, ctx, d) // Table ID is wrong, so dropping table is failed. - dbInfo := testSchemaInfo(c, d, "test") + dbInfo := testSchemaInfo(c, d, "test_ddl") testCreateSchema(c, testNewContext(d), d, dbInfo) job := doDDLJobErr(c, dbInfo.ID, -1, model.ActionDropTable, nil, ctx, d) @@ -295,7 +295,7 @@ func (s *testDDLSuite) TestViewError(c *C) { c.Assert(err, IsNil) }() ctx := testNewContext(d) - dbInfo := testSchemaInfo(c, d, "test") + dbInfo := testSchemaInfo(c, d, "test_ddl") testCreateSchema(c, testNewContext(d), d, dbInfo) // Table ID or schema ID is wrong, so getting table is failed. @@ -363,7 +363,7 @@ func (s *testDDLSuite) TestForeignKeyError(c *C) { doDDLJobErr(c, -1, 1, model.ActionAddForeignKey, nil, ctx, d) doDDLJobErr(c, -1, 1, model.ActionDropForeignKey, nil, ctx, d) - dbInfo := testSchemaInfo(c, d, "test") + dbInfo := testSchemaInfo(c, d, "test_ddl") tblInfo := testTableInfo(c, d, "t", 3) testCreateSchema(c, ctx, d, dbInfo) testCreateTable(c, ctx, d, dbInfo, tblInfo) @@ -393,7 +393,7 @@ func (s *testDDLSuite) TestIndexError(c *C) { doDDLJobErr(c, -1, 1, model.ActionAddIndex, nil, ctx, d) doDDLJobErr(c, -1, 1, model.ActionDropIndex, nil, ctx, d) - dbInfo := testSchemaInfo(c, d, "test") + dbInfo := testSchemaInfo(c, d, "test_ddl") tblInfo := testTableInfo(c, d, "t", 3) testCreateSchema(c, ctx, d, dbInfo) testCreateTable(c, ctx, d, dbInfo, tblInfo) @@ -435,7 +435,7 @@ func (s *testDDLSuite) TestColumnError(c *C) { }() ctx := testNewContext(d) - dbInfo := testSchemaInfo(c, d, "test") + dbInfo := testSchemaInfo(c, d, "test_ddl") tblInfo := testTableInfo(c, d, "t", 3) testCreateSchema(c, ctx, d, dbInfo) testCreateTable(c, ctx, d, dbInfo, tblInfo) diff --git a/ddl/index_change_test.go b/ddl/index_change_test.go index dfdfc7111c372..6a34599137c10 100644 --- a/ddl/index_change_test.go +++ b/ddl/index_change_test.go @@ -22,7 +22,6 @@ import ( "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" @@ -38,15 +37,18 @@ type testIndexChangeSuite struct { func (s *testIndexChangeSuite) SetUpSuite(c *C) { s.store = testCreateStore(c, "test_index_change") - s.dbInfo = &model.DBInfo{ - Name: model.NewCIStr("test_index_change"), - ID: 1, - } - err := kv.RunInNewTxn(context.Background(), s.store, true, func(ctx context.Context, txn kv.Transaction) error { - t := meta.NewMeta(txn) - return errors.Trace(t.CreateDatabase(s.dbInfo)) - }) - c.Check(err, IsNil, Commentf("err %v", errors.ErrorStack(err))) + d := testNewDDLAndStart( + context.Background(), + c, + WithStore(s.store), + WithLease(testLease), + ) + defer func() { + err := d.Stop() + c.Assert(err, IsNil) + }() + s.dbInfo = testSchemaInfo(c, d, "test_index_change") + testCreateSchema(c, testNewContext(d), d, s.dbInfo) } func (s *testIndexChangeSuite) TearDownSuite(c *C) { diff --git a/ddl/options.go b/ddl/options.go index 8613a8e9affa9..9238a7c8542ff 100644 --- a/ddl/options.go +++ b/ddl/options.go @@ -26,11 +26,11 @@ type Option func(*Options) // Options represents all the options of the DDL module needs type Options struct { - EtcdCli *clientv3.Client - Store kv.Storage - InfoHandle *infoschema.Handle - Hook Callback - Lease time.Duration + EtcdCli *clientv3.Client + Store kv.Storage + InfoCache *infoschema.InfoCache + Hook Callback + Lease time.Duration } // WithEtcdClient specifies the `clientv3.Client` of DDL used to request the etcd service @@ -47,10 +47,10 @@ func WithStore(store kv.Storage) Option { } } -// WithInfoHandle specifies the `infoschema.Handle` -func WithInfoHandle(ih *infoschema.Handle) Option { +// WithInfoCache specifies the `infoschema.InfoCache` +func WithInfoCache(ic *infoschema.InfoCache) Option { return func(options *Options) { - options.InfoHandle = ih + options.InfoCache = ic } } diff --git a/ddl/options_test.go b/ddl/options_test.go index 294d68731e4c3..22a451d622c71 100644 --- a/ddl/options_test.go +++ b/ddl/options_test.go @@ -33,14 +33,14 @@ func (s *ddlOptionsSuite) TestOptions(c *C) { callback := &ddl.BaseCallback{} lease := time.Second * 3 store := &mock.Store{} - infoHandle := infoschema.NewHandle(store) + infoHandle := infoschema.NewCache(16) options := []ddl.Option{ ddl.WithEtcdClient(client), ddl.WithHook(callback), ddl.WithLease(lease), ddl.WithStore(store), - ddl.WithInfoHandle(infoHandle), + ddl.WithInfoCache(infoHandle), } opt := &ddl.Options{} @@ -52,5 +52,5 @@ func (s *ddlOptionsSuite) TestOptions(c *C) { c.Assert(opt.Hook, Equals, callback) c.Assert(opt.Lease, Equals, lease) c.Assert(opt.Store, Equals, store) - c.Assert(opt.InfoHandle, Equals, infoHandle) + c.Assert(opt.InfoCache, Equals, infoHandle) } diff --git a/ddl/partition.go b/ddl/partition.go index 0cafa9d2ff525..4e55ec1779e21 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -911,18 +911,15 @@ func getTableInfoWithDroppingPartitions(t *model.TableInfo) *model.TableInfo { } func dropRuleBundles(d *ddlCtx, physicalTableIDs []int64) error { - if d.infoHandle != nil && d.infoHandle.IsValid() { - bundles := make([]*placement.Bundle, 0, len(physicalTableIDs)) - for _, ID := range physicalTableIDs { - oldBundle, ok := d.infoHandle.Get().BundleByName(placement.GroupID(ID)) - if ok && !oldBundle.IsEmpty() { - bundles = append(bundles, placement.BuildPlacementDropBundle(ID)) - } + bundles := make([]*placement.Bundle, 0, len(physicalTableIDs)) + for _, ID := range physicalTableIDs { + oldBundle, ok := d.infoCache.GetLatest().BundleByName(placement.GroupID(ID)) + if ok && !oldBundle.IsEmpty() { + bundles = append(bundles, placement.BuildPlacementDropBundle(ID)) } - err := infosync.PutRuleBundles(context.TODO(), bundles) - return err } - return nil + err := infosync.PutRuleBundles(context.TODO(), bundles) + return err } // onDropTablePartition deletes old partition meta. @@ -1095,22 +1092,20 @@ func onTruncateTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, e } } - if d.infoHandle != nil && d.infoHandle.IsValid() { - bundles := make([]*placement.Bundle, 0, len(oldIDs)) + bundles := make([]*placement.Bundle, 0, len(oldIDs)) - for i, oldID := range oldIDs { - oldBundle, ok := d.infoHandle.Get().BundleByName(placement.GroupID(oldID)) - if ok && !oldBundle.IsEmpty() { - bundles = append(bundles, placement.BuildPlacementDropBundle(oldID)) - bundles = append(bundles, placement.BuildPlacementCopyBundle(oldBundle, newPartitions[i].ID)) - } + for i, oldID := range oldIDs { + oldBundle, ok := d.infoCache.GetLatest().BundleByName(placement.GroupID(oldID)) + if ok && !oldBundle.IsEmpty() { + bundles = append(bundles, placement.BuildPlacementDropBundle(oldID)) + bundles = append(bundles, placement.BuildPlacementCopyBundle(oldBundle, newPartitions[i].ID)) } + } - err = infosync.PutRuleBundles(context.TODO(), bundles) - if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Wrapf(err, "failed to notify PD the placement rules") - } + err = infosync.PutRuleBundles(context.TODO(), bundles) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Wrapf(err, "failed to notify PD the placement rules") } newIDs := make([]int64, len(oldIDs)) @@ -1299,27 +1294,25 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo // the follow code is a swap function for rules of two partitions // though partitions has exchanged their ID, swap still take effect - if d.infoHandle != nil && d.infoHandle.IsValid() { - bundles := make([]*placement.Bundle, 0, 2) - ptBundle, ptOK := d.infoHandle.Get().BundleByName(placement.GroupID(partDef.ID)) - ptOK = ptOK && !ptBundle.IsEmpty() - ntBundle, ntOK := d.infoHandle.Get().BundleByName(placement.GroupID(nt.ID)) - ntOK = ntOK && !ntBundle.IsEmpty() - if ptOK && ntOK { - bundles = append(bundles, placement.BuildPlacementCopyBundle(ptBundle, nt.ID)) - bundles = append(bundles, placement.BuildPlacementCopyBundle(ntBundle, partDef.ID)) - } else if ptOK { - bundles = append(bundles, placement.BuildPlacementDropBundle(partDef.ID)) - bundles = append(bundles, placement.BuildPlacementCopyBundle(ptBundle, nt.ID)) - } else if ntOK { - bundles = append(bundles, placement.BuildPlacementDropBundle(nt.ID)) - bundles = append(bundles, placement.BuildPlacementCopyBundle(ntBundle, partDef.ID)) - } - err = infosync.PutRuleBundles(context.TODO(), bundles) - if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Wrapf(err, "failed to notify PD the placement rules") - } + bundles := make([]*placement.Bundle, 0, 2) + ptBundle, ptOK := d.infoCache.GetLatest().BundleByName(placement.GroupID(partDef.ID)) + ptOK = ptOK && !ptBundle.IsEmpty() + ntBundle, ntOK := d.infoCache.GetLatest().BundleByName(placement.GroupID(nt.ID)) + ntOK = ntOK && !ntBundle.IsEmpty() + if ptOK && ntOK { + bundles = append(bundles, placement.BuildPlacementCopyBundle(ptBundle, nt.ID)) + bundles = append(bundles, placement.BuildPlacementCopyBundle(ntBundle, partDef.ID)) + } else if ptOK { + bundles = append(bundles, placement.BuildPlacementDropBundle(partDef.ID)) + bundles = append(bundles, placement.BuildPlacementCopyBundle(ptBundle, nt.ID)) + } else if ntOK { + bundles = append(bundles, placement.BuildPlacementDropBundle(nt.ID)) + bundles = append(bundles, placement.BuildPlacementCopyBundle(ntBundle, partDef.ID)) + } + err = infosync.PutRuleBundles(context.TODO(), bundles) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Wrapf(err, "failed to notify PD the placement rules") } ver, err = updateSchemaVersion(t, job) diff --git a/ddl/reorg_test.go b/ddl/reorg_test.go index 18dd9a975fceb..4c28540e7ad3b 100644 --- a/ddl/reorg_test.go +++ b/ddl/reorg_test.go @@ -217,7 +217,7 @@ func (s *testDDLSuite) TestReorgOwner(c *C) { c.Assert(err, IsNil) }() - dbInfo := testSchemaInfo(c, d1, "test") + dbInfo := testSchemaInfo(c, d1, "test_reorg") testCreateSchema(c, ctx, d1, dbInfo) tblInfo := testTableInfo(c, d1, "t", 3) diff --git a/ddl/restart_test.go b/ddl/restart_test.go index b587d54b80cc8..b7791ef7679bd 100644 --- a/ddl/restart_test.go +++ b/ddl/restart_test.go @@ -120,7 +120,7 @@ func (s *testSchemaSuite) TestSchemaResume(c *C) { testCheckOwner(c, d1, true) - dbInfo := testSchemaInfo(c, d1, "test") + dbInfo := testSchemaInfo(c, d1, "test_restart") job := &model.Job{ SchemaID: dbInfo.ID, Type: model.ActionCreateSchema, @@ -157,7 +157,7 @@ func (s *testStatSuite) TestStat(c *C) { c.Assert(err, IsNil) }() - dbInfo := testSchemaInfo(c, d, "test") + dbInfo := testSchemaInfo(c, d, "test_restart") testCreateSchema(c, testNewContext(d), d, dbInfo) // TODO: Get this information from etcd. diff --git a/ddl/schema.go b/ddl/schema.go index 823e12a551900..a4b14a49bdbc3 100644 --- a/ddl/schema.go +++ b/ddl/schema.go @@ -68,16 +68,12 @@ func onCreateSchema(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error } func checkSchemaNotExists(d *ddlCtx, t *meta.Meta, schemaID int64, dbInfo *model.DBInfo) error { - // d.infoHandle maybe nil in some test. - if d.infoHandle == nil { - return checkSchemaNotExistsFromStore(t, schemaID, dbInfo) - } // Try to use memory schema info to check first. currVer, err := t.GetSchemaVersion() if err != nil { return err } - is := d.infoHandle.Get() + is := d.infoCache.GetLatest() if is.SchemaMetaVersion() == currVer { return checkSchemaNotExistsFromInfoSchema(is, schemaID, dbInfo) } @@ -169,7 +165,7 @@ func onDropSchema(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) oldIDs := getIDs(tables) bundles := make([]*placement.Bundle, 0, len(oldIDs)+1) for _, ID := range append(oldIDs, dbInfo.ID) { - oldBundle, ok := d.infoHandle.Get().BundleByName(placement.GroupID(ID)) + oldBundle, ok := d.infoCache.GetLatest().BundleByName(placement.GroupID(ID)) if ok && !oldBundle.IsEmpty() { bundles = append(bundles, placement.BuildPlacementDropBundle(ID)) } diff --git a/ddl/schema_test.go b/ddl/schema_test.go index c70a0b793bb35..b4c8efee7b089 100644 --- a/ddl/schema_test.go +++ b/ddl/schema_test.go @@ -139,7 +139,7 @@ func (s *testSchemaSuite) TestSchema(c *C) { c.Assert(err, IsNil) }() ctx := testNewContext(d) - dbInfo := testSchemaInfo(c, d, "test") + dbInfo := testSchemaInfo(c, d, "test_schema") // create a database. job := testCreateSchema(c, ctx, d, dbInfo) @@ -228,7 +228,7 @@ func (s *testSchemaSuite) TestSchemaWaitJob(c *C) { // d2 must not be owner. d2.ownerManager.RetireOwner() - dbInfo := testSchemaInfo(c, d2, "test") + dbInfo := testSchemaInfo(c, d2, "test_schema") testCreateSchema(c, ctx, d2, dbInfo) testCheckSchemaState(c, d2, dbInfo, model.StatePublic) diff --git a/ddl/stat_test.go b/ddl/stat_test.go index fe562a0ae0fb8..1ed3cbfe4c7fc 100644 --- a/ddl/stat_test.go +++ b/ddl/stat_test.go @@ -61,7 +61,7 @@ func (s *testSerialStatSuite) TestDDLStatsInfo(c *C) { c.Assert(err, IsNil) }() - dbInfo := testSchemaInfo(c, d, "test") + dbInfo := testSchemaInfo(c, d, "test_stat") testCreateSchema(c, testNewContext(d), d, dbInfo) tblInfo := testTableInfo(c, d, "t", 2) ctx := testNewContext(d) diff --git a/ddl/table.go b/ddl/table.go index 668de3ac41c05..424dd040a0de9 100644 --- a/ddl/table.go +++ b/ddl/table.go @@ -487,34 +487,32 @@ func onTruncateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ erro } } - if d.infoHandle != nil && d.infoHandle.IsValid() { - is := d.infoHandle.Get() - - bundles := make([]*placement.Bundle, 0, len(oldPartitionIDs)+1) - if oldBundle, ok := is.BundleByName(placement.GroupID(tableID)); ok { - bundles = append(bundles, placement.BuildPlacementCopyBundle(oldBundle, newTableID)) - } - - if pi := tblInfo.GetPartitionInfo(); pi != nil { - oldIDs := make([]int64, 0, len(oldPartitionIDs)) - newIDs := make([]int64, 0, len(oldPartitionIDs)) - newDefs := pi.Definitions - for i := range oldPartitionIDs { - newID := newDefs[i].ID - if oldBundle, ok := is.BundleByName(placement.GroupID(oldPartitionIDs[i])); ok && !oldBundle.IsEmpty() { - oldIDs = append(oldIDs, oldPartitionIDs[i]) - newIDs = append(newIDs, newID) - bundles = append(bundles, placement.BuildPlacementCopyBundle(oldBundle, newID)) - } + is := d.infoCache.GetLatest() + + bundles := make([]*placement.Bundle, 0, len(oldPartitionIDs)+1) + if oldBundle, ok := is.BundleByName(placement.GroupID(tableID)); ok { + bundles = append(bundles, placement.BuildPlacementCopyBundle(oldBundle, newTableID)) + } + + if pi := tblInfo.GetPartitionInfo(); pi != nil { + oldIDs := make([]int64, 0, len(oldPartitionIDs)) + newIDs := make([]int64, 0, len(oldPartitionIDs)) + newDefs := pi.Definitions + for i := range oldPartitionIDs { + newID := newDefs[i].ID + if oldBundle, ok := is.BundleByName(placement.GroupID(oldPartitionIDs[i])); ok && !oldBundle.IsEmpty() { + oldIDs = append(oldIDs, oldPartitionIDs[i]) + newIDs = append(newIDs, newID) + bundles = append(bundles, placement.BuildPlacementCopyBundle(oldBundle, newID)) } - job.CtxVars = []interface{}{oldIDs, newIDs} } + job.CtxVars = []interface{}{oldIDs, newIDs} + } - err = infosync.PutRuleBundles(context.TODO(), bundles) - if err != nil { - job.State = model.JobStateCancelled - return 0, errors.Wrapf(err, "failed to notify PD the placement rules") - } + err = infosync.PutRuleBundles(context.TODO(), bundles) + if err != nil { + job.State = model.JobStateCancelled + return 0, errors.Wrapf(err, "failed to notify PD the placement rules") } // Clear the tiflash replica available status. @@ -967,16 +965,12 @@ func onUpdateFlashReplicaStatus(t *meta.Meta, job *model.Job) (ver int64, _ erro } func checkTableNotExists(d *ddlCtx, t *meta.Meta, schemaID int64, tableName string) error { - // d.infoHandle maybe nil in some test. - if d.infoHandle == nil || !d.infoHandle.IsValid() { - return checkTableNotExistsFromStore(t, schemaID, tableName) - } // Try to use memory schema info to check first. currVer, err := t.GetSchemaVersion() if err != nil { return err } - is := d.infoHandle.Get() + is := d.infoCache.GetLatest() if is.SchemaMetaVersion() == currVer { return checkTableNotExistsFromInfoSchema(is, schemaID, tableName) } diff --git a/ddl/table_test.go b/ddl/table_test.go index 5760fc2b152b5..10927908f5289 100644 --- a/ddl/table_test.go +++ b/ddl/table_test.go @@ -355,7 +355,7 @@ func (s *testTableSuite) SetUpSuite(c *C) { WithLease(testLease), ) - s.dbInfo = testSchemaInfo(c, s.d, "test") + s.dbInfo = testSchemaInfo(c, s.d, "test_table") testCreateSchema(c, testNewContext(s.d), s.d, s.dbInfo) } diff --git a/ddl/util/syncer_test.go b/ddl/util/syncer_test.go index b552488ad49de..5a9d41d47e3b8 100644 --- a/ddl/util/syncer_test.go +++ b/ddl/util/syncer_test.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/parser/terror" . "github.com/pingcap/tidb/ddl" . "github.com/pingcap/tidb/ddl/util" + "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/owner" "github.com/pingcap/tidb/store/mockstore" "go.etcd.io/etcd/clientv3" @@ -69,11 +70,14 @@ func TestSyncerSimple(t *testing.T) { defer clus.Terminate(t) cli := clus.RandClient() ctx := goctx.Background() + ic := infoschema.NewCache(2) + ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 0)) d := NewDDL( ctx, WithEtcdClient(cli), WithStore(store), WithLease(testLease), + WithInfoCache(ic), ) err = d.Start(nil) if err != nil { @@ -110,11 +114,14 @@ func TestSyncerSimple(t *testing.T) { t.Fatalf("client get global version result not match, err %v", err) } + ic2 := infoschema.NewCache(2) + ic2.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 0)) d1 := NewDDL( ctx, WithEtcdClient(cli), WithStore(store), WithLease(testLease), + WithInfoCache(ic2), ) err = d1.Start(nil) if err != nil { diff --git a/domain/domain.go b/domain/domain.go index f4b0ac8900f24..e6ea3d1e2d949 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -67,7 +67,7 @@ import ( // Multiple domains can be used in parallel without synchronization. type Domain struct { store kv.Storage - infoHandle *infoschema.Handle + infoCache *infoschema.InfoCache privHandle *privileges.Handle bindHandle *bindinfo.BindHandle statsHandle unsafe.Pointer @@ -92,78 +92,75 @@ type Domain struct { isLostConnectionToPD sync2.AtomicInt32 // !0: true, 0: false. } -// loadInfoSchema loads infoschema at startTS into handle, usedSchemaVersion is the currently used -// infoschema version, if it is the same as the schema version at startTS, we don't need to reload again. -// It returns the latest schema version, the changed table IDs, whether it's a full load and an error. -func (do *Domain) loadInfoSchema(handle *infoschema.Handle, usedSchemaVersion int64, - startTS uint64) (neededSchemaVersion int64, change *tikv.RelatedSchemaChange, fullLoad bool, err error) { +// loadInfoSchema loads infoschema at startTS. +// It returns: +// 1. the needed infoschema +// 2. cache hit indicator +// 3. currentSchemaVersion(before loading) +// 4. the changed table IDs if it is not full load +// 5. an error if any +func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, int64, *tikv.RelatedSchemaChange, error) { snapshot := do.store.GetSnapshot(kv.NewVersion(startTS)) m := meta.NewSnapshotMeta(snapshot) - neededSchemaVersion, err = m.GetSchemaVersion() + neededSchemaVersion, err := m.GetSchemaVersion() if err != nil { - return 0, nil, fullLoad, err - } - if usedSchemaVersion != 0 && usedSchemaVersion == neededSchemaVersion { - return neededSchemaVersion, nil, fullLoad, nil + return nil, false, 0, nil, err } - // Update self schema version to etcd. - defer func() { - // There are two possibilities for not updating the self schema version to etcd. - // 1. Failed to loading schema information. - // 2. When users use history read feature, the neededSchemaVersion isn't the latest schema version. - if err != nil || neededSchemaVersion < do.InfoSchema().SchemaMetaVersion() { - logutil.BgLogger().Info("do not update self schema version to etcd", - zap.Int64("usedSchemaVersion", usedSchemaVersion), - zap.Int64("neededSchemaVersion", neededSchemaVersion), zap.Error(err)) - return - } + if is := do.infoCache.GetByVersion(neededSchemaVersion); is != nil { + return is, true, 0, nil, nil + } - err = do.ddl.SchemaSyncer().UpdateSelfVersion(context.Background(), neededSchemaVersion) - if err != nil { - logutil.BgLogger().Info("update self version failed", - zap.Int64("usedSchemaVersion", usedSchemaVersion), - zap.Int64("neededSchemaVersion", neededSchemaVersion), zap.Error(err)) - } - }() + currentSchemaVersion := int64(0) + if oldInfoSchema := do.infoCache.GetLatest(); oldInfoSchema != nil { + currentSchemaVersion = oldInfoSchema.SchemaMetaVersion() + } + // TODO: tryLoadSchemaDiffs has potential risks of failure. And it becomes worse in history reading cases. + // It is only kept because there is no alternative diff/partial loading solution. + // And it is only used to diff upgrading the current latest infoschema, if: + // 1. Not first time bootstrap loading, which needs a full load. + // 2. It is newer than the current one, so it will be "the current one" after this function call. + // 3. There are less 100 diffs. startTime := time.Now() - ok, relatedChanges, err := do.tryLoadSchemaDiffs(m, usedSchemaVersion, neededSchemaVersion) - if err != nil { + if currentSchemaVersion != 0 && neededSchemaVersion > currentSchemaVersion && neededSchemaVersion-currentSchemaVersion < 100 { + is, relatedChanges, err := do.tryLoadSchemaDiffs(m, currentSchemaVersion, neededSchemaVersion) + if err == nil { + do.infoCache.Insert(is) + logutil.BgLogger().Info("diff load InfoSchema success", + zap.Int64("currentSchemaVersion", currentSchemaVersion), + zap.Int64("neededSchemaVersion", neededSchemaVersion), + zap.Duration("start time", time.Since(startTime)), + zap.Int64s("phyTblIDs", relatedChanges.PhyTblIDS), + zap.Uint64s("actionTypes", relatedChanges.ActionTypes)) + return is, false, currentSchemaVersion, relatedChanges, nil + } // We can fall back to full load, don't need to return the error. logutil.BgLogger().Error("failed to load schema diff", zap.Error(err)) } - if ok { - logutil.BgLogger().Info("diff load InfoSchema success", - zap.Int64("usedSchemaVersion", usedSchemaVersion), - zap.Int64("neededSchemaVersion", neededSchemaVersion), - zap.Duration("start time", time.Since(startTime)), - zap.Int64s("phyTblIDs", relatedChanges.PhyTblIDS), - zap.Uint64s("actionTypes", relatedChanges.ActionTypes)) - return neededSchemaVersion, relatedChanges, fullLoad, nil - } - fullLoad = true schemas, err := do.fetchAllSchemasWithTables(m) if err != nil { - return 0, nil, fullLoad, err + return nil, false, currentSchemaVersion, nil, err } bundles, err := infosync.GetAllRuleBundles(context.TODO()) if err != nil { - return 0, nil, fullLoad, err + return nil, false, currentSchemaVersion, nil, err } - newISBuilder, err := infoschema.NewBuilder(handle).InitWithDBInfos(schemas, bundles, neededSchemaVersion) + newISBuilder, err := infoschema.NewBuilder(do.Store()).InitWithDBInfos(schemas, bundles, neededSchemaVersion) if err != nil { - return 0, nil, fullLoad, err + return nil, false, currentSchemaVersion, nil, err } logutil.BgLogger().Info("full load InfoSchema success", - zap.Int64("usedSchemaVersion", usedSchemaVersion), + zap.Int64("currentSchemaVersion", currentSchemaVersion), zap.Int64("neededSchemaVersion", neededSchemaVersion), zap.Duration("start time", time.Since(startTime))) - newISBuilder.Build() - return neededSchemaVersion, nil, fullLoad, nil + + is := newISBuilder.Build() + do.infoCache.Insert(is) + return is, false, currentSchemaVersion, nil, nil } func (do *Domain) fetchAllSchemasWithTables(m *meta.Meta) ([]*model.DBInfo, error) { @@ -238,48 +235,31 @@ func (do *Domain) fetchSchemasWithTables(schemas []*model.DBInfo, m *meta.Meta, done <- nil } -const ( - initialVersion = 0 - maxNumberOfDiffsToLoad = 100 -) - -func isTooOldSchema(usedVersion, newVersion int64) bool { - if usedVersion == initialVersion || newVersion-usedVersion > maxNumberOfDiffsToLoad { - return true - } - return false -} - // tryLoadSchemaDiffs tries to only load latest schema changes. // Return true if the schema is loaded successfully. // Return false if the schema can not be loaded by schema diff, then we need to do full load. // The second returned value is the delta updated table and partition IDs. -func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64) (bool, *tikv.RelatedSchemaChange, error) { - // If there isn't any used version, or used version is too old, we do full load. - // And when users use history read feature, we will set usedVersion to initialVersion, then full load is needed. - if isTooOldSchema(usedVersion, newVersion) { - return false, nil, nil - } +func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64) (infoschema.InfoSchema, *tikv.RelatedSchemaChange, error) { var diffs []*model.SchemaDiff for usedVersion < newVersion { usedVersion++ diff, err := m.GetSchemaDiff(usedVersion) if err != nil { - return false, nil, err + return nil, nil, err } if diff == nil { // If diff is missing for any version between used and new version, we fall back to full reload. - return false, nil, nil + return nil, nil, fmt.Errorf("failed to get schemadiff") } diffs = append(diffs, diff) } - builder := infoschema.NewBuilder(do.infoHandle).InitWithOldInfoSchema() + builder := infoschema.NewBuilder(do.Store()).InitWithOldInfoSchema(do.infoCache.GetLatest()) phyTblIDs := make([]int64, 0, len(diffs)) actions := make([]uint64, 0, len(diffs)) for _, diff := range diffs { IDs, err := builder.ApplyDiff(m, diff) if err != nil { - return false, nil, err + return nil, nil, err } if canSkipSchemaCheckerDDL(diff.Type) { continue @@ -289,11 +269,11 @@ func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64 actions = append(actions, uint64(1< staleVer { - return errors.New("schema version changed after the staleness startTS") - } // With START TRANSACTION, autocommit remains disabled until you end // the transaction with COMMIT or ROLLBACK. The autocommit mode then diff --git a/executor/stale_txn_test.go b/executor/stale_txn_test.go index 493bda06c5de2..7cf235bd3c0f7 100644 --- a/executor/stale_txn_test.go +++ b/executor/stale_txn_test.go @@ -18,7 +18,6 @@ import ( "time" . "github.com/pingcap/check" - "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/ddl/placement" "github.com/pingcap/tidb/store/tikv/oracle" @@ -26,12 +25,6 @@ import ( ) func (s *testStaleTxnSerialSuite) TestExactStalenessTransaction(c *C) { - c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/mockStalenessTxnSchemaVer", "return(false)"), IsNil) - defer func() { - err := failpoint.Disable("github.com/pingcap/tidb/executor/mockStalenessTxnSchemaVer") - c.Assert(err, IsNil) - }() - testcases := []struct { name string preSQL string @@ -117,8 +110,6 @@ func (s *testStaleTxnSerialSuite) TestExactStalenessTransaction(c *C) { } func (s *testStaleTxnSerialSuite) TestStaleReadKVRequest(c *C) { - c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/mockStalenessTxnSchemaVer", "return(false)"), IsNil) - defer failpoint.Disable("github.com/pingcap/tidb/executor/mockStalenessTxnSchemaVer") tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") tk.MustExec("drop table if exists t") @@ -155,7 +146,7 @@ func (s *testStaleTxnSerialSuite) TestStaleReadKVRequest(c *C) { failpoint.Enable("github.com/pingcap/tidb/config/injectTxnScope", fmt.Sprintf(`return("%v")`, testcase.zone)) failpoint.Enable("github.com/pingcap/tidb/store/tikv/assertStoreLabels", fmt.Sprintf(`return("%v_%v")`, placement.DCLabelKey, testcase.txnScope)) failpoint.Enable("github.com/pingcap/tidb/store/tikv/assertStaleReadFlag", `return(true)`) - tk.MustExec(`START TRANSACTION READ ONLY WITH TIMESTAMP BOUND EXACT STALENESS '00:00:20';`) + tk.MustExec(`START TRANSACTION READ ONLY WITH TIMESTAMP BOUND EXACT STALENESS '00:00:00';`) tk.MustQuery(testcase.sql) tk.MustExec(`commit`) failpoint.Disable("github.com/pingcap/tidb/config/injectTxnScope") @@ -165,12 +156,6 @@ func (s *testStaleTxnSerialSuite) TestStaleReadKVRequest(c *C) { } func (s *testStaleTxnSerialSuite) TestStalenessAndHistoryRead(c *C) { - c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/mockStalenessTxnSchemaVer", "return(false)"), IsNil) - defer func() { - err := failpoint.Disable("github.com/pingcap/tidb/executor/mockStalenessTxnSchemaVer") - c.Assert(err, IsNil) - }() - tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") // For mocktikv, safe point is not initialized, we manually insert it for snapshot to use. @@ -193,62 +178,7 @@ func (s *testStaleTxnSerialSuite) TestStalenessAndHistoryRead(c *C) { tk.MustExec("commit") } -func (s *testStaleTxnSerialSuite) TestStalenessTransactionSchemaVer(c *C) { - testcases := []struct { - name string - sql string - expectErr error - }{ - { - name: "ddl change before stale txn", - sql: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND EXACT STALENESS '00:00:03'`, - expectErr: errors.New("schema version changed after the staleness startTS"), - }, - { - name: "ddl change before stale txn", - sql: fmt.Sprintf("START TRANSACTION READ ONLY WITH TIMESTAMP BOUND READ TIMESTAMP '%v'", - time.Now().Truncate(3*time.Second).Format("2006-01-02 15:04:05")), - expectErr: errors.New(".*schema version changed after the staleness startTS.*"), - }, - { - name: "ddl change before stale txn", - sql: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND EXACT STALENESS '00:00:03'`, - expectErr: nil, - }, - } - tk := testkit.NewTestKitWithInit(c, s.store) - for _, testcase := range testcases { - check := func() { - if testcase.expectErr != nil { - c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/mockStalenessTxnSchemaVer", "return(true)"), IsNil) - defer func() { - err := failpoint.Disable("github.com/pingcap/tidb/executor/mockStalenessTxnSchemaVer") - c.Assert(err, IsNil) - }() - - } else { - c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/mockStalenessTxnSchemaVer", "return(false)"), IsNil) - defer func() { - err := failpoint.Disable("github.com/pingcap/tidb/executor/mockStalenessTxnSchemaVer") - c.Assert(err, IsNil) - }() - - } - _, err := tk.Exec(testcase.sql) - if testcase.expectErr != nil { - c.Assert(err, NotNil) - c.Assert(err.Error(), Matches, testcase.expectErr.Error()) - } else { - c.Assert(err, IsNil) - } - } - check() - } -} - func (s *testStaleTxnSerialSuite) TestTimeBoundedStalenessTxn(c *C) { - c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/mockStalenessTxnSchemaVer", "return(false)"), IsNil) - defer failpoint.Disable("github.com/pingcap/tidb/executor/mockStalenessTxnSchemaVer") tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") tk.MustExec("drop table if exists t") @@ -318,3 +248,22 @@ func (s *testStaleTxnSerialSuite) TestTimeBoundedStalenessTxn(c *C) { failpoint.Disable("github.com/pingcap/tidb/store/tikv/injectSafeTS") } } + +func (s *testStaleTxnSerialSuite) TestStalenessTransactionSchemaVer(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t (id int primary key);") + + schemaVer1 := tk.Se.GetSessionVars().GetInfoSchema().SchemaMetaVersion() + time.Sleep(time.Second) + tk.MustExec("drop table if exists t") + schemaVer2 := tk.Se.GetSessionVars().GetInfoSchema().SchemaMetaVersion() + // confirm schema changed + c.Assert(schemaVer1, Less, schemaVer2) + + tk.MustExec(`START TRANSACTION READ ONLY WITH TIMESTAMP BOUND EXACT STALENESS '00:00:01'`) + schemaVer3 := tk.Se.GetSessionVars().GetInfoSchema().SchemaMetaVersion() + // got an old infoSchema + c.Assert(schemaVer3, Equals, schemaVer1) +} diff --git a/infoschema/builder.go b/infoschema/builder.go index 28591d8679baf..88e8b71add319 100644 --- a/infoschema/builder.go +++ b/infoschema/builder.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl/placement" "github.com/pingcap/tidb/domain/infosync" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/table" @@ -35,8 +36,10 @@ import ( // Builder builds a new InfoSchema. type Builder struct { - is *infoSchema - handle *Handle + is *infoSchema + // TODO: store is only used by autoid allocators + // detach allocators from storage, use passed transaction in the feature + store kv.Storage } // ApplyDiff applies SchemaDiff to the new InfoSchema. @@ -352,14 +355,14 @@ func (b *Builder) applyCreateTable(m *meta.Meta, dbInfo *model.DBInfo, tableID i ConvertOldVersionUTF8ToUTF8MB4IfNeed(tblInfo) if len(allocs) == 0 { - allocs = autoid.NewAllocatorsFromTblInfo(b.handle.store, dbInfo.ID, tblInfo) + allocs = autoid.NewAllocatorsFromTblInfo(b.store, dbInfo.ID, tblInfo) } else { switch tp { case model.ActionRebaseAutoID, model.ActionModifyTableAutoIdCache: - newAlloc := autoid.NewAllocator(b.handle.store, dbInfo.ID, tblInfo.IsAutoIncColUnsigned(), autoid.RowIDAllocType) + newAlloc := autoid.NewAllocator(b.store, dbInfo.ID, tblInfo.IsAutoIncColUnsigned(), autoid.RowIDAllocType) allocs = append(allocs, newAlloc) case model.ActionRebaseAutoRandomBase: - newAlloc := autoid.NewAllocator(b.handle.store, dbInfo.ID, tblInfo.IsAutoRandomBitColUnsigned(), autoid.AutoRandomType) + newAlloc := autoid.NewAllocator(b.store, dbInfo.ID, tblInfo.IsAutoRandomBitColUnsigned(), autoid.AutoRandomType) allocs = append(allocs, newAlloc) case model.ActionModifyColumn: // Change column attribute from auto_increment to auto_random. @@ -368,7 +371,7 @@ func (b *Builder) applyCreateTable(m *meta.Meta, dbInfo *model.DBInfo, tableID i allocs = allocs.Filter(func(a autoid.Allocator) bool { return a.GetType() != autoid.AutoIncrementType && a.GetType() != autoid.RowIDAllocType }) - newAlloc := autoid.NewAllocator(b.handle.store, dbInfo.ID, tblInfo.IsAutoRandomBitColUnsigned(), autoid.AutoRandomType) + newAlloc := autoid.NewAllocator(b.store, dbInfo.ID, tblInfo.IsAutoRandomBitColUnsigned(), autoid.AutoRandomType) allocs = append(allocs, newAlloc) } } @@ -470,9 +473,14 @@ func (b *Builder) applyPlacementUpdate(id string) error { return nil } +// Build builds and returns the built infoschema. +func (b *Builder) Build() InfoSchema { + return b.is +} + // InitWithOldInfoSchema initializes an empty new InfoSchema by copies all the data from old InfoSchema. -func (b *Builder) InitWithOldInfoSchema() *Builder { - oldIS := b.handle.Get().(*infoSchema) +func (b *Builder) InitWithOldInfoSchema(oldSchema InfoSchema) *Builder { + oldIS := oldSchema.(*infoSchema) b.is.schemaMetaVersion = oldIS.schemaMetaVersion b.copySchemasMap(oldIS) b.copyBundlesMap(oldIS) @@ -549,7 +557,7 @@ func (b *Builder) createSchemaTablesForDB(di *model.DBInfo, tableFromMeta tableF b.is.schemaMap[di.Name.L] = schTbls for _, t := range di.Tables { - allocs := autoid.NewAllocatorsFromTblInfo(b.handle.store, di.ID, t) + allocs := autoid.NewAllocatorsFromTblInfo(b.store, di.ID, t) var tbl table.Table tbl, err := tableFromMeta(allocs, t) if err != nil { @@ -574,21 +582,16 @@ func RegisterVirtualTable(dbInfo *model.DBInfo, tableFromMeta tableFromMetaFunc) drivers = append(drivers, &virtualTableDriver{dbInfo, tableFromMeta}) } -// Build sets new InfoSchema to the handle in the Builder. -func (b *Builder) Build() { - b.handle.value.Store(b.is) -} - // NewBuilder creates a new Builder with a Handle. -func NewBuilder(handle *Handle) *Builder { - b := new(Builder) - b.handle = handle - b.is = &infoSchema{ - schemaMap: map[string]*schemaTables{}, - ruleBundleMap: map[string]*placement.Bundle{}, - sortedTablesBuckets: make([]sortedTables, bucketCount), +func NewBuilder(store kv.Storage) *Builder { + return &Builder{ + store: store, + is: &infoSchema{ + schemaMap: map[string]*schemaTables{}, + ruleBundleMap: map[string]*placement.Bundle{}, + sortedTablesBuckets: make([]sortedTables, bucketCount), + }, } - return b } func tableBucketIdx(tableID int64) int { diff --git a/infoschema/cache.go b/infoschema/cache.go new file mode 100644 index 0000000000000..4c3371b1bc354 --- /dev/null +++ b/infoschema/cache.go @@ -0,0 +1,95 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package infoschema + +import ( + "sort" + "sync" + + "github.com/pingcap/tidb/metrics" +) + +// InfoCache handles information schema, including getting and setting. +// The cache behavior, however, is transparent and under automatic management. +// It only promised to cache the infoschema, if it is newer than all the cached. +type InfoCache struct { + mu sync.RWMutex + // cache is sorted by SchemaVersion in descending order + cache []InfoSchema +} + +// NewCache creates a new InfoCache. +func NewCache(capcity int) *InfoCache { + return &InfoCache{cache: make([]InfoSchema, 0, capcity)} +} + +// GetLatest gets the newest information schema. +func (h *InfoCache) GetLatest() InfoSchema { + h.mu.RLock() + defer h.mu.RUnlock() + metrics.InfoCacheCounters.WithLabelValues("get").Inc() + if len(h.cache) > 0 { + metrics.InfoCacheCounters.WithLabelValues("hit").Inc() + return h.cache[0] + } + return nil +} + +// GetByVersion gets the information schema based on schemaVersion. Returns nil if it is not loaded. +func (h *InfoCache) GetByVersion(version int64) InfoSchema { + h.mu.RLock() + defer h.mu.RUnlock() + metrics.InfoCacheCounters.WithLabelValues("get").Inc() + i := sort.Search(len(h.cache), func(i int) bool { + return h.cache[i].SchemaMetaVersion() <= version + }) + if i < len(h.cache) && h.cache[i].SchemaMetaVersion() == version { + metrics.InfoCacheCounters.WithLabelValues("hit").Inc() + return h.cache[i] + } + return nil +} + +// Insert will **TRY** to insert the infoschema into the cache. +// It only promised to cache the newest infoschema. +// It returns 'true' if it is cached, 'false' otherwise. +func (h *InfoCache) Insert(is InfoSchema) bool { + h.mu.Lock() + defer h.mu.Unlock() + + version := is.SchemaMetaVersion() + i := sort.Search(len(h.cache), func(i int) bool { + return h.cache[i].SchemaMetaVersion() <= version + }) + + // cached entry + if i < len(h.cache) && h.cache[i].SchemaMetaVersion() == version { + return true + } + + if len(h.cache) < cap(h.cache) { + // has free space, grown the slice + h.cache = h.cache[:len(h.cache)+1] + copy(h.cache[i+1:], h.cache[i:]) + h.cache[i] = is + return true + } else if i < len(h.cache) { + // drop older schema + copy(h.cache[i+1:], h.cache[i:]) + h.cache[i] = is + return true + } + // older than all cached schemas, refuse to cache it + return false +} diff --git a/infoschema/cache_test.go b/infoschema/cache_test.go new file mode 100644 index 0000000000000..a8e9ddcc0df5a --- /dev/null +++ b/infoschema/cache_test.go @@ -0,0 +1,119 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package infoschema_test + +import ( + . "github.com/pingcap/check" + "github.com/pingcap/tidb/infoschema" +) + +var _ = Suite(&testInfoCacheSuite{}) + +type testInfoCacheSuite struct { +} + +func (s *testInfoCacheSuite) TestNewCache(c *C) { + ic := infoschema.NewCache(16) + c.Assert(ic, NotNil) +} + +func (s *testInfoCacheSuite) TestInsert(c *C) { + ic := infoschema.NewCache(3) + c.Assert(ic, NotNil) + + is2 := infoschema.MockInfoSchemaWithSchemaVer(nil, 2) + ic.Insert(is2) + c.Assert(ic.GetByVersion(2), NotNil) + + // newer + is5 := infoschema.MockInfoSchemaWithSchemaVer(nil, 5) + ic.Insert(is5) + c.Assert(ic.GetByVersion(5), NotNil) + c.Assert(ic.GetByVersion(2), NotNil) + + // older + is0 := infoschema.MockInfoSchemaWithSchemaVer(nil, 0) + ic.Insert(is0) + c.Assert(ic.GetByVersion(5), NotNil) + c.Assert(ic.GetByVersion(2), NotNil) + c.Assert(ic.GetByVersion(0), NotNil) + + // replace 5, drop 0 + is6 := infoschema.MockInfoSchemaWithSchemaVer(nil, 6) + ic.Insert(is6) + c.Assert(ic.GetByVersion(6), NotNil) + c.Assert(ic.GetByVersion(5), NotNil) + c.Assert(ic.GetByVersion(2), NotNil) + c.Assert(ic.GetByVersion(0), IsNil) + + // replace 2, drop 2 + is3 := infoschema.MockInfoSchemaWithSchemaVer(nil, 3) + ic.Insert(is3) + c.Assert(ic.GetByVersion(6), NotNil) + c.Assert(ic.GetByVersion(5), NotNil) + c.Assert(ic.GetByVersion(3), NotNil) + c.Assert(ic.GetByVersion(2), IsNil) + c.Assert(ic.GetByVersion(0), IsNil) + + // insert 2, but failed silently + ic.Insert(is2) + c.Assert(ic.GetByVersion(6), NotNil) + c.Assert(ic.GetByVersion(5), NotNil) + c.Assert(ic.GetByVersion(3), NotNil) + c.Assert(ic.GetByVersion(2), IsNil) + c.Assert(ic.GetByVersion(0), IsNil) + + // insert 5, but it is already in + ic.Insert(is5) + c.Assert(ic.GetByVersion(6), NotNil) + c.Assert(ic.GetByVersion(5), NotNil) + c.Assert(ic.GetByVersion(3), NotNil) + c.Assert(ic.GetByVersion(2), IsNil) + c.Assert(ic.GetByVersion(0), IsNil) +} + +func (s *testInfoCacheSuite) TestGetByVersion(c *C) { + ic := infoschema.NewCache(2) + c.Assert(ic, NotNil) + is1 := infoschema.MockInfoSchemaWithSchemaVer(nil, 1) + ic.Insert(is1) + is3 := infoschema.MockInfoSchemaWithSchemaVer(nil, 3) + ic.Insert(is3) + + c.Assert(ic.GetByVersion(1), Equals, is1) + c.Assert(ic.GetByVersion(3), Equals, is3) + c.Assert(ic.GetByVersion(0), IsNil, Commentf("index == 0, but not found")) + c.Assert(ic.GetByVersion(2), IsNil, Commentf("index in the middle, but not found")) + c.Assert(ic.GetByVersion(4), IsNil, Commentf("index == length, but not found")) +} + +func (s *testInfoCacheSuite) TestGetLatest(c *C) { + ic := infoschema.NewCache(16) + c.Assert(ic, NotNil) + c.Assert(ic.GetLatest(), IsNil) + + is1 := infoschema.MockInfoSchemaWithSchemaVer(nil, 1) + ic.Insert(is1) + c.Assert(ic.GetLatest(), Equals, is1) + + // newer change the newest + is2 := infoschema.MockInfoSchemaWithSchemaVer(nil, 2) + ic.Insert(is2) + c.Assert(ic.GetLatest(), Equals, is2) + + // older schema doesn't change the newest + is0 := infoschema.MockInfoSchemaWithSchemaVer(nil, 0) + ic.Insert(is0) + c.Assert(ic.GetLatest(), Equals, is2) +} diff --git a/infoschema/infoschema.go b/infoschema/infoschema.go index ac8afd14605f1..2494e89b4d57f 100644 --- a/infoschema/infoschema.go +++ b/infoschema/infoschema.go @@ -17,12 +17,10 @@ import ( "fmt" "sort" "sync" - "sync/atomic" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/ddl/placement" - "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/util" @@ -312,40 +310,6 @@ func (is *infoSchema) SequenceByName(schema, sequence model.CIStr) (util.Sequenc return tbl.(util.SequenceTable), nil } -// Handle handles information schema, including getting and setting. -type Handle struct { - value atomic.Value - store kv.Storage -} - -// NewHandle creates a new Handle. -func NewHandle(store kv.Storage) *Handle { - h := &Handle{ - store: store, - } - return h -} - -// Get gets information schema from Handle. -func (h *Handle) Get() InfoSchema { - v := h.value.Load() - schema, _ := v.(InfoSchema) - return schema -} - -// IsValid uses to check whether handle value is valid. -func (h *Handle) IsValid() bool { - return h.value.Load() != nil -} - -// EmptyClone creates a new Handle with the same store and memSchema, but the value is not set. -func (h *Handle) EmptyClone() *Handle { - newHandle := &Handle{ - store: h.store, - } - return newHandle -} - func init() { // Initialize the information shema database and register the driver to `drivers` dbID := autoid.InformationSchemaDBID diff --git a/infoschema/infoschema_test.go b/infoschema/infoschema_test.go index 6aa0c5526f467..87276ef1452b9 100644 --- a/infoschema/infoschema_test.go +++ b/infoschema/infoschema_test.go @@ -15,7 +15,6 @@ package infoschema_test import ( "context" - "sync" "testing" . "github.com/pingcap/check" @@ -57,7 +56,6 @@ func (*testSuite) TestT(c *C) { c.Assert(err, IsNil) defer dom.Close() - handle := infoschema.NewHandle(store) dbName := model.NewCIStr("Test") tbName := model.NewCIStr("T") colName := model.NewCIStr("A") @@ -116,7 +114,7 @@ func (*testSuite) TestT(c *C) { }) c.Assert(err, IsNil) - builder, err := infoschema.NewBuilder(handle).InitWithDBInfos(dbInfos, nil, 1) + builder, err := infoschema.NewBuilder(dom.Store()).InitWithDBInfos(dbInfos, nil, 1) c.Assert(err, IsNil) txn, err := store.Begin() @@ -126,8 +124,7 @@ func (*testSuite) TestT(c *C) { err = txn.Rollback() c.Assert(err, IsNil) - builder.Build() - is := handle.Get() + is := builder.Build() schemaNames := is.AllSchemaNames() c.Assert(schemaNames, HasLen, 4) @@ -213,14 +210,10 @@ func (*testSuite) TestT(c *C) { c.Assert(err, IsNil) err = txn.Rollback() c.Assert(err, IsNil) - builder.Build() - is = handle.Get() + is = builder.Build() schema, ok = is.SchemaByID(dbID) c.Assert(ok, IsTrue) c.Assert(len(schema.Tables), Equals, 1) - - emptyHandle := handle.EmptyClone() - c.Assert(emptyHandle.Get(), IsNil) } func (testSuite) TestMockInfoSchema(c *C) { @@ -258,32 +251,6 @@ func checkApplyCreateNonExistsTableDoesNotPanic(c *C, txn kv.Transaction, builde c.Assert(infoschema.ErrTableNotExists.Equal(err), IsTrue) } -// TestConcurrent makes sure it is safe to concurrently create handle on multiple stores. -func (testSuite) TestConcurrent(c *C) { - defer testleak.AfterTest(c)() - storeCount := 5 - stores := make([]kv.Storage, storeCount) - for i := 0; i < storeCount; i++ { - store, err := mockstore.NewMockStore() - c.Assert(err, IsNil) - stores[i] = store - } - defer func() { - for _, store := range stores { - store.Close() - } - }() - var wg sync.WaitGroup - wg.Add(storeCount) - for _, store := range stores { - go func(s kv.Storage) { - defer wg.Done() - _ = infoschema.NewHandle(s) - }(store) - } - wg.Wait() -} - // TestInfoTables makes sure that all tables of information_schema could be found in infoschema handle. func (*testSuite) TestInfoTables(c *C) { defer testleak.AfterTest(c)() @@ -293,12 +260,10 @@ func (*testSuite) TestInfoTables(c *C) { err := store.Close() c.Assert(err, IsNil) }() - handle := infoschema.NewHandle(store) - builder, err := infoschema.NewBuilder(handle).InitWithDBInfos(nil, nil, 0) + + builder, err := infoschema.NewBuilder(store).InitWithDBInfos(nil, nil, 0) c.Assert(err, IsNil) - builder.Build() - is := handle.Get() - c.Assert(is, NotNil) + is := builder.Build() infoTables := []string{ "SCHEMATA", @@ -360,12 +325,9 @@ func (*testSuite) TestGetBundle(c *C) { c.Assert(err, IsNil) }() - handle := infoschema.NewHandle(store) - builder, err := infoschema.NewBuilder(handle).InitWithDBInfos(nil, nil, 0) + builder, err := infoschema.NewBuilder(store).InitWithDBInfos(nil, nil, 0) c.Assert(err, IsNil) - builder.Build() - - is := handle.Get() + is := builder.Build() bundle := &placement.Bundle{ ID: placement.PDBundleID, diff --git a/metrics/domain.go b/metrics/domain.go index dd3912555d59c..f30dbd59e5d32 100644 --- a/metrics/domain.go +++ b/metrics/domain.go @@ -38,6 +38,19 @@ var ( Buckets: prometheus.ExponentialBuckets(0.001, 2, 20), // 1ms ~ 524s }) + // InfoCacheCounters are the counters of get/hit. + InfoCacheCounters = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "tidb", + Subsystem: "domain", + Name: "infocache_counters", + Help: "Counters of infoCache: get/hit.", + }, []string{LblType}) + // InfoCacheCounterGet is the total number of getting entry. + InfoCacheCounterGet = "get" + // InfoCacheCounterHit is the cache hit numbers for get. + InfoCacheCounterHit = "hit" + // LoadPrivilegeCounter records the counter of load privilege. LoadPrivilegeCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ diff --git a/metrics/metrics.go b/metrics/metrics.go index ff2ac3b1aa08d..4a879b5d5423c 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -98,6 +98,7 @@ func RegisterMetrics() { prometheus.MustRegister(JobsGauge) prometheus.MustRegister(KeepAliveCounter) prometheus.MustRegister(LoadPrivilegeCounter) + prometheus.MustRegister(InfoCacheCounters) prometheus.MustRegister(LoadSchemaCounter) prometheus.MustRegister(LoadSchemaDuration) prometheus.MustRegister(MetaHistogram) diff --git a/owner/manager_test.go b/owner/manager_test.go index e25b204e6bbb4..e239419057291 100644 --- a/owner/manager_test.go +++ b/owner/manager_test.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/parser/terror" . "github.com/pingcap/tidb/ddl" + "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/owner" "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/util/logutil" @@ -72,11 +73,14 @@ func TestSingle(t *testing.T) { defer clus.Terminate(t) cli := clus.RandClient() ctx := goctx.Background() + ic := infoschema.NewCache(2) + ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 0)) d := NewDDL( ctx, WithEtcdClient(cli), WithStore(store), WithLease(testLease), + WithInfoCache(ic), ) err = d.Start(nil) if err != nil { @@ -142,11 +146,14 @@ func TestCluster(t *testing.T) { defer clus.Terminate(t) cli := clus.Client(0) + ic := infoschema.NewCache(2) + ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 0)) d := NewDDL( goctx.Background(), WithEtcdClient(cli), WithStore(store), WithLease(testLease), + WithInfoCache(ic), ) err = d.Start(nil) if err != nil { @@ -157,11 +164,14 @@ func TestCluster(t *testing.T) { t.Fatalf("expect true, got isOwner:%v", isOwner) } cli1 := clus.Client(1) + ic2 := infoschema.NewCache(2) + ic2.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 0)) d1 := NewDDL( goctx.Background(), WithEtcdClient(cli1), WithStore(store), WithLease(testLease), + WithInfoCache(ic2), ) err = d1.Start(nil) if err != nil { @@ -189,11 +199,14 @@ func TestCluster(t *testing.T) { // d3 (not owner) stop cli3 := clus.Client(3) + ic3 := infoschema.NewCache(2) + ic3.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 0)) d3 := NewDDL( goctx.Background(), WithEtcdClient(cli3), WithStore(store), WithLease(testLease), + WithInfoCache(ic3), ) err = d3.Start(nil) if err != nil { diff --git a/session/session.go b/session/session.go index af3f41c863dc0..84ff7e4eec424 100644 --- a/session/session.go +++ b/session/session.go @@ -1935,7 +1935,7 @@ func (s *session) isTxnRetryable() bool { func (s *session) NewTxn(ctx context.Context) error { if s.txn.Valid() { - txnID := s.txn.StartTS() + txnStartTS := s.txn.StartTS() txnScope := s.GetSessionVars().TxnCtx.TxnScope err := s.CommitTxn(ctx) if err != nil { @@ -1944,7 +1944,7 @@ func (s *session) NewTxn(ctx context.Context) error { vars := s.GetSessionVars() logutil.Logger(ctx).Info("NewTxn() inside a transaction auto commit", zap.Int64("schemaVersion", vars.GetInfoSchema().SchemaMetaVersion()), - zap.Uint64("txnStartTS", txnID), + zap.Uint64("txnStartTS", txnStartTS), zap.String("txnScope", txnScope)) } @@ -2805,7 +2805,10 @@ func (s *session) NewTxnWithStalenessOption(ctx context.Context, option sessionc txn.SetOption(kv.IsStalenessReadOnly, true) txn.SetOption(kv.TxnScope, txnScope) s.txn.changeInvalidToValid(txn) - is := domain.GetDomain(s).InfoSchema() + is, err := domain.GetDomain(s).GetSnapshotInfoSchema(txn.StartTS()) + if err != nil { + return errors.Trace(err) + } s.sessionVars.TxnCtx = &variable.TransactionContext{ InfoSchema: is, CreateTime: time.Now(), diff --git a/session/session_test.go b/session/session_test.go index 4870215f33c9e..a897cb7db07f3 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -3903,9 +3903,7 @@ func (s *testSessionSerialSuite) TestIssue21943(c *C) { c.Assert(err.Error(), Equals, "[variable:1238]Variable 'last_plan_from_cache' is a read only variable") } -func (s *testSessionSuite) TestValidateReadOnlyInStalenessTransaction(c *C) { - c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/mockStalenessTxnSchemaVer", "return(false)"), IsNil) - defer failpoint.Disable("github.com/pingcap/tidb/executor/mockStalenessTxnSchemaVer") +func (s *testSessionSerialSuite) TestValidateReadOnlyInStalenessTransaction(c *C) { testcases := []struct { name string sql string @@ -4036,7 +4034,7 @@ func (s *testSessionSuite) TestValidateReadOnlyInStalenessTransaction(c *C) { tk.MustExec(`set @@tidb_enable_noop_functions=1;`) for _, testcase := range testcases { c.Log(testcase.name) - tk.MustExec(`START TRANSACTION READ ONLY WITH TIMESTAMP BOUND READ TIMESTAMP '2020-09-06 00:00:00';`) + tk.MustExec(`START TRANSACTION READ ONLY WITH TIMESTAMP BOUND EXACT STALENESS '00:00:00';`) if testcase.isValidate { _, err := tk.Exec(testcase.sql) c.Assert(err, IsNil) @@ -4050,8 +4048,6 @@ func (s *testSessionSuite) TestValidateReadOnlyInStalenessTransaction(c *C) { } func (s *testSessionSerialSuite) TestSpecialSQLInStalenessTxn(c *C) { - c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/mockStalenessTxnSchemaVer", "return(false)"), IsNil) - defer failpoint.Disable("github.com/pingcap/tidb/executor/mockStalenessTxnSchemaVer") tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") testcases := []struct { @@ -4098,7 +4094,7 @@ func (s *testSessionSerialSuite) TestSpecialSQLInStalenessTxn(c *C) { tk.MustExec("CREATE USER 'newuser' IDENTIFIED BY 'mypassword';") for _, testcase := range testcases { comment := Commentf(testcase.name) - tk.MustExec(`START TRANSACTION READ ONLY WITH TIMESTAMP BOUND READ TIMESTAMP '2020-09-06 00:00:00';`) + tk.MustExec(`START TRANSACTION READ ONLY WITH TIMESTAMP BOUND EXACT STALENESS '00:00:00';`) c.Assert(tk.Se.GetSessionVars().TxnCtx.IsStaleness, Equals, true, comment) tk.MustExec(testcase.sql) c.Assert(tk.Se.GetSessionVars().TxnCtx.IsStaleness, Equals, testcase.sameSession, comment)