From 07dcfb0c72174f574180adbaddc09218192695a9 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 7 Nov 2023 10:02:11 +0800 Subject: [PATCH] *: share etcd client from domain for autoid allocator #46647 (#48335) close pingcap/tidb#46324 --- br/pkg/backup/BUILD.bazel | 1 + br/pkg/backup/client.go | 19 +++++- br/pkg/lightning/restore/meta_manager.go | 23 ++++--- br/pkg/lightning/restore/meta_manager_test.go | 21 +++++-- br/pkg/lightning/restore/restore.go | 17 +++++- br/pkg/lightning/restore/restore_test.go | 4 +- br/pkg/lightning/restore/table_restore.go | 28 ++++++++- .../lightning/restore/table_restore_test.go | 6 +- br/pkg/restore/db_test.go | 8 +-- ddl/column.go | 17 +++++- ddl/ddl_api.go | 2 +- ddl/foreign_key_test.go | 4 +- ddl/index.go | 2 +- ddl/job_table.go | 25 ++++++++ ddl/partition.go | 2 +- ddl/placement_policy_ddl_test.go | 2 +- ddl/table.go | 10 ++-- ddl/table_test.go | 6 +- domain/domain.go | 4 +- executor/executor_test.go | 2 +- infoschema/BUILD.bazel | 1 + infoschema/builder.go | 22 +++---- infoschema/infoschema_test.go | 25 ++++++-- meta/autoid/BUILD.bazel | 2 +- meta/autoid/autoid.go | 60 +++++++++---------- meta/autoid/autoid_test.go | 51 ++++++++++------ meta/autoid/bench_test.go | 2 +- planner/core/preprocess.go | 5 +- 28 files changed, 250 insertions(+), 121 deletions(-) diff --git a/br/pkg/backup/BUILD.bazel b/br/pkg/backup/BUILD.bazel index a80396f4fddd2..2fe3af2cb3627 100644 --- a/br/pkg/backup/BUILD.bazel +++ b/br/pkg/backup/BUILD.bazel @@ -51,6 +51,7 @@ go_library( "@com_github_tikv_client_go_v2//tikv", "@com_github_tikv_client_go_v2//txnkv/txnlock", "@com_github_tikv_pd_client//:client", + "@io_etcd_go_etcd_client_v3//:client", "@org_golang_google_grpc//codes", "@org_golang_google_grpc//status", "@org_golang_x_sync//errgroup", diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index 2102211125f1d..ab9b9801c4c9d 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -50,6 +50,7 @@ import ( "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/txnkv/txnlock" pd "github.com/tikv/pd/client" + clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" "golang.org/x/sync/errgroup" "google.golang.org/grpc/codes" @@ -492,6 +493,18 @@ func appendRanges(tbl *model.TableInfo, tblID int64) ([]kv.KeyRange, error) { return retRanges, nil } +type mockRequirement struct { + kv.Storage +} + +func (r mockRequirement) Store() kv.Storage { + return r.Storage +} + +func (r mockRequirement) GetEtcdClient() *clientv3.Client { + return nil +} + // BuildBackupRangeAndSchema gets KV range and schema of tables. // KV ranges are separated by Table IDs. // Also, KV ranges are separated by Index IDs in the same table. @@ -575,9 +588,9 @@ func BuildBackupRangeAndSchema( autoIDAccess := m.GetAutoIDAccessors(dbInfo.ID, tableInfo.ID) tblVer := autoid.AllocOptionTableInfoVersion(tableInfo.Version) - idAlloc := autoid.NewAllocator(storage, dbInfo.ID, tableInfo.ID, false, autoid.RowIDAllocType, tblVer) - seqAlloc := autoid.NewAllocator(storage, dbInfo.ID, tableInfo.ID, false, autoid.SequenceType, tblVer) - randAlloc := autoid.NewAllocator(storage, dbInfo.ID, tableInfo.ID, false, autoid.AutoRandomType, tblVer) + idAlloc := autoid.NewAllocator(mockRequirement{storage}, dbInfo.ID, tableInfo.ID, false, autoid.RowIDAllocType, tblVer) + seqAlloc := autoid.NewAllocator(mockRequirement{storage}, dbInfo.ID, tableInfo.ID, false, autoid.SequenceType, tblVer) + randAlloc := autoid.NewAllocator(mockRequirement{storage}, dbInfo.ID, tableInfo.ID, false, autoid.AutoRandomType, tblVer) var globalAutoID int64 switch { diff --git a/br/pkg/lightning/restore/meta_manager.go b/br/pkg/lightning/restore/meta_manager.go index ace5b91458746..c19cd0bcf8120 100644 --- a/br/pkg/lightning/restore/meta_manager.go +++ b/br/pkg/lightning/restore/meta_manager.go @@ -16,7 +16,6 @@ import ( verify "github.com/pingcap/tidb/br/pkg/lightning/verification" "github.com/pingcap/tidb/br/pkg/pdutil" "github.com/pingcap/tidb/br/pkg/redact" - "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/parser/model" "go.uber.org/zap" @@ -255,10 +254,10 @@ func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, rawRowIDMax int64 if curStatus == metaStatusInitial { if needAutoID { // maxRowIDMax is the max row_id that other tasks has allocated, we need to rebase the global autoid base first. - if err := rebaseGlobalAutoID(ctx, maxRowIDMax, m.tr.kvStore, m.tr.dbInfo.ID, m.tr.tableInfo.Core); err != nil { + if err := rebaseGlobalAutoID(ctx, maxRowIDMax, m.tr, m.tr.dbInfo.ID, m.tr.tableInfo.Core); err != nil { return errors.Trace(err) } - newRowIDBase, newRowIDMax, err = allocGlobalAutoID(ctx, rawRowIDMax, m.tr.kvStore, m.tr.dbInfo.ID, m.tr.tableInfo.Core) + newRowIDBase, newRowIDMax, err = allocGlobalAutoID(ctx, rawRowIDMax, m.tr, m.tr.dbInfo.ID, m.tr.tableInfo.Core) if err != nil { return errors.Trace(err) } @@ -1159,8 +1158,8 @@ func (m *singleTaskMetaMgr) CleanupAllMetas(ctx context.Context) error { func (m *singleTaskMetaMgr) Close() { } -func allocGlobalAutoID(ctx context.Context, n int64, store kv.Storage, dbID int64, tblInfo *model.TableInfo) (autoIDBase, autoIDMax int64, err error) { - allocators, err := getGlobalAutoIDAlloc(store, dbID, tblInfo) +func allocGlobalAutoID(ctx context.Context, n int64, r autoid.Requirement, dbID int64, tblInfo *model.TableInfo) (autoIDBase, autoIDMax int64, err error) { + allocators, err := getGlobalAutoIDAlloc(r, dbID, tblInfo) if err != nil { return 0, 0, err } @@ -1177,8 +1176,8 @@ func allocGlobalAutoID(ctx context.Context, n int64, store kv.Storage, dbID int6 return } -func rebaseGlobalAutoID(ctx context.Context, newBase int64, store kv.Storage, dbID int64, tblInfo *model.TableInfo) error { - allocators, err := getGlobalAutoIDAlloc(store, dbID, tblInfo) +func rebaseGlobalAutoID(ctx context.Context, newBase int64, r autoid.Requirement, dbID int64, tblInfo *model.TableInfo) error { + allocators, err := getGlobalAutoIDAlloc(r, dbID, tblInfo) if err != nil { return err } @@ -1191,8 +1190,8 @@ func rebaseGlobalAutoID(ctx context.Context, newBase int64, store kv.Storage, db return nil } -func getGlobalAutoIDAlloc(store kv.Storage, dbID int64, tblInfo *model.TableInfo) ([]autoid.Allocator, error) { - if store == nil { +func getGlobalAutoIDAlloc(r autoid.Requirement, dbID int64, tblInfo *model.TableInfo) ([]autoid.Allocator, error) { + if r == nil || r.Store() == nil { return nil, errors.New("internal error: kv store should not be nil") } if dbID == 0 { @@ -1224,15 +1223,15 @@ func getGlobalAutoIDAlloc(store kv.Storage, dbID int64, tblInfo *model.TableInfo case hasRowID || hasAutoIncID: allocators := make([]autoid.Allocator, 0, 2) if tblInfo.SepAutoInc() && hasAutoIncID { - allocators = append(allocators, autoid.NewAllocator(store, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(), + allocators = append(allocators, autoid.NewAllocator(r, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(), autoid.AutoIncrementType, noCache, tblVer)) } // this allocator is NOT used when SepAutoInc=true and auto increment column is clustered. - allocators = append(allocators, autoid.NewAllocator(store, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(), + allocators = append(allocators, autoid.NewAllocator(r, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(), autoid.RowIDAllocType, noCache, tblVer)) return allocators, nil case hasAutoRandID: - return []autoid.Allocator{autoid.NewAllocator(store, dbID, tblInfo.ID, tblInfo.IsAutoRandomBitColUnsigned(), + return []autoid.Allocator{autoid.NewAllocator(r, dbID, tblInfo.ID, tblInfo.IsAutoRandomBitColUnsigned(), autoid.AutoRandomType, noCache, tblVer)}, nil default: return nil, errors.Errorf("internal error: table %s has no auto ID", tblInfo.Name) diff --git a/br/pkg/lightning/restore/meta_manager_test.go b/br/pkg/lightning/restore/meta_manager_test.go index 1aff6e0055498..64afa6c485882 100644 --- a/br/pkg/lightning/restore/meta_manager_test.go +++ b/br/pkg/lightning/restore/meta_manager_test.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/parser" "github.com/pingcap/tidb/parser/ast" + clientv3 "go.etcd.io/etcd/client/v3" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/store/mockstore" tmock "github.com/pingcap/tidb/util/mock" @@ -324,7 +325,7 @@ func (s *metaMgrSuite) prepareMockInner(rowsVal [][]driver.Value, nextRowID *int WillReturnRows(rows) if nextRowID != nil { - allocs := autoid.NewAllocatorsFromTblInfo(s.mgr.tr.kvStore, s.mgr.tr.dbInfo.ID, s.mgr.tr.tableInfo.Core) + allocs := autoid.NewAllocatorsFromTblInfo(s.mgr.tr, s.mgr.tr.dbInfo.ID, s.mgr.tr.tableInfo.Core) alloc := allocs.Get(autoid.RowIDAllocType) alloc.ForceRebase(*nextRowID - 1) } @@ -480,6 +481,18 @@ func newTableInfo2(t *testing.T, return tableInfo } +type mockRequirement struct { + kv.Storage +} + +func (r mockRequirement) Store() kv.Storage { + return r.Storage +} + +func (r mockRequirement) GetEtcdClient() *clientv3.Client { + return nil +} + func TestAllocGlobalAutoID(t *testing.T) { storePath := t.TempDir() kvStore, err := mockstore.NewMockStore(mockstore.WithPath(storePath)) @@ -557,11 +570,11 @@ func TestAllocGlobalAutoID(t *testing.T) { ctx := context.Background() for _, c := range cases { ti := newTableInfo2(t, 1, c.tableID, c.createTableSQL, kvStore) - allocators, err := getGlobalAutoIDAlloc(kvStore, 1, ti) + allocators, err := getGlobalAutoIDAlloc(mockRequirement{kvStore}, 1, ti) if c.expectErrStr == "" { require.NoError(t, err, c.tableID) - require.NoError(t, rebaseGlobalAutoID(ctx, 123, kvStore, 1, ti)) - base, idMax, err := allocGlobalAutoID(ctx, 100, kvStore, 1, ti) + require.NoError(t, rebaseGlobalAutoID(ctx, 123, mockRequirement{kvStore}, 1, ti)) + base, idMax, err := allocGlobalAutoID(ctx, 100, mockRequirement{kvStore}, 1, ti) require.NoError(t, err, c.tableID) require.Equal(t, int64(123), base, c.tableID) require.Equal(t, int64(223), idMax, c.tableID) diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index 4501a184106b0..6639fda0a2efc 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -66,6 +66,7 @@ import ( "github.com/pingcap/tidb/util/set" tikvconfig "github.com/tikv/client-go/v2/config" pd "github.com/tikv/pd/client" + clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/atomic" "go.uber.org/multierr" "go.uber.org/zap" @@ -1473,6 +1474,7 @@ func (rc *Controller) restoreTables(ctx context.Context) (finalErr error) { cleanup := false postProgress := func() error { return nil } var kvStore tidbkv.Storage + var etcdCli *clientv3.Client if isLocalBackend(rc.cfg) { var ( @@ -1526,6 +1528,14 @@ func (rc *Controller) restoreTables(ctx context.Context) (finalErr error) { if err != nil { return errors.Trace(err) } + // etcdCli, err := clientv3.New(clientv3.Config{ + // Endpoints: []string{rc.cfg.TiDB.PdAddr}, + // AutoSyncInterval: 30 * time.Second, + // TLS: rc.tls.TLSConfig(), + // }) + // if err != nil { + // return errors.Trace(err) + // } manager, err := newChecksumManager(ctx, rc, kvStore) if err != nil { return errors.Trace(err) @@ -1580,6 +1590,11 @@ func (rc *Controller) restoreTables(ctx context.Context) (finalErr error) { logTask.Warn("failed to close kv store", zap.Error(err)) } } + if etcdCli != nil { + if err := etcdCli.Close(); err != nil { + logTask.Warn("failed to close etcd client", zap.Error(err)) + } + } }() taskCh := make(chan task, rc.cfg.App.IndexConcurrency) @@ -1629,7 +1644,7 @@ func (rc *Controller) restoreTables(ctx context.Context) (finalErr error) { if err != nil { return errors.Trace(err) } - tr, err := NewTableRestore(tableName, tableMeta, dbInfo, tableInfo, cp, igCols.ColumnsMap(), kvStore, log.FromContext(ctx)) + tr, err := NewTableRestore(tableName, tableMeta, dbInfo, tableInfo, cp, igCols.ColumnsMap(), kvStore, etcdCli, log.FromContext(ctx)) if err != nil { return errors.Trace(err) } diff --git a/br/pkg/lightning/restore/restore_test.go b/br/pkg/lightning/restore/restore_test.go index 8dd05a8e09f7c..26a33958561b4 100644 --- a/br/pkg/lightning/restore/restore_test.go +++ b/br/pkg/lightning/restore/restore_test.go @@ -74,7 +74,7 @@ func TestNewTableRestore(t *testing.T) { for _, tc := range testCases { tableInfo := dbInfo.Tables[tc.name] tableName := common.UniqueTable("mockdb", tableInfo.Name) - tr, err := NewTableRestore(tableName, nil, dbInfo, tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, log.L()) + tr, err := NewTableRestore(tableName, nil, dbInfo, tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, nil, log.L()) require.NotNil(t, tr) require.NoError(t, err) } @@ -91,7 +91,7 @@ func TestNewTableRestoreFailure(t *testing.T) { }} tableName := common.UniqueTable("mockdb", "failure") - _, err := NewTableRestore(tableName, nil, dbInfo, tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, log.L()) + _, err := NewTableRestore(tableName, nil, dbInfo, tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, nil, log.L()) require.Regexp(t, `failed to tables\.TableFromMeta.*`, err.Error()) } diff --git a/br/pkg/lightning/restore/table_restore.go b/br/pkg/lightning/restore/table_restore.go index 7ca1f5b489589..0d1f92d706fae 100644 --- a/br/pkg/lightning/restore/table_restore.go +++ b/br/pkg/lightning/restore/table_restore.go @@ -41,6 +41,7 @@ import ( "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/util/mathutil" + clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/multierr" "go.uber.org/zap" "golang.org/x/exp/slices" @@ -58,6 +59,7 @@ type TableRestore struct { alloc autoid.Allocators logger log.Logger kvStore tidbkv.Storage + etcdCli *clientv3.Client ignoreColumns map[string]struct{} } @@ -70,6 +72,7 @@ func NewTableRestore( cp *checkpoints.TableCheckpoint, ignoreColumns map[string]struct{}, kvStore tidbkv.Storage, + etcdCli *clientv3.Client, logger log.Logger, ) (*TableRestore, error) { idAlloc := kv.NewPanickingAllocators(cp.AllocBase) @@ -86,11 +89,20 @@ func NewTableRestore( encTable: tbl, alloc: idAlloc, kvStore: kvStore, + etcdCli: etcdCli, logger: logger.With(zap.String("table", tableName)), ignoreColumns: ignoreColumns, }, nil } +func (tr *TableRestore) Store() tidbkv.Storage { + return tr.kvStore +} + +func (tr *TableRestore) GetEtcdClient() *clientv3.Client { + return tr.etcdCli +} + func (tr *TableRestore) Close() { tr.encTable = nil tr.logger.Info("restore done") @@ -146,6 +158,20 @@ func (tr *TableRestore) populateChunks(ctx context.Context, rc *Controller, cp * return err } +// // AutoIDRequirement implements autoid.Requirement. +// var _ autoid.Requirement = &TableImporter{} + +// // Store implements the autoid.Requirement interface. +// func (tr *TableImporter) Store() tidbkv.Storage { +// return tr.kvStore +// } + +// // GetEtcdClient implements the autoid.Requirement interface. +// func (tr *TableImporter) GetEtcdClient() *clientv3.Client { +// return tr.etcdCli +// } + +// RebaseChunkRowIDs rebase the row id of the chunks. func (tr *TableRestore) RebaseChunkRowIDs(cp *checkpoints.TableCheckpoint, rowIDBase int64) { if rowIDBase == 0 { return @@ -758,7 +784,7 @@ func (tr *TableRestore) postProcess( // And in this case, ALTER TABLE xxx AUTO_INCREMENT = xxx only works on the allocator of auto_increment column, // not for allocator of _tidb_rowid. // So we need to rebase IDs for those 2 allocators explicitly. - err = rebaseGlobalAutoID(ctx, adjustIDBase(newBase), tr.kvStore, tr.dbInfo.ID, tr.tableInfo.Core) + err = rebaseGlobalAutoID(ctx, adjustIDBase(newBase), tr, tr.dbInfo.ID, tr.tableInfo.Core) } } rc.alterTableLock.Unlock() diff --git a/br/pkg/lightning/restore/table_restore_test.go b/br/pkg/lightning/restore/table_restore_test.go index f622d32a80b1d..923e3fdd34115 100644 --- a/br/pkg/lightning/restore/table_restore_test.go +++ b/br/pkg/lightning/restore/table_restore_test.go @@ -167,7 +167,7 @@ func (s *tableRestoreSuiteBase) setupSuite(t *testing.T) { func (s *tableRestoreSuiteBase) setupTest(t *testing.T) { // Collect into the test TableRestore structure var err error - s.tr, err = NewTableRestore("`db`.`table`", s.tableMeta, s.dbInfo, s.tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, log.L()) + s.tr, err = NewTableRestore("`db`.`table`", s.tableMeta, s.dbInfo, s.tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, nil, log.L()) require.NoError(t, err) s.cfg = config.NewConfig() @@ -458,7 +458,7 @@ func (s *tableRestoreSuite) TestPopulateChunksCSVHeader() { cfg.Mydumper.StrictFormat = true rc := &Controller{cfg: cfg, ioWorkers: worker.NewPool(context.Background(), 1, "io"), store: store} - tr, err := NewTableRestore("`db`.`table`", tableMeta, s.dbInfo, s.tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, log.L()) + tr, err := NewTableRestore("`db`.`table`", tableMeta, s.dbInfo, s.tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, nil, log.L()) require.NoError(s.T(), err) require.NoError(s.T(), tr.populateChunks(context.Background(), rc, cp)) @@ -723,7 +723,7 @@ func (s *tableRestoreSuite) TestInitializeColumnsGenerated() { require.NoError(s.T(), err) core.State = model.StatePublic tableInfo := &checkpoints.TidbTableInfo{Name: "table", DB: "db", Core: core} - s.tr, err = NewTableRestore("`db`.`table`", s.tableMeta, s.dbInfo, tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, log.L()) + s.tr, err = NewTableRestore("`db`.`table`", s.tableMeta, s.dbInfo, tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, nil, log.L()) require.NoError(s.T(), err) ccp := &checkpoints.ChunkCheckpoint{} diff --git a/br/pkg/restore/db_test.go b/br/pkg/restore/db_test.go index d0c7f702092f5..c7989d5485074 100644 --- a/br/pkg/restore/db_test.go +++ b/br/pkg/restore/db_test.go @@ -79,7 +79,7 @@ func TestRestoreAutoIncID(t *testing.T) { DB: dbInfo, } // Get the next AutoIncID - idAlloc := autoid.NewAllocator(s.mock.Storage, dbInfo.ID, table.Info.ID, false, autoid.RowIDAllocType) + idAlloc := autoid.NewAllocator(s.mock.Domain, dbInfo.ID, table.Info.ID, false, autoid.RowIDAllocType) globalAutoID, err := idAlloc.NextGlobalAutoID() require.NoErrorf(t, err, "Error allocate next auto id") require.Equal(t, uint64(globalAutoID), autoIncID) @@ -380,7 +380,7 @@ func TestGetExistedUserDBs(t *testing.T) { dbs := restore.GetExistedUserDBs(dom) require.Equal(t, 0, len(dbs)) - builder, err := infoschema.NewBuilder(m.Store(), nil).InitWithDBInfos( + builder, err := infoschema.NewBuilder(dom, nil).InitWithDBInfos( []*model.DBInfo{ {Name: model.NewCIStr("mysql")}, {Name: model.NewCIStr("test")}, @@ -391,7 +391,7 @@ func TestGetExistedUserDBs(t *testing.T) { dbs = restore.GetExistedUserDBs(dom) require.Equal(t, 0, len(dbs)) - builder, err = infoschema.NewBuilder(m.Store(), nil).InitWithDBInfos( + builder, err = infoschema.NewBuilder(dom, nil).InitWithDBInfos( []*model.DBInfo{ {Name: model.NewCIStr("mysql")}, {Name: model.NewCIStr("test")}, @@ -403,7 +403,7 @@ func TestGetExistedUserDBs(t *testing.T) { dbs = restore.GetExistedUserDBs(dom) require.Equal(t, 1, len(dbs)) - builder, err = infoschema.NewBuilder(m.Store(), nil).InitWithDBInfos( + builder, err = infoschema.NewBuilder(dom, nil).InitWithDBInfos( []*model.DBInfo{ {Name: model.NewCIStr("mysql")}, {Name: model.NewCIStr("d1")}, diff --git a/ddl/column.go b/ddl/column.go index 5be16eb62eafd..5293639fd6747 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -50,6 +50,7 @@ import ( decoder "github.com/pingcap/tidb/util/rowDecoder" "github.com/pingcap/tidb/util/sqlexec" "github.com/prometheus/client_golang/prometheus" + clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" ) @@ -752,7 +753,7 @@ func (w *worker) doModifyColumnTypeWithData( job.SnapshotVer = 0 job.SchemaState = model.StateWriteReorganization case model.StateWriteReorganization: - tbl, err := getTable(d.store, dbInfo.ID, tblInfo) + tbl, err := getTable((*asAutoIDRequirement)(d), dbInfo.ID, tblInfo) if err != nil { return ver, errors.Trace(err) } @@ -1603,6 +1604,18 @@ func checkNewAutoRandomBits(idAccessors meta.AutoIDAccessors, oldCol *model.Colu return nil } +type asAutoIDRequirement ddlCtx + +var _ autoid.Requirement = &asAutoIDRequirement{} + +func (r *asAutoIDRequirement) Store() kv.Storage { + return r.store +} + +func (r *asAutoIDRequirement) GetEtcdClient() *clientv3.Client { + return r.etcdCli +} + // applyNewAutoRandomBits set auto_random bits to TableInfo and // migrate auto_increment ID to auto_random ID if possible. func applyNewAutoRandomBits(d *ddlCtx, m *meta.Meta, dbInfo *model.DBInfo, @@ -1612,7 +1625,7 @@ func applyNewAutoRandomBits(d *ddlCtx, m *meta.Meta, dbInfo *model.DBInfo, if !needMigrateFromAutoIncToAutoRand { return nil } - autoRandAlloc := autoid.NewAllocatorsFromTblInfo(d.store, dbInfo.ID, tblInfo).Get(autoid.AutoRandomType) + autoRandAlloc := autoid.NewAllocatorsFromTblInfo((*asAutoIDRequirement)(d), dbInfo.ID, tblInfo).Get(autoid.AutoRandomType) if autoRandAlloc == nil { errMsg := fmt.Sprintf(autoid.AutoRandomAllocatorNotFound, dbInfo.Name.O, tblInfo.Name.O) return dbterror.ErrInvalidAutoRandom.GenWithStackByArgs(errMsg) diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index 76842cada9a5e..97b74a694ce01 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -2994,7 +2994,7 @@ func checkCharsetAndCollation(cs string, co string) error { // handleAutoIncID handles auto_increment option in DDL. It creates a ID counter for the table and initiates the counter to a proper value. // For example if the option sets auto_increment to 10. The counter will be set to 9. So the next allocated ID will be 10. func (d *ddl) handleAutoIncID(tbInfo *model.TableInfo, schemaID int64, newEnd int64, tp autoid.AllocatorType) error { - allocs := autoid.NewAllocatorsFromTblInfo(d.store, schemaID, tbInfo) + allocs := autoid.NewAllocatorsFromTblInfo((*asAutoIDRequirement)(d.ddlCtx), schemaID, tbInfo) if alloc := allocs.Get(tp); alloc != nil { err := alloc.Rebase(context.Background(), newEnd, false) if err != nil { diff --git a/ddl/foreign_key_test.go b/ddl/foreign_key_test.go index 6c6c1653f696e..07e099b6716c1 100644 --- a/ddl/foreign_key_test.go +++ b/ddl/foreign_key_test.go @@ -130,7 +130,7 @@ func TestForeignKey(t *testing.T) { mu.Lock() defer mu.Unlock() var t table.Table - t, err = testGetTableWithError(store, dbInfo.ID, tblInfo.ID) + t, err = testGetTableWithError(dom, dbInfo.ID, tblInfo.ID) if err != nil { hookErr = errors.Trace(err) return @@ -172,7 +172,7 @@ func TestForeignKey(t *testing.T) { mu.Lock() defer mu.Unlock() var t table.Table - t, err = testGetTableWithError(store, dbInfo.ID, tblInfo.ID) + t, err = testGetTableWithError(dom, dbInfo.ID, tblInfo.ID) if err != nil { hookErr = errors.Trace(err) return diff --git a/ddl/index.go b/ddl/index.go index ec09a3c802379..be5ce0df8d495 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -649,7 +649,7 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo job.SchemaState = model.StateWriteReorganization case model.StateWriteReorganization: // reorganization -> public - tbl, err := getTable(d.store, schemaID, tblInfo) + tbl, err := getTable((*asAutoIDRequirement)(d), schemaID, tblInfo) if err != nil { return ver, errors.Trace(err) } diff --git a/ddl/job_table.go b/ddl/job_table.go index 268228df82035..b4c114a94161d 100644 --- a/ddl/job_table.go +++ b/ddl/job_table.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" + // "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/sessionctx/variable" @@ -308,6 +309,30 @@ func (d *ddl) markJobProcessing(sess *session, job *model.Job) error { return errors.Trace(err) } +// <<<<<<< HEAD +// ======= +// func (d *ddl) getTableByTxn(r autoid.Requirement, schemaID, tableID int64) (*model.DBInfo, table.Table, error) { +// var tbl table.Table +// var dbInfo *model.DBInfo +// err := kv.RunInNewTxn(d.ctx, r.Store(), false, func(ctx context.Context, txn kv.Transaction) error { +// t := meta.NewMeta(txn) +// var err1 error +// dbInfo, err1 = t.GetDatabase(schemaID) +// if err1 != nil { +// return errors.Trace(err1) +// } +// tblInfo, err1 := getTableInfo(t, tableID, schemaID) +// if err1 != nil { +// return errors.Trace(err1) +// } +// tbl, err1 = getTable(r, schemaID, tblInfo) +// return errors.Trace(err1) +// }) +// return dbInfo, tbl, err +// } + +// >>>>>>> a062330246... *: share etcd client from domain for autoid allocator #46647 (#48335) + const ( addDDLJobSQL = "insert into mysql.tidb_ddl_job(job_id, reorg, schema_ids, table_ids, job_meta, type, processing) values" updateDDLJobSQL = "update mysql.tidb_ddl_job set job_meta = %s where job_id = %d" diff --git a/ddl/partition.go b/ddl/partition.go index 5f14f12a55b67..f6a61de83e9e7 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -1787,7 +1787,7 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) ( case model.StateDeleteReorganization: oldTblInfo := getTableInfoWithDroppingPartitions(tblInfo) physicalTableIDs = getPartitionIDsFromDefinitions(tblInfo.Partition.DroppingDefinitions) - tbl, err := getTable(d.store, job.SchemaID, oldTblInfo) + tbl, err := getTable((*asAutoIDRequirement)(d), job.SchemaID, oldTblInfo) if err != nil { return ver, errors.Trace(err) } diff --git a/ddl/placement_policy_ddl_test.go b/ddl/placement_policy_ddl_test.go index f5d1392018c84..7d672dfcaa0ca 100644 --- a/ddl/placement_policy_ddl_test.go +++ b/ddl/placement_policy_ddl_test.go @@ -118,7 +118,7 @@ func TestPlacementPolicyInUse(t *testing.T) { t4.State = model.StatePublic db1.Tables = append(db1.Tables, t4) - builder, err := infoschema.NewBuilder(store, nil).InitWithDBInfos( + builder, err := infoschema.NewBuilder(dom, nil).InitWithDBInfos( []*model.DBInfo{db1, db2, dbP}, []*model.PolicyInfo{p1, p2, p3, p4, p5}, 1, diff --git a/ddl/table.go b/ddl/table.go index ddaf06938709d..05a36ae3f1314 100644 --- a/ddl/table.go +++ b/ddl/table.go @@ -612,8 +612,8 @@ func checkSafePoint(w *worker, snapshotTS uint64) error { return gcutil.ValidateSnapshot(ctx, snapshotTS) } -func getTable(store kv.Storage, schemaID int64, tblInfo *model.TableInfo) (table.Table, error) { - allocs := autoid.NewAllocatorsFromTblInfo(store, schemaID, tblInfo) +func getTable(r autoid.Requirement, schemaID int64, tblInfo *model.TableInfo) (table.Table, error) { + allocs := autoid.NewAllocatorsFromTblInfo(r, schemaID, tblInfo) tbl, err := table.TableFromMeta(allocs, tblInfo) return tbl, errors.Trace(err) } @@ -825,7 +825,7 @@ func onRebaseAutoRandomType(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, return onRebaseAutoID(d, d.store, t, job, autoid.AutoRandomType) } -func onRebaseAutoID(d *ddlCtx, store kv.Storage, t *meta.Meta, job *model.Job, tp autoid.AllocatorType) (ver int64, _ error) { +func onRebaseAutoID(d *ddlCtx, _ kv.Storage, t *meta.Meta, job *model.Job, tp autoid.AllocatorType) (ver int64, _ error) { schemaID := job.SchemaID var ( newBase int64 @@ -848,7 +848,7 @@ func onRebaseAutoID(d *ddlCtx, store kv.Storage, t *meta.Meta, job *model.Job, t return ver, errors.Trace(err) } - tbl, err := getTable(store, schemaID, tblInfo) + tbl, err := getTable((*asAutoIDRequirement)(d), schemaID, tblInfo) if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) @@ -930,7 +930,7 @@ func (w *worker) onShardRowID(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int6 if shardRowIDBits < tblInfo.ShardRowIDBits { tblInfo.ShardRowIDBits = shardRowIDBits } else { - tbl, err := getTable(d.store, job.SchemaID, tblInfo) + tbl, err := getTable((*asAutoIDRequirement)(d), job.SchemaID, tblInfo) if err != nil { return ver, errors.Trace(err) } diff --git a/ddl/table_test.go b/ddl/table_test.go index 9d990fc217243..195c188298476 100644 --- a/ddl/table_test.go +++ b/ddl/table_test.go @@ -142,10 +142,10 @@ func testTruncateTable(t *testing.T, ctx sessionctx.Context, store kv.Storage, d return job } -func testGetTableWithError(store kv.Storage, schemaID, tableID int64) (table.Table, error) { +func testGetTableWithError(r autoid.Requirement, schemaID, tableID int64) (table.Table, error) { var tblInfo *model.TableInfo ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL) - err := kv.RunInNewTxn(ctx, store, false, func(ctx context.Context, txn kv.Transaction) error { + err := kv.RunInNewTxn(ctx, r.Store(), false, func(ctx context.Context, txn kv.Transaction) error { t := meta.NewMeta(txn) var err1 error tblInfo, err1 = t.GetTable(schemaID, tableID) @@ -160,7 +160,7 @@ func testGetTableWithError(store kv.Storage, schemaID, tableID int64) (table.Tab if tblInfo == nil { return nil, errors.New("table not found") } - alloc := autoid.NewAllocator(store, schemaID, tblInfo.ID, false, autoid.RowIDAllocType) + alloc := autoid.NewAllocator(r, schemaID, tblInfo.ID, false, autoid.RowIDAllocType) tbl, err := table.TableFromMeta(autoid.NewAllocators(false, alloc), tblInfo) if err != nil { return nil, errors.Trace(err) diff --git a/domain/domain.go b/domain/domain.go index 24aacaef58e9d..3c35487e27a45 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -231,7 +231,7 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i return nil, false, currentSchemaVersion, nil, err } - newISBuilder, err := infoschema.NewBuilder(do.Store(), do.sysFacHack).InitWithDBInfos(schemas, policies, neededSchemaVersion) + newISBuilder, err := infoschema.NewBuilder(do, do.sysFacHack).InitWithDBInfos(schemas, policies, neededSchemaVersion) if err != nil { return nil, false, currentSchemaVersion, nil, err } @@ -371,7 +371,7 @@ func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64 } diffs = append(diffs, diff) } - builder := infoschema.NewBuilder(do.Store(), do.sysFacHack).InitWithOldInfoSchema(do.infoCache.GetLatest()) + builder := infoschema.NewBuilder(do, do.sysFacHack).InitWithOldInfoSchema(do.infoCache.GetLatest()) builder.SetDeltaUpdateBundles() phyTblIDs := make([]int64, 0, len(diffs)) actions := make([]uint64, 0, len(diffs)) diff --git a/executor/executor_test.go b/executor/executor_test.go index bdaaf99b16e13..cc05d212fc137 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -1939,7 +1939,7 @@ func TestCheckIndex(t *testing.T) { require.NoError(t, err) tbInfo := tbl.Meta() - alloc := autoid.NewAllocator(store, dbInfo.ID, tbInfo.ID, false, autoid.RowIDAllocType) + alloc := autoid.NewAllocator(dom, dbInfo.ID, tbInfo.ID, false, autoid.RowIDAllocType) tb, err := tables.TableFromMeta(autoid.NewAllocators(false, alloc), tbInfo) require.NoError(t, err) diff --git a/infoschema/BUILD.bazel b/infoschema/BUILD.bazel index f5d1563552cdc..906fe1b63e0b7 100644 --- a/infoschema/BUILD.bazel +++ b/infoschema/BUILD.bazel @@ -109,6 +109,7 @@ go_test( "@com_github_pingcap_tipb//go-tipb", "@com_github_prometheus_prometheus//promql", "@com_github_stretchr_testify//require", + "@io_etcd_go_etcd_client_v3//:client", "@org_golang_google_grpc//:grpc", "@org_uber_go_goleak//:goleak", ], diff --git a/infoschema/builder.go b/infoschema/builder.go index 18077a2663766..30f1a0da8c37e 100644 --- a/infoschema/builder.go +++ b/infoschema/builder.go @@ -182,9 +182,9 @@ type Builder struct { // This map will indicate which DB has been copied, so that they // don't need to be copied again. dirtyDB map[string]bool - // TODO: store is only used by autoid allocators - // detach allocators from storage, use passed transaction in the feature - store kv.Storage + + // Used by autoid allocators + autoid.Requirement factory func() (pools.Resource, error) bundleInfoBuilder @@ -336,7 +336,7 @@ func (b *Builder) applyExchangeTablePartition(m *meta.Meta, diff *model.SchemaDi } // ntID is the new id for the partition! b.markPartitionBundleShouldUpdate(ntID) - err = updateAutoIDForExchangePartition(b.store, ptSchemaID, ptID, ntSchemaID, ntID) + err = updateAutoIDForExchangePartition(b.Requirement.Store(), ptSchemaID, ptID, ntSchemaID, ntID) if err != nil { return nil, errors.Trace(err) } @@ -746,7 +746,7 @@ func (b *Builder) applyCreateTable(m *meta.Meta, dbInfo *model.DBInfo, tableID i ConvertOldVersionUTF8ToUTF8MB4IfNeed(tblInfo) if len(allocs.Allocs) == 0 { - allocs = autoid.NewAllocatorsFromTblInfo(b.store, dbInfo.ID, tblInfo) + allocs = autoid.NewAllocatorsFromTblInfo(b.Requirement, dbInfo.ID, tblInfo) } else { tblVer := autoid.AllocOptionTableInfoVersion(tblInfo.Version) switch tp { @@ -756,11 +756,11 @@ func (b *Builder) applyCreateTable(m *meta.Meta, dbInfo *model.DBInfo, tableID i // and RowIDAllocType allocator for it. Because auto id and row id could share the same allocator. // Allocate auto id may route to allocate row id, if row id allocator is nil, the program panic! for _, tp := range [2]autoid.AllocatorType{autoid.AutoIncrementType, autoid.RowIDAllocType} { - newAlloc := autoid.NewAllocator(b.store, dbInfo.ID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(), tp, tblVer, idCacheOpt) + newAlloc := autoid.NewAllocator(b.Requirement, dbInfo.ID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(), tp, tblVer, idCacheOpt) allocs = allocs.Append(newAlloc) } case model.ActionRebaseAutoRandomBase: - newAlloc := autoid.NewAllocator(b.store, dbInfo.ID, tblInfo.ID, tblInfo.IsAutoRandomBitColUnsigned(), autoid.AutoRandomType, tblVer) + newAlloc := autoid.NewAllocator(b.Requirement, dbInfo.ID, tblInfo.ID, tblInfo.IsAutoRandomBitColUnsigned(), autoid.AutoRandomType, tblVer) allocs = allocs.Append(newAlloc) case model.ActionModifyColumn: // Change column attribute from auto_increment to auto_random. @@ -769,7 +769,7 @@ func (b *Builder) applyCreateTable(m *meta.Meta, dbInfo *model.DBInfo, tableID i allocs = allocs.Filter(func(a autoid.Allocator) bool { return a.GetType() != autoid.AutoIncrementType && a.GetType() != autoid.RowIDAllocType }) - newAlloc := autoid.NewAllocator(b.store, dbInfo.ID, tblInfo.ID, tblInfo.IsAutoRandomBitColUnsigned(), autoid.AutoRandomType, tblVer) + newAlloc := autoid.NewAllocator(b.Requirement, dbInfo.ID, tblInfo.ID, tblInfo.IsAutoRandomBitColUnsigned(), autoid.AutoRandomType, tblVer) allocs = allocs.Append(newAlloc) } } @@ -1018,7 +1018,7 @@ func (b *Builder) createSchemaTablesForDB(di *model.DBInfo, tableFromMeta tableF b.is.schemaMap[di.Name.L] = schTbls for _, t := range di.Tables { - allocs := autoid.NewAllocatorsFromTblInfo(b.store, di.ID, t) + allocs := autoid.NewAllocatorsFromTblInfo(b.Requirement, di.ID, t) var tbl table.Table tbl, err := tableFromMeta(allocs, t) if err != nil { @@ -1054,9 +1054,9 @@ func RegisterVirtualTable(dbInfo *model.DBInfo, tableFromMeta tableFromMetaFunc) } // NewBuilder creates a new Builder with a Handle. -func NewBuilder(store kv.Storage, factory func() (pools.Resource, error)) *Builder { +func NewBuilder(r autoid.Requirement, factory func() (pools.Resource, error)) *Builder { return &Builder{ - store: store, + Requirement: r, is: &infoSchema{ schemaMap: map[string]*schemaTables{}, policyMap: map[string]*model.PolicyInfo{}, diff --git a/infoschema/infoschema_test.go b/infoschema/infoschema_test.go index a09c5a2fcde6d..558102016c0e3 100644 --- a/infoschema/infoschema_test.go +++ b/infoschema/infoschema_test.go @@ -37,6 +37,7 @@ import ( "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util" "github.com/stretchr/testify/require" + clientv3 "go.etcd.io/etcd/client/v3" ) func TestBasic(t *testing.T) { @@ -110,7 +111,7 @@ func TestBasic(t *testing.T) { }) require.NoError(t, err) - builder, err := infoschema.NewBuilder(dom.Store(), nil).InitWithDBInfos(dbInfos, nil, 1) + builder, err := infoschema.NewBuilder(dom, nil).InitWithDBInfos(dbInfos, nil, 1) require.NoError(t, err) txn, err := store.Begin() @@ -256,7 +257,7 @@ func TestInfoTables(t *testing.T) { require.NoError(t, err) }() - builder, err := infoschema.NewBuilder(store, nil).InitWithDBInfos(nil, nil, 0) + builder, err := infoschema.NewBuilder(mockRequirement{store}, nil).InitWithDBInfos(nil, nil, 0) require.NoError(t, err) is := builder.Build() @@ -332,7 +333,7 @@ func TestBuildSchemaWithGlobalTemporaryTable(t *testing.T) { err := kv.RunInNewTxn(ctx, store, true, func(ctx context.Context, txn kv.Transaction) error { m := meta.NewMeta(txn) for _, change := range changes { - builder := infoschema.NewBuilder(store, nil).InitWithOldInfoSchema(curIs) + builder := infoschema.NewBuilder(dom, nil).InitWithOldInfoSchema(curIs) change(m, builder) curIs = builder.Build() } @@ -410,7 +411,7 @@ func TestBuildSchemaWithGlobalTemporaryTable(t *testing.T) { // full load newDB, ok := newIS.SchemaByName(model.NewCIStr("test")) require.True(t, ok) - builder, err := infoschema.NewBuilder(store, nil).InitWithDBInfos([]*model.DBInfo{newDB}, newIS.AllPlacementPolicies(), newIS.SchemaMetaVersion()) + builder, err := infoschema.NewBuilder(dom, nil).InitWithDBInfos([]*model.DBInfo{newDB}, newIS.AllPlacementPolicies(), newIS.SchemaMetaVersion()) require.NoError(t, err) require.True(t, builder.Build().HasTemporaryTable()) @@ -535,7 +536,7 @@ func TestBuildBundle(t *testing.T) { assertBundle(is, tbl2.Meta().ID, nil) assertBundle(is, p1.ID, p1Bundle) - builder, err := infoschema.NewBuilder(store, nil).InitWithDBInfos([]*model.DBInfo{db}, is.AllPlacementPolicies(), is.SchemaMetaVersion()) + builder, err := infoschema.NewBuilder(dom, nil).InitWithDBInfos([]*model.DBInfo{db}, is.AllPlacementPolicies(), is.SchemaMetaVersion()) require.NoError(t, err) is2 := builder.Build() assertBundle(is2, tbl1.Meta().ID, tb1Bundle) @@ -543,6 +544,18 @@ func TestBuildBundle(t *testing.T) { assertBundle(is2, p1.ID, p1Bundle) } +type mockRequirement struct { + kv.Storage +} + +func (r mockRequirement) Store() kv.Storage { + return r.Storage +} + +func (r mockRequirement) GetEtcdClient() *clientv3.Client { + return nil +} + func TestLocalTemporaryTables(t *testing.T) { store, err := mockstore.NewMockStore() require.NoError(t, err) @@ -584,7 +597,7 @@ func TestLocalTemporaryTables(t *testing.T) { State: model.StatePublic, } - allocs := autoid.NewAllocatorsFromTblInfo(store, schemaID, tblInfo) + allocs := autoid.NewAllocatorsFromTblInfo(mockRequirement{store}, schemaID, tblInfo) tbl, err := table.TableFromMeta(allocs, tblInfo) require.NoError(t, err) diff --git a/meta/autoid/BUILD.bazel b/meta/autoid/BUILD.bazel index 50e53258f305b..0053d0cd6419a 100644 --- a/meta/autoid/BUILD.bazel +++ b/meta/autoid/BUILD.bazel @@ -11,7 +11,6 @@ go_library( importpath = "github.com/pingcap/tidb/meta/autoid", visibility = ["//visibility:public"], deps = [ - "//autoid_service", "//config", "//errno", "//kv", @@ -62,6 +61,7 @@ go_test( "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", "@com_github_stretchr_testify//require", + "@io_etcd_go_etcd_client_v3//:client", "@org_uber_go_goleak//:goleak", ], ) diff --git a/meta/autoid/autoid.go b/meta/autoid/autoid.go index 13a15c4b76a44..8eeeb12b42c55 100644 --- a/meta/autoid/autoid.go +++ b/meta/autoid/autoid.go @@ -26,7 +26,7 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/failpoint" - autoid "github.com/pingcap/tidb/autoid_service" + "github.com/pingcap/kvproto/pkg/autoid" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/metrics" @@ -558,37 +558,23 @@ func NextStep(curStep int64, consumeDur time.Duration) int64 { return res } -func newSinglePointAlloc(store kv.Storage, dbID, tblID int64, isUnsigned bool) *singlePointAlloc { - ebd, ok := store.(kv.EtcdBackend) - if !ok { - // newSinglePointAlloc fail because not etcd background - // This could happen in the server package unit test - return nil - } +// MockForTest is exported for testing. +// The actual implementation is in github.com/pingcap/tidb/autoid_service because of the +// package circle depending issue. +var MockForTest func(kv.Storage) autoid.AutoIDAllocClient - addrs, err := ebd.EtcdAddrs() - if err != nil { - panic(err) - } +func newSinglePointAlloc(r Requirement, dbID, tblID int64, isUnsigned bool) *singlePointAlloc { spa := &singlePointAlloc{ dbID: dbID, tblID: tblID, isUnsigned: isUnsigned, } - if len(addrs) > 0 { - etcdCli, err := clientv3.New(clientv3.Config{ - Endpoints: addrs, - AutoSyncInterval: 30 * time.Second, - TLS: ebd.TLSConfig(), - }) - if err != nil { - logutil.BgLogger().Error("[autoid client] fail to connect etcd, fallback to default", zap.Error(err)) - return nil - } - spa.clientDiscover = clientDiscover{etcdCli: etcdCli} - } else { + if r.GetEtcdClient() == nil { + // Only for test in mockstore spa.clientDiscover = clientDiscover{} - spa.mu.AutoIDAllocClient = autoid.MockForTest(store) + spa.mu.AutoIDAllocClient = MockForTest(r.Store()) + } else { + spa.clientDiscover = clientDiscover{etcdCli: r.GetEtcdClient()} } // mockAutoIDChange failpoint is not implemented in this allocator, so fallback to use the default one. @@ -600,9 +586,19 @@ func newSinglePointAlloc(store kv.Storage, dbID, tblID int64, isUnsigned bool) * return spa } +// Requirement is the parameter required by NewAllocator +type Requirement interface { + Store() kv.Storage + GetEtcdClient() *clientv3.Client +} + // NewAllocator returns a new auto increment id generator on the store. -func NewAllocator(store kv.Storage, dbID, tbID int64, isUnsigned bool, +func NewAllocator(r Requirement, dbID, tbID int64, isUnsigned bool, allocType AllocatorType, opts ...AllocOption) Allocator { + var store kv.Storage + if r != nil { + store = r.Store() + } alloc := &allocator{ store: store, dbID: dbID, @@ -619,7 +615,7 @@ func NewAllocator(store kv.Storage, dbID, tbID int64, isUnsigned bool, // Use the MySQL compatible AUTO_INCREMENT mode. if alloc.customStep && alloc.step == 1 && alloc.tbVersion >= model.TableInfoVersion5 { if allocType == AutoIncrementType { - alloc1 := newSinglePointAlloc(store, dbID, tbID, isUnsigned) + alloc1 := newSinglePointAlloc(r, dbID, tbID, isUnsigned) if alloc1 != nil { return alloc1 } @@ -649,7 +645,7 @@ func NewSequenceAllocator(store kv.Storage, dbID, tbID int64, info *model.Sequen } // NewAllocatorsFromTblInfo creates an array of allocators of different types with the information of model.TableInfo. -func NewAllocatorsFromTblInfo(store kv.Storage, schemaID int64, tblInfo *model.TableInfo) Allocators { +func NewAllocatorsFromTblInfo(r Requirement, schemaID int64, tblInfo *model.TableInfo) Allocators { var allocs []Allocator dbID := tblInfo.GetDBID(schemaID) idCacheOpt := CustomAutoIncCacheOption(tblInfo.AutoIdCache) @@ -658,20 +654,20 @@ func NewAllocatorsFromTblInfo(store kv.Storage, schemaID int64, tblInfo *model.T hasRowID := !tblInfo.PKIsHandle && !tblInfo.IsCommonHandle hasAutoIncID := tblInfo.GetAutoIncrementColInfo() != nil if hasRowID || hasAutoIncID { - alloc := NewAllocator(store, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(), RowIDAllocType, idCacheOpt, tblVer) + alloc := NewAllocator(r, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(), RowIDAllocType, idCacheOpt, tblVer) allocs = append(allocs, alloc) } if hasAutoIncID { - alloc := NewAllocator(store, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(), AutoIncrementType, idCacheOpt, tblVer) + alloc := NewAllocator(r, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(), AutoIncrementType, idCacheOpt, tblVer) allocs = append(allocs, alloc) } hasAutoRandID := tblInfo.ContainsAutoRandomBits() if hasAutoRandID { - alloc := NewAllocator(store, dbID, tblInfo.ID, tblInfo.IsAutoRandomBitColUnsigned(), AutoRandomType, idCacheOpt, tblVer) + alloc := NewAllocator(r, dbID, tblInfo.ID, tblInfo.IsAutoRandomBitColUnsigned(), AutoRandomType, idCacheOpt, tblVer) allocs = append(allocs, alloc) } if tblInfo.IsSequence() { - allocs = append(allocs, NewSequenceAllocator(store, dbID, tblInfo.ID, tblInfo.Sequence)) + allocs = append(allocs, NewSequenceAllocator(r.Store(), dbID, tblInfo.ID, tblInfo.Sequence)) } return NewAllocators(tblInfo.SepAutoInc(), allocs...) } diff --git a/meta/autoid/autoid_test.go b/meta/autoid/autoid_test.go index 0b8cd60257cf4..22e627f009861 100644 --- a/meta/autoid/autoid_test.go +++ b/meta/autoid/autoid_test.go @@ -32,8 +32,21 @@ import ( "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/util" "github.com/stretchr/testify/require" + clientv3 "go.etcd.io/etcd/client/v3" ) +type mockRequirement struct { + kv.Storage +} + +func (r mockRequirement) Store() kv.Storage { + return r.Storage +} + +func (r mockRequirement) GetEtcdClient() *clientv3.Client { + return nil +} + func TestSignedAutoid(t *testing.T) { require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/meta/autoid/mockAutoIDChange", `return(true)`)) defer func() { @@ -67,7 +80,7 @@ func TestSignedAutoid(t *testing.T) { require.NoError(t, err) // Since the test here is applicable to any type of allocators, autoid.RowIDAllocType is chosen. - alloc := autoid.NewAllocator(store, 1, 1, false, autoid.RowIDAllocType) + alloc := autoid.NewAllocator(mockRequirement{store}, 1, 1, false, autoid.RowIDAllocType) require.NotNil(t, alloc) globalAutoID, err := alloc.NextGlobalAutoID() @@ -105,13 +118,13 @@ func TestSignedAutoid(t *testing.T) { require.NoError(t, err) require.Equal(t, int64(3011), id) - alloc = autoid.NewAllocator(store, 1, 1, false, autoid.RowIDAllocType) + alloc = autoid.NewAllocator(mockRequirement{store}, 1, 1, false, autoid.RowIDAllocType) require.NotNil(t, alloc) _, id, err = alloc.Alloc(ctx, 1, 1, 1) require.NoError(t, err) require.Equal(t, autoid.GetStep()+1, id) - alloc = autoid.NewAllocator(store, 1, 2, false, autoid.RowIDAllocType) + alloc = autoid.NewAllocator(mockRequirement{store}, 1, 2, false, autoid.RowIDAllocType) require.NotNil(t, alloc) err = alloc.Rebase(context.Background(), int64(1), false) require.NoError(t, err) @@ -119,11 +132,11 @@ func TestSignedAutoid(t *testing.T) { require.NoError(t, err) require.Equal(t, int64(2), id) - alloc = autoid.NewAllocator(store, 1, 3, false, autoid.RowIDAllocType) + alloc = autoid.NewAllocator(mockRequirement{store}, 1, 3, false, autoid.RowIDAllocType) require.NotNil(t, alloc) err = alloc.Rebase(context.Background(), int64(3210), false) require.NoError(t, err) - alloc = autoid.NewAllocator(store, 1, 3, false, autoid.RowIDAllocType) + alloc = autoid.NewAllocator(mockRequirement{store}, 1, 3, false, autoid.RowIDAllocType) require.NotNil(t, alloc) err = alloc.Rebase(context.Background(), int64(3000), false) require.NoError(t, err) @@ -145,7 +158,7 @@ func TestSignedAutoid(t *testing.T) { require.NoError(t, err) // alloc N for signed - alloc = autoid.NewAllocator(store, 1, 4, false, autoid.RowIDAllocType) + alloc = autoid.NewAllocator(mockRequirement{store}, 1, 4, false, autoid.RowIDAllocType) require.NotNil(t, alloc) globalAutoID, err = alloc.NextGlobalAutoID() require.NoError(t, err) @@ -188,7 +201,7 @@ func TestSignedAutoid(t *testing.T) { require.Greater(t, min+1, lastRemainOne) // Test for increment & offset for signed. - alloc = autoid.NewAllocator(store, 1, 5, false, autoid.RowIDAllocType) + alloc = autoid.NewAllocator(mockRequirement{store}, 1, 5, false, autoid.RowIDAllocType) require.NotNil(t, alloc) increment := int64(2) @@ -271,7 +284,7 @@ func TestUnsignedAutoid(t *testing.T) { }) require.NoError(t, err) - alloc := autoid.NewAllocator(store, 1, 1, true, autoid.RowIDAllocType) + alloc := autoid.NewAllocator(mockRequirement{store}, 1, 1, true, autoid.RowIDAllocType) require.NotNil(t, alloc) globalAutoID, err := alloc.NextGlobalAutoID() @@ -309,13 +322,13 @@ func TestUnsignedAutoid(t *testing.T) { require.NoError(t, err) require.Equal(t, int64(3011), id) - alloc = autoid.NewAllocator(store, 1, 1, true, autoid.RowIDAllocType) + alloc = autoid.NewAllocator(mockRequirement{store}, 1, 1, true, autoid.RowIDAllocType) require.NotNil(t, alloc) _, id, err = alloc.Alloc(ctx, 1, 1, 1) require.NoError(t, err) require.Equal(t, autoid.GetStep()+1, id) - alloc = autoid.NewAllocator(store, 1, 2, true, autoid.RowIDAllocType) + alloc = autoid.NewAllocator(mockRequirement{store}, 1, 2, true, autoid.RowIDAllocType) require.NotNil(t, alloc) err = alloc.Rebase(context.Background(), int64(1), false) require.NoError(t, err) @@ -323,11 +336,11 @@ func TestUnsignedAutoid(t *testing.T) { require.NoError(t, err) require.Equal(t, int64(2), id) - alloc = autoid.NewAllocator(store, 1, 3, true, autoid.RowIDAllocType) + alloc = autoid.NewAllocator(mockRequirement{store}, 1, 3, true, autoid.RowIDAllocType) require.NotNil(t, alloc) err = alloc.Rebase(context.Background(), int64(3210), false) require.NoError(t, err) - alloc = autoid.NewAllocator(store, 1, 3, true, autoid.RowIDAllocType) + alloc = autoid.NewAllocator(mockRequirement{store}, 1, 3, true, autoid.RowIDAllocType) require.NotNil(t, alloc) err = alloc.Rebase(context.Background(), int64(3000), false) require.NoError(t, err) @@ -352,7 +365,7 @@ func TestUnsignedAutoid(t *testing.T) { require.NoError(t, err) // alloc N for unsigned - alloc = autoid.NewAllocator(store, 1, 4, true, autoid.RowIDAllocType) + alloc = autoid.NewAllocator(mockRequirement{store}, 1, 4, true, autoid.RowIDAllocType) require.NotNil(t, alloc) globalAutoID, err = alloc.NextGlobalAutoID() require.NoError(t, err) @@ -381,7 +394,7 @@ func TestUnsignedAutoid(t *testing.T) { require.Greater(t, min+1, lastRemainOne) // Test increment & offset for unsigned. Using AutoRandomType to avoid valid range check for increment and offset. - alloc = autoid.NewAllocator(store, 1, 5, true, autoid.AutoRandomType) + alloc = autoid.NewAllocator(mockRequirement{store}, 1, 5, true, autoid.AutoRandomType) require.NotNil(t, alloc) require.NoError(t, err) require.Equal(t, int64(1), globalAutoID) @@ -435,7 +448,7 @@ func TestConcurrentAlloc(t *testing.T) { allocIDs := func() { ctx := context.Background() - alloc := autoid.NewAllocator(store, dbID, tblID, false, autoid.RowIDAllocType) + alloc := autoid.NewAllocator(mockRequirement{store}, dbID, tblID, false, autoid.RowIDAllocType) for j := 0; j < int(autoid.GetStep())+5; j++ { _, id, err1 := alloc.Alloc(ctx, 1, 1, 1) if err1 != nil { @@ -516,7 +529,7 @@ func TestRollbackAlloc(t *testing.T) { injectConf := new(kv.InjectionConfig) injectConf.SetCommitError(errors.New("injected")) injectedStore := kv.NewInjectedStore(store, injectConf) - alloc := autoid.NewAllocator(injectedStore, 1, 2, false, autoid.RowIDAllocType) + alloc := autoid.NewAllocator(mockRequirement{injectedStore}, 1, 2, false, autoid.RowIDAllocType) _, _, err = alloc.Alloc(ctx, 1, 1, 1) require.Error(t, err) require.Equal(t, int64(0), alloc.Base()) @@ -566,11 +579,11 @@ func TestAllocComputationIssue(t *testing.T) { require.NoError(t, err) // Since the test here is applicable to any type of allocators, autoid.RowIDAllocType is chosen. - unsignedAlloc1 := autoid.NewAllocator(store, 1, 1, true, autoid.RowIDAllocType) + unsignedAlloc1 := autoid.NewAllocator(mockRequirement{store}, 1, 1, true, autoid.RowIDAllocType) require.NotNil(t, unsignedAlloc1) - signedAlloc1 := autoid.NewAllocator(store, 1, 1, false, autoid.RowIDAllocType) + signedAlloc1 := autoid.NewAllocator(mockRequirement{store}, 1, 1, false, autoid.RowIDAllocType) require.NotNil(t, signedAlloc1) - signedAlloc2 := autoid.NewAllocator(store, 1, 2, false, autoid.RowIDAllocType) + signedAlloc2 := autoid.NewAllocator(mockRequirement{store}, 1, 2, false, autoid.RowIDAllocType) require.NotNil(t, signedAlloc2) // the next valid two value must be 13 & 16, batch size = 6. diff --git a/meta/autoid/bench_test.go b/meta/autoid/bench_test.go index d8b489060875d..422f825c666c9 100644 --- a/meta/autoid/bench_test.go +++ b/meta/autoid/bench_test.go @@ -57,7 +57,7 @@ func BenchmarkAllocator_Alloc(b *testing.B) { if err != nil { return } - alloc := autoid.NewAllocator(store, 1, 2, false, autoid.RowIDAllocType) + alloc := autoid.NewAllocator(mockRequirement{store}, 1, 2, false, autoid.RowIDAllocType) b.StartTimer() for i := 0; i < b.N; i++ { _, _, err := alloc.Alloc(ctx, 1, 1, 1) diff --git a/planner/core/preprocess.go b/planner/core/preprocess.go index 4f726e2d8dce4..7ea1bf9fa3f8d 100644 --- a/planner/core/preprocess.go +++ b/planner/core/preprocess.go @@ -1835,7 +1835,8 @@ func tryLockMDLAndUpdateSchemaIfNecessary(sctx sessionctx.Context, dbName model. if !skipLock { sctx.GetSessionVars().GetRelatedTableForMDL().Store(tableInfo.ID, int64(0)) } - domainSchema := domain.GetDomain(sctx).InfoSchema() + dom := domain.GetDomain(sctx) + domainSchema := dom.InfoSchema() domainSchemaVer := domainSchema.SchemaMetaVersion() var err error tbl, err = domainSchema.TableByName(dbName, tableInfo.Name) @@ -1865,7 +1866,7 @@ func tryLockMDLAndUpdateSchemaIfNecessary(sctx sessionctx.Context, dbName model. } copyTableInfo.Indices[i].State = model.StateWriteReorganization dbInfo, _ := domainSchema.SchemaByName(dbName) - allocs := autoid.NewAllocatorsFromTblInfo(sctx.GetStore(), dbInfo.ID, copyTableInfo) + allocs := autoid.NewAllocatorsFromTblInfo(dom, dbInfo.ID, copyTableInfo) tbl, err = table.TableFromMeta(allocs, copyTableInfo) if err != nil { return nil, err