From b1e74eff3ad648b089ded58335e330648ef0700b Mon Sep 17 00:00:00 2001 From: xhe Date: Thu, 22 Apr 2021 12:30:20 +0800 Subject: [PATCH 01/23] *: compatibility with staleread Signed-off-by: xhe --- ddl/ddl.go | 10 +- ddl/ddl_api.go | 30 +++--- ddl/options.go | 14 +-- ddl/options_test.go | 4 +- ddl/partition.go | 14 +-- ddl/schema.go | 6 +- ddl/table.go | 8 +- domain/domain.go | 180 +++++++++++++++------------------- domain/domain_test.go | 2 +- infoschema/builder.go | 47 ++++----- infoschema/cache.go | 77 +++++++++++++++ infoschema/cache_test.go | 103 +++++++++++++++++++ infoschema/infoschema.go | 36 ------- infoschema/infoschema_test.go | 42 ++------ session/session.go | 9 +- 15 files changed, 340 insertions(+), 242 deletions(-) create mode 100644 infoschema/cache.go create mode 100644 infoschema/cache_test.go diff --git a/ddl/ddl.go b/ddl/ddl.go index 6f20fe25ccc07..ffb93411b6183 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -202,7 +202,7 @@ type ddlCtx struct { ddlEventCh chan<- *util.Event lease time.Duration // lease is schema lease. binlogCli *pumpcli.PumpsClient // binlogCli is used for Binlog. - infoHandle *infoschema.Handle + infoCache *infoschema.InfoCache statsHandle *handle.Handle tableLockCkr util.DeadTableLockChecker etcdCli *clientv3.Client @@ -290,7 +290,7 @@ func newDDL(ctx context.Context, options ...Option) *ddl { ownerManager: manager, schemaSyncer: syncer, binlogCli: binloginfo.GetPumpsClient(), - infoHandle: opt.InfoHandle, + infoCache: opt.InfoCache, tableLockCkr: deadLockCkr, etcdCli: opt.EtcdCli, } @@ -411,7 +411,7 @@ func (d *ddl) GetLease() time.Duration { // Please don't use this function, it is used by TestParallelDDLBeforeRunDDLJob to intercept the calling of d.infoHandle.Get(), use d.infoHandle.Get() instead. // Otherwise, the TestParallelDDLBeforeRunDDLJob will hang up forever. func (d *ddl) GetInfoSchemaWithInterceptor(ctx sessionctx.Context) infoschema.InfoSchema { - is := d.infoHandle.Get() + is := d.infoCache.Get() d.mu.RLock() defer d.mu.RUnlock() @@ -649,10 +649,10 @@ func (d *ddl) startCleanDeadTableLock() { if !d.ownerManager.IsOwner() { continue } - if d.infoHandle == nil || !d.infoHandle.IsValid() { + if d.infoCache == nil || d.infoCache.Get() == nil { continue } - deadLockTables, err := d.tableLockCkr.GetDeadLockedTables(d.ctx, d.infoHandle.Get().AllSchemas()) + deadLockTables, err := d.tableLockCkr.GetDeadLockedTables(d.ctx, d.infoCache.Get().AllSchemas()) if err != nil { logutil.BgLogger().Info("[ddl] get dead table lock failed.", zap.Error(err)) continue diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index e6d77c9e674e9..7323c49f241fd 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -2367,7 +2367,7 @@ func (d *ddl) AlterTable(ctx sessionctx.Context, ident ast.Ident, specs []*ast.A return errors.Trace(err) } - is := d.infoHandle.Get() + is := d.infoCache.Get() if is.TableIsView(ident.Schema, ident.Name) || is.TableIsSequence(ident.Schema, ident.Name) { return ErrWrongObject.GenWithStackByArgs(ident.Schema, ident.Name, "BASE TABLE") } @@ -2898,7 +2898,7 @@ func (d *ddl) AddColumns(ctx sessionctx.Context, ti ast.Ident, specs []*ast.Alte // AddTablePartitions will add a new partition to the table. func (d *ddl) AddTablePartitions(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error { - is := d.infoHandle.Get() + is := d.infoCache.Get() schema, ok := is.SchemaByName(ident.Schema) if !ok { return errors.Trace(infoschema.ErrDatabaseNotExists.GenWithStackByArgs(schema)) @@ -2959,7 +2959,7 @@ func (d *ddl) AddTablePartitions(ctx sessionctx.Context, ident ast.Ident, spec * // CoalescePartitions coalesce partitions can be used with a table that is partitioned by hash or key to reduce the number of partitions by number. func (d *ddl) CoalescePartitions(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error { - is := d.infoHandle.Get() + is := d.infoCache.Get() schema, ok := is.SchemaByName(ident.Schema) if !ok { return errors.Trace(infoschema.ErrDatabaseNotExists.GenWithStackByArgs(schema)) @@ -2991,7 +2991,7 @@ func (d *ddl) CoalescePartitions(ctx sessionctx.Context, ident ast.Ident, spec * } func (d *ddl) TruncateTablePartition(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error { - is := d.infoHandle.Get() + is := d.infoCache.Get() schema, ok := is.SchemaByName(ident.Schema) if !ok { return errors.Trace(infoschema.ErrDatabaseNotExists.GenWithStackByArgs(schema)) @@ -3039,7 +3039,7 @@ func (d *ddl) TruncateTablePartition(ctx sessionctx.Context, ident ast.Ident, sp } func (d *ddl) DropTablePartition(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error { - is := d.infoHandle.Get() + is := d.infoCache.Get() schema, ok := is.SchemaByName(ident.Schema) if !ok { return errors.Trace(infoschema.ErrDatabaseNotExists.GenWithStackByArgs(schema)) @@ -3752,7 +3752,7 @@ func processAndCheckDefaultValueAndColumn(ctx sessionctx.Context, col *table.Col func (d *ddl) getModifiableColumnJob(ctx sessionctx.Context, ident ast.Ident, originalColName model.CIStr, spec *ast.AlterTableSpec) (*model.Job, error) { specNewColumn := spec.NewColumns[0] - is := d.infoHandle.Get() + is := d.infoCache.Get() schema, ok := is.SchemaByName(ident.Schema) if !ok { return nil, errors.Trace(infoschema.ErrDatabaseNotExists) @@ -4203,7 +4203,7 @@ func (d *ddl) ModifyColumn(ctx sessionctx.Context, ident ast.Ident, spec *ast.Al func (d *ddl) AlterColumn(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error { specNewColumn := spec.NewColumns[0] - is := d.infoHandle.Get() + is := d.infoCache.Get() schema, ok := is.SchemaByName(ident.Schema) if !ok { return infoschema.ErrTableNotExists.GenWithStackByArgs(ident.Schema, ident.Name) @@ -4257,7 +4257,7 @@ func (d *ddl) AlterColumn(ctx sessionctx.Context, ident ast.Ident, spec *ast.Alt // AlterTableComment updates the table comment information. func (d *ddl) AlterTableComment(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error { - is := d.infoHandle.Get() + is := d.infoCache.Get() schema, ok := is.SchemaByName(ident.Schema) if !ok { return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(ident.Schema) @@ -4310,7 +4310,7 @@ func (d *ddl) AlterTableCharsetAndCollate(ctx sessionctx.Context, ident ast.Iden return ErrUnknownCharacterSet.GenWithStackByArgs(toCharset) } - is := d.infoHandle.Get() + is := d.infoCache.Get() schema, ok := is.SchemaByName(ident.Schema) if !ok { return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(ident.Schema) @@ -4471,7 +4471,7 @@ func (d *ddl) AlterTableDropStatistics(ctx sessionctx.Context, ident ast.Ident, // UpdateTableReplicaInfo updates the table flash replica infos. func (d *ddl) UpdateTableReplicaInfo(ctx sessionctx.Context, physicalID int64, available bool) error { - is := d.infoHandle.Get() + is := d.infoCache.Get() tb, ok := is.TableByID(physicalID) if !ok { tb, _, _ = is.FindTableByPartitionID(physicalID) @@ -4574,7 +4574,7 @@ func checkAlterTableCharset(tblInfo *model.TableInfo, dbInfo *model.DBInfo, toCh // In TiDB, indexes are case-insensitive (so index 'a' and 'A" are considered the same index), // but index names are case-sensitive (we can rename index 'a' to 'A') func (d *ddl) RenameIndex(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error { - is := d.infoHandle.Get() + is := d.infoCache.Get() schema, ok := is.SchemaByName(ident.Schema) if !ok { return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(ident.Schema) @@ -5232,7 +5232,7 @@ func buildFKInfo(fkName model.CIStr, keys []*ast.IndexPartSpecification, refer * } func (d *ddl) CreateForeignKey(ctx sessionctx.Context, ti ast.Ident, fkName model.CIStr, keys []*ast.IndexPartSpecification, refer *ast.ReferenceDef) error { - is := d.infoHandle.Get() + is := d.infoCache.Get() schema, ok := is.SchemaByName(ti.Schema) if !ok { return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(ti.Schema) @@ -5264,7 +5264,7 @@ func (d *ddl) CreateForeignKey(ctx sessionctx.Context, ti ast.Ident, fkName mode } func (d *ddl) DropForeignKey(ctx sessionctx.Context, ti ast.Ident, fkName model.CIStr) error { - is := d.infoHandle.Get() + is := d.infoCache.Get() schema, ok := is.SchemaByName(ti.Schema) if !ok { return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(ti.Schema) @@ -5290,7 +5290,7 @@ func (d *ddl) DropForeignKey(ctx sessionctx.Context, ti ast.Ident, fkName model. } func (d *ddl) DropIndex(ctx sessionctx.Context, ti ast.Ident, indexName model.CIStr, ifExists bool) error { - is := d.infoHandle.Get() + is := d.infoCache.Get() schema, ok := is.SchemaByName(ti.Schema) if !ok { return errors.Trace(infoschema.ErrDatabaseNotExists) @@ -6036,7 +6036,7 @@ func (d *ddl) AlterTableAlterPartition(ctx sessionctx.Context, ident ast.Ident, return errors.Trace(err) } - oldBundle := infoschema.GetBundle(d.infoHandle.Get(), []int64{partitionID, meta.ID, schema.ID}) + oldBundle := infoschema.GetBundle(d.infoCache.Get(), []int64{partitionID, meta.ID, schema.ID}) oldBundle.ID = placement.GroupID(partitionID) diff --git a/ddl/options.go b/ddl/options.go index 8613a8e9affa9..5a8448a772af0 100644 --- a/ddl/options.go +++ b/ddl/options.go @@ -26,11 +26,11 @@ type Option func(*Options) // Options represents all the options of the DDL module needs type Options struct { - EtcdCli *clientv3.Client - Store kv.Storage - InfoHandle *infoschema.Handle - Hook Callback - Lease time.Duration + EtcdCli *clientv3.Client + Store kv.Storage + InfoCache *infoschema.InfoCache + Hook Callback + Lease time.Duration } // WithEtcdClient specifies the `clientv3.Client` of DDL used to request the etcd service @@ -48,9 +48,9 @@ func WithStore(store kv.Storage) Option { } // WithInfoHandle specifies the `infoschema.Handle` -func WithInfoHandle(ih *infoschema.Handle) Option { +func WithInfoHandle(ic *infoschema.InfoCache) Option { return func(options *Options) { - options.InfoHandle = ih + options.InfoCache = ic } } diff --git a/ddl/options_test.go b/ddl/options_test.go index 294d68731e4c3..d177fd2cd0842 100644 --- a/ddl/options_test.go +++ b/ddl/options_test.go @@ -33,7 +33,7 @@ func (s *ddlOptionsSuite) TestOptions(c *C) { callback := &ddl.BaseCallback{} lease := time.Second * 3 store := &mock.Store{} - infoHandle := infoschema.NewHandle(store) + infoHandle := infoschema.NewCache(16) options := []ddl.Option{ ddl.WithEtcdClient(client), @@ -52,5 +52,5 @@ func (s *ddlOptionsSuite) TestOptions(c *C) { c.Assert(opt.Hook, Equals, callback) c.Assert(opt.Lease, Equals, lease) c.Assert(opt.Store, Equals, store) - c.Assert(opt.InfoHandle, Equals, infoHandle) + c.Assert(opt.InfoCache, Equals, infoHandle) } diff --git a/ddl/partition.go b/ddl/partition.go index 0cafa9d2ff525..da1d27503f61e 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -911,10 +911,10 @@ func getTableInfoWithDroppingPartitions(t *model.TableInfo) *model.TableInfo { } func dropRuleBundles(d *ddlCtx, physicalTableIDs []int64) error { - if d.infoHandle != nil && d.infoHandle.IsValid() { + if d.infoCache != nil && d.infoCache.Get() != nil { bundles := make([]*placement.Bundle, 0, len(physicalTableIDs)) for _, ID := range physicalTableIDs { - oldBundle, ok := d.infoHandle.Get().BundleByName(placement.GroupID(ID)) + oldBundle, ok := d.infoCache.Get().BundleByName(placement.GroupID(ID)) if ok && !oldBundle.IsEmpty() { bundles = append(bundles, placement.BuildPlacementDropBundle(ID)) } @@ -1095,11 +1095,11 @@ func onTruncateTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, e } } - if d.infoHandle != nil && d.infoHandle.IsValid() { + if d.infoCache != nil && d.infoCache.Get() != nil { bundles := make([]*placement.Bundle, 0, len(oldIDs)) for i, oldID := range oldIDs { - oldBundle, ok := d.infoHandle.Get().BundleByName(placement.GroupID(oldID)) + oldBundle, ok := d.infoCache.Get().BundleByName(placement.GroupID(oldID)) if ok && !oldBundle.IsEmpty() { bundles = append(bundles, placement.BuildPlacementDropBundle(oldID)) bundles = append(bundles, placement.BuildPlacementCopyBundle(oldBundle, newPartitions[i].ID)) @@ -1299,11 +1299,11 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo // the follow code is a swap function for rules of two partitions // though partitions has exchanged their ID, swap still take effect - if d.infoHandle != nil && d.infoHandle.IsValid() { + if d.infoCache != nil && d.infoCache.Get() != nil { bundles := make([]*placement.Bundle, 0, 2) - ptBundle, ptOK := d.infoHandle.Get().BundleByName(placement.GroupID(partDef.ID)) + ptBundle, ptOK := d.infoCache.Get().BundleByName(placement.GroupID(partDef.ID)) ptOK = ptOK && !ptBundle.IsEmpty() - ntBundle, ntOK := d.infoHandle.Get().BundleByName(placement.GroupID(nt.ID)) + ntBundle, ntOK := d.infoCache.Get().BundleByName(placement.GroupID(nt.ID)) ntOK = ntOK && !ntBundle.IsEmpty() if ptOK && ntOK { bundles = append(bundles, placement.BuildPlacementCopyBundle(ptBundle, nt.ID)) diff --git a/ddl/schema.go b/ddl/schema.go index 823e12a551900..8854b397fb964 100644 --- a/ddl/schema.go +++ b/ddl/schema.go @@ -69,7 +69,7 @@ func onCreateSchema(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error func checkSchemaNotExists(d *ddlCtx, t *meta.Meta, schemaID int64, dbInfo *model.DBInfo) error { // d.infoHandle maybe nil in some test. - if d.infoHandle == nil { + if d.infoCache == nil { return checkSchemaNotExistsFromStore(t, schemaID, dbInfo) } // Try to use memory schema info to check first. @@ -77,7 +77,7 @@ func checkSchemaNotExists(d *ddlCtx, t *meta.Meta, schemaID int64, dbInfo *model if err != nil { return err } - is := d.infoHandle.Get() + is := d.infoCache.Get() if is.SchemaMetaVersion() == currVer { return checkSchemaNotExistsFromInfoSchema(is, schemaID, dbInfo) } @@ -169,7 +169,7 @@ func onDropSchema(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) oldIDs := getIDs(tables) bundles := make([]*placement.Bundle, 0, len(oldIDs)+1) for _, ID := range append(oldIDs, dbInfo.ID) { - oldBundle, ok := d.infoHandle.Get().BundleByName(placement.GroupID(ID)) + oldBundle, ok := d.infoCache.Get().BundleByName(placement.GroupID(ID)) if ok && !oldBundle.IsEmpty() { bundles = append(bundles, placement.BuildPlacementDropBundle(ID)) } diff --git a/ddl/table.go b/ddl/table.go index 668de3ac41c05..77c64dcd9625c 100644 --- a/ddl/table.go +++ b/ddl/table.go @@ -487,8 +487,8 @@ func onTruncateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ erro } } - if d.infoHandle != nil && d.infoHandle.IsValid() { - is := d.infoHandle.Get() + if d.infoCache != nil && d.infoCache.Get() != nil { + is := d.infoCache.Get() bundles := make([]*placement.Bundle, 0, len(oldPartitionIDs)+1) if oldBundle, ok := is.BundleByName(placement.GroupID(tableID)); ok { @@ -968,7 +968,7 @@ func onUpdateFlashReplicaStatus(t *meta.Meta, job *model.Job) (ver int64, _ erro func checkTableNotExists(d *ddlCtx, t *meta.Meta, schemaID int64, tableName string) error { // d.infoHandle maybe nil in some test. - if d.infoHandle == nil || !d.infoHandle.IsValid() { + if d.infoCache == nil || d.infoCache.Get() == nil { return checkTableNotExistsFromStore(t, schemaID, tableName) } // Try to use memory schema info to check first. @@ -976,7 +976,7 @@ func checkTableNotExists(d *ddlCtx, t *meta.Meta, schemaID int64, tableName stri if err != nil { return err } - is := d.infoHandle.Get() + is := d.infoCache.Get() if is.SchemaMetaVersion() == currVer { return checkTableNotExistsFromInfoSchema(is, schemaID, tableName) } diff --git a/domain/domain.go b/domain/domain.go index f4b0ac8900f24..5934fd8d43441 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -67,7 +67,7 @@ import ( // Multiple domains can be used in parallel without synchronization. type Domain struct { store kv.Storage - infoHandle *infoschema.Handle + infoCache *infoschema.InfoCache privHandle *privileges.Handle bindHandle *bindinfo.BindHandle statsHandle unsafe.Pointer @@ -92,78 +92,75 @@ type Domain struct { isLostConnectionToPD sync2.AtomicInt32 // !0: true, 0: false. } -// loadInfoSchema loads infoschema at startTS into handle, usedSchemaVersion is the currently used -// infoschema version, if it is the same as the schema version at startTS, we don't need to reload again. -// It returns the latest schema version, the changed table IDs, whether it's a full load and an error. -func (do *Domain) loadInfoSchema(handle *infoschema.Handle, usedSchemaVersion int64, - startTS uint64) (neededSchemaVersion int64, change *tikv.RelatedSchemaChange, fullLoad bool, err error) { +// loadInfoSchema loads infoschema at startTS. +// It returns: +// 1. the needed infoschema +// 2. cache hit indicator +// 3. currentSchemaVersion(before loading) +// 4. the changed table IDs if it is not full load +// 5. an error if any +func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, int64, *tikv.RelatedSchemaChange, error) { snapshot := do.store.GetSnapshot(kv.NewVersion(startTS)) m := meta.NewSnapshotMeta(snapshot) - neededSchemaVersion, err = m.GetSchemaVersion() + neededSchemaVersion, err := m.GetSchemaVersion() if err != nil { - return 0, nil, fullLoad, err - } - if usedSchemaVersion != 0 && usedSchemaVersion == neededSchemaVersion { - return neededSchemaVersion, nil, fullLoad, nil + return nil, false, 0, nil, err } - // Update self schema version to etcd. - defer func() { - // There are two possibilities for not updating the self schema version to etcd. - // 1. Failed to loading schema information. - // 2. When users use history read feature, the neededSchemaVersion isn't the latest schema version. - if err != nil || neededSchemaVersion < do.InfoSchema().SchemaMetaVersion() { - logutil.BgLogger().Info("do not update self schema version to etcd", - zap.Int64("usedSchemaVersion", usedSchemaVersion), - zap.Int64("neededSchemaVersion", neededSchemaVersion), zap.Error(err)) - return - } + if is := do.infoCache.GetVersion(neededSchemaVersion); is != nil { + return is, true, 0, nil, nil + } - err = do.ddl.SchemaSyncer().UpdateSelfVersion(context.Background(), neededSchemaVersion) - if err != nil { - logutil.BgLogger().Info("update self version failed", - zap.Int64("usedSchemaVersion", usedSchemaVersion), - zap.Int64("neededSchemaVersion", neededSchemaVersion), zap.Error(err)) - } - }() + currentSchemaVersion := int64(0) + if oldInfoSchema := do.infoCache.Get(); oldInfoSchema != nil { + currentSchemaVersion = oldInfoSchema.SchemaMetaVersion() + } + // TODO: tryLoadSchemaDiffs has potential risks of failure. And it becomes worse in history reading cases. + // It is only kept because there is no alternative diff/partial loading solution. + // And it is only used to diff upgrading the current latest infoschema, if: + // 1. Not first time bootstrap loading, which needs a full load. + // 2. It is newer than the current one, so it will be "the current one" after this function call. + // 2. There are less 100 diffs. startTime := time.Now() - ok, relatedChanges, err := do.tryLoadSchemaDiffs(m, usedSchemaVersion, neededSchemaVersion) - if err != nil { + if currentSchemaVersion != 0 && neededSchemaVersion > currentSchemaVersion && neededSchemaVersion-currentSchemaVersion < 100 { + is, relatedChanges, err := do.tryLoadSchemaDiffs(m, currentSchemaVersion, neededSchemaVersion) + if err == nil { + do.infoCache.Insert(is) + logutil.BgLogger().Info("diff load InfoSchema success", + zap.Int64("currentSchemaVersion", currentSchemaVersion), + zap.Int64("neededSchemaVersion", neededSchemaVersion), + zap.Duration("start time", time.Since(startTime)), + zap.Int64s("phyTblIDs", relatedChanges.PhyTblIDS), + zap.Uint64s("actionTypes", relatedChanges.ActionTypes)) + return is, false, currentSchemaVersion, relatedChanges, nil + } // We can fall back to full load, don't need to return the error. logutil.BgLogger().Error("failed to load schema diff", zap.Error(err)) } - if ok { - logutil.BgLogger().Info("diff load InfoSchema success", - zap.Int64("usedSchemaVersion", usedSchemaVersion), - zap.Int64("neededSchemaVersion", neededSchemaVersion), - zap.Duration("start time", time.Since(startTime)), - zap.Int64s("phyTblIDs", relatedChanges.PhyTblIDS), - zap.Uint64s("actionTypes", relatedChanges.ActionTypes)) - return neededSchemaVersion, relatedChanges, fullLoad, nil - } - fullLoad = true schemas, err := do.fetchAllSchemasWithTables(m) if err != nil { - return 0, nil, fullLoad, err + return nil, false, currentSchemaVersion, nil, err } bundles, err := infosync.GetAllRuleBundles(context.TODO()) if err != nil { - return 0, nil, fullLoad, err + return nil, false, currentSchemaVersion, nil, err } - newISBuilder, err := infoschema.NewBuilder(handle).InitWithDBInfos(schemas, bundles, neededSchemaVersion) + newISBuilder, err := infoschema.NewBuilder(do.Store()).InitWithDBInfos(schemas, bundles, neededSchemaVersion) if err != nil { - return 0, nil, fullLoad, err + return nil, false, currentSchemaVersion, nil, err } logutil.BgLogger().Info("full load InfoSchema success", - zap.Int64("usedSchemaVersion", usedSchemaVersion), + zap.Int64("currentSchemaVersion", currentSchemaVersion), zap.Int64("neededSchemaVersion", neededSchemaVersion), zap.Duration("start time", time.Since(startTime))) - newISBuilder.Build() - return neededSchemaVersion, nil, fullLoad, nil + + is := newISBuilder.Build() + do.infoCache.Insert(is) + return is, false, currentSchemaVersion, nil, nil } func (do *Domain) fetchAllSchemasWithTables(m *meta.Meta) ([]*model.DBInfo, error) { @@ -238,48 +235,31 @@ func (do *Domain) fetchSchemasWithTables(schemas []*model.DBInfo, m *meta.Meta, done <- nil } -const ( - initialVersion = 0 - maxNumberOfDiffsToLoad = 100 -) - -func isTooOldSchema(usedVersion, newVersion int64) bool { - if usedVersion == initialVersion || newVersion-usedVersion > maxNumberOfDiffsToLoad { - return true - } - return false -} - // tryLoadSchemaDiffs tries to only load latest schema changes. // Return true if the schema is loaded successfully. // Return false if the schema can not be loaded by schema diff, then we need to do full load. // The second returned value is the delta updated table and partition IDs. -func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64) (bool, *tikv.RelatedSchemaChange, error) { - // If there isn't any used version, or used version is too old, we do full load. - // And when users use history read feature, we will set usedVersion to initialVersion, then full load is needed. - if isTooOldSchema(usedVersion, newVersion) { - return false, nil, nil - } +func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64) (infoschema.InfoSchema, *tikv.RelatedSchemaChange, error) { var diffs []*model.SchemaDiff for usedVersion < newVersion { usedVersion++ diff, err := m.GetSchemaDiff(usedVersion) if err != nil { - return false, nil, err + return nil, nil, err } if diff == nil { // If diff is missing for any version between used and new version, we fall back to full reload. - return false, nil, nil + return nil, nil, fmt.Errorf("failed to get schemadiff") } diffs = append(diffs, diff) } - builder := infoschema.NewBuilder(do.infoHandle).InitWithOldInfoSchema() + builder := infoschema.NewBuilder(do.Store()).InitWithOldInfoSchema(do.infoCache.Get()) phyTblIDs := make([]int64, 0, len(diffs)) actions := make([]uint64, 0, len(diffs)) for _, diff := range diffs { IDs, err := builder.ApplyDiff(m, diff) if err != nil { - return false, nil, err + return nil, nil, err } if canSkipSchemaCheckerDDL(diff.Type) { continue @@ -289,11 +269,11 @@ func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64 actions = append(actions, uint64(1< 0 { + return h.cache[0] + } + return nil +} + +// GetVersion gets the information schema based on schemaVersion. Returns nil if it is not loaded. +func (h *InfoCache) GetVersion(version int64) InfoSchema { + h.mu.Lock() + defer h.mu.Unlock() + i := sort.Search(len(h.cache), func(i int) bool { + return h.cache[i].SchemaMetaVersion() <= version + }) + if i < len(h.cache) && h.cache[i].SchemaMetaVersion() == version { + return h.cache[i] + } + return nil +} + +// Insert will **TRY** to insert the infoschema into the cache. It works by always keeping newers. +// But YOU SHOULD NOT RELY THIS BEHAVIOR. +func (h *InfoCache) Insert(is InfoSchema) { + h.mu.Lock() + defer h.mu.Unlock() + version := is.SchemaMetaVersion() + i := sort.Search(len(h.cache), func(i int) bool { + return h.cache[i].SchemaMetaVersion() <= version + }) + if len(h.cache) < cap(h.cache) { + // has free space + h.cache = append(h.cache, nil) + copy(h.cache[i+1:], h.cache[i:]) + h.cache[i] = is + } else if i < len(h.cache) { + // drop older schema + copy(h.cache[i+1:], h.cache[i:]) + h.cache[i] = is + } + // older than all cached schemas, refuse to cache it +} diff --git a/infoschema/cache_test.go b/infoschema/cache_test.go new file mode 100644 index 0000000000000..1481243145080 --- /dev/null +++ b/infoschema/cache_test.go @@ -0,0 +1,103 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package infoschema_test + +import ( + . "github.com/pingcap/check" + "github.com/pingcap/tidb/infoschema" +) + +var _ = Suite(&testInfoCacheSuite{}) + +type testInfoCacheSuite struct { +} + +func (s *testInfoCacheSuite) TestNewCache(c *C) { + ic := infoschema.NewCache(16) + c.Assert(ic, NotNil) +} + +func (s *testInfoCacheSuite) TestInsert(c *C) { + ic := infoschema.NewCache(3) + c.Assert(ic, NotNil) + + is2 := infoschema.MockInfoSchemaWithSchemaVer(nil, 2) + ic.Insert(is2) + c.Assert(ic.GetVersion(2), NotNil) + + // newer + is5 := infoschema.MockInfoSchemaWithSchemaVer(nil, 5) + ic.Insert(is5) + c.Assert(ic.GetVersion(5), NotNil) + c.Assert(ic.GetVersion(2), NotNil) + + // older + is0 := infoschema.MockInfoSchemaWithSchemaVer(nil, 0) + ic.Insert(is0) + c.Assert(ic.GetVersion(5), NotNil) + c.Assert(ic.GetVersion(2), NotNil) + c.Assert(ic.GetVersion(0), NotNil) + + // replace 5, drop 0 + is6 := infoschema.MockInfoSchemaWithSchemaVer(nil, 6) + ic.Insert(is6) + c.Assert(ic.GetVersion(6), NotNil) + c.Assert(ic.GetVersion(5), NotNil) + c.Assert(ic.GetVersion(2), NotNil) + c.Assert(ic.GetVersion(0), IsNil) + + // replace 2, drop 2 + is3 := infoschema.MockInfoSchemaWithSchemaVer(nil, 3) + ic.Insert(is3) + c.Assert(ic.GetVersion(6), NotNil) + c.Assert(ic.GetVersion(5), NotNil) + c.Assert(ic.GetVersion(3), NotNil) + c.Assert(ic.GetVersion(2), IsNil) + c.Assert(ic.GetVersion(0), IsNil) +} + +func (s *testInfoCacheSuite) TestGetVersion(c *C) { + ic := infoschema.NewCache(2) + c.Assert(ic, NotNil) + is1 := infoschema.MockInfoSchemaWithSchemaVer(nil, 1) + ic.Insert(is1) + is3 := infoschema.MockInfoSchemaWithSchemaVer(nil, 3) + ic.Insert(is3) + + c.Assert(ic.GetVersion(1), Equals, is1) + c.Assert(ic.GetVersion(3), Equals, is3) + c.Assert(ic.GetVersion(0), IsNil, Commentf("index == 0, but not found")) + c.Assert(ic.GetVersion(2), IsNil, Commentf("index in the middle, but not found")) + c.Assert(ic.GetVersion(4), IsNil, Commentf("index == length, but not found")) +} + +func (s *testInfoCacheSuite) TestGet(c *C) { + ic := infoschema.NewCache(16) + c.Assert(ic, NotNil) + c.Assert(ic.Get(), IsNil) + + is1 := infoschema.MockInfoSchemaWithSchemaVer(nil, 1) + ic.Insert(is1) + c.Assert(ic.Get(), Equals, is1) + + // newer change the newest + is2 := infoschema.MockInfoSchemaWithSchemaVer(nil, 2) + ic.Insert(is2) + c.Assert(ic.Get(), Equals, is2) + + // older schema doesn't change the newest + is0 := infoschema.MockInfoSchemaWithSchemaVer(nil, 0) + ic.Insert(is0) + c.Assert(ic.Get(), Equals, is2) +} diff --git a/infoschema/infoschema.go b/infoschema/infoschema.go index ac8afd14605f1..2494e89b4d57f 100644 --- a/infoschema/infoschema.go +++ b/infoschema/infoschema.go @@ -17,12 +17,10 @@ import ( "fmt" "sort" "sync" - "sync/atomic" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/ddl/placement" - "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/util" @@ -312,40 +310,6 @@ func (is *infoSchema) SequenceByName(schema, sequence model.CIStr) (util.Sequenc return tbl.(util.SequenceTable), nil } -// Handle handles information schema, including getting and setting. -type Handle struct { - value atomic.Value - store kv.Storage -} - -// NewHandle creates a new Handle. -func NewHandle(store kv.Storage) *Handle { - h := &Handle{ - store: store, - } - return h -} - -// Get gets information schema from Handle. -func (h *Handle) Get() InfoSchema { - v := h.value.Load() - schema, _ := v.(InfoSchema) - return schema -} - -// IsValid uses to check whether handle value is valid. -func (h *Handle) IsValid() bool { - return h.value.Load() != nil -} - -// EmptyClone creates a new Handle with the same store and memSchema, but the value is not set. -func (h *Handle) EmptyClone() *Handle { - newHandle := &Handle{ - store: h.store, - } - return newHandle -} - func init() { // Initialize the information shema database and register the driver to `drivers` dbID := autoid.InformationSchemaDBID diff --git a/infoschema/infoschema_test.go b/infoschema/infoschema_test.go index 6aa0c5526f467..4fcea2f15085d 100644 --- a/infoschema/infoschema_test.go +++ b/infoschema/infoschema_test.go @@ -15,7 +15,6 @@ package infoschema_test import ( "context" - "sync" "testing" . "github.com/pingcap/check" @@ -57,7 +56,7 @@ func (*testSuite) TestT(c *C) { c.Assert(err, IsNil) defer dom.Close() - handle := infoschema.NewHandle(store) + handle := infoschema.NewCache(16) dbName := model.NewCIStr("Test") tbName := model.NewCIStr("T") colName := model.NewCIStr("A") @@ -116,7 +115,7 @@ func (*testSuite) TestT(c *C) { }) c.Assert(err, IsNil) - builder, err := infoschema.NewBuilder(handle).InitWithDBInfos(dbInfos, nil, 1) + builder, err := infoschema.NewBuilder(dom.Store()).InitWithDBInfos(dbInfos, nil, 1) c.Assert(err, IsNil) txn, err := store.Begin() @@ -218,9 +217,6 @@ func (*testSuite) TestT(c *C) { schema, ok = is.SchemaByID(dbID) c.Assert(ok, IsTrue) c.Assert(len(schema.Tables), Equals, 1) - - emptyHandle := handle.EmptyClone() - c.Assert(emptyHandle.Get(), IsNil) } func (testSuite) TestMockInfoSchema(c *C) { @@ -258,32 +254,6 @@ func checkApplyCreateNonExistsTableDoesNotPanic(c *C, txn kv.Transaction, builde c.Assert(infoschema.ErrTableNotExists.Equal(err), IsTrue) } -// TestConcurrent makes sure it is safe to concurrently create handle on multiple stores. -func (testSuite) TestConcurrent(c *C) { - defer testleak.AfterTest(c)() - storeCount := 5 - stores := make([]kv.Storage, storeCount) - for i := 0; i < storeCount; i++ { - store, err := mockstore.NewMockStore() - c.Assert(err, IsNil) - stores[i] = store - } - defer func() { - for _, store := range stores { - store.Close() - } - }() - var wg sync.WaitGroup - wg.Add(storeCount) - for _, store := range stores { - go func(s kv.Storage) { - defer wg.Done() - _ = infoschema.NewHandle(s) - }(store) - } - wg.Wait() -} - // TestInfoTables makes sure that all tables of information_schema could be found in infoschema handle. func (*testSuite) TestInfoTables(c *C) { defer testleak.AfterTest(c)() @@ -293,8 +263,8 @@ func (*testSuite) TestInfoTables(c *C) { err := store.Close() c.Assert(err, IsNil) }() - handle := infoschema.NewHandle(store) - builder, err := infoschema.NewBuilder(handle).InitWithDBInfos(nil, nil, 0) + handle := infoschema.NewCache(16) + builder, err := infoschema.NewBuilder(store).InitWithDBInfos(nil, nil, 0) c.Assert(err, IsNil) builder.Build() is := handle.Get() @@ -360,8 +330,8 @@ func (*testSuite) TestGetBundle(c *C) { c.Assert(err, IsNil) }() - handle := infoschema.NewHandle(store) - builder, err := infoschema.NewBuilder(handle).InitWithDBInfos(nil, nil, 0) + handle := infoschema.NewCache(16) + builder, err := infoschema.NewBuilder(store).InitWithDBInfos(nil, nil, 0) c.Assert(err, IsNil) builder.Build() diff --git a/session/session.go b/session/session.go index 19312e5fc391e..20e8481bd2433 100644 --- a/session/session.go +++ b/session/session.go @@ -1936,7 +1936,7 @@ func (s *session) isTxnRetryable() bool { func (s *session) NewTxn(ctx context.Context) error { if s.txn.Valid() { - txnID := s.txn.StartTS() + txnStartTS := s.txn.StartTS() txnScope := s.GetSessionVars().TxnCtx.TxnScope err := s.CommitTxn(ctx) if err != nil { @@ -1945,7 +1945,7 @@ func (s *session) NewTxn(ctx context.Context) error { vars := s.GetSessionVars() logutil.Logger(ctx).Info("NewTxn() inside a transaction auto commit", zap.Int64("schemaVersion", vars.GetInfoSchema().SchemaMetaVersion()), - zap.Uint64("txnStartTS", txnID), + zap.Uint64("txnStartTS", txnStartTS), zap.String("txnScope", txnScope)) } @@ -2806,7 +2806,10 @@ func (s *session) NewTxnWithStalenessOption(ctx context.Context, option sessionc txn.SetOption(tikvstore.IsStalenessReadOnly, true) txn.SetOption(tikvstore.TxnScope, txnScope) s.txn.changeInvalidToValid(txn) - is := domain.GetDomain(s).InfoSchema() + is, err := domain.GetDomain(s).GetSnapshotInfoSchema(txn.StartTS()) + if err != nil { + return errors.Trace(err) + } s.sessionVars.TxnCtx = &variable.TransactionContext{ InfoSchema: is, CreateTime: time.Now(), From b8d54adb8640b40fb277b68af453f3f785ce8314 Mon Sep 17 00:00:00 2001 From: xhe Date: Mon, 26 Apr 2021 19:05:56 +0800 Subject: [PATCH 02/23] *: remove old compatibility check, and fix tests --- executor/simple.go | 21 --------------------- infoschema/infoschema_test.go | 18 +++++------------- session/session_test.go | 10 +++------- 3 files changed, 8 insertions(+), 41 deletions(-) diff --git a/executor/simple.go b/executor/simple.go index 24cb857aec3d5..d45f26adf27f3 100644 --- a/executor/simple.go +++ b/executor/simple.go @@ -22,7 +22,6 @@ import ( "github.com/ngaut/pools" "github.com/pingcap/errors" - "github.com/pingcap/failpoint" "github.com/pingcap/parser/ast" "github.com/pingcap/parser/auth" "github.com/pingcap/parser/model" @@ -675,26 +674,6 @@ func (e *SimpleExec) executeStartTransactionReadOnlyWithTimestampBound(ctx conte if err != nil { return err } - dom := domain.GetDomain(e.ctx) - m, err := dom.GetSnapshotMeta(e.ctx.GetSessionVars().TxnCtx.StartTS) - if err != nil { - return err - } - staleVer, err := m.GetSchemaVersion() - if err != nil { - return err - } - failpoint.Inject("mockStalenessTxnSchemaVer", func(val failpoint.Value) { - if val.(bool) { - staleVer = e.ctx.GetSessionVars().GetInfoSchema().SchemaMetaVersion() - 1 - } else { - staleVer = e.ctx.GetSessionVars().GetInfoSchema().SchemaMetaVersion() - } - }) - // TODO: currently we directly check the schema version. In future, we can cache the stale infoschema instead. - if e.ctx.GetSessionVars().GetInfoSchema().SchemaMetaVersion() > staleVer { - return errors.New("schema version changed after the staleness startTS") - } // With START TRANSACTION, autocommit remains disabled until you end // the transaction with COMMIT or ROLLBACK. The autocommit mode then diff --git a/infoschema/infoschema_test.go b/infoschema/infoschema_test.go index 4fcea2f15085d..87276ef1452b9 100644 --- a/infoschema/infoschema_test.go +++ b/infoschema/infoschema_test.go @@ -56,7 +56,6 @@ func (*testSuite) TestT(c *C) { c.Assert(err, IsNil) defer dom.Close() - handle := infoschema.NewCache(16) dbName := model.NewCIStr("Test") tbName := model.NewCIStr("T") colName := model.NewCIStr("A") @@ -125,8 +124,7 @@ func (*testSuite) TestT(c *C) { err = txn.Rollback() c.Assert(err, IsNil) - builder.Build() - is := handle.Get() + is := builder.Build() schemaNames := is.AllSchemaNames() c.Assert(schemaNames, HasLen, 4) @@ -212,8 +210,7 @@ func (*testSuite) TestT(c *C) { c.Assert(err, IsNil) err = txn.Rollback() c.Assert(err, IsNil) - builder.Build() - is = handle.Get() + is = builder.Build() schema, ok = is.SchemaByID(dbID) c.Assert(ok, IsTrue) c.Assert(len(schema.Tables), Equals, 1) @@ -263,12 +260,10 @@ func (*testSuite) TestInfoTables(c *C) { err := store.Close() c.Assert(err, IsNil) }() - handle := infoschema.NewCache(16) + builder, err := infoschema.NewBuilder(store).InitWithDBInfos(nil, nil, 0) c.Assert(err, IsNil) - builder.Build() - is := handle.Get() - c.Assert(is, NotNil) + is := builder.Build() infoTables := []string{ "SCHEMATA", @@ -330,12 +325,9 @@ func (*testSuite) TestGetBundle(c *C) { c.Assert(err, IsNil) }() - handle := infoschema.NewCache(16) builder, err := infoschema.NewBuilder(store).InitWithDBInfos(nil, nil, 0) c.Assert(err, IsNil) - builder.Build() - - is := handle.Get() + is := builder.Build() bundle := &placement.Bundle{ ID: placement.PDBundleID, diff --git a/session/session_test.go b/session/session_test.go index 3baee4f0ef6f1..5aa0512c9640c 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -3904,9 +3904,7 @@ func (s *testSessionSerialSuite) TestIssue21943(c *C) { c.Assert(err.Error(), Equals, "[variable:1238]Variable 'last_plan_from_cache' is a read only variable") } -func (s *testSessionSuite) TestValidateReadOnlyInStalenessTransaction(c *C) { - c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/mockStalenessTxnSchemaVer", "return(false)"), IsNil) - defer failpoint.Disable("github.com/pingcap/tidb/executor/mockStalenessTxnSchemaVer") +func (s *testSessionSerialSuite) TestValidateReadOnlyInStalenessTransaction(c *C) { testcases := []struct { name string sql string @@ -4037,7 +4035,7 @@ func (s *testSessionSuite) TestValidateReadOnlyInStalenessTransaction(c *C) { tk.MustExec(`set @@tidb_enable_noop_functions=1;`) for _, testcase := range testcases { c.Log(testcase.name) - tk.MustExec(`START TRANSACTION READ ONLY WITH TIMESTAMP BOUND READ TIMESTAMP '2020-09-06 00:00:00';`) + tk.MustExec(`START TRANSACTION READ ONLY WITH TIMESTAMP BOUND EXACT STALENESS '00:00:00';`) if testcase.isValidate { _, err := tk.Exec(testcase.sql) c.Assert(err, IsNil) @@ -4051,8 +4049,6 @@ func (s *testSessionSuite) TestValidateReadOnlyInStalenessTransaction(c *C) { } func (s *testSessionSerialSuite) TestSpecialSQLInStalenessTxn(c *C) { - c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/mockStalenessTxnSchemaVer", "return(false)"), IsNil) - defer failpoint.Disable("github.com/pingcap/tidb/executor/mockStalenessTxnSchemaVer") tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") testcases := []struct { @@ -4099,7 +4095,7 @@ func (s *testSessionSerialSuite) TestSpecialSQLInStalenessTxn(c *C) { tk.MustExec("CREATE USER 'newuser' IDENTIFIED BY 'mypassword';") for _, testcase := range testcases { comment := Commentf(testcase.name) - tk.MustExec(`START TRANSACTION READ ONLY WITH TIMESTAMP BOUND READ TIMESTAMP '2020-09-06 00:00:00';`) + tk.MustExec(`START TRANSACTION READ ONLY WITH TIMESTAMP BOUND EXACT STALENESS '00:00:00';`) c.Assert(tk.Se.GetSessionVars().TxnCtx.IsStaleness, Equals, true, comment) tk.MustExec(testcase.sql) c.Assert(tk.Se.GetSessionVars().TxnCtx.IsStaleness, Equals, testcase.sameSession, comment) From c57ea971bfe7eb09a987a44a50748a4d53766a41 Mon Sep 17 00:00:00 2001 From: xhe Date: Tue, 27 Apr 2021 14:08:52 +0800 Subject: [PATCH 03/23] *: typo Co-authored-by: xiongjiwei --- domain/domain.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/domain/domain.go b/domain/domain.go index 5934fd8d43441..7cc05b52ed12e 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -349,7 +349,7 @@ func (do *Domain) Reload() error { } metrics.LoadSchemaCounter.WithLabelValues("succ").Inc() - // only updateif it is not from cache + // only update if it is not from cache if !hitCache { // loaded newer schema if oldSchemaVersion < is.SchemaMetaVersion() { From f78e80f4b1c79c53754ab653322e9a9ad23c9de7 Mon Sep 17 00:00:00 2001 From: xhe Date: Tue, 27 Apr 2021 22:56:20 +0800 Subject: [PATCH 04/23] *: typo Co-authored-by: djshow832 --- infoschema/cache.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/infoschema/cache.go b/infoschema/cache.go index 3d2a8abcc4e02..5798c31afcb71 100644 --- a/infoschema/cache.go +++ b/infoschema/cache.go @@ -26,7 +26,7 @@ type InfoCache struct { cache []InfoSchema } -// NewCache creates a new Handle. +// NewCache creates a new InfoCache. func NewCache(capcity int) *InfoCache { return &InfoCache{cache: make([]InfoSchema, 0, capcity)} } From 62034074f8b1db3f8e945a923defe94981dcba6d Mon Sep 17 00:00:00 2001 From: xhe Date: Tue, 27 Apr 2021 23:18:11 +0800 Subject: [PATCH 05/23] infoschema: cache tweak Signed-off-by: xhe --- infoschema/cache.go | 22 ++++++++++++++-------- infoschema/cache_test.go | 16 ++++++++++++++++ 2 files changed, 30 insertions(+), 8 deletions(-) diff --git a/infoschema/cache.go b/infoschema/cache.go index 5798c31afcb71..a202a807ae176 100644 --- a/infoschema/cache.go +++ b/infoschema/cache.go @@ -20,8 +20,7 @@ import ( // InfoCache handles information schema, including getting and setting. type InfoCache struct { - // mu can not be RWMutex, because we access infoschema in the cache slice - mu sync.Mutex + mu sync.RWMutex // cache is sorted by SchemaVersion in descending order cache []InfoSchema } @@ -33,8 +32,8 @@ func NewCache(capcity int) *InfoCache { // Get gets the newest information schema. func (h *InfoCache) Get() InfoSchema { - h.mu.Lock() - defer h.mu.Unlock() + h.mu.RLock() + defer h.mu.RUnlock() if len(h.cache) > 0 { return h.cache[0] } @@ -43,8 +42,8 @@ func (h *InfoCache) Get() InfoSchema { // GetVersion gets the information schema based on schemaVersion. Returns nil if it is not loaded. func (h *InfoCache) GetVersion(version int64) InfoSchema { - h.mu.Lock() - defer h.mu.Unlock() + h.mu.RLock() + defer h.mu.RUnlock() i := sort.Search(len(h.cache), func(i int) bool { return h.cache[i].SchemaMetaVersion() <= version }) @@ -59,13 +58,20 @@ func (h *InfoCache) GetVersion(version int64) InfoSchema { func (h *InfoCache) Insert(is InfoSchema) { h.mu.Lock() defer h.mu.Unlock() + version := is.SchemaMetaVersion() i := sort.Search(len(h.cache), func(i int) bool { return h.cache[i].SchemaMetaVersion() <= version }) + + // cached entry + if i < len(h.cache) && h.cache[i].SchemaMetaVersion() == version { + return + } + if len(h.cache) < cap(h.cache) { - // has free space - h.cache = append(h.cache, nil) + // has free space, grown the slice + h.cache = h.cache[:len(h.cache)+1] copy(h.cache[i+1:], h.cache[i:]) h.cache[i] = is } else if i < len(h.cache) { diff --git a/infoschema/cache_test.go b/infoschema/cache_test.go index 1481243145080..3c501f35441d2 100644 --- a/infoschema/cache_test.go +++ b/infoschema/cache_test.go @@ -65,6 +65,22 @@ func (s *testInfoCacheSuite) TestInsert(c *C) { c.Assert(ic.GetVersion(3), NotNil) c.Assert(ic.GetVersion(2), IsNil) c.Assert(ic.GetVersion(0), IsNil) + + // insert 2, but failed silently + ic.Insert(is2) + c.Assert(ic.GetVersion(6), NotNil) + c.Assert(ic.GetVersion(5), NotNil) + c.Assert(ic.GetVersion(3), NotNil) + c.Assert(ic.GetVersion(2), IsNil) + c.Assert(ic.GetVersion(0), IsNil) + + // insert 5, but it is already in + ic.Insert(is5) + c.Assert(ic.GetVersion(6), NotNil) + c.Assert(ic.GetVersion(5), NotNil) + c.Assert(ic.GetVersion(3), NotNil) + c.Assert(ic.GetVersion(2), IsNil) + c.Assert(ic.GetVersion(0), IsNil) } func (s *testInfoCacheSuite) TestGetVersion(c *C) { From b3dbd674d3f687123d8fc904ecf6efecb8c3657b Mon Sep 17 00:00:00 2001 From: xhe Date: Tue, 27 Apr 2021 23:19:41 +0800 Subject: [PATCH 06/23] infoschema: rename Signed-off-by: xhe --- ddl/ddl.go | 6 ++-- ddl/ddl_api.go | 30 ++++++++--------- ddl/partition.go | 14 ++++---- ddl/schema.go | 4 +-- ddl/table.go | 8 ++--- domain/domain.go | 8 ++--- infoschema/cache.go | 8 ++--- infoschema/cache_test.go | 72 ++++++++++++++++++++-------------------- 8 files changed, 75 insertions(+), 75 deletions(-) diff --git a/ddl/ddl.go b/ddl/ddl.go index ffb93411b6183..7183c9c4cce74 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -411,7 +411,7 @@ func (d *ddl) GetLease() time.Duration { // Please don't use this function, it is used by TestParallelDDLBeforeRunDDLJob to intercept the calling of d.infoHandle.Get(), use d.infoHandle.Get() instead. // Otherwise, the TestParallelDDLBeforeRunDDLJob will hang up forever. func (d *ddl) GetInfoSchemaWithInterceptor(ctx sessionctx.Context) infoschema.InfoSchema { - is := d.infoCache.Get() + is := d.infoCache.GetLatest() d.mu.RLock() defer d.mu.RUnlock() @@ -649,10 +649,10 @@ func (d *ddl) startCleanDeadTableLock() { if !d.ownerManager.IsOwner() { continue } - if d.infoCache == nil || d.infoCache.Get() == nil { + if d.infoCache == nil || d.infoCache.GetLatest() == nil { continue } - deadLockTables, err := d.tableLockCkr.GetDeadLockedTables(d.ctx, d.infoCache.Get().AllSchemas()) + deadLockTables, err := d.tableLockCkr.GetDeadLockedTables(d.ctx, d.infoCache.GetLatest().AllSchemas()) if err != nil { logutil.BgLogger().Info("[ddl] get dead table lock failed.", zap.Error(err)) continue diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index 7323c49f241fd..8556b79efc558 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -2367,7 +2367,7 @@ func (d *ddl) AlterTable(ctx sessionctx.Context, ident ast.Ident, specs []*ast.A return errors.Trace(err) } - is := d.infoCache.Get() + is := d.infoCache.GetLatest() if is.TableIsView(ident.Schema, ident.Name) || is.TableIsSequence(ident.Schema, ident.Name) { return ErrWrongObject.GenWithStackByArgs(ident.Schema, ident.Name, "BASE TABLE") } @@ -2898,7 +2898,7 @@ func (d *ddl) AddColumns(ctx sessionctx.Context, ti ast.Ident, specs []*ast.Alte // AddTablePartitions will add a new partition to the table. func (d *ddl) AddTablePartitions(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error { - is := d.infoCache.Get() + is := d.infoCache.GetLatest() schema, ok := is.SchemaByName(ident.Schema) if !ok { return errors.Trace(infoschema.ErrDatabaseNotExists.GenWithStackByArgs(schema)) @@ -2959,7 +2959,7 @@ func (d *ddl) AddTablePartitions(ctx sessionctx.Context, ident ast.Ident, spec * // CoalescePartitions coalesce partitions can be used with a table that is partitioned by hash or key to reduce the number of partitions by number. func (d *ddl) CoalescePartitions(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error { - is := d.infoCache.Get() + is := d.infoCache.GetLatest() schema, ok := is.SchemaByName(ident.Schema) if !ok { return errors.Trace(infoschema.ErrDatabaseNotExists.GenWithStackByArgs(schema)) @@ -2991,7 +2991,7 @@ func (d *ddl) CoalescePartitions(ctx sessionctx.Context, ident ast.Ident, spec * } func (d *ddl) TruncateTablePartition(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error { - is := d.infoCache.Get() + is := d.infoCache.GetLatest() schema, ok := is.SchemaByName(ident.Schema) if !ok { return errors.Trace(infoschema.ErrDatabaseNotExists.GenWithStackByArgs(schema)) @@ -3039,7 +3039,7 @@ func (d *ddl) TruncateTablePartition(ctx sessionctx.Context, ident ast.Ident, sp } func (d *ddl) DropTablePartition(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error { - is := d.infoCache.Get() + is := d.infoCache.GetLatest() schema, ok := is.SchemaByName(ident.Schema) if !ok { return errors.Trace(infoschema.ErrDatabaseNotExists.GenWithStackByArgs(schema)) @@ -3752,7 +3752,7 @@ func processAndCheckDefaultValueAndColumn(ctx sessionctx.Context, col *table.Col func (d *ddl) getModifiableColumnJob(ctx sessionctx.Context, ident ast.Ident, originalColName model.CIStr, spec *ast.AlterTableSpec) (*model.Job, error) { specNewColumn := spec.NewColumns[0] - is := d.infoCache.Get() + is := d.infoCache.GetLatest() schema, ok := is.SchemaByName(ident.Schema) if !ok { return nil, errors.Trace(infoschema.ErrDatabaseNotExists) @@ -4203,7 +4203,7 @@ func (d *ddl) ModifyColumn(ctx sessionctx.Context, ident ast.Ident, spec *ast.Al func (d *ddl) AlterColumn(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error { specNewColumn := spec.NewColumns[0] - is := d.infoCache.Get() + is := d.infoCache.GetLatest() schema, ok := is.SchemaByName(ident.Schema) if !ok { return infoschema.ErrTableNotExists.GenWithStackByArgs(ident.Schema, ident.Name) @@ -4257,7 +4257,7 @@ func (d *ddl) AlterColumn(ctx sessionctx.Context, ident ast.Ident, spec *ast.Alt // AlterTableComment updates the table comment information. func (d *ddl) AlterTableComment(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error { - is := d.infoCache.Get() + is := d.infoCache.GetLatest() schema, ok := is.SchemaByName(ident.Schema) if !ok { return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(ident.Schema) @@ -4310,7 +4310,7 @@ func (d *ddl) AlterTableCharsetAndCollate(ctx sessionctx.Context, ident ast.Iden return ErrUnknownCharacterSet.GenWithStackByArgs(toCharset) } - is := d.infoCache.Get() + is := d.infoCache.GetLatest() schema, ok := is.SchemaByName(ident.Schema) if !ok { return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(ident.Schema) @@ -4471,7 +4471,7 @@ func (d *ddl) AlterTableDropStatistics(ctx sessionctx.Context, ident ast.Ident, // UpdateTableReplicaInfo updates the table flash replica infos. func (d *ddl) UpdateTableReplicaInfo(ctx sessionctx.Context, physicalID int64, available bool) error { - is := d.infoCache.Get() + is := d.infoCache.GetLatest() tb, ok := is.TableByID(physicalID) if !ok { tb, _, _ = is.FindTableByPartitionID(physicalID) @@ -4574,7 +4574,7 @@ func checkAlterTableCharset(tblInfo *model.TableInfo, dbInfo *model.DBInfo, toCh // In TiDB, indexes are case-insensitive (so index 'a' and 'A" are considered the same index), // but index names are case-sensitive (we can rename index 'a' to 'A') func (d *ddl) RenameIndex(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error { - is := d.infoCache.Get() + is := d.infoCache.GetLatest() schema, ok := is.SchemaByName(ident.Schema) if !ok { return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(ident.Schema) @@ -5232,7 +5232,7 @@ func buildFKInfo(fkName model.CIStr, keys []*ast.IndexPartSpecification, refer * } func (d *ddl) CreateForeignKey(ctx sessionctx.Context, ti ast.Ident, fkName model.CIStr, keys []*ast.IndexPartSpecification, refer *ast.ReferenceDef) error { - is := d.infoCache.Get() + is := d.infoCache.GetLatest() schema, ok := is.SchemaByName(ti.Schema) if !ok { return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(ti.Schema) @@ -5264,7 +5264,7 @@ func (d *ddl) CreateForeignKey(ctx sessionctx.Context, ti ast.Ident, fkName mode } func (d *ddl) DropForeignKey(ctx sessionctx.Context, ti ast.Ident, fkName model.CIStr) error { - is := d.infoCache.Get() + is := d.infoCache.GetLatest() schema, ok := is.SchemaByName(ti.Schema) if !ok { return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(ti.Schema) @@ -5290,7 +5290,7 @@ func (d *ddl) DropForeignKey(ctx sessionctx.Context, ti ast.Ident, fkName model. } func (d *ddl) DropIndex(ctx sessionctx.Context, ti ast.Ident, indexName model.CIStr, ifExists bool) error { - is := d.infoCache.Get() + is := d.infoCache.GetLatest() schema, ok := is.SchemaByName(ti.Schema) if !ok { return errors.Trace(infoschema.ErrDatabaseNotExists) @@ -6036,7 +6036,7 @@ func (d *ddl) AlterTableAlterPartition(ctx sessionctx.Context, ident ast.Ident, return errors.Trace(err) } - oldBundle := infoschema.GetBundle(d.infoCache.Get(), []int64{partitionID, meta.ID, schema.ID}) + oldBundle := infoschema.GetBundle(d.infoCache.GetLatest(), []int64{partitionID, meta.ID, schema.ID}) oldBundle.ID = placement.GroupID(partitionID) diff --git a/ddl/partition.go b/ddl/partition.go index da1d27503f61e..bdd6bf71a2480 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -911,10 +911,10 @@ func getTableInfoWithDroppingPartitions(t *model.TableInfo) *model.TableInfo { } func dropRuleBundles(d *ddlCtx, physicalTableIDs []int64) error { - if d.infoCache != nil && d.infoCache.Get() != nil { + if d.infoCache != nil && d.infoCache.GetLatest() != nil { bundles := make([]*placement.Bundle, 0, len(physicalTableIDs)) for _, ID := range physicalTableIDs { - oldBundle, ok := d.infoCache.Get().BundleByName(placement.GroupID(ID)) + oldBundle, ok := d.infoCache.GetLatest().BundleByName(placement.GroupID(ID)) if ok && !oldBundle.IsEmpty() { bundles = append(bundles, placement.BuildPlacementDropBundle(ID)) } @@ -1095,11 +1095,11 @@ func onTruncateTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, e } } - if d.infoCache != nil && d.infoCache.Get() != nil { + if d.infoCache != nil && d.infoCache.GetLatest() != nil { bundles := make([]*placement.Bundle, 0, len(oldIDs)) for i, oldID := range oldIDs { - oldBundle, ok := d.infoCache.Get().BundleByName(placement.GroupID(oldID)) + oldBundle, ok := d.infoCache.GetLatest().BundleByName(placement.GroupID(oldID)) if ok && !oldBundle.IsEmpty() { bundles = append(bundles, placement.BuildPlacementDropBundle(oldID)) bundles = append(bundles, placement.BuildPlacementCopyBundle(oldBundle, newPartitions[i].ID)) @@ -1299,11 +1299,11 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo // the follow code is a swap function for rules of two partitions // though partitions has exchanged their ID, swap still take effect - if d.infoCache != nil && d.infoCache.Get() != nil { + if d.infoCache != nil && d.infoCache.GetLatest() != nil { bundles := make([]*placement.Bundle, 0, 2) - ptBundle, ptOK := d.infoCache.Get().BundleByName(placement.GroupID(partDef.ID)) + ptBundle, ptOK := d.infoCache.GetLatest().BundleByName(placement.GroupID(partDef.ID)) ptOK = ptOK && !ptBundle.IsEmpty() - ntBundle, ntOK := d.infoCache.Get().BundleByName(placement.GroupID(nt.ID)) + ntBundle, ntOK := d.infoCache.GetLatest().BundleByName(placement.GroupID(nt.ID)) ntOK = ntOK && !ntBundle.IsEmpty() if ptOK && ntOK { bundles = append(bundles, placement.BuildPlacementCopyBundle(ptBundle, nt.ID)) diff --git a/ddl/schema.go b/ddl/schema.go index 8854b397fb964..ddb48079e94d3 100644 --- a/ddl/schema.go +++ b/ddl/schema.go @@ -77,7 +77,7 @@ func checkSchemaNotExists(d *ddlCtx, t *meta.Meta, schemaID int64, dbInfo *model if err != nil { return err } - is := d.infoCache.Get() + is := d.infoCache.GetLatest() if is.SchemaMetaVersion() == currVer { return checkSchemaNotExistsFromInfoSchema(is, schemaID, dbInfo) } @@ -169,7 +169,7 @@ func onDropSchema(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) oldIDs := getIDs(tables) bundles := make([]*placement.Bundle, 0, len(oldIDs)+1) for _, ID := range append(oldIDs, dbInfo.ID) { - oldBundle, ok := d.infoCache.Get().BundleByName(placement.GroupID(ID)) + oldBundle, ok := d.infoCache.GetLatest().BundleByName(placement.GroupID(ID)) if ok && !oldBundle.IsEmpty() { bundles = append(bundles, placement.BuildPlacementDropBundle(ID)) } diff --git a/ddl/table.go b/ddl/table.go index 77c64dcd9625c..1f4538d4df5a1 100644 --- a/ddl/table.go +++ b/ddl/table.go @@ -487,8 +487,8 @@ func onTruncateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ erro } } - if d.infoCache != nil && d.infoCache.Get() != nil { - is := d.infoCache.Get() + if d.infoCache != nil && d.infoCache.GetLatest() != nil { + is := d.infoCache.GetLatest() bundles := make([]*placement.Bundle, 0, len(oldPartitionIDs)+1) if oldBundle, ok := is.BundleByName(placement.GroupID(tableID)); ok { @@ -968,7 +968,7 @@ func onUpdateFlashReplicaStatus(t *meta.Meta, job *model.Job) (ver int64, _ erro func checkTableNotExists(d *ddlCtx, t *meta.Meta, schemaID int64, tableName string) error { // d.infoHandle maybe nil in some test. - if d.infoCache == nil || d.infoCache.Get() == nil { + if d.infoCache == nil || d.infoCache.GetLatest() == nil { return checkTableNotExistsFromStore(t, schemaID, tableName) } // Try to use memory schema info to check first. @@ -976,7 +976,7 @@ func checkTableNotExists(d *ddlCtx, t *meta.Meta, schemaID int64, tableName stri if err != nil { return err } - is := d.infoCache.Get() + is := d.infoCache.GetLatest() if is.SchemaMetaVersion() == currVer { return checkTableNotExistsFromInfoSchema(is, schemaID, tableName) } diff --git a/domain/domain.go b/domain/domain.go index 7cc05b52ed12e..9bfd4049ab74f 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -107,12 +107,12 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i return nil, false, 0, nil, err } - if is := do.infoCache.GetVersion(neededSchemaVersion); is != nil { + if is := do.infoCache.GetByVersion(neededSchemaVersion); is != nil { return is, true, 0, nil, nil } currentSchemaVersion := int64(0) - if oldInfoSchema := do.infoCache.Get(); oldInfoSchema != nil { + if oldInfoSchema := do.infoCache.GetLatest(); oldInfoSchema != nil { currentSchemaVersion = oldInfoSchema.SchemaMetaVersion() } @@ -253,7 +253,7 @@ func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64 } diffs = append(diffs, diff) } - builder := infoschema.NewBuilder(do.Store()).InitWithOldInfoSchema(do.infoCache.Get()) + builder := infoschema.NewBuilder(do.Store()).InitWithOldInfoSchema(do.infoCache.GetLatest()) phyTblIDs := make([]int64, 0, len(diffs)) actions := make([]uint64, 0, len(diffs)) for _, diff := range diffs { @@ -286,7 +286,7 @@ func canSkipSchemaCheckerDDL(tp model.ActionType) bool { // InfoSchema gets information schema from domain. func (do *Domain) InfoSchema() infoschema.InfoSchema { - return do.infoCache.Get() + return do.infoCache.GetLatest() } // GetSnapshotInfoSchema gets a snapshot information schema. diff --git a/infoschema/cache.go b/infoschema/cache.go index a202a807ae176..c1a5976e094b8 100644 --- a/infoschema/cache.go +++ b/infoschema/cache.go @@ -30,8 +30,8 @@ func NewCache(capcity int) *InfoCache { return &InfoCache{cache: make([]InfoSchema, 0, capcity)} } -// Get gets the newest information schema. -func (h *InfoCache) Get() InfoSchema { +// GetLatest gets the newest information schema. +func (h *InfoCache) GetLatest() InfoSchema { h.mu.RLock() defer h.mu.RUnlock() if len(h.cache) > 0 { @@ -40,8 +40,8 @@ func (h *InfoCache) Get() InfoSchema { return nil } -// GetVersion gets the information schema based on schemaVersion. Returns nil if it is not loaded. -func (h *InfoCache) GetVersion(version int64) InfoSchema { +// GetByVersion gets the information schema based on schemaVersion. Returns nil if it is not loaded. +func (h *InfoCache) GetByVersion(version int64) InfoSchema { h.mu.RLock() defer h.mu.RUnlock() i := sort.Search(len(h.cache), func(i int) bool { diff --git a/infoschema/cache_test.go b/infoschema/cache_test.go index 3c501f35441d2..a8e9ddcc0df5a 100644 --- a/infoschema/cache_test.go +++ b/infoschema/cache_test.go @@ -34,56 +34,56 @@ func (s *testInfoCacheSuite) TestInsert(c *C) { is2 := infoschema.MockInfoSchemaWithSchemaVer(nil, 2) ic.Insert(is2) - c.Assert(ic.GetVersion(2), NotNil) + c.Assert(ic.GetByVersion(2), NotNil) // newer is5 := infoschema.MockInfoSchemaWithSchemaVer(nil, 5) ic.Insert(is5) - c.Assert(ic.GetVersion(5), NotNil) - c.Assert(ic.GetVersion(2), NotNil) + c.Assert(ic.GetByVersion(5), NotNil) + c.Assert(ic.GetByVersion(2), NotNil) // older is0 := infoschema.MockInfoSchemaWithSchemaVer(nil, 0) ic.Insert(is0) - c.Assert(ic.GetVersion(5), NotNil) - c.Assert(ic.GetVersion(2), NotNil) - c.Assert(ic.GetVersion(0), NotNil) + c.Assert(ic.GetByVersion(5), NotNil) + c.Assert(ic.GetByVersion(2), NotNil) + c.Assert(ic.GetByVersion(0), NotNil) // replace 5, drop 0 is6 := infoschema.MockInfoSchemaWithSchemaVer(nil, 6) ic.Insert(is6) - c.Assert(ic.GetVersion(6), NotNil) - c.Assert(ic.GetVersion(5), NotNil) - c.Assert(ic.GetVersion(2), NotNil) - c.Assert(ic.GetVersion(0), IsNil) + c.Assert(ic.GetByVersion(6), NotNil) + c.Assert(ic.GetByVersion(5), NotNil) + c.Assert(ic.GetByVersion(2), NotNil) + c.Assert(ic.GetByVersion(0), IsNil) // replace 2, drop 2 is3 := infoschema.MockInfoSchemaWithSchemaVer(nil, 3) ic.Insert(is3) - c.Assert(ic.GetVersion(6), NotNil) - c.Assert(ic.GetVersion(5), NotNil) - c.Assert(ic.GetVersion(3), NotNil) - c.Assert(ic.GetVersion(2), IsNil) - c.Assert(ic.GetVersion(0), IsNil) + c.Assert(ic.GetByVersion(6), NotNil) + c.Assert(ic.GetByVersion(5), NotNil) + c.Assert(ic.GetByVersion(3), NotNil) + c.Assert(ic.GetByVersion(2), IsNil) + c.Assert(ic.GetByVersion(0), IsNil) // insert 2, but failed silently ic.Insert(is2) - c.Assert(ic.GetVersion(6), NotNil) - c.Assert(ic.GetVersion(5), NotNil) - c.Assert(ic.GetVersion(3), NotNil) - c.Assert(ic.GetVersion(2), IsNil) - c.Assert(ic.GetVersion(0), IsNil) + c.Assert(ic.GetByVersion(6), NotNil) + c.Assert(ic.GetByVersion(5), NotNil) + c.Assert(ic.GetByVersion(3), NotNil) + c.Assert(ic.GetByVersion(2), IsNil) + c.Assert(ic.GetByVersion(0), IsNil) // insert 5, but it is already in ic.Insert(is5) - c.Assert(ic.GetVersion(6), NotNil) - c.Assert(ic.GetVersion(5), NotNil) - c.Assert(ic.GetVersion(3), NotNil) - c.Assert(ic.GetVersion(2), IsNil) - c.Assert(ic.GetVersion(0), IsNil) + c.Assert(ic.GetByVersion(6), NotNil) + c.Assert(ic.GetByVersion(5), NotNil) + c.Assert(ic.GetByVersion(3), NotNil) + c.Assert(ic.GetByVersion(2), IsNil) + c.Assert(ic.GetByVersion(0), IsNil) } -func (s *testInfoCacheSuite) TestGetVersion(c *C) { +func (s *testInfoCacheSuite) TestGetByVersion(c *C) { ic := infoschema.NewCache(2) c.Assert(ic, NotNil) is1 := infoschema.MockInfoSchemaWithSchemaVer(nil, 1) @@ -91,29 +91,29 @@ func (s *testInfoCacheSuite) TestGetVersion(c *C) { is3 := infoschema.MockInfoSchemaWithSchemaVer(nil, 3) ic.Insert(is3) - c.Assert(ic.GetVersion(1), Equals, is1) - c.Assert(ic.GetVersion(3), Equals, is3) - c.Assert(ic.GetVersion(0), IsNil, Commentf("index == 0, but not found")) - c.Assert(ic.GetVersion(2), IsNil, Commentf("index in the middle, but not found")) - c.Assert(ic.GetVersion(4), IsNil, Commentf("index == length, but not found")) + c.Assert(ic.GetByVersion(1), Equals, is1) + c.Assert(ic.GetByVersion(3), Equals, is3) + c.Assert(ic.GetByVersion(0), IsNil, Commentf("index == 0, but not found")) + c.Assert(ic.GetByVersion(2), IsNil, Commentf("index in the middle, but not found")) + c.Assert(ic.GetByVersion(4), IsNil, Commentf("index == length, but not found")) } -func (s *testInfoCacheSuite) TestGet(c *C) { +func (s *testInfoCacheSuite) TestGetLatest(c *C) { ic := infoschema.NewCache(16) c.Assert(ic, NotNil) - c.Assert(ic.Get(), IsNil) + c.Assert(ic.GetLatest(), IsNil) is1 := infoschema.MockInfoSchemaWithSchemaVer(nil, 1) ic.Insert(is1) - c.Assert(ic.Get(), Equals, is1) + c.Assert(ic.GetLatest(), Equals, is1) // newer change the newest is2 := infoschema.MockInfoSchemaWithSchemaVer(nil, 2) ic.Insert(is2) - c.Assert(ic.Get(), Equals, is2) + c.Assert(ic.GetLatest(), Equals, is2) // older schema doesn't change the newest is0 := infoschema.MockInfoSchemaWithSchemaVer(nil, 0) ic.Insert(is0) - c.Assert(ic.Get(), Equals, is2) + c.Assert(ic.GetLatest(), Equals, is2) } From 0a2ff1c85c8369c08ec1b65ac096c412f2670759 Mon Sep 17 00:00:00 2001 From: xhe Date: Wed, 28 Apr 2021 11:17:43 +0800 Subject: [PATCH 07/23] *: rebase and adapt the newest staleness PR Signed-off-by: xhe --- executor/stale_txn_test.go | 72 +------------------------------------- 1 file changed, 1 insertion(+), 71 deletions(-) diff --git a/executor/stale_txn_test.go b/executor/stale_txn_test.go index ce5202ae58a75..5c762da6b087e 100644 --- a/executor/stale_txn_test.go +++ b/executor/stale_txn_test.go @@ -18,7 +18,6 @@ import ( "time" . "github.com/pingcap/check" - "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/ddl/placement" "github.com/pingcap/tidb/store/tikv/oracle" @@ -26,12 +25,6 @@ import ( ) func (s *testStaleTxnSerialSuite) TestExactStalenessTransaction(c *C) { - c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/mockStalenessTxnSchemaVer", "return(false)"), IsNil) - defer func() { - err := failpoint.Disable("github.com/pingcap/tidb/executor/mockStalenessTxnSchemaVer") - c.Assert(err, IsNil) - }() - testcases := []struct { name string preSQL string @@ -117,8 +110,6 @@ func (s *testStaleTxnSerialSuite) TestExactStalenessTransaction(c *C) { } func (s *testStaleTxnSerialSuite) TestStaleReadKVRequest(c *C) { - c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/mockStalenessTxnSchemaVer", "return(false)"), IsNil) - defer failpoint.Disable("github.com/pingcap/tidb/executor/mockStalenessTxnSchemaVer") tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") tk.MustExec("drop table if exists t") @@ -155,7 +146,7 @@ func (s *testStaleTxnSerialSuite) TestStaleReadKVRequest(c *C) { failpoint.Enable("github.com/pingcap/tidb/config/injectTxnScope", fmt.Sprintf(`return("%v")`, testcase.zone)) failpoint.Enable("github.com/pingcap/tidb/store/tikv/assertStoreLabels", fmt.Sprintf(`return("%v_%v")`, placement.DCLabelKey, testcase.txnScope)) failpoint.Enable("github.com/pingcap/tidb/store/tikv/assertStaleReadFlag", `return(true)`) - tk.MustExec(`START TRANSACTION READ ONLY WITH TIMESTAMP BOUND EXACT STALENESS '00:00:20';`) + tk.MustExec(`START TRANSACTION READ ONLY WITH TIMESTAMP BOUND EXACT STALENESS '00:00:00';`) tk.MustQuery(testcase.sql) tk.MustExec(`commit`) failpoint.Disable("github.com/pingcap/tidb/config/injectTxnScope") @@ -165,12 +156,6 @@ func (s *testStaleTxnSerialSuite) TestStaleReadKVRequest(c *C) { } func (s *testStaleTxnSerialSuite) TestStalenessAndHistoryRead(c *C) { - c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/mockStalenessTxnSchemaVer", "return(false)"), IsNil) - defer func() { - err := failpoint.Disable("github.com/pingcap/tidb/executor/mockStalenessTxnSchemaVer") - c.Assert(err, IsNil) - }() - tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") // For mocktikv, safe point is not initialized, we manually insert it for snapshot to use. @@ -193,62 +178,7 @@ func (s *testStaleTxnSerialSuite) TestStalenessAndHistoryRead(c *C) { tk.MustExec("commit") } -func (s *testStaleTxnSerialSuite) TestStalenessTransactionSchemaVer(c *C) { - testcases := []struct { - name string - sql string - expectErr error - }{ - { - name: "ddl change before stale txn", - sql: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND EXACT STALENESS '00:00:03'`, - expectErr: errors.New("schema version changed after the staleness startTS"), - }, - { - name: "ddl change before stale txn", - sql: fmt.Sprintf("START TRANSACTION READ ONLY WITH TIMESTAMP BOUND READ TIMESTAMP '%v'", - time.Now().Truncate(3*time.Second).Format("2006-01-02 15:04:05")), - expectErr: errors.New(".*schema version changed after the staleness startTS.*"), - }, - { - name: "ddl change before stale txn", - sql: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND EXACT STALENESS '00:00:03'`, - expectErr: nil, - }, - } - tk := testkit.NewTestKitWithInit(c, s.store) - for _, testcase := range testcases { - check := func() { - if testcase.expectErr != nil { - c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/mockStalenessTxnSchemaVer", "return(true)"), IsNil) - defer func() { - err := failpoint.Disable("github.com/pingcap/tidb/executor/mockStalenessTxnSchemaVer") - c.Assert(err, IsNil) - }() - - } else { - c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/mockStalenessTxnSchemaVer", "return(false)"), IsNil) - defer func() { - err := failpoint.Disable("github.com/pingcap/tidb/executor/mockStalenessTxnSchemaVer") - c.Assert(err, IsNil) - }() - - } - _, err := tk.Exec(testcase.sql) - if testcase.expectErr != nil { - c.Assert(err, NotNil) - c.Assert(err.Error(), Matches, testcase.expectErr.Error()) - } else { - c.Assert(err, IsNil) - } - } - check() - } -} - func (s *testStaleTxnSerialSuite) TestTimeBoundedStalenessTxn(c *C) { - c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/mockStalenessTxnSchemaVer", "return(false)"), IsNil) - defer failpoint.Disable("github.com/pingcap/tidb/executor/mockStalenessTxnSchemaVer") tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") tk.MustExec("drop table if exists t") From 8b756c567d5d9d216f34ec31a0ead9772abc27aa Mon Sep 17 00:00:00 2001 From: xhe Date: Wed, 28 Apr 2021 12:55:58 +0800 Subject: [PATCH 08/23] ddl: rename Signed-off-by: xhe --- ddl/options.go | 4 ++-- ddl/options_test.go | 2 +- domain/domain.go | 2 +- domain/domain_test.go | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/ddl/options.go b/ddl/options.go index 5a8448a772af0..9238a7c8542ff 100644 --- a/ddl/options.go +++ b/ddl/options.go @@ -47,8 +47,8 @@ func WithStore(store kv.Storage) Option { } } -// WithInfoHandle specifies the `infoschema.Handle` -func WithInfoHandle(ic *infoschema.InfoCache) Option { +// WithInfoCache specifies the `infoschema.InfoCache` +func WithInfoCache(ic *infoschema.InfoCache) Option { return func(options *Options) { options.InfoCache = ic } diff --git a/ddl/options_test.go b/ddl/options_test.go index d177fd2cd0842..22a451d622c71 100644 --- a/ddl/options_test.go +++ b/ddl/options_test.go @@ -40,7 +40,7 @@ func (s *ddlOptionsSuite) TestOptions(c *C) { ddl.WithHook(callback), ddl.WithLease(lease), ddl.WithStore(store), - ddl.WithInfoHandle(infoHandle), + ddl.WithInfoCache(infoHandle), } opt := &ddl.Options{} diff --git a/domain/domain.go b/domain/domain.go index 9bfd4049ab74f..f468d168bfd4e 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -707,7 +707,7 @@ func (do *Domain) Init(ddlLease time.Duration, sysFactory func(*Domain) (pools.R ctx, ddl.WithEtcdClient(do.etcdClient), ddl.WithStore(do.store), - ddl.WithInfoHandle(do.infoCache), + ddl.WithInfoCache(do.infoCache), ddl.WithHook(callback), ddl.WithLease(ddlLease), ) diff --git a/domain/domain_test.go b/domain/domain_test.go index 724d95d0d511c..82a583866aad3 100644 --- a/domain/domain_test.go +++ b/domain/domain_test.go @@ -128,7 +128,7 @@ func TestInfo(t *testing.T) { goCtx, ddl.WithEtcdClient(dom.GetEtcdClient()), ddl.WithStore(s), - ddl.WithInfoHandle(dom.infoCache), + ddl.WithInfoCache(dom.infoCache), ddl.WithLease(ddlLease), ) err = dom.ddl.Start(nil) From a4da6db3cc19e8d68800abe1833ed3cb434da9ca Mon Sep 17 00:00:00 2001 From: xhe Date: Wed, 28 Apr 2021 12:56:49 +0800 Subject: [PATCH 09/23] domain: typo Signed-off-by: xhe --- domain/domain.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/domain/domain.go b/domain/domain.go index f468d168bfd4e..41681e8e60cee 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -121,7 +121,7 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i // And it is only used to diff upgrading the current latest infoschema, if: // 1. Not first time bootstrap loading, which needs a full load. // 2. It is newer than the current one, so it will be "the current one" after this function call. - // 2. There are less 100 diffs. + // 3. There are less 100 diffs. startTime := time.Now() if currentSchemaVersion != 0 && neededSchemaVersion > currentSchemaVersion && neededSchemaVersion-currentSchemaVersion < 100 { is, relatedChanges, err := do.tryLoadSchemaDiffs(m, currentSchemaVersion, neededSchemaVersion) From 05a22ae85c3c5d3821fff4794eb58901509279b8 Mon Sep 17 00:00:00 2001 From: xhe Date: Wed, 28 Apr 2021 12:59:39 +0800 Subject: [PATCH 10/23] infoschema: update description Signed-off-by: xhe --- infoschema/cache.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/infoschema/cache.go b/infoschema/cache.go index c1a5976e094b8..e5779b415a393 100644 --- a/infoschema/cache.go +++ b/infoschema/cache.go @@ -53,8 +53,8 @@ func (h *InfoCache) GetByVersion(version int64) InfoSchema { return nil } -// Insert will **TRY** to insert the infoschema into the cache. It works by always keeping newers. -// But YOU SHOULD NOT RELY THIS BEHAVIOR. +// Insert will **TRY** to insert the infoschema into the cache. +// It only promised to cache the infoschema, if it is newer than all the cached. func (h *InfoCache) Insert(is InfoSchema) { h.mu.Lock() defer h.mu.Unlock() From d517fee27c03920123889574c5d5dd7666127459 Mon Sep 17 00:00:00 2001 From: xhe Date: Wed, 28 Apr 2021 13:17:27 +0800 Subject: [PATCH 11/23] infoschema: reword Signed-off-by: xhe --- infoschema/cache.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/infoschema/cache.go b/infoschema/cache.go index e5779b415a393..3365e42461aba 100644 --- a/infoschema/cache.go +++ b/infoschema/cache.go @@ -19,6 +19,8 @@ import ( ) // InfoCache handles information schema, including getting and setting. +// The cache behavior, however, is transprent and under automatic management. +// It only promised to cache the infoschema, if it is newer than all the cached. type InfoCache struct { mu sync.RWMutex // cache is sorted by SchemaVersion in descending order @@ -54,7 +56,7 @@ func (h *InfoCache) GetByVersion(version int64) InfoSchema { } // Insert will **TRY** to insert the infoschema into the cache. -// It only promised to cache the infoschema, if it is newer than all the cached. +// It only promised to cache the newest infoschema. func (h *InfoCache) Insert(is InfoSchema) { h.mu.Lock() defer h.mu.Unlock() From 2259113591bbc8f2f22959f860cc167d57ad5de4 Mon Sep 17 00:00:00 2001 From: xhe Date: Mon, 10 May 2021 15:08:21 +0800 Subject: [PATCH 12/23] *: reword Co-authored-by: JmPotato --- domain/domain.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/domain/domain.go b/domain/domain.go index 41681e8e60cee..e6ea3d1e2d949 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -284,7 +284,7 @@ func canSkipSchemaCheckerDDL(tp model.ActionType) bool { return false } -// InfoSchema gets information schema from domain. +// InfoSchema gets the latest information schema from domain. func (do *Domain) InfoSchema() infoschema.InfoSchema { return do.infoCache.GetLatest() } From 64cfce56d4b33df322bde906f2ab3f5c7cb4f26b Mon Sep 17 00:00:00 2001 From: xhe Date: Thu, 13 May 2021 14:45:37 +0800 Subject: [PATCH 13/23] *: ensure infoCache is non-nil Signed-off-by: xhe --- ddl/column_change_test.go | 21 +++++++++------- ddl/column_test.go | 3 +-- ddl/ddl.go | 12 +++++++--- ddl/ddl_test.go | 5 ++++ ddl/ddl_worker_test.go | 10 ++++---- ddl/index_change_test.go | 22 +++++++++-------- ddl/partition.go | 17 ++++++------- ddl/reorg_test.go | 2 +- ddl/restart_test.go | 4 ++-- ddl/schema.go | 4 ---- ddl/schema_test.go | 4 ++-- ddl/stat_test.go | 2 +- ddl/table.go | 50 +++++++++++++++++---------------------- ddl/table_test.go | 2 +- 14 files changed, 80 insertions(+), 78 deletions(-) diff --git a/ddl/column_change_test.go b/ddl/column_change_test.go index 94e8787a2bdc4..6bd5a94f7235e 100644 --- a/ddl/column_change_test.go +++ b/ddl/column_change_test.go @@ -47,15 +47,18 @@ type testColumnChangeSuite struct { func (s *testColumnChangeSuite) SetUpSuite(c *C) { SetWaitTimeWhenErrorOccurred(1 * time.Microsecond) s.store = testCreateStore(c, "test_column_change") - s.dbInfo = &model.DBInfo{ - Name: model.NewCIStr("test_column_change"), - ID: 1, - } - err := kv.RunInNewTxn(context.Background(), s.store, true, func(ctx context.Context, txn kv.Transaction) error { - t := meta.NewMeta(txn) - return errors.Trace(t.CreateDatabase(s.dbInfo)) - }) - c.Check(err, IsNil) + d := testNewDDLAndStart( + context.Background(), + c, + WithStore(s.store), + WithLease(testLease), + ) + defer func() { + err := d.Stop() + c.Assert(err, IsNil) + }() + s.dbInfo = testSchemaInfo(c, d, "test_index_change") + testCreateSchema(c, testNewContext(d), d, s.dbInfo) } func (s *testColumnChangeSuite) TearDownSuite(c *C) { diff --git a/ddl/column_test.go b/ddl/column_test.go index 862fb4aa04c59..f3eaa26d22385 100644 --- a/ddl/column_test.go +++ b/ddl/column_test.go @@ -54,8 +54,7 @@ func (s *testColumnSuite) SetUpSuite(c *C) { s.dbInfo = testSchemaInfo(c, d, "test_column") testCreateSchema(c, testNewContext(d), d, s.dbInfo) - err := d.Stop() - c.Assert(err, IsNil) + c.Assert(d.Stop(), IsNil) } func (s *testColumnSuite) TearDownSuite(c *C) { diff --git a/ddl/ddl.go b/ddl/ddl.go index 7183c9c4cce74..0b45a67c6826f 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -282,6 +282,15 @@ func newDDL(ctx context.Context, options ...Option) *ddl { deadLockCkr = util.NewDeadTableLockChecker(etcdCli) } + // TODO: make store and infoCache explicit arguments + // these two should be ensured to exist + if (opt.Store == nil) { + panic("store should not be nil") + } + if (opt.InfoCache == nil) { + panic("infoCache should not be nil") + } + ddlCtx := &ddlCtx{ uuid: id, store: opt.Store, @@ -649,9 +658,6 @@ func (d *ddl) startCleanDeadTableLock() { if !d.ownerManager.IsOwner() { continue } - if d.infoCache == nil || d.infoCache.GetLatest() == nil { - continue - } deadLockTables, err := d.tableLockCkr.GetDeadLockedTables(d.ctx, d.infoCache.GetLatest().AllSchemas()) if err != nil { logutil.BgLogger().Info("[ddl] get dead table lock failed.", zap.Error(err)) diff --git a/ddl/ddl_test.go b/ddl/ddl_test.go index b77c3300c2700..79635bfc0933b 100644 --- a/ddl/ddl_test.go +++ b/ddl/ddl_test.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/parser/model" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/domain/infosync" + "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/meta/autoid" @@ -86,6 +87,10 @@ func TestT(t *testing.T) { } func testNewDDLAndStart(ctx context.Context, c *C, options ...Option) *ddl { + // init infoCache and a stub infoSchema + ic := infoschema.NewCache(2) + ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 0)) + options = append(options, WithInfoCache(ic)) d := newDDL(ctx, options...) err := d.Start(nil) c.Assert(err, IsNil) diff --git a/ddl/ddl_worker_test.go b/ddl/ddl_worker_test.go index 6e745820b04b9..72fef7c96c19e 100644 --- a/ddl/ddl_worker_test.go +++ b/ddl/ddl_worker_test.go @@ -247,7 +247,7 @@ func (s *testDDLSuite) TestTableError(c *C) { // Schema ID is wrong, so dropping table is failed. doDDLJobErr(c, -1, 1, model.ActionDropTable, nil, ctx, d) // Table ID is wrong, so dropping table is failed. - dbInfo := testSchemaInfo(c, d, "test") + dbInfo := testSchemaInfo(c, d, "test_ddl") testCreateSchema(c, testNewContext(d), d, dbInfo) job := doDDLJobErr(c, dbInfo.ID, -1, model.ActionDropTable, nil, ctx, d) @@ -295,7 +295,7 @@ func (s *testDDLSuite) TestViewError(c *C) { c.Assert(err, IsNil) }() ctx := testNewContext(d) - dbInfo := testSchemaInfo(c, d, "test") + dbInfo := testSchemaInfo(c, d, "test_ddl") testCreateSchema(c, testNewContext(d), d, dbInfo) // Table ID or schema ID is wrong, so getting table is failed. @@ -363,7 +363,7 @@ func (s *testDDLSuite) TestForeignKeyError(c *C) { doDDLJobErr(c, -1, 1, model.ActionAddForeignKey, nil, ctx, d) doDDLJobErr(c, -1, 1, model.ActionDropForeignKey, nil, ctx, d) - dbInfo := testSchemaInfo(c, d, "test") + dbInfo := testSchemaInfo(c, d, "test_ddl") tblInfo := testTableInfo(c, d, "t", 3) testCreateSchema(c, ctx, d, dbInfo) testCreateTable(c, ctx, d, dbInfo, tblInfo) @@ -393,7 +393,7 @@ func (s *testDDLSuite) TestIndexError(c *C) { doDDLJobErr(c, -1, 1, model.ActionAddIndex, nil, ctx, d) doDDLJobErr(c, -1, 1, model.ActionDropIndex, nil, ctx, d) - dbInfo := testSchemaInfo(c, d, "test") + dbInfo := testSchemaInfo(c, d, "test_ddl") tblInfo := testTableInfo(c, d, "t", 3) testCreateSchema(c, ctx, d, dbInfo) testCreateTable(c, ctx, d, dbInfo, tblInfo) @@ -435,7 +435,7 @@ func (s *testDDLSuite) TestColumnError(c *C) { }() ctx := testNewContext(d) - dbInfo := testSchemaInfo(c, d, "test") + dbInfo := testSchemaInfo(c, d, "test_ddl") tblInfo := testTableInfo(c, d, "t", 3) testCreateSchema(c, ctx, d, dbInfo) testCreateTable(c, ctx, d, dbInfo, tblInfo) diff --git a/ddl/index_change_test.go b/ddl/index_change_test.go index 0a54b6b25e694..b273887dd00ac 100644 --- a/ddl/index_change_test.go +++ b/ddl/index_change_test.go @@ -22,7 +22,6 @@ import ( "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" @@ -38,15 +37,18 @@ type testIndexChangeSuite struct { func (s *testIndexChangeSuite) SetUpSuite(c *C) { s.store = testCreateStore(c, "test_index_change") - s.dbInfo = &model.DBInfo{ - Name: model.NewCIStr("test_index_change"), - ID: 1, - } - err := kv.RunInNewTxn(context.Background(), s.store, true, func(ctx context.Context, txn kv.Transaction) error { - t := meta.NewMeta(txn) - return errors.Trace(t.CreateDatabase(s.dbInfo)) - }) - c.Check(err, IsNil, Commentf("err %v", errors.ErrorStack(err))) + d := testNewDDLAndStart( + context.Background(), + c, + WithStore(s.store), + WithLease(testLease), + ) + defer func() { + err := d.Stop() + c.Assert(err, IsNil) + }() + s.dbInfo = testSchemaInfo(c, d, "test_index_change") + testCreateSchema(c, testNewContext(d), d, s.dbInfo) } func (s *testIndexChangeSuite) TearDownSuite(c *C) { diff --git a/ddl/partition.go b/ddl/partition.go index bdd6bf71a2480..ecf9d3dc5604d 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -911,18 +911,15 @@ func getTableInfoWithDroppingPartitions(t *model.TableInfo) *model.TableInfo { } func dropRuleBundles(d *ddlCtx, physicalTableIDs []int64) error { - if d.infoCache != nil && d.infoCache.GetLatest() != nil { - bundles := make([]*placement.Bundle, 0, len(physicalTableIDs)) - for _, ID := range physicalTableIDs { - oldBundle, ok := d.infoCache.GetLatest().BundleByName(placement.GroupID(ID)) - if ok && !oldBundle.IsEmpty() { - bundles = append(bundles, placement.BuildPlacementDropBundle(ID)) - } + bundles := make([]*placement.Bundle, 0, len(physicalTableIDs)) + for _, ID := range physicalTableIDs { + oldBundle, ok := d.infoCache.GetLatest().BundleByName(placement.GroupID(ID)) + if ok && !oldBundle.IsEmpty() { + bundles = append(bundles, placement.BuildPlacementDropBundle(ID)) } - err := infosync.PutRuleBundles(context.TODO(), bundles) - return err } - return nil + err := infosync.PutRuleBundles(context.TODO(), bundles) + return err } // onDropTablePartition deletes old partition meta. diff --git a/ddl/reorg_test.go b/ddl/reorg_test.go index 18dd9a975fceb..4c28540e7ad3b 100644 --- a/ddl/reorg_test.go +++ b/ddl/reorg_test.go @@ -217,7 +217,7 @@ func (s *testDDLSuite) TestReorgOwner(c *C) { c.Assert(err, IsNil) }() - dbInfo := testSchemaInfo(c, d1, "test") + dbInfo := testSchemaInfo(c, d1, "test_reorg") testCreateSchema(c, ctx, d1, dbInfo) tblInfo := testTableInfo(c, d1, "t", 3) diff --git a/ddl/restart_test.go b/ddl/restart_test.go index b587d54b80cc8..b7791ef7679bd 100644 --- a/ddl/restart_test.go +++ b/ddl/restart_test.go @@ -120,7 +120,7 @@ func (s *testSchemaSuite) TestSchemaResume(c *C) { testCheckOwner(c, d1, true) - dbInfo := testSchemaInfo(c, d1, "test") + dbInfo := testSchemaInfo(c, d1, "test_restart") job := &model.Job{ SchemaID: dbInfo.ID, Type: model.ActionCreateSchema, @@ -157,7 +157,7 @@ func (s *testStatSuite) TestStat(c *C) { c.Assert(err, IsNil) }() - dbInfo := testSchemaInfo(c, d, "test") + dbInfo := testSchemaInfo(c, d, "test_restart") testCreateSchema(c, testNewContext(d), d, dbInfo) // TODO: Get this information from etcd. diff --git a/ddl/schema.go b/ddl/schema.go index ddb48079e94d3..a4b14a49bdbc3 100644 --- a/ddl/schema.go +++ b/ddl/schema.go @@ -68,10 +68,6 @@ func onCreateSchema(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error } func checkSchemaNotExists(d *ddlCtx, t *meta.Meta, schemaID int64, dbInfo *model.DBInfo) error { - // d.infoHandle maybe nil in some test. - if d.infoCache == nil { - return checkSchemaNotExistsFromStore(t, schemaID, dbInfo) - } // Try to use memory schema info to check first. currVer, err := t.GetSchemaVersion() if err != nil { diff --git a/ddl/schema_test.go b/ddl/schema_test.go index c70a0b793bb35..b4c8efee7b089 100644 --- a/ddl/schema_test.go +++ b/ddl/schema_test.go @@ -139,7 +139,7 @@ func (s *testSchemaSuite) TestSchema(c *C) { c.Assert(err, IsNil) }() ctx := testNewContext(d) - dbInfo := testSchemaInfo(c, d, "test") + dbInfo := testSchemaInfo(c, d, "test_schema") // create a database. job := testCreateSchema(c, ctx, d, dbInfo) @@ -228,7 +228,7 @@ func (s *testSchemaSuite) TestSchemaWaitJob(c *C) { // d2 must not be owner. d2.ownerManager.RetireOwner() - dbInfo := testSchemaInfo(c, d2, "test") + dbInfo := testSchemaInfo(c, d2, "test_schema") testCreateSchema(c, ctx, d2, dbInfo) testCheckSchemaState(c, d2, dbInfo, model.StatePublic) diff --git a/ddl/stat_test.go b/ddl/stat_test.go index fe562a0ae0fb8..1ed3cbfe4c7fc 100644 --- a/ddl/stat_test.go +++ b/ddl/stat_test.go @@ -61,7 +61,7 @@ func (s *testSerialStatSuite) TestDDLStatsInfo(c *C) { c.Assert(err, IsNil) }() - dbInfo := testSchemaInfo(c, d, "test") + dbInfo := testSchemaInfo(c, d, "test_stat") testCreateSchema(c, testNewContext(d), d, dbInfo) tblInfo := testTableInfo(c, d, "t", 2) ctx := testNewContext(d) diff --git a/ddl/table.go b/ddl/table.go index 1f4538d4df5a1..424dd040a0de9 100644 --- a/ddl/table.go +++ b/ddl/table.go @@ -487,34 +487,32 @@ func onTruncateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ erro } } - if d.infoCache != nil && d.infoCache.GetLatest() != nil { - is := d.infoCache.GetLatest() - - bundles := make([]*placement.Bundle, 0, len(oldPartitionIDs)+1) - if oldBundle, ok := is.BundleByName(placement.GroupID(tableID)); ok { - bundles = append(bundles, placement.BuildPlacementCopyBundle(oldBundle, newTableID)) - } + is := d.infoCache.GetLatest() - if pi := tblInfo.GetPartitionInfo(); pi != nil { - oldIDs := make([]int64, 0, len(oldPartitionIDs)) - newIDs := make([]int64, 0, len(oldPartitionIDs)) - newDefs := pi.Definitions - for i := range oldPartitionIDs { - newID := newDefs[i].ID - if oldBundle, ok := is.BundleByName(placement.GroupID(oldPartitionIDs[i])); ok && !oldBundle.IsEmpty() { - oldIDs = append(oldIDs, oldPartitionIDs[i]) - newIDs = append(newIDs, newID) - bundles = append(bundles, placement.BuildPlacementCopyBundle(oldBundle, newID)) - } + bundles := make([]*placement.Bundle, 0, len(oldPartitionIDs)+1) + if oldBundle, ok := is.BundleByName(placement.GroupID(tableID)); ok { + bundles = append(bundles, placement.BuildPlacementCopyBundle(oldBundle, newTableID)) + } + + if pi := tblInfo.GetPartitionInfo(); pi != nil { + oldIDs := make([]int64, 0, len(oldPartitionIDs)) + newIDs := make([]int64, 0, len(oldPartitionIDs)) + newDefs := pi.Definitions + for i := range oldPartitionIDs { + newID := newDefs[i].ID + if oldBundle, ok := is.BundleByName(placement.GroupID(oldPartitionIDs[i])); ok && !oldBundle.IsEmpty() { + oldIDs = append(oldIDs, oldPartitionIDs[i]) + newIDs = append(newIDs, newID) + bundles = append(bundles, placement.BuildPlacementCopyBundle(oldBundle, newID)) } - job.CtxVars = []interface{}{oldIDs, newIDs} } + job.CtxVars = []interface{}{oldIDs, newIDs} + } - err = infosync.PutRuleBundles(context.TODO(), bundles) - if err != nil { - job.State = model.JobStateCancelled - return 0, errors.Wrapf(err, "failed to notify PD the placement rules") - } + err = infosync.PutRuleBundles(context.TODO(), bundles) + if err != nil { + job.State = model.JobStateCancelled + return 0, errors.Wrapf(err, "failed to notify PD the placement rules") } // Clear the tiflash replica available status. @@ -967,10 +965,6 @@ func onUpdateFlashReplicaStatus(t *meta.Meta, job *model.Job) (ver int64, _ erro } func checkTableNotExists(d *ddlCtx, t *meta.Meta, schemaID int64, tableName string) error { - // d.infoHandle maybe nil in some test. - if d.infoCache == nil || d.infoCache.GetLatest() == nil { - return checkTableNotExistsFromStore(t, schemaID, tableName) - } // Try to use memory schema info to check first. currVer, err := t.GetSchemaVersion() if err != nil { diff --git a/ddl/table_test.go b/ddl/table_test.go index 5760fc2b152b5..10927908f5289 100644 --- a/ddl/table_test.go +++ b/ddl/table_test.go @@ -355,7 +355,7 @@ func (s *testTableSuite) SetUpSuite(c *C) { WithLease(testLease), ) - s.dbInfo = testSchemaInfo(c, s.d, "test") + s.dbInfo = testSchemaInfo(c, s.d, "test_table") testCreateSchema(c, testNewContext(s.d), s.d, s.dbInfo) } From 12482eb877140bc582a8f7e65df92dcc52852995 Mon Sep 17 00:00:00 2001 From: xhe Date: Thu, 13 May 2021 14:49:54 +0800 Subject: [PATCH 14/23] *: remove another two useless checks Signed-off-by: xhe --- ddl/partition.go | 66 +++++++++++++++++++++++------------------------- 1 file changed, 31 insertions(+), 35 deletions(-) diff --git a/ddl/partition.go b/ddl/partition.go index ecf9d3dc5604d..4e55ec1779e21 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -1092,24 +1092,22 @@ func onTruncateTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, e } } - if d.infoCache != nil && d.infoCache.GetLatest() != nil { - bundles := make([]*placement.Bundle, 0, len(oldIDs)) - - for i, oldID := range oldIDs { - oldBundle, ok := d.infoCache.GetLatest().BundleByName(placement.GroupID(oldID)) - if ok && !oldBundle.IsEmpty() { - bundles = append(bundles, placement.BuildPlacementDropBundle(oldID)) - bundles = append(bundles, placement.BuildPlacementCopyBundle(oldBundle, newPartitions[i].ID)) - } - } + bundles := make([]*placement.Bundle, 0, len(oldIDs)) - err = infosync.PutRuleBundles(context.TODO(), bundles) - if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Wrapf(err, "failed to notify PD the placement rules") + for i, oldID := range oldIDs { + oldBundle, ok := d.infoCache.GetLatest().BundleByName(placement.GroupID(oldID)) + if ok && !oldBundle.IsEmpty() { + bundles = append(bundles, placement.BuildPlacementDropBundle(oldID)) + bundles = append(bundles, placement.BuildPlacementCopyBundle(oldBundle, newPartitions[i].ID)) } } + err = infosync.PutRuleBundles(context.TODO(), bundles) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Wrapf(err, "failed to notify PD the placement rules") + } + newIDs := make([]int64, len(oldIDs)) for i := range oldIDs { newIDs[i] = newPartitions[i].ID @@ -1296,27 +1294,25 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo // the follow code is a swap function for rules of two partitions // though partitions has exchanged their ID, swap still take effect - if d.infoCache != nil && d.infoCache.GetLatest() != nil { - bundles := make([]*placement.Bundle, 0, 2) - ptBundle, ptOK := d.infoCache.GetLatest().BundleByName(placement.GroupID(partDef.ID)) - ptOK = ptOK && !ptBundle.IsEmpty() - ntBundle, ntOK := d.infoCache.GetLatest().BundleByName(placement.GroupID(nt.ID)) - ntOK = ntOK && !ntBundle.IsEmpty() - if ptOK && ntOK { - bundles = append(bundles, placement.BuildPlacementCopyBundle(ptBundle, nt.ID)) - bundles = append(bundles, placement.BuildPlacementCopyBundle(ntBundle, partDef.ID)) - } else if ptOK { - bundles = append(bundles, placement.BuildPlacementDropBundle(partDef.ID)) - bundles = append(bundles, placement.BuildPlacementCopyBundle(ptBundle, nt.ID)) - } else if ntOK { - bundles = append(bundles, placement.BuildPlacementDropBundle(nt.ID)) - bundles = append(bundles, placement.BuildPlacementCopyBundle(ntBundle, partDef.ID)) - } - err = infosync.PutRuleBundles(context.TODO(), bundles) - if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Wrapf(err, "failed to notify PD the placement rules") - } + bundles := make([]*placement.Bundle, 0, 2) + ptBundle, ptOK := d.infoCache.GetLatest().BundleByName(placement.GroupID(partDef.ID)) + ptOK = ptOK && !ptBundle.IsEmpty() + ntBundle, ntOK := d.infoCache.GetLatest().BundleByName(placement.GroupID(nt.ID)) + ntOK = ntOK && !ntBundle.IsEmpty() + if ptOK && ntOK { + bundles = append(bundles, placement.BuildPlacementCopyBundle(ptBundle, nt.ID)) + bundles = append(bundles, placement.BuildPlacementCopyBundle(ntBundle, partDef.ID)) + } else if ptOK { + bundles = append(bundles, placement.BuildPlacementDropBundle(partDef.ID)) + bundles = append(bundles, placement.BuildPlacementCopyBundle(ptBundle, nt.ID)) + } else if ntOK { + bundles = append(bundles, placement.BuildPlacementDropBundle(nt.ID)) + bundles = append(bundles, placement.BuildPlacementCopyBundle(ntBundle, partDef.ID)) + } + err = infosync.PutRuleBundles(context.TODO(), bundles) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Wrapf(err, "failed to notify PD the placement rules") } ver, err = updateSchemaVersion(t, job) From 4322cc09d55ce71add6135c8500c112b4065c02c Mon Sep 17 00:00:00 2001 From: xhe Date: Thu, 13 May 2021 16:35:18 +0800 Subject: [PATCH 15/23] *: typo Co-authored-by: Song Gao --- infoschema/cache.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/infoschema/cache.go b/infoschema/cache.go index 3365e42461aba..f82c8bfb16749 100644 --- a/infoschema/cache.go +++ b/infoschema/cache.go @@ -19,7 +19,7 @@ import ( ) // InfoCache handles information schema, including getting and setting. -// The cache behavior, however, is transprent and under automatic management. +// The cache behavior, however, is transparent and under automatic management. // It only promised to cache the infoschema, if it is newer than all the cached. type InfoCache struct { mu sync.RWMutex From d4addd31a710ad487037f3a791a67919754ff554 Mon Sep 17 00:00:00 2001 From: xhe Date: Thu, 13 May 2021 16:39:16 +0800 Subject: [PATCH 16/23] *: add return value for Insert Signed-off-by: xhe --- infoschema/cache.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/infoschema/cache.go b/infoschema/cache.go index f82c8bfb16749..f2f5a39e11cfe 100644 --- a/infoschema/cache.go +++ b/infoschema/cache.go @@ -57,7 +57,8 @@ func (h *InfoCache) GetByVersion(version int64) InfoSchema { // Insert will **TRY** to insert the infoschema into the cache. // It only promised to cache the newest infoschema. -func (h *InfoCache) Insert(is InfoSchema) { +// It returns 'true' if it is cached, 'false' otherwise. +func (h *InfoCache) Insert(is InfoSchema) bool { h.mu.Lock() defer h.mu.Unlock() @@ -68,7 +69,7 @@ func (h *InfoCache) Insert(is InfoSchema) { // cached entry if i < len(h.cache) && h.cache[i].SchemaMetaVersion() == version { - return + return true } if len(h.cache) < cap(h.cache) { @@ -76,10 +77,13 @@ func (h *InfoCache) Insert(is InfoSchema) { h.cache = h.cache[:len(h.cache)+1] copy(h.cache[i+1:], h.cache[i:]) h.cache[i] = is + return true } else if i < len(h.cache) { // drop older schema copy(h.cache[i+1:], h.cache[i:]) h.cache[i] = is + return true } // older than all cached schemas, refuse to cache it + return false } From d19a778f64efaa69aa2bb6a71d293ad3775ee259 Mon Sep 17 00:00:00 2001 From: xhe Date: Thu, 13 May 2021 16:57:15 +0800 Subject: [PATCH 17/23] *: add TestStalenessTransactionSchemaVer back Signed-off-by: xhe --- executor/stale_txn_test.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/executor/stale_txn_test.go b/executor/stale_txn_test.go index 5c762da6b087e..9e67d2514c6ef 100644 --- a/executor/stale_txn_test.go +++ b/executor/stale_txn_test.go @@ -248,3 +248,21 @@ func (s *testStaleTxnSerialSuite) TestTimeBoundedStalenessTxn(c *C) { failpoint.Disable("github.com/pingcap/tidb/store/tikv/injectSafeTS") } } + +func (s *testStaleTxnSerialSuite) TestStalenessTransactionSchemaVer(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t (id int primary key);") + + schemaVer1 := tk.Se.GetSessionVars().GetInfoSchema().SchemaMetaVersion() + tk.MustExec("drop table if exists t") + schemaVer2 := tk.Se.GetSessionVars().GetInfoSchema().SchemaMetaVersion() + // confirm schema changed + c.Assert(schemaVer1, Less, schemaVer2) + + tk.MustExec(`START TRANSACTION READ ONLY WITH TIMESTAMP BOUND EXACT STALENESS '00:00:01'`) + schemaVer3 := tk.Se.GetSessionVars().GetInfoSchema().SchemaMetaVersion() + // got an old infoSchema + c.Assert(schemaVer3, Less, schemaVer2) +} From 2a682eb755c1dfacc68f15783a8596245fb0fa60 Mon Sep 17 00:00:00 2001 From: xhe Date: Thu, 13 May 2021 16:58:11 +0800 Subject: [PATCH 18/23] *: fix fmt Signed-off-by: xhe --- ddl/ddl.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ddl/ddl.go b/ddl/ddl.go index 0b45a67c6826f..9eb05b86741ed 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -284,10 +284,10 @@ func newDDL(ctx context.Context, options ...Option) *ddl { // TODO: make store and infoCache explicit arguments // these two should be ensured to exist - if (opt.Store == nil) { + if opt.Store == nil { panic("store should not be nil") } - if (opt.InfoCache == nil) { + if opt.InfoCache == nil { panic("infoCache should not be nil") } From 2f7ae2f04f6ee6df298cfe001a8e8a156efb3194 Mon Sep 17 00:00:00 2001 From: xhe Date: Thu, 13 May 2021 17:51:40 +0800 Subject: [PATCH 19/23] *: accurate test Signed-off-by: xhe --- executor/stale_txn_test.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/executor/stale_txn_test.go b/executor/stale_txn_test.go index 9e67d2514c6ef..5ca7bbffa17ed 100644 --- a/executor/stale_txn_test.go +++ b/executor/stale_txn_test.go @@ -256,13 +256,12 @@ func (s *testStaleTxnSerialSuite) TestStalenessTransactionSchemaVer(c *C) { tk.MustExec("create table t (id int primary key);") schemaVer1 := tk.Se.GetSessionVars().GetInfoSchema().SchemaMetaVersion() + time.Sleep(time.Second) tk.MustExec("drop table if exists t") schemaVer2 := tk.Se.GetSessionVars().GetInfoSchema().SchemaMetaVersion() - // confirm schema changed c.Assert(schemaVer1, Less, schemaVer2) tk.MustExec(`START TRANSACTION READ ONLY WITH TIMESTAMP BOUND EXACT STALENESS '00:00:01'`) schemaVer3 := tk.Se.GetSessionVars().GetInfoSchema().SchemaMetaVersion() - // got an old infoSchema - c.Assert(schemaVer3, Less, schemaVer2) + c.Assert(schemaVer3, Equals, schemaVer1) } From e00f63e3ee9d85c60b0c608224f10efc6f24cc67 Mon Sep 17 00:00:00 2001 From: xhe Date: Thu, 13 May 2021 17:52:59 +0800 Subject: [PATCH 20/23] *: mistakenly deleted comment Signed-off-by: xhe --- executor/stale_txn_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/executor/stale_txn_test.go b/executor/stale_txn_test.go index 5ca7bbffa17ed..e7d3326ca1206 100644 --- a/executor/stale_txn_test.go +++ b/executor/stale_txn_test.go @@ -259,9 +259,11 @@ func (s *testStaleTxnSerialSuite) TestStalenessTransactionSchemaVer(c *C) { time.Sleep(time.Second) tk.MustExec("drop table if exists t") schemaVer2 := tk.Se.GetSessionVars().GetInfoSchema().SchemaMetaVersion() + // confirm schema changed c.Assert(schemaVer1, Less, schemaVer2) tk.MustExec(`START TRANSACTION READ ONLY WITH TIMESTAMP BOUND EXACT STALENESS '00:00:01'`) schemaVer3 := tk.Se.GetSessionVars().GetInfoSchema().SchemaMetaVersion() + // got an old infoSchema c.Assert(schemaVer3, Equals, schemaVer1) } From 287038dc7da0dcd8aafe9e40ace063bb336cba25 Mon Sep 17 00:00:00 2001 From: xhe Date: Mon, 17 May 2021 14:40:13 +0800 Subject: [PATCH 21/23] *: add metrics Signed-off-by: xhe --- infoschema/cache.go | 6 ++++++ metrics/domain.go | 13 +++++++++++++ metrics/metrics.go | 1 + 3 files changed, 20 insertions(+) diff --git a/infoschema/cache.go b/infoschema/cache.go index f2f5a39e11cfe..4c3371b1bc354 100644 --- a/infoschema/cache.go +++ b/infoschema/cache.go @@ -16,6 +16,8 @@ package infoschema import ( "sort" "sync" + + "github.com/pingcap/tidb/metrics" ) // InfoCache handles information schema, including getting and setting. @@ -36,7 +38,9 @@ func NewCache(capcity int) *InfoCache { func (h *InfoCache) GetLatest() InfoSchema { h.mu.RLock() defer h.mu.RUnlock() + metrics.InfoCacheCounters.WithLabelValues("get").Inc() if len(h.cache) > 0 { + metrics.InfoCacheCounters.WithLabelValues("hit").Inc() return h.cache[0] } return nil @@ -46,10 +50,12 @@ func (h *InfoCache) GetLatest() InfoSchema { func (h *InfoCache) GetByVersion(version int64) InfoSchema { h.mu.RLock() defer h.mu.RUnlock() + metrics.InfoCacheCounters.WithLabelValues("get").Inc() i := sort.Search(len(h.cache), func(i int) bool { return h.cache[i].SchemaMetaVersion() <= version }) if i < len(h.cache) && h.cache[i].SchemaMetaVersion() == version { + metrics.InfoCacheCounters.WithLabelValues("hit").Inc() return h.cache[i] } return nil diff --git a/metrics/domain.go b/metrics/domain.go index dd3912555d59c..f30dbd59e5d32 100644 --- a/metrics/domain.go +++ b/metrics/domain.go @@ -38,6 +38,19 @@ var ( Buckets: prometheus.ExponentialBuckets(0.001, 2, 20), // 1ms ~ 524s }) + // InfoCacheCounters are the counters of get/hit. + InfoCacheCounters = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "tidb", + Subsystem: "domain", + Name: "infocache_counters", + Help: "Counters of infoCache: get/hit.", + }, []string{LblType}) + // InfoCacheCounterGet is the total number of getting entry. + InfoCacheCounterGet = "get" + // InfoCacheCounterHit is the cache hit numbers for get. + InfoCacheCounterHit = "hit" + // LoadPrivilegeCounter records the counter of load privilege. LoadPrivilegeCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ diff --git a/metrics/metrics.go b/metrics/metrics.go index ff2ac3b1aa08d..4a879b5d5423c 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -98,6 +98,7 @@ func RegisterMetrics() { prometheus.MustRegister(JobsGauge) prometheus.MustRegister(KeepAliveCounter) prometheus.MustRegister(LoadPrivilegeCounter) + prometheus.MustRegister(InfoCacheCounters) prometheus.MustRegister(LoadSchemaCounter) prometheus.MustRegister(LoadSchemaDuration) prometheus.MustRegister(MetaHistogram) From 28c6e2b176955b415271ba51a1bb7931aa565043 Mon Sep 17 00:00:00 2001 From: xhe Date: Mon, 17 May 2021 18:57:14 +0800 Subject: [PATCH 22/23] *: fix panic Signed-off-by: xhe --- ddl/util/syncer_test.go | 7 +++++++ owner/manager_test.go | 4 ++++ 2 files changed, 11 insertions(+) diff --git a/ddl/util/syncer_test.go b/ddl/util/syncer_test.go index b552488ad49de..5a9d41d47e3b8 100644 --- a/ddl/util/syncer_test.go +++ b/ddl/util/syncer_test.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/parser/terror" . "github.com/pingcap/tidb/ddl" . "github.com/pingcap/tidb/ddl/util" + "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/owner" "github.com/pingcap/tidb/store/mockstore" "go.etcd.io/etcd/clientv3" @@ -69,11 +70,14 @@ func TestSyncerSimple(t *testing.T) { defer clus.Terminate(t) cli := clus.RandClient() ctx := goctx.Background() + ic := infoschema.NewCache(2) + ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 0)) d := NewDDL( ctx, WithEtcdClient(cli), WithStore(store), WithLease(testLease), + WithInfoCache(ic), ) err = d.Start(nil) if err != nil { @@ -110,11 +114,14 @@ func TestSyncerSimple(t *testing.T) { t.Fatalf("client get global version result not match, err %v", err) } + ic2 := infoschema.NewCache(2) + ic2.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 0)) d1 := NewDDL( ctx, WithEtcdClient(cli), WithStore(store), WithLease(testLease), + WithInfoCache(ic2), ) err = d1.Start(nil) if err != nil { diff --git a/owner/manager_test.go b/owner/manager_test.go index e25b204e6bbb4..fe060cb7fb2ee 100644 --- a/owner/manager_test.go +++ b/owner/manager_test.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/parser/terror" . "github.com/pingcap/tidb/ddl" + "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/owner" "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/util/logutil" @@ -72,11 +73,14 @@ func TestSingle(t *testing.T) { defer clus.Terminate(t) cli := clus.RandClient() ctx := goctx.Background() + ic := infoschema.NewCache(2) + ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 0)) d := NewDDL( ctx, WithEtcdClient(cli), WithStore(store), WithLease(testLease), + WithInfoCache(ic), ) err = d.Start(nil) if err != nil { From 3fa0aed95427d87caceea4523f5e90da5bddd52f Mon Sep 17 00:00:00 2001 From: xhe Date: Mon, 17 May 2021 19:24:28 +0800 Subject: [PATCH 23/23] *: fix panic2 Signed-off-by: xhe --- owner/manager_test.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/owner/manager_test.go b/owner/manager_test.go index fe060cb7fb2ee..e239419057291 100644 --- a/owner/manager_test.go +++ b/owner/manager_test.go @@ -146,11 +146,14 @@ func TestCluster(t *testing.T) { defer clus.Terminate(t) cli := clus.Client(0) + ic := infoschema.NewCache(2) + ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 0)) d := NewDDL( goctx.Background(), WithEtcdClient(cli), WithStore(store), WithLease(testLease), + WithInfoCache(ic), ) err = d.Start(nil) if err != nil { @@ -161,11 +164,14 @@ func TestCluster(t *testing.T) { t.Fatalf("expect true, got isOwner:%v", isOwner) } cli1 := clus.Client(1) + ic2 := infoschema.NewCache(2) + ic2.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 0)) d1 := NewDDL( goctx.Background(), WithEtcdClient(cli1), WithStore(store), WithLease(testLease), + WithInfoCache(ic2), ) err = d1.Start(nil) if err != nil { @@ -193,11 +199,14 @@ func TestCluster(t *testing.T) { // d3 (not owner) stop cli3 := clus.Client(3) + ic3 := infoschema.NewCache(2) + ic3.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 0)) d3 := NewDDL( goctx.Background(), WithEtcdClient(cli3), WithStore(store), WithLease(testLease), + WithInfoCache(ic3), ) err = d3.Start(nil) if err != nil {