diff --git a/br/pkg/lightning/common/BUILD.bazel b/br/pkg/lightning/common/BUILD.bazel index 44b370a799cde..519e81ed03175 100644 --- a/br/pkg/lightning/common/BUILD.bazel +++ b/br/pkg/lightning/common/BUILD.bazel @@ -23,7 +23,6 @@ go_library( "//br/pkg/lightning/log", "//br/pkg/utils", "//errno", - "//kv", "//meta/autoid", "//parser/model", "//sessionctx/variable", @@ -102,7 +101,7 @@ go_test( ], embed = [":common"], flaky = True, - shard_count = 20, + shard_count = 21, deps = [ "//br/pkg/errors", "//br/pkg/lightning/log", diff --git a/br/pkg/lightning/common/common.go b/br/pkg/lightning/common/common.go index 4b2f6698d80f0..fdfc493220914 100644 --- a/br/pkg/lightning/common/common.go +++ b/br/pkg/lightning/common/common.go @@ -18,7 +18,6 @@ import ( "context" "github.com/pingcap/errors" - "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/parser/model" ) @@ -50,9 +49,9 @@ var DefaultImportVariablesTiDB = map[string]string{ } // AllocGlobalAutoID allocs N consecutive autoIDs from TiDB. -func AllocGlobalAutoID(ctx context.Context, n int64, store kv.Storage, dbID int64, +func AllocGlobalAutoID(ctx context.Context, n int64, r autoid.Requirement, dbID int64, tblInfo *model.TableInfo) (autoIDBase, autoIDMax int64, err error) { - allocators, err := GetGlobalAutoIDAlloc(store, dbID, tblInfo) + allocators, err := GetGlobalAutoIDAlloc(r, dbID, tblInfo) if err != nil { return 0, 0, err } @@ -70,9 +69,9 @@ func AllocGlobalAutoID(ctx context.Context, n int64, store kv.Storage, dbID int6 } // RebaseGlobalAutoID rebase the autoID base to newBase. -func RebaseGlobalAutoID(ctx context.Context, newBase int64, store kv.Storage, dbID int64, +func RebaseGlobalAutoID(ctx context.Context, newBase int64, r autoid.Requirement, dbID int64, tblInfo *model.TableInfo) error { - allocators, err := GetGlobalAutoIDAlloc(store, dbID, tblInfo) + allocators, err := GetGlobalAutoIDAlloc(r, dbID, tblInfo) if err != nil { return err } @@ -85,10 +84,34 @@ func RebaseGlobalAutoID(ctx context.Context, newBase int64, store kv.Storage, db return nil } +// RebaseTableAllocators rebase the allocators of a table. +// This function only rebase a table allocator when its new base is given in +// `bases` param, else it will be skipped. +// base is the max id that have been used by the table, the next usable id will +// be base + 1, see Allocator.Alloc. +func RebaseTableAllocators(ctx context.Context, bases map[autoid.AllocatorType]int64, r autoid.Requirement, dbID int64, + tblInfo *model.TableInfo) error { + allocators, err := GetGlobalAutoIDAlloc(r, dbID, tblInfo) + if err != nil { + return err + } + for _, alloc := range allocators { + base, ok := bases[alloc.GetType()] + if !ok { + continue + } + err = alloc.Rebase(ctx, base, false) + if err != nil { + return err + } + } + return nil +} + // GetGlobalAutoIDAlloc returns the autoID allocators for a table. // export it for testing. -func GetGlobalAutoIDAlloc(store kv.Storage, dbID int64, tblInfo *model.TableInfo) ([]autoid.Allocator, error) { - if store == nil { +func GetGlobalAutoIDAlloc(r autoid.Requirement, dbID int64, tblInfo *model.TableInfo) ([]autoid.Allocator, error) { + if r == nil || r.Store() == nil { return nil, errors.New("internal error: kv store should not be nil") } if dbID == 0 { @@ -120,15 +143,15 @@ func GetGlobalAutoIDAlloc(store kv.Storage, dbID int64, tblInfo *model.TableInfo case hasRowID || hasAutoIncID: allocators := make([]autoid.Allocator, 0, 2) if tblInfo.SepAutoInc() && hasAutoIncID { - allocators = append(allocators, autoid.NewAllocator(store, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(), + allocators = append(allocators, autoid.NewAllocator(r, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(), autoid.AutoIncrementType, noCache, tblVer)) } // this allocator is NOT used when SepAutoInc=true and auto increment column is clustered. - allocators = append(allocators, autoid.NewAllocator(store, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(), + allocators = append(allocators, autoid.NewAllocator(r, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(), autoid.RowIDAllocType, noCache, tblVer)) return allocators, nil case hasAutoRandID: - return []autoid.Allocator{autoid.NewAllocator(store, dbID, tblInfo.ID, tblInfo.IsAutoRandomBitColUnsigned(), + return []autoid.Allocator{autoid.NewAllocator(r, dbID, tblInfo.ID, tblInfo.IsAutoRandomBitColUnsigned(), autoid.AutoRandomType, noCache, tblVer)}, nil default: return nil, errors.Errorf("internal error: table %s has no auto ID", tblInfo.Name) diff --git a/br/pkg/lightning/common/common_test.go b/br/pkg/lightning/common/common_test.go index ef9b74ab7486f..0fb69f58b203a 100644 --- a/br/pkg/lightning/common/common_test.go +++ b/br/pkg/lightning/common/common_test.go @@ -134,11 +134,11 @@ func TestAllocGlobalAutoID(t *testing.T) { ctx := context.Background() for _, c := range cases { ti := newTableInfo(t, 1, c.tableID, c.createTableSQL, kvStore) - allocators, err := common.GetGlobalAutoIDAlloc(kvStore, 1, ti) + allocators, err := common.GetGlobalAutoIDAlloc(mockRequirement{kvStore}, 1, ti) if c.expectErrStr == "" { require.NoError(t, err, c.tableID) - require.NoError(t, common.RebaseGlobalAutoID(ctx, 123, kvStore, 1, ti)) - base, idMax, err := common.AllocGlobalAutoID(ctx, 100, kvStore, 1, ti) + require.NoError(t, common.RebaseGlobalAutoID(ctx, 123, mockRequirement{kvStore}, 1, ti)) + base, idMax, err := common.AllocGlobalAutoID(ctx, 100, mockRequirement{kvStore}, 1, ti) require.NoError(t, err, c.tableID) require.Equal(t, int64(123), base, c.tableID) require.Equal(t, int64(223), idMax, c.tableID) @@ -159,3 +159,70 @@ func TestAllocGlobalAutoID(t *testing.T) { require.Equal(t, c.expectAllocatorTypes, allocatorTypes, c.tableID) } } + +type mockRequirement struct { + kv.Storage +} + +func (r mockRequirement) Store() kv.Storage { + return r.Storage +} + +func (r mockRequirement) AutoIDClient() *autoid.ClientDiscover { + return nil +} + +func TestRebaseTableAllocators(t *testing.T) { + storePath := t.TempDir() + kvStore, err := mockstore.NewMockStore(mockstore.WithPath(storePath)) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, kvStore.Close()) + }) + ti := newTableInfo(t, 1, 42, + "create table t42 (a int primary key nonclustered auto_increment) AUTO_ID_CACHE 1", kvStore) + allocators, err := common.GetGlobalAutoIDAlloc(mockRequirement{kvStore}, 1, ti) + require.NoError(t, err) + require.Len(t, allocators, 2) + for _, alloc := range allocators { + id, err := alloc.NextGlobalAutoID() + require.NoError(t, err) + require.Equal(t, int64(1), id) + } + ctx := context.Background() + allocatorTypes := make([]autoid.AllocatorType, 0, len(allocators)) + // rebase to 123 + for _, alloc := range allocators { + require.NoError(t, alloc.Rebase(ctx, 123, false)) + allocatorTypes = append(allocatorTypes, alloc.GetType()) + } + require.Equal(t, []autoid.AllocatorType{autoid.AutoIncrementType, autoid.RowIDAllocType}, allocatorTypes) + // this call does nothing + require.NoError(t, common.RebaseTableAllocators(ctx, nil, mockRequirement{kvStore}, 1, ti)) + for _, alloc := range allocators { + nextID, err := alloc.NextGlobalAutoID() + require.NoError(t, err) + require.Equal(t, int64(124), nextID) + } + // this call rebase AutoIncrementType allocator to 223 + require.NoError(t, common.RebaseTableAllocators(ctx, map[autoid.AllocatorType]int64{ + autoid.AutoIncrementType: 223, + }, mockRequirement{kvStore}, 1, ti)) + next, err := allocators[0].NextGlobalAutoID() + require.NoError(t, err) + require.Equal(t, int64(224), next) + next, err = allocators[1].NextGlobalAutoID() + require.NoError(t, err) + require.Equal(t, int64(124), next) + // this call rebase AutoIncrementType allocator to 323, RowIDAllocType allocator to 423 + require.NoError(t, common.RebaseTableAllocators(ctx, map[autoid.AllocatorType]int64{ + autoid.AutoIncrementType: 323, + autoid.RowIDAllocType: 423, + }, mockRequirement{kvStore}, 1, ti)) + next, err = allocators[0].NextGlobalAutoID() + require.NoError(t, err) + require.Equal(t, int64(324), next) + next, err = allocators[1].NextGlobalAutoID() + require.NoError(t, err) + require.Equal(t, int64(424), next) +} diff --git a/br/pkg/lightning/importer/BUILD.bazel b/br/pkg/lightning/importer/BUILD.bazel index a7f6a63212b7d..61d8117fdf324 100644 --- a/br/pkg/lightning/importer/BUILD.bazel +++ b/br/pkg/lightning/importer/BUILD.bazel @@ -64,6 +64,7 @@ go_library( "//util/collate", "//util/dbterror", "//util/engine", + "//util/etcd", "//util/mathutil", "//util/mock", "//util/regexpr-router", diff --git a/br/pkg/lightning/importer/import.go b/br/pkg/lightning/importer/import.go index 52e0db68bffee..0fbb7f74205c4 100644 --- a/br/pkg/lightning/importer/import.go +++ b/br/pkg/lightning/importer/import.go @@ -54,6 +54,7 @@ import ( "github.com/pingcap/tidb/br/pkg/version/build" tidbconfig "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/errno" + "github.com/pingcap/tidb/keyspace" tidbkv "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/parser" @@ -62,12 +63,14 @@ import ( "github.com/pingcap/tidb/store/driver" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/collate" + "github.com/pingcap/tidb/util/etcd" "github.com/pingcap/tidb/util/mathutil" regexprrouter "github.com/pingcap/tidb/util/regexpr-router" "github.com/pingcap/tidb/util/set" "github.com/prometheus/client_golang/prometheus" tikvconfig "github.com/tikv/client-go/v2/config" pd "github.com/tikv/pd/client" + clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/atomic" "go.uber.org/multierr" "go.uber.org/zap" @@ -1517,6 +1520,7 @@ func (rc *Controller) importTables(ctx context.Context) (finalErr error) { cleanup := false postProgress := func() error { return nil } var kvStore tidbkv.Storage + var etcdCli *clientv3.Client if isLocalBackend(rc.cfg) { var ( @@ -1570,6 +1574,16 @@ func (rc *Controller) importTables(ctx context.Context) (finalErr error) { if err != nil { return errors.Trace(err) } + etcdCli, err := clientv3.New(clientv3.Config{ + Endpoints: []string{rc.cfg.TiDB.PdAddr}, + AutoSyncInterval: 30 * time.Second, + TLS: rc.tls.TLSConfig(), + }) + if err != nil { + return errors.Trace(err) + } + etcd.SetEtcdCliByNamespace(etcdCli, keyspace.MakeKeyspaceEtcdNamespace(kvStore.GetCodec())) + manager, err := NewChecksumManager(ctx, rc, kvStore) if err != nil { return errors.Trace(err) @@ -1637,6 +1651,11 @@ func (rc *Controller) importTables(ctx context.Context) (finalErr error) { logTask.Warn("failed to close kv store", zap.Error(err)) } } + if etcdCli != nil { + if err := etcdCli.Close(); err != nil { + logTask.Warn("failed to close etcd client", zap.Error(err)) + } + } }() taskCh := make(chan task, rc.cfg.App.IndexConcurrency) @@ -1686,7 +1705,7 @@ func (rc *Controller) importTables(ctx context.Context) (finalErr error) { if err != nil { return errors.Trace(err) } - tr, err := NewTableImporter(tableName, tableMeta, dbInfo, tableInfo, cp, igCols.ColumnsMap(), kvStore, log.FromContext(ctx)) + tr, err := NewTableImporter(tableName, tableMeta, dbInfo, tableInfo, cp, igCols.ColumnsMap(), kvStore, etcdCli, log.FromContext(ctx)) if err != nil { return errors.Trace(err) } diff --git a/br/pkg/lightning/importer/import_test.go b/br/pkg/lightning/importer/import_test.go index 042b07c6071d9..c24e929753a70 100644 --- a/br/pkg/lightning/importer/import_test.go +++ b/br/pkg/lightning/importer/import_test.go @@ -72,7 +72,7 @@ func TestNewTableRestore(t *testing.T) { for _, tc := range testCases { tableInfo := dbInfo.Tables[tc.name] tableName := common.UniqueTable("mockdb", tableInfo.Name) - tr, err := NewTableImporter(tableName, nil, dbInfo, tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, log.L()) + tr, err := NewTableImporter(tableName, nil, dbInfo, tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, nil, log.L()) require.NotNil(t, tr) require.NoError(t, err) } @@ -89,7 +89,7 @@ func TestNewTableRestoreFailure(t *testing.T) { }} tableName := common.UniqueTable("mockdb", "failure") - _, err := NewTableImporter(tableName, nil, dbInfo, tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, log.L()) + _, err := NewTableImporter(tableName, nil, dbInfo, tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, nil, log.L()) require.Regexp(t, `failed to tables\.TableFromMeta.*`, err.Error()) } diff --git a/br/pkg/lightning/importer/meta_manager.go b/br/pkg/lightning/importer/meta_manager.go index fe128638a1318..fff80ddb09718 100644 --- a/br/pkg/lightning/importer/meta_manager.go +++ b/br/pkg/lightning/importer/meta_manager.go @@ -252,10 +252,10 @@ func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, rawRowIDMax int64 if curStatus == metaStatusInitial { if needAutoID { // maxRowIDMax is the max row_id that other tasks has allocated, we need to rebase the global autoid base first. - if err := common.RebaseGlobalAutoID(ctx, maxRowIDMax, m.tr.kvStore, m.tr.dbInfo.ID, m.tr.tableInfo.Core); err != nil { + if err := common.RebaseGlobalAutoID(ctx, maxRowIDMax, m.tr, m.tr.dbInfo.ID, m.tr.tableInfo.Core); err != nil { return errors.Trace(err) } - newRowIDBase, newRowIDMax, err = common.AllocGlobalAutoID(ctx, rawRowIDMax, m.tr.kvStore, m.tr.dbInfo.ID, m.tr.tableInfo.Core) + newRowIDBase, newRowIDMax, err = common.AllocGlobalAutoID(ctx, rawRowIDMax, m.tr, m.tr.dbInfo.ID, m.tr.tableInfo.Core) if err != nil { return errors.Trace(err) } diff --git a/br/pkg/lightning/importer/meta_manager_test.go b/br/pkg/lightning/importer/meta_manager_test.go index bbccb3ec5aead..db5850abb8d81 100644 --- a/br/pkg/lightning/importer/meta_manager_test.go +++ b/br/pkg/lightning/importer/meta_manager_test.go @@ -325,7 +325,7 @@ func (s *metaMgrSuite) prepareMockInner(rowsVal [][]driver.Value, nextRowID *int WillReturnRows(rows) if nextRowID != nil { - allocs := autoid.NewAllocatorsFromTblInfo(s.mgr.tr.kvStore, s.mgr.tr.dbInfo.ID, s.mgr.tr.tableInfo.Core) + allocs := autoid.NewAllocatorsFromTblInfo(s.mgr.tr, s.mgr.tr.dbInfo.ID, s.mgr.tr.tableInfo.Core) alloc := allocs.Get(autoid.RowIDAllocType) alloc.ForceRebase(*nextRowID - 1) } diff --git a/br/pkg/lightning/importer/table_import.go b/br/pkg/lightning/importer/table_import.go index c66961665dbd4..c1da1972b8c1f 100644 --- a/br/pkg/lightning/importer/table_import.go +++ b/br/pkg/lightning/importer/table_import.go @@ -51,6 +51,7 @@ import ( "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/mathutil" + clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/multierr" "go.uber.org/zap" "golang.org/x/exp/slices" @@ -69,6 +70,8 @@ type TableImporter struct { alloc autoid.Allocators logger log.Logger kvStore tidbkv.Storage + etcdCli *clientv3.Client + autoidCli *autoid.ClientDiscover ignoreColumns map[string]struct{} } @@ -82,6 +85,7 @@ func NewTableImporter( cp *checkpoints.TableCheckpoint, ignoreColumns map[string]struct{}, kvStore tidbkv.Storage, + etcdCli *clientv3.Client, logger log.Logger, ) (*TableImporter, error) { idAlloc := kv.NewPanickingAllocators(cp.AllocBase) @@ -89,6 +93,7 @@ func NewTableImporter( if err != nil { return nil, errors.Annotatef(err, "failed to tables.TableFromMeta %s", tableName) } + autoidCli := autoid.NewClientDiscover(etcdCli) return &TableImporter{ tableName: tableName, @@ -98,6 +103,8 @@ func NewTableImporter( encTable: tbl, alloc: idAlloc, kvStore: kvStore, + etcdCli: etcdCli, + autoidCli: autoidCli, logger: logger.With(zap.String("table", tableName)), ignoreColumns: ignoreColumns, }, nil @@ -268,6 +275,19 @@ func (tr *TableImporter) populateChunks(ctx context.Context, rc *Controller, cp return err } +// AutoIDRequirement implements autoid.Requirement. +var _ autoid.Requirement = &TableImporter{} + +// Store implements the autoid.Requirement interface. +func (tr *TableImporter) Store() tidbkv.Storage { + return tr.kvStore +} + +// AutoIDClient implements the autoid.Requirement interface. +func (tr *TableImporter) AutoIDClient() *autoid.ClientDiscover { + return tr.autoidCli +} + // RebaseChunkRowIDs rebase the row id of the chunks. func (*TableImporter) RebaseChunkRowIDs(cp *checkpoints.TableCheckpoint, rowIDBase int64) { if rowIDBase == 0 { @@ -898,7 +918,7 @@ func (tr *TableImporter) postProcess( // And in this case, ALTER TABLE xxx AUTO_INCREMENT = xxx only works on the allocator of auto_increment column, // not for allocator of _tidb_rowid. // So we need to rebase IDs for those 2 allocators explicitly. - err = common.RebaseGlobalAutoID(ctx, adjustIDBase(newBase), tr.kvStore, tr.dbInfo.ID, tr.tableInfo.Core) + err = common.RebaseGlobalAutoID(ctx, adjustIDBase(newBase), tr, tr.dbInfo.ID, tr.tableInfo.Core) } } rc.alterTableLock.Unlock() diff --git a/br/pkg/lightning/importer/table_import_test.go b/br/pkg/lightning/importer/table_import_test.go index fbce8d1e8fee9..fa0a24fde8f8d 100644 --- a/br/pkg/lightning/importer/table_import_test.go +++ b/br/pkg/lightning/importer/table_import_test.go @@ -211,7 +211,7 @@ func (s *tableRestoreSuiteBase) setupSuite(t *testing.T) { func (s *tableRestoreSuiteBase) setupTest(t *testing.T) { // Collect into the test TableImporter structure var err error - s.tr, err = NewTableImporter("`db`.`table`", s.tableMeta, s.dbInfo, s.tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, log.L()) + s.tr, err = NewTableImporter("`db`.`table`", s.tableMeta, s.dbInfo, s.tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, nil, log.L()) require.NoError(t, err) s.cfg = config.NewConfig() @@ -516,7 +516,7 @@ func (s *tableRestoreSuite) TestPopulateChunksCSVHeader() { cfg.Mydumper.StrictFormat = true rc := &Controller{cfg: cfg, ioWorkers: worker.NewPool(context.Background(), 1, "io"), store: store} - tr, err := NewTableImporter("`db`.`table`", tableMeta, s.dbInfo, s.tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, log.L()) + tr, err := NewTableImporter("`db`.`table`", tableMeta, s.dbInfo, s.tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, nil, log.L()) require.NoError(s.T(), err) require.NoError(s.T(), tr.populateChunks(context.Background(), rc, cp)) @@ -767,7 +767,7 @@ func (s *tableRestoreSuite) TestInitializeColumnsGenerated() { require.NoError(s.T(), err) core.State = model.StatePublic tableInfo := &checkpoints.TidbTableInfo{Name: "table", DB: "db", Core: core} - s.tr, err = NewTableImporter("`db`.`table`", s.tableMeta, s.dbInfo, tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, log.L()) + s.tr, err = NewTableImporter("`db`.`table`", s.tableMeta, s.dbInfo, tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, nil, log.L()) require.NoError(s.T(), err) ccp := &checkpoints.ChunkCheckpoint{} diff --git a/br/pkg/restore/db_test.go b/br/pkg/restore/db_test.go index 3a5416501e4df..8129ee4c513cc 100644 --- a/br/pkg/restore/db_test.go +++ b/br/pkg/restore/db_test.go @@ -78,7 +78,7 @@ func TestRestoreAutoIncID(t *testing.T) { DB: dbInfo, } // Get the next AutoIncID - idAlloc := autoid.NewAllocator(s.mock.Storage, dbInfo.ID, table.Info.ID, false, autoid.RowIDAllocType) + idAlloc := autoid.NewAllocator(s.mock.Domain, dbInfo.ID, table.Info.ID, false, autoid.RowIDAllocType) globalAutoID, err := idAlloc.NextGlobalAutoID() require.NoErrorf(t, err, "Error allocate next auto id") require.Equal(t, uint64(globalAutoID), autoIncID) @@ -376,7 +376,7 @@ func TestGetExistedUserDBs(t *testing.T) { dbs := restore.GetExistedUserDBs(dom) require.Equal(t, 0, len(dbs)) - builder, err := infoschema.NewBuilder(m.Store(), nil).InitWithDBInfos( + builder, err := infoschema.NewBuilder(dom, nil).InitWithDBInfos( []*model.DBInfo{ {Name: model.NewCIStr("mysql")}, {Name: model.NewCIStr("test")}, @@ -387,7 +387,7 @@ func TestGetExistedUserDBs(t *testing.T) { dbs = restore.GetExistedUserDBs(dom) require.Equal(t, 0, len(dbs)) - builder, err = infoschema.NewBuilder(m.Store(), nil).InitWithDBInfos( + builder, err = infoschema.NewBuilder(dom, nil).InitWithDBInfos( []*model.DBInfo{ {Name: model.NewCIStr("mysql")}, {Name: model.NewCIStr("test")}, @@ -399,7 +399,7 @@ func TestGetExistedUserDBs(t *testing.T) { dbs = restore.GetExistedUserDBs(dom) require.Equal(t, 1, len(dbs)) - builder, err = infoschema.NewBuilder(m.Store(), nil).InitWithDBInfos( + builder, err = infoschema.NewBuilder(dom, nil).InitWithDBInfos( []*model.DBInfo{ {Name: model.NewCIStr("mysql")}, {Name: model.NewCIStr("d1")}, diff --git a/ddl/column.go b/ddl/column.go index e3f15eb6423ca..f6d77f432951e 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -754,7 +754,7 @@ func (w *worker) doModifyColumnTypeWithData( job.SnapshotVer = 0 job.SchemaState = model.StateWriteReorganization case model.StateWriteReorganization: - tbl, err := getTable(d.store, dbInfo.ID, tblInfo) + tbl, err := getTable((*asAutoIDRequirement)(d), dbInfo.ID, tblInfo) if err != nil { return ver, errors.Trace(err) } @@ -1681,6 +1681,18 @@ func checkNewAutoRandomBits(idAccessors meta.AutoIDAccessors, oldCol *model.Colu return nil } +type asAutoIDRequirement ddlCtx + +var _ autoid.Requirement = &asAutoIDRequirement{} + +func (r *asAutoIDRequirement) Store() kv.Storage { + return r.store +} + +func (r *asAutoIDRequirement) AutoIDClient() *autoid.ClientDiscover { + return r.autoidCli +} + // applyNewAutoRandomBits set auto_random bits to TableInfo and // migrate auto_increment ID to auto_random ID if possible. func applyNewAutoRandomBits(d *ddlCtx, m *meta.Meta, dbInfo *model.DBInfo, @@ -1690,7 +1702,7 @@ func applyNewAutoRandomBits(d *ddlCtx, m *meta.Meta, dbInfo *model.DBInfo, if !needMigrateFromAutoIncToAutoRand { return nil } - autoRandAlloc := autoid.NewAllocatorsFromTblInfo(d.store, dbInfo.ID, tblInfo).Get(autoid.AutoRandomType) + autoRandAlloc := autoid.NewAllocatorsFromTblInfo((*asAutoIDRequirement)(d), dbInfo.ID, tblInfo).Get(autoid.AutoRandomType) if autoRandAlloc == nil { errMsg := fmt.Sprintf(autoid.AutoRandomAllocatorNotFound, dbInfo.Name.O, tblInfo.Name.O) return dbterror.ErrInvalidAutoRandom.GenWithStackByArgs(errMsg) diff --git a/ddl/ddl.go b/ddl/ddl.go index cf9919893a4ea..e02d62bc12c8f 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -45,6 +45,7 @@ import ( "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" + "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/owner" "github.com/pingcap/tidb/parser/ast" @@ -355,6 +356,7 @@ type ddlCtx struct { statsHandle *handle.Handle tableLockCkr util.DeadTableLockChecker etcdCli *clientv3.Client + autoidCli *autoid.ClientDiscover // backfillJobCh gets notification if any backfill jobs coming. backfillJobCh chan struct{} @@ -679,6 +681,7 @@ func newDDL(ctx context.Context, options ...Option) *ddl { infoCache: opt.InfoCache, tableLockCkr: deadLockCkr, etcdCli: opt.EtcdCli, + autoidCli: opt.AutoIDClient, schemaVersionManager: newSchemaVersionManager(), waitSchemaSyncedController: newWaitSchemaSyncedController(), runningJobIDs: make([]string, 0, jobRecordCapacity), diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index dc871d4586815..692ff5754884b 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -3020,7 +3020,7 @@ func checkCharsetAndCollation(cs string, co string) error { // handleAutoIncID handles auto_increment option in DDL. It creates a ID counter for the table and initiates the counter to a proper value. // For example if the option sets auto_increment to 10. The counter will be set to 9. So the next allocated ID will be 10. func (d *ddl) handleAutoIncID(tbInfo *model.TableInfo, schemaID int64, newEnd int64, tp autoid.AllocatorType) error { - allocs := autoid.NewAllocatorsFromTblInfo(d.store, schemaID, tbInfo) + allocs := autoid.NewAllocatorsFromTblInfo((*asAutoIDRequirement)(d.ddlCtx), schemaID, tbInfo) if alloc := allocs.Get(tp); alloc != nil { err := alloc.Rebase(context.Background(), newEnd, false) if err != nil { diff --git a/ddl/disttask_flow.go b/ddl/disttask_flow.go index bb783e9baa8bc..215f2ad3be473 100644 --- a/ddl/disttask_flow.go +++ b/ddl/disttask_flow.go @@ -84,7 +84,7 @@ func (h *litBackfillFlowHandle) ProcessNormalFlow(_ context.Context, _ dispatche return nil, nil default: } - tbl, err := getTable(d.store, job.SchemaID, tblInfo) + tbl, err := getTable((*asAutoIDRequirement)(d.ddlCtx), job.SchemaID, tblInfo) if err != nil { return nil, err } diff --git a/ddl/foreign_key_test.go b/ddl/foreign_key_test.go index a50fb2de5af99..5f661e8d3c429 100644 --- a/ddl/foreign_key_test.go +++ b/ddl/foreign_key_test.go @@ -134,7 +134,7 @@ func TestForeignKey(t *testing.T) { mu.Lock() defer mu.Unlock() var t table.Table - t, err = testGetTableWithError(store, dbInfo.ID, tblInfo.ID) + t, err = testGetTableWithError(dom, dbInfo.ID, tblInfo.ID) if err != nil { hookErr = errors.Trace(err) return @@ -176,7 +176,7 @@ func TestForeignKey(t *testing.T) { mu.Lock() defer mu.Unlock() var t table.Table - t, err = testGetTableWithError(store, dbInfo.ID, tblInfo.ID) + t, err = testGetTableWithError(dom, dbInfo.ID, tblInfo.ID) if err != nil { hookErr = errors.Trace(err) return diff --git a/ddl/index.go b/ddl/index.go index ca7cedb1a5264..fc51e9f774a01 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -671,7 +671,7 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo job.SchemaState = model.StateWriteReorganization case model.StateWriteReorganization: // reorganization -> public - tbl, err := getTable(d.store, schemaID, tblInfo) + tbl, err := getTable((*asAutoIDRequirement)(d), schemaID, tblInfo) if err != nil { return ver, errors.Trace(err) } diff --git a/ddl/job_table.go b/ddl/job_table.go index 353fe508a0aeb..5d1556eca692f 100644 --- a/ddl/job_table.go +++ b/ddl/job_table.go @@ -32,6 +32,7 @@ import ( "github.com/pingcap/tidb/ddl/syncer" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" + "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/owner" "github.com/pingcap/tidb/parser/model" @@ -463,10 +464,10 @@ func (*ddl) markJobProcessing(se *sess.Session, job *model.Job) error { return errors.Trace(err) } -func (d *ddl) getTableByTxn(store kv.Storage, schemaID, tableID int64) (*model.DBInfo, table.Table, error) { +func (d *ddl) getTableByTxn(r autoid.Requirement, schemaID, tableID int64) (*model.DBInfo, table.Table, error) { var tbl table.Table var dbInfo *model.DBInfo - err := kv.RunInNewTxn(d.ctx, store, false, func(ctx context.Context, txn kv.Transaction) error { + err := kv.RunInNewTxn(d.ctx, r.Store(), false, func(ctx context.Context, txn kv.Transaction) error { t := meta.NewMeta(txn) var err1 error dbInfo, err1 = t.GetDatabase(schemaID) @@ -477,7 +478,7 @@ func (d *ddl) getTableByTxn(store kv.Storage, schemaID, tableID int64) (*model.D if err1 != nil { return errors.Trace(err1) } - tbl, err1 = getTable(store, schemaID, tblInfo) + tbl, err1 = getTable(r, schemaID, tblInfo) return errors.Trace(err1) }) return dbInfo, tbl, err diff --git a/ddl/options.go b/ddl/options.go index e4c59e5d3b5b7..1355808e831a2 100644 --- a/ddl/options.go +++ b/ddl/options.go @@ -19,6 +19,7 @@ import ( "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/meta/autoid" clientv3 "go.etcd.io/etcd/client/v3" ) @@ -27,11 +28,12 @@ type Option func(*Options) // Options represents all the options of the DDL module needs type Options struct { - EtcdCli *clientv3.Client - Store kv.Storage - InfoCache *infoschema.InfoCache - Hook Callback - Lease time.Duration + EtcdCli *clientv3.Client + Store kv.Storage + AutoIDClient *autoid.ClientDiscover + InfoCache *infoschema.InfoCache + Hook Callback + Lease time.Duration } // WithEtcdClient specifies the `clientv3.Client` of DDL used to request the etcd service @@ -55,6 +57,13 @@ func WithInfoCache(ic *infoschema.InfoCache) Option { } } +// WithAutoIDClient specifies the autoid client used by the autoid service for those AUTO_ID_CACHE=1 tables. +func WithAutoIDClient(cli *autoid.ClientDiscover) Option { + return func(options *Options) { + options.AutoIDClient = cli + } +} + // WithHook specifies the `Callback` of DDL used to notify the outer module when events are triggered func WithHook(callback Callback) Option { return func(options *Options) { diff --git a/ddl/partition.go b/ddl/partition.go index 5e4522ed843ce..3c503de5012ef 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -1899,7 +1899,7 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) ( case model.StateDeleteReorganization: oldTblInfo := getTableInfoWithDroppingPartitions(tblInfo) physicalTableIDs = getPartitionIDsFromDefinitions(tblInfo.Partition.DroppingDefinitions) - tbl, err := getTable(d.store, job.SchemaID, oldTblInfo) + tbl, err := getTable((*asAutoIDRequirement)(d), job.SchemaID, oldTblInfo) if err != nil { return ver, errors.Trace(err) } @@ -2598,7 +2598,7 @@ func (w *worker) onReorganizePartition(d *ddlCtx, t *meta.Meta, job *model.Job) ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != job.SchemaState) case model.StateWriteReorganization: physicalTableIDs := getPartitionIDsFromDefinitions(tblInfo.Partition.DroppingDefinitions) - tbl, err2 := getTable(d.store, job.SchemaID, tblInfo) + tbl, err2 := getTable((*asAutoIDRequirement)(d), job.SchemaID, tblInfo) if err2 != nil { return ver, errors.Trace(err2) } diff --git a/ddl/placement_policy_ddl_test.go b/ddl/placement_policy_ddl_test.go index 0c0793e711d8e..cb03fe0c2a7f7 100644 --- a/ddl/placement_policy_ddl_test.go +++ b/ddl/placement_policy_ddl_test.go @@ -118,7 +118,7 @@ func TestPlacementPolicyInUse(t *testing.T) { t4.State = model.StatePublic db1.Tables = append(db1.Tables, t4) - builder, err := infoschema.NewBuilder(store, nil).InitWithDBInfos( + builder, err := infoschema.NewBuilder(dom, nil).InitWithDBInfos( []*model.DBInfo{db1, db2, dbP}, []*model.PolicyInfo{p1, p2, p3, p4, p5}, nil, diff --git a/ddl/scheduler.go b/ddl/scheduler.go index aa4d04e7cb24e..a3ae9ca385224 100644 --- a/ddl/scheduler.go +++ b/ddl/scheduler.go @@ -93,7 +93,7 @@ func NewBackfillSchedulerHandle(taskMeta []byte, d *ddl, stepForImport bool) (sc jobMeta := &bgm.Job bh.job = jobMeta - db, tbl, err := d.getTableByTxn(d.store, jobMeta.SchemaID, jobMeta.TableID) + db, tbl, err := d.getTableByTxn((*asAutoIDRequirement)(d.ddlCtx), jobMeta.SchemaID, jobMeta.TableID) if err != nil { return nil, err } diff --git a/ddl/table.go b/ddl/table.go index 8147e57e0ac1a..bbc2599ceae81 100644 --- a/ddl/table.go +++ b/ddl/table.go @@ -621,8 +621,8 @@ func checkSafePoint(w *worker, snapshotTS uint64) error { return gcutil.ValidateSnapshot(ctx, snapshotTS) } -func getTable(store kv.Storage, schemaID int64, tblInfo *model.TableInfo) (table.Table, error) { - allocs := autoid.NewAllocatorsFromTblInfo(store, schemaID, tblInfo) +func getTable(r autoid.Requirement, schemaID int64, tblInfo *model.TableInfo) (table.Table, error) { + allocs := autoid.NewAllocatorsFromTblInfo(r, schemaID, tblInfo) tbl, err := table.TableFromMeta(allocs, tblInfo) return tbl, errors.Trace(err) } @@ -834,7 +834,7 @@ func onRebaseAutoRandomType(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, return onRebaseAutoID(d, d.store, t, job, autoid.AutoRandomType) } -func onRebaseAutoID(d *ddlCtx, store kv.Storage, t *meta.Meta, job *model.Job, tp autoid.AllocatorType) (ver int64, _ error) { +func onRebaseAutoID(d *ddlCtx, _ kv.Storage, t *meta.Meta, job *model.Job, tp autoid.AllocatorType) (ver int64, _ error) { schemaID := job.SchemaID var ( newBase int64 @@ -857,7 +857,7 @@ func onRebaseAutoID(d *ddlCtx, store kv.Storage, t *meta.Meta, job *model.Job, t return ver, errors.Trace(err) } - tbl, err := getTable(store, schemaID, tblInfo) + tbl, err := getTable((*asAutoIDRequirement)(d), schemaID, tblInfo) if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) @@ -939,7 +939,7 @@ func (w *worker) onShardRowID(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int6 if shardRowIDBits < tblInfo.ShardRowIDBits { tblInfo.ShardRowIDBits = shardRowIDBits } else { - tbl, err := getTable(d.store, job.SchemaID, tblInfo) + tbl, err := getTable((*asAutoIDRequirement)(d), job.SchemaID, tblInfo) if err != nil { return ver, errors.Trace(err) } diff --git a/ddl/table_test.go b/ddl/table_test.go index dce990b07828c..7d150cbc86d9f 100644 --- a/ddl/table_test.go +++ b/ddl/table_test.go @@ -143,10 +143,10 @@ func testTruncateTable(t *testing.T, ctx sessionctx.Context, store kv.Storage, d return job } -func testGetTableWithError(store kv.Storage, schemaID, tableID int64) (table.Table, error) { +func testGetTableWithError(r autoid.Requirement, schemaID, tableID int64) (table.Table, error) { var tblInfo *model.TableInfo ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL) - err := kv.RunInNewTxn(ctx, store, false, func(ctx context.Context, txn kv.Transaction) error { + err := kv.RunInNewTxn(ctx, r.Store(), false, func(ctx context.Context, txn kv.Transaction) error { t := meta.NewMeta(txn) var err1 error tblInfo, err1 = t.GetTable(schemaID, tableID) @@ -161,7 +161,7 @@ func testGetTableWithError(store kv.Storage, schemaID, tableID int64) (table.Tab if tblInfo == nil { return nil, errors.New("table not found") } - alloc := autoid.NewAllocator(store, schemaID, tblInfo.ID, false, autoid.RowIDAllocType) + alloc := autoid.NewAllocator(r, schemaID, tblInfo.ID, false, autoid.RowIDAllocType) tbl, err := table.TableFromMeta(autoid.NewAllocators(false, alloc), tblInfo) if err != nil { return nil, errors.Trace(err) diff --git a/domain/BUILD.bazel b/domain/BUILD.bazel index d1001013a1953..af65cb71b4dd1 100644 --- a/domain/BUILD.bazel +++ b/domain/BUILD.bazel @@ -40,6 +40,7 @@ go_library( "//keyspace", "//kv", "//meta", + "//meta/autoid", "//metrics", "//owner", "//parser/ast", diff --git a/domain/domain.go b/domain/domain.go index 59372d5967548..d7ce5b07632d3 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -52,6 +52,7 @@ import ( "github.com/pingcap/tidb/keyspace" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" + "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/owner" "github.com/pingcap/tidb/parser/ast" @@ -135,6 +136,8 @@ type Domain struct { exit chan struct{} // `etcdClient` must be used when keyspace is not set, or when the logic to each etcd path needs to be separated by keyspace. etcdClient *clientv3.Client + // autoidClient is used when there are tables with AUTO_ID_CACHE=1, it is the client to the autoid service. + autoidClient *autoid.ClientDiscover // `unprefixedEtcdCli` will never set the etcd namespace prefix by keyspace. // It is only used in storeMinStartTS and RemoveMinStartTS now. // It must be used when the etcd path isn't needed to separate by keyspace. @@ -270,7 +273,7 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i return nil, false, currentSchemaVersion, nil, err } - newISBuilder, err := infoschema.NewBuilder(do.Store(), do.sysFacHack).InitWithDBInfos(schemas, policies, resourceGroups, neededSchemaVersion) + newISBuilder, err := infoschema.NewBuilder(do, do.sysFacHack).InitWithDBInfos(schemas, policies, resourceGroups, neededSchemaVersion) if err != nil { return nil, false, currentSchemaVersion, nil, err } @@ -418,7 +421,7 @@ func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64 } diffs = append(diffs, diff) } - builder := infoschema.NewBuilder(do.Store(), do.sysFacHack).InitWithOldInfoSchema(do.infoCache.GetLatest()) + builder := infoschema.NewBuilder(do, do.sysFacHack).InitWithOldInfoSchema(do.infoCache.GetLatest()) builder.SetDeltaUpdateBundles() phyTblIDs := make([]int64, 0, len(diffs)) actions := make([]uint64, 0, len(diffs)) @@ -1091,6 +1094,9 @@ func (do *Domain) Init( etcd.SetEtcdCliByNamespace(cli, keyspace.MakeKeyspaceEtcdNamespace(do.store.GetCodec())) do.etcdClient = cli + do.autoidClient = autoid.NewClientDiscover(cli) + + do.autoidClient = autoid.NewClientDiscover(cli) unprefixedEtcdCli, err := newEtcdCli(addrs, ebd) if err != nil { @@ -1124,6 +1130,7 @@ func (do *Domain) Init( ctx, ddl.WithEtcdClient(do.etcdClient), ddl.WithStore(do.store), + ddl.WithAutoIDClient(do.autoidClient), ddl.WithInfoCache(do.infoCache), ddl.WithHook(callback), ddl.WithLease(ddlLease), @@ -1556,6 +1563,11 @@ func (do *Domain) GetEtcdClient() *clientv3.Client { return do.etcdClient } +// AutoIDClient returns the autoid client. +func (do *Domain) AutoIDClient() *autoid.ClientDiscover { + return do.autoidClient +} + // GetPDClient returns the PD client. func (do *Domain) GetPDClient() pd.Client { if store, ok := do.store.(kv.StorageWithPD); ok { @@ -1842,7 +1854,7 @@ func (do *Domain) handleEvolvePlanTasksLoop(ctx sessionctx.Context, owner owner. // in BootstrapSession. func (do *Domain) TelemetryReportLoop(ctx sessionctx.Context) { ctx.GetSessionVars().InRestrictedSQL = true - err := telemetry.InitialRun(ctx, do.GetEtcdClient()) + err := telemetry.InitialRun(ctx, do.etcdClient) if err != nil { logutil.BgLogger().Warn("Initial telemetry run failed", zap.Error(err)) } @@ -1863,7 +1875,7 @@ func (do *Domain) TelemetryReportLoop(ctx sessionctx.Context) { if !owner.IsOwner() { continue } - err := telemetry.ReportUsageData(ctx, do.GetEtcdClient()) + err := telemetry.ReportUsageData(ctx, do.etcdClient) if err != nil { // Only status update errors will be printed out logutil.BgLogger().Warn("TelemetryReportLoop status update failed", zap.Error(err)) diff --git a/executor/executor_test.go b/executor/executor_test.go index de43d423c39a6..10b4f30b3ef96 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -2003,7 +2003,7 @@ func TestCheckIndex(t *testing.T) { require.NoError(t, err) tbInfo := tbl.Meta() - alloc := autoid.NewAllocator(store, dbInfo.ID, tbInfo.ID, false, autoid.RowIDAllocType) + alloc := autoid.NewAllocator(dom, dbInfo.ID, tbInfo.ID, false, autoid.RowIDAllocType) tb, err := tables.TableFromMeta(autoid.NewAllocators(false, alloc), tbInfo) require.NoError(t, err) diff --git a/executor/importer/BUILD.bazel b/executor/importer/BUILD.bazel index 7e937232b0cff..5a840ecbeb329 100644 --- a/executor/importer/BUILD.bazel +++ b/executor/importer/BUILD.bazel @@ -28,6 +28,7 @@ go_library( "//config", "//executor/asyncloaddata", "//expression", + "//keyspace", "//kv", "//meta/autoid", "//parser/ast", @@ -46,6 +47,7 @@ go_library( "//util/chunk", "//util/dbterror", "//util/dbterror/exeerrors", + "//util/etcd", "//util/filter", "//util/intest", "//util/logutil", @@ -59,6 +61,7 @@ go_library( "@com_github_pingcap_log//:log", "@com_github_tikv_client_go_v2//config", "@com_github_tikv_client_go_v2//tikv", + "@io_etcd_go_etcd_client_v3//:client", "@org_golang_x_sync//errgroup", "@org_uber_go_multierr//:multierr", "@org_uber_go_zap//:zap", diff --git a/executor/importer/table_import.go b/executor/importer/table_import.go index 637be6e9905fa..b750657a4421e 100644 --- a/executor/importer/table_import.go +++ b/executor/importer/table_import.go @@ -39,11 +39,15 @@ import ( verify "github.com/pingcap/tidb/br/pkg/lightning/verification" "github.com/pingcap/tidb/br/pkg/storage" tidb "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/keyspace" tidbkv "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/util" + "github.com/pingcap/tidb/util/etcd" + clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/multierr" "go.uber.org/zap" ) @@ -369,11 +373,34 @@ func (ti *TableImporter) PopulateChunks(ctx context.Context) (map[int32]*checkpo } if common.TableHasAutoID(ti.tableInfo.Core) { + tidbCfg := tidb.GetGlobalConfig() + clusterSecurity := tidbCfg.Security.ClusterSecurity() + tlsConfig, err := clusterSecurity.ToTLSConfig() + if err != nil { + return nil, errors.Trace(err) + } + etcdCli, err := clientv3.New(clientv3.Config{ + Endpoints: []string{tidbCfg.Path}, + AutoSyncInterval: 30 * time.Second, + TLS: tlsConfig, + }) + if err != nil { + return nil, errors.Trace(err) + } + etcd.SetEtcdCliByNamespace(etcdCli, keyspace.MakeKeyspaceEtcdNamespace(ti.kvStore.GetCodec())) + defer func() { + if err := etcdCli.Close(); err != nil { + ti.logger.Error("close etcd client error", zap.Error(err)) + } + }() + autoidCli := autoid.NewClientDiscover(etcdCli) + + r := &asAutoIDRequirement{ti.kvStore, autoidCli} // todo: the new base should be the max row id of the last Node if we support distributed import. - if err = common.RebaseGlobalAutoID(ctx, 0, ti.kvStore, ti.dbID, ti.tableInfo.Core); err != nil { + if err = common.RebaseGlobalAutoID(ctx, 0, r, ti.dbID, ti.tableInfo.Core); err != nil { return nil, errors.Trace(err) } - newMinRowID, _, err := common.AllocGlobalAutoID(ctx, maxRowID, ti.kvStore, ti.dbID, ti.tableInfo.Core) + newMinRowID, _, err := common.AllocGlobalAutoID(ctx, maxRowID, r, ti.dbID, ti.tableInfo.Core) if err != nil { return nil, errors.Trace(err) } @@ -385,6 +412,21 @@ func (ti *TableImporter) PopulateChunks(ctx context.Context) (map[int32]*checkpo return ti.tableCp.Engines, nil } +type asAutoIDRequirement struct { + kvStore tidbkv.Storage + autoidCli *autoid.ClientDiscover +} + +var _ autoid.Requirement = &asAutoIDRequirement{} + +func (r *asAutoIDRequirement) Store() tidbkv.Storage { + return r.kvStore +} + +func (r *asAutoIDRequirement) AutoIDClient() *autoid.ClientDiscover { + return r.autoidCli +} + func (ti *TableImporter) rebaseChunkRowID(rowIDBase int64) { if rowIDBase == 0 { return diff --git a/infoschema/builder.go b/infoschema/builder.go index dc41c1d0d1d96..daf5acf3e4da6 100644 --- a/infoschema/builder.go +++ b/infoschema/builder.go @@ -182,9 +182,9 @@ type Builder struct { // This map will indicate which DB has been copied, so that they // don't need to be copied again. dirtyDB map[string]bool - // TODO: store is only used by autoid allocators - // detach allocators from storage, use passed transaction in the feature - store kv.Storage + + // Used by autoid allocators + autoid.Requirement factory func() (pools.Resource, error) bundleInfoBuilder @@ -358,7 +358,7 @@ func (b *Builder) applyExchangeTablePartition(m *meta.Meta, diff *model.SchemaDi } // ntID is the new id for the partition! b.markPartitionBundleShouldUpdate(ntID) - err = updateAutoIDForExchangePartition(b.store, ptSchemaID, ptID, ntSchemaID, ntID) + err = updateAutoIDForExchangePartition(b.Requirement.Store(), ptSchemaID, ptID, ntSchemaID, ntID) if err != nil { return nil, errors.Trace(err) } @@ -793,7 +793,7 @@ func (b *Builder) applyCreateTable(m *meta.Meta, dbInfo *model.DBInfo, tableID i ConvertOldVersionUTF8ToUTF8MB4IfNeed(tblInfo) if len(allocs.Allocs) == 0 { - allocs = autoid.NewAllocatorsFromTblInfo(b.store, dbInfo.ID, tblInfo) + allocs = autoid.NewAllocatorsFromTblInfo(b.Requirement, dbInfo.ID, tblInfo) } else { tblVer := autoid.AllocOptionTableInfoVersion(tblInfo.Version) switch tp { @@ -803,11 +803,11 @@ func (b *Builder) applyCreateTable(m *meta.Meta, dbInfo *model.DBInfo, tableID i // and RowIDAllocType allocator for it. Because auto id and row id could share the same allocator. // Allocate auto id may route to allocate row id, if row id allocator is nil, the program panic! for _, tp := range [2]autoid.AllocatorType{autoid.AutoIncrementType, autoid.RowIDAllocType} { - newAlloc := autoid.NewAllocator(b.store, dbInfo.ID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(), tp, tblVer, idCacheOpt) + newAlloc := autoid.NewAllocator(b.Requirement, dbInfo.ID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(), tp, tblVer, idCacheOpt) allocs = allocs.Append(newAlloc) } case model.ActionRebaseAutoRandomBase: - newAlloc := autoid.NewAllocator(b.store, dbInfo.ID, tblInfo.ID, tblInfo.IsAutoRandomBitColUnsigned(), autoid.AutoRandomType, tblVer) + newAlloc := autoid.NewAllocator(b.Requirement, dbInfo.ID, tblInfo.ID, tblInfo.IsAutoRandomBitColUnsigned(), autoid.AutoRandomType, tblVer) allocs = allocs.Append(newAlloc) case model.ActionModifyColumn: // Change column attribute from auto_increment to auto_random. @@ -816,7 +816,7 @@ func (b *Builder) applyCreateTable(m *meta.Meta, dbInfo *model.DBInfo, tableID i allocs = allocs.Filter(func(a autoid.Allocator) bool { return a.GetType() != autoid.AutoIncrementType && a.GetType() != autoid.RowIDAllocType }) - newAlloc := autoid.NewAllocator(b.store, dbInfo.ID, tblInfo.ID, tblInfo.IsAutoRandomBitColUnsigned(), autoid.AutoRandomType, tblVer) + newAlloc := autoid.NewAllocator(b.Requirement, dbInfo.ID, tblInfo.ID, tblInfo.IsAutoRandomBitColUnsigned(), autoid.AutoRandomType, tblVer) allocs = allocs.Append(newAlloc) } } @@ -1078,7 +1078,7 @@ func (b *Builder) createSchemaTablesForDB(di *model.DBInfo, tableFromMeta tableF b.is.schemaMap[di.Name.L] = schTbls for _, t := range di.Tables { - allocs := autoid.NewAllocatorsFromTblInfo(b.store, di.ID, t) + allocs := autoid.NewAllocatorsFromTblInfo(b.Requirement, di.ID, t) var tbl table.Table tbl, err := tableFromMeta(allocs, t) if err != nil { @@ -1114,9 +1114,9 @@ func RegisterVirtualTable(dbInfo *model.DBInfo, tableFromMeta tableFromMetaFunc) } // NewBuilder creates a new Builder with a Handle. -func NewBuilder(store kv.Storage, factory func() (pools.Resource, error)) *Builder { +func NewBuilder(r autoid.Requirement, factory func() (pools.Resource, error)) *Builder { return &Builder{ - store: store, + Requirement: r, is: &infoSchema{ schemaMap: map[string]*schemaTables{}, policyMap: map[string]*model.PolicyInfo{}, diff --git a/infoschema/infoschema_test.go b/infoschema/infoschema_test.go index 5fd8058562ebd..4c14de760043e 100644 --- a/infoschema/infoschema_test.go +++ b/infoschema/infoschema_test.go @@ -110,7 +110,7 @@ func TestBasic(t *testing.T) { }) require.NoError(t, err) - builder, err := infoschema.NewBuilder(dom.Store(), nil).InitWithDBInfos(dbInfos, nil, nil, 1) + builder, err := infoschema.NewBuilder(dom, nil).InitWithDBInfos(dbInfos, nil, nil, 1) require.NoError(t, err) txn, err := store.Begin() @@ -256,7 +256,7 @@ func TestInfoTables(t *testing.T) { require.NoError(t, err) }() - builder, err := infoschema.NewBuilder(store, nil).InitWithDBInfos(nil, nil, nil, 0) + builder, err := infoschema.NewBuilder(mockRequirement{store}, nil).InitWithDBInfos(nil, nil, nil, 0) require.NoError(t, err) is := builder.Build() @@ -333,7 +333,7 @@ func TestBuildSchemaWithGlobalTemporaryTable(t *testing.T) { err := kv.RunInNewTxn(ctx, store, true, func(ctx context.Context, txn kv.Transaction) error { m := meta.NewMeta(txn) for _, change := range changes { - builder := infoschema.NewBuilder(store, nil).InitWithOldInfoSchema(curIs) + builder := infoschema.NewBuilder(dom, nil).InitWithOldInfoSchema(curIs) change(m, builder) curIs = builder.Build() } @@ -411,7 +411,7 @@ func TestBuildSchemaWithGlobalTemporaryTable(t *testing.T) { // full load newDB, ok := newIS.SchemaByName(model.NewCIStr("test")) require.True(t, ok) - builder, err := infoschema.NewBuilder(store, nil).InitWithDBInfos([]*model.DBInfo{newDB}, newIS.AllPlacementPolicies(), newIS.AllResourceGroups(), newIS.SchemaMetaVersion()) + builder, err := infoschema.NewBuilder(dom, nil).InitWithDBInfos([]*model.DBInfo{newDB}, newIS.AllPlacementPolicies(), newIS.AllResourceGroups(), newIS.SchemaMetaVersion()) require.NoError(t, err) require.True(t, builder.Build().HasTemporaryTable()) @@ -536,7 +536,7 @@ func TestBuildBundle(t *testing.T) { assertBundle(is, tbl2.Meta().ID, nil) assertBundle(is, p1.ID, p1Bundle) - builder, err := infoschema.NewBuilder(store, nil).InitWithDBInfos([]*model.DBInfo{db}, is.AllPlacementPolicies(), is.AllResourceGroups(), is.SchemaMetaVersion()) + builder, err := infoschema.NewBuilder(mockRequirement{store}, nil).InitWithDBInfos([]*model.DBInfo{db}, is.AllPlacementPolicies(), is.AllResourceGroups(), is.SchemaMetaVersion()) require.NoError(t, err) is2 := builder.Build() assertBundle(is2, tbl1.Meta().ID, tb1Bundle) @@ -544,6 +544,18 @@ func TestBuildBundle(t *testing.T) { assertBundle(is2, p1.ID, p1Bundle) } +type mockRequirement struct { + kv.Storage +} + +func (r mockRequirement) Store() kv.Storage { + return r.Storage +} + +func (r mockRequirement) AutoIDClient() *autoid.ClientDiscover { + return nil +} + func TestLocalTemporaryTables(t *testing.T) { store, err := mockstore.NewMockStore() require.NoError(t, err) @@ -585,7 +597,7 @@ func TestLocalTemporaryTables(t *testing.T) { State: model.StatePublic, } - allocs := autoid.NewAllocatorsFromTblInfo(store, schemaID, tblInfo) + allocs := autoid.NewAllocatorsFromTblInfo(mockRequirement{store}, schemaID, tblInfo) tbl, err := table.TableFromMeta(allocs, tblInfo) require.NoError(t, err) diff --git a/meta/autoid/autoid.go b/meta/autoid/autoid.go index c8227f2d6348b..6db418149492b 100644 --- a/meta/autoid/autoid.go +++ b/meta/autoid/autoid.go @@ -39,7 +39,6 @@ import ( "github.com/pingcap/tidb/util/tracing" "github.com/tikv/client-go/v2/txnkv/txnsnapshot" tikvutil "github.com/tikv/client-go/v2/util" - clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" ) @@ -567,37 +566,18 @@ func NextStep(curStep int64, consumeDur time.Duration) int64 { // package circle depending issue. var MockForTest func(kv.Storage) autoid.AutoIDAllocClient -func newSinglePointAlloc(store kv.Storage, dbID, tblID int64, isUnsigned bool) *singlePointAlloc { - ebd, ok := store.(kv.EtcdBackend) - if !ok { - // newSinglePointAlloc fail because not etcd background - // This could happen in the server package unit test - return nil - } - - addrs, err := ebd.EtcdAddrs() - if err != nil { - panic(err) - } +func newSinglePointAlloc(r Requirement, dbID, tblID int64, isUnsigned bool) *singlePointAlloc { spa := &singlePointAlloc{ dbID: dbID, tblID: tblID, isUnsigned: isUnsigned, } - if len(addrs) > 0 { - etcdCli, err := clientv3.New(clientv3.Config{ - Endpoints: addrs, - TLS: ebd.TLSConfig(), - AutoSyncInterval: 30 * time.Second, - }) - if err != nil { - logutil.BgLogger().Error("[autoid client] fail to connect etcd, fallback to default", zap.Error(err)) - return nil - } - spa.clientDiscover = clientDiscover{etcdCli: etcdCli} + if r.AutoIDClient() == nil { + // Only for test in mockstore + spa.ClientDiscover = &ClientDiscover{} + spa.mu.AutoIDAllocClient = MockForTest(r.Store()) } else { - spa.clientDiscover = clientDiscover{} - spa.mu.AutoIDAllocClient = MockForTest(store) + spa.ClientDiscover = r.AutoIDClient() } // mockAutoIDChange failpoint is not implemented in this allocator, so fallback to use the default one. @@ -609,9 +589,19 @@ func newSinglePointAlloc(store kv.Storage, dbID, tblID int64, isUnsigned bool) * return spa } +// Requirement is the parameter required by NewAllocator +type Requirement interface { + Store() kv.Storage + AutoIDClient() *ClientDiscover +} + // NewAllocator returns a new auto increment id generator on the store. -func NewAllocator(store kv.Storage, dbID, tbID int64, isUnsigned bool, +func NewAllocator(r Requirement, dbID, tbID int64, isUnsigned bool, allocType AllocatorType, opts ...AllocOption) Allocator { + var store kv.Storage + if r != nil { + store = r.Store() + } alloc := &allocator{ store: store, dbID: dbID, @@ -628,7 +618,7 @@ func NewAllocator(store kv.Storage, dbID, tbID int64, isUnsigned bool, // Use the MySQL compatible AUTO_INCREMENT mode. if alloc.customStep && alloc.step == 1 && alloc.tbVersion >= model.TableInfoVersion5 { if allocType == AutoIncrementType { - alloc1 := newSinglePointAlloc(store, dbID, tbID, isUnsigned) + alloc1 := newSinglePointAlloc(r, dbID, tbID, isUnsigned) if alloc1 != nil { return alloc1 } @@ -658,7 +648,7 @@ func NewSequenceAllocator(store kv.Storage, dbID, tbID int64, info *model.Sequen } // NewAllocatorsFromTblInfo creates an array of allocators of different types with the information of model.TableInfo. -func NewAllocatorsFromTblInfo(store kv.Storage, schemaID int64, tblInfo *model.TableInfo) Allocators { +func NewAllocatorsFromTblInfo(r Requirement, schemaID int64, tblInfo *model.TableInfo) Allocators { var allocs []Allocator dbID := tblInfo.GetDBID(schemaID) idCacheOpt := CustomAutoIncCacheOption(tblInfo.AutoIdCache) @@ -667,20 +657,20 @@ func NewAllocatorsFromTblInfo(store kv.Storage, schemaID int64, tblInfo *model.T hasRowID := !tblInfo.PKIsHandle && !tblInfo.IsCommonHandle hasAutoIncID := tblInfo.GetAutoIncrementColInfo() != nil if hasRowID || hasAutoIncID { - alloc := NewAllocator(store, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(), RowIDAllocType, idCacheOpt, tblVer) + alloc := NewAllocator(r, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(), RowIDAllocType, idCacheOpt, tblVer) allocs = append(allocs, alloc) } if hasAutoIncID { - alloc := NewAllocator(store, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(), AutoIncrementType, idCacheOpt, tblVer) + alloc := NewAllocator(r, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(), AutoIncrementType, idCacheOpt, tblVer) allocs = append(allocs, alloc) } hasAutoRandID := tblInfo.ContainsAutoRandomBits() if hasAutoRandID { - alloc := NewAllocator(store, dbID, tblInfo.ID, tblInfo.IsAutoRandomBitColUnsigned(), AutoRandomType, idCacheOpt, tblVer) + alloc := NewAllocator(r, dbID, tblInfo.ID, tblInfo.IsAutoRandomBitColUnsigned(), AutoRandomType, idCacheOpt, tblVer) allocs = append(allocs, alloc) } if tblInfo.IsSequence() { - allocs = append(allocs, NewSequenceAllocator(store, dbID, tblInfo.ID, tblInfo.Sequence)) + allocs = append(allocs, NewSequenceAllocator(r.Store(), dbID, tblInfo.ID, tblInfo.Sequence)) } return NewAllocators(tblInfo.SepAutoInc(), allocs...) } diff --git a/meta/autoid/autoid_service.go b/meta/autoid/autoid_service.go index bdb7725be9451..682317b32e3de 100644 --- a/meta/autoid/autoid_service.go +++ b/meta/autoid/autoid_service.go @@ -40,10 +40,11 @@ type singlePointAlloc struct { tblID int64 lastAllocated int64 isUnsigned bool - clientDiscover + *ClientDiscover } -type clientDiscover struct { +// ClientDiscover is used to get the AutoIDAllocClient, it creates the grpc connection with autoid service leader. +type ClientDiscover struct { // This the etcd client for service discover etcdCli *clientv3.Client // This is the real client for the AutoIDAlloc service @@ -60,7 +61,15 @@ const ( autoIDLeaderPath = "tidb/autoid/leader" ) -func (d *clientDiscover) GetClient(ctx context.Context) (autoid.AutoIDAllocClient, error) { +// NewClientDiscover creates a ClientDiscover object. +func NewClientDiscover(etcdCli *clientv3.Client) *ClientDiscover { + return &ClientDiscover{ + etcdCli: etcdCli, + } +} + +// GetClient gets the AutoIDAllocClient. +func (d *ClientDiscover) GetClient(ctx context.Context) (autoid.AutoIDAllocClient, error) { d.mu.RLock() cli := d.mu.AutoIDAllocClient if cli != nil { @@ -138,7 +147,7 @@ retry: if err != nil { if strings.Contains(err.Error(), "rpc error") { time.Sleep(backoffDuration) - sp.resetConn(err) + sp.ResetConn(err) goto retry } return 0, 0, errors.Trace(err) @@ -155,15 +164,19 @@ retry: const backoffDuration = 200 * time.Millisecond -func (sp *singlePointAlloc) resetConn(reason error) { - logutil.BgLogger().Info("[autoid client] reset grpc connection", - zap.String("reason", reason.Error())) +// ResetConn reset the AutoIDAllocClient and underlying grpc connection. +// The next GetClient() call will recreate the client connecting to the correct leader by querying etcd. +func (d *ClientDiscover) ResetConn(reason error) { + if reason != nil { + logutil.BgLogger().Info("reset grpc connection", zap.String("category", "autoid client"), + zap.String("reason", reason.Error())) + } var grpcConn *grpc.ClientConn - sp.mu.Lock() - grpcConn = sp.mu.ClientConn - sp.mu.AutoIDAllocClient = nil - sp.mu.ClientConn = nil - sp.mu.Unlock() + d.mu.Lock() + grpcConn = d.mu.ClientConn + d.mu.AutoIDAllocClient = nil + d.mu.ClientConn = nil + d.mu.Unlock() // Close grpc.ClientConn to release resource. if grpcConn != nil { err := grpcConn.Close() @@ -210,7 +223,7 @@ retry: if err != nil { if strings.Contains(err.Error(), "rpc error") { time.Sleep(backoffDuration) - sp.resetConn(err) + sp.ResetConn(err) goto retry } return errors.Trace(err) diff --git a/meta/autoid/autoid_test.go b/meta/autoid/autoid_test.go index 912b1e173bc8c..dc69a997fda96 100644 --- a/meta/autoid/autoid_test.go +++ b/meta/autoid/autoid_test.go @@ -35,6 +35,18 @@ import ( "github.com/stretchr/testify/require" ) +type mockRequirement struct { + kv.Storage +} + +func (r mockRequirement) Store() kv.Storage { + return r.Storage +} + +func (r mockRequirement) AutoIDClient() *autoid.ClientDiscover { + return nil +} + func TestSignedAutoid(t *testing.T) { require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/meta/autoid/mockAutoIDChange", `return(true)`)) defer func() { @@ -68,7 +80,7 @@ func TestSignedAutoid(t *testing.T) { require.NoError(t, err) // Since the test here is applicable to any type of allocators, autoid.RowIDAllocType is chosen. - alloc := autoid.NewAllocator(store, 1, 1, false, autoid.RowIDAllocType) + alloc := autoid.NewAllocator(mockRequirement{store}, 1, 1, false, autoid.RowIDAllocType) require.NotNil(t, alloc) globalAutoID, err := alloc.NextGlobalAutoID() @@ -106,13 +118,13 @@ func TestSignedAutoid(t *testing.T) { require.NoError(t, err) require.Equal(t, int64(3011), id) - alloc = autoid.NewAllocator(store, 1, 1, false, autoid.RowIDAllocType) + alloc = autoid.NewAllocator(mockRequirement{store}, 1, 1, false, autoid.RowIDAllocType) require.NotNil(t, alloc) _, id, err = alloc.Alloc(ctx, 1, 1, 1) require.NoError(t, err) require.Equal(t, autoid.GetStep()+1, id) - alloc = autoid.NewAllocator(store, 1, 2, false, autoid.RowIDAllocType) + alloc = autoid.NewAllocator(mockRequirement{store}, 1, 2, false, autoid.RowIDAllocType) require.NotNil(t, alloc) err = alloc.Rebase(context.Background(), int64(1), false) require.NoError(t, err) @@ -120,11 +132,11 @@ func TestSignedAutoid(t *testing.T) { require.NoError(t, err) require.Equal(t, int64(2), id) - alloc = autoid.NewAllocator(store, 1, 3, false, autoid.RowIDAllocType) + alloc = autoid.NewAllocator(mockRequirement{store}, 1, 3, false, autoid.RowIDAllocType) require.NotNil(t, alloc) err = alloc.Rebase(context.Background(), int64(3210), false) require.NoError(t, err) - alloc = autoid.NewAllocator(store, 1, 3, false, autoid.RowIDAllocType) + alloc = autoid.NewAllocator(mockRequirement{store}, 1, 3, false, autoid.RowIDAllocType) require.NotNil(t, alloc) err = alloc.Rebase(context.Background(), int64(3000), false) require.NoError(t, err) @@ -146,7 +158,7 @@ func TestSignedAutoid(t *testing.T) { require.NoError(t, err) // alloc N for signed - alloc = autoid.NewAllocator(store, 1, 4, false, autoid.RowIDAllocType) + alloc = autoid.NewAllocator(mockRequirement{store}, 1, 4, false, autoid.RowIDAllocType) require.NotNil(t, alloc) globalAutoID, err = alloc.NextGlobalAutoID() require.NoError(t, err) @@ -189,7 +201,7 @@ func TestSignedAutoid(t *testing.T) { require.Greater(t, min+1, lastRemainOne) // Test for increment & offset for signed. - alloc = autoid.NewAllocator(store, 1, 5, false, autoid.RowIDAllocType) + alloc = autoid.NewAllocator(mockRequirement{store}, 1, 5, false, autoid.RowIDAllocType) require.NotNil(t, alloc) increment := int64(2) @@ -272,7 +284,7 @@ func TestUnsignedAutoid(t *testing.T) { }) require.NoError(t, err) - alloc := autoid.NewAllocator(store, 1, 1, true, autoid.RowIDAllocType) + alloc := autoid.NewAllocator(mockRequirement{store}, 1, 1, true, autoid.RowIDAllocType) require.NotNil(t, alloc) globalAutoID, err := alloc.NextGlobalAutoID() @@ -310,13 +322,13 @@ func TestUnsignedAutoid(t *testing.T) { require.NoError(t, err) require.Equal(t, int64(3011), id) - alloc = autoid.NewAllocator(store, 1, 1, true, autoid.RowIDAllocType) + alloc = autoid.NewAllocator(mockRequirement{store}, 1, 1, true, autoid.RowIDAllocType) require.NotNil(t, alloc) _, id, err = alloc.Alloc(ctx, 1, 1, 1) require.NoError(t, err) require.Equal(t, autoid.GetStep()+1, id) - alloc = autoid.NewAllocator(store, 1, 2, true, autoid.RowIDAllocType) + alloc = autoid.NewAllocator(mockRequirement{store}, 1, 2, true, autoid.RowIDAllocType) require.NotNil(t, alloc) err = alloc.Rebase(context.Background(), int64(1), false) require.NoError(t, err) @@ -324,11 +336,11 @@ func TestUnsignedAutoid(t *testing.T) { require.NoError(t, err) require.Equal(t, int64(2), id) - alloc = autoid.NewAllocator(store, 1, 3, true, autoid.RowIDAllocType) + alloc = autoid.NewAllocator(mockRequirement{store}, 1, 3, true, autoid.RowIDAllocType) require.NotNil(t, alloc) err = alloc.Rebase(context.Background(), int64(3210), false) require.NoError(t, err) - alloc = autoid.NewAllocator(store, 1, 3, true, autoid.RowIDAllocType) + alloc = autoid.NewAllocator(mockRequirement{store}, 1, 3, true, autoid.RowIDAllocType) require.NotNil(t, alloc) err = alloc.Rebase(context.Background(), int64(3000), false) require.NoError(t, err) @@ -353,7 +365,7 @@ func TestUnsignedAutoid(t *testing.T) { require.NoError(t, err) // alloc N for unsigned - alloc = autoid.NewAllocator(store, 1, 4, true, autoid.RowIDAllocType) + alloc = autoid.NewAllocator(mockRequirement{store}, 1, 4, true, autoid.RowIDAllocType) require.NotNil(t, alloc) globalAutoID, err = alloc.NextGlobalAutoID() require.NoError(t, err) @@ -382,7 +394,7 @@ func TestUnsignedAutoid(t *testing.T) { require.Greater(t, min+1, lastRemainOne) // Test increment & offset for unsigned. Using AutoRandomType to avoid valid range check for increment and offset. - alloc = autoid.NewAllocator(store, 1, 5, true, autoid.AutoRandomType) + alloc = autoid.NewAllocator(mockRequirement{store}, 1, 5, true, autoid.AutoRandomType) require.NotNil(t, alloc) require.NoError(t, err) require.Equal(t, int64(1), globalAutoID) @@ -436,7 +448,7 @@ func TestConcurrentAlloc(t *testing.T) { allocIDs := func() { ctx := context.Background() - alloc := autoid.NewAllocator(store, dbID, tblID, false, autoid.RowIDAllocType) + alloc := autoid.NewAllocator(mockRequirement{store}, dbID, tblID, false, autoid.RowIDAllocType) for j := 0; j < int(autoid.GetStep())+5; j++ { _, id, err1 := alloc.Alloc(ctx, 1, 1, 1) if err1 != nil { @@ -517,7 +529,7 @@ func TestRollbackAlloc(t *testing.T) { injectConf := new(kv.InjectionConfig) injectConf.SetCommitError(errors.New("injected")) injectedStore := kv.NewInjectedStore(store, injectConf) - alloc := autoid.NewAllocator(injectedStore, 1, 2, false, autoid.RowIDAllocType) + alloc := autoid.NewAllocator(mockRequirement{injectedStore}, 1, 2, false, autoid.RowIDAllocType) _, _, err = alloc.Alloc(ctx, 1, 1, 1) require.Error(t, err) require.Equal(t, int64(0), alloc.Base()) @@ -567,11 +579,11 @@ func TestAllocComputationIssue(t *testing.T) { require.NoError(t, err) // Since the test here is applicable to any type of allocators, autoid.RowIDAllocType is chosen. - unsignedAlloc1 := autoid.NewAllocator(store, 1, 1, true, autoid.RowIDAllocType) + unsignedAlloc1 := autoid.NewAllocator(mockRequirement{store}, 1, 1, true, autoid.RowIDAllocType) require.NotNil(t, unsignedAlloc1) - signedAlloc1 := autoid.NewAllocator(store, 1, 1, false, autoid.RowIDAllocType) + signedAlloc1 := autoid.NewAllocator(mockRequirement{store}, 1, 1, false, autoid.RowIDAllocType) require.NotNil(t, signedAlloc1) - signedAlloc2 := autoid.NewAllocator(store, 1, 2, false, autoid.RowIDAllocType) + signedAlloc2 := autoid.NewAllocator(mockRequirement{store}, 1, 2, false, autoid.RowIDAllocType) require.NotNil(t, signedAlloc2) // the next valid two value must be 13 & 16, batch size = 6. @@ -615,7 +627,7 @@ func TestIssue40584(t *testing.T) { }) require.NoError(t, err) - alloc := autoid.NewAllocator(store, 1, 1, false, autoid.RowIDAllocType) + alloc := autoid.NewAllocator(mockRequirement{store}, 1, 1, false, autoid.RowIDAllocType) require.NotNil(t, alloc) finishAlloc := make(chan bool) diff --git a/meta/autoid/bench_test.go b/meta/autoid/bench_test.go index d8b489060875d..422f825c666c9 100644 --- a/meta/autoid/bench_test.go +++ b/meta/autoid/bench_test.go @@ -57,7 +57,7 @@ func BenchmarkAllocator_Alloc(b *testing.B) { if err != nil { return } - alloc := autoid.NewAllocator(store, 1, 2, false, autoid.RowIDAllocType) + alloc := autoid.NewAllocator(mockRequirement{store}, 1, 2, false, autoid.RowIDAllocType) b.StartTimer() for i := 0; i < b.N; i++ { _, _, err := alloc.Alloc(ctx, 1, 1, 1) diff --git a/planner/core/preprocess.go b/planner/core/preprocess.go index bc9d3a8fa731d..b4094199bc3f5 100644 --- a/planner/core/preprocess.go +++ b/planner/core/preprocess.go @@ -1823,7 +1823,8 @@ func tryLockMDLAndUpdateSchemaIfNecessary(sctx sessionctx.Context, dbName model. if !skipLock { sctx.GetSessionVars().GetRelatedTableForMDL().Store(tableInfo.ID, int64(0)) } - domainSchema := domain.GetDomain(sctx).InfoSchema() + dom := domain.GetDomain(sctx) + domainSchema := dom.InfoSchema() domainSchemaVer := domainSchema.SchemaMetaVersion() var err error tbl, err = domainSchema.TableByName(dbName, tableInfo.Name) @@ -1856,7 +1857,7 @@ func tryLockMDLAndUpdateSchemaIfNecessary(sctx sessionctx.Context, dbName model. } copyTableInfo.Indices[i].State = model.StateWriteReorganization dbInfo, _ := domainSchema.SchemaByName(dbName) - allocs := autoid.NewAllocatorsFromTblInfo(sctx.GetStore(), dbInfo.ID, copyTableInfo) + allocs := autoid.NewAllocatorsFromTblInfo(dom, dbInfo.ID, copyTableInfo) tbl, err = table.TableFromMeta(allocs, copyTableInfo) if err != nil { return nil, err