Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: share etcd client from domain for autoid allocator (#46647) #48310

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions autoid_service/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
"//config",
"//kv",
"//meta",
"//meta/autoid",
"//metrics",
"//owner",
"//parser/model",
Expand Down
7 changes: 6 additions & 1 deletion autoid_service/autoid.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
autoid1 "github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/owner"
"github.com/pingcap/tidb/parser/model"
Expand Down Expand Up @@ -299,7 +300,7 @@ func (m *mockClient) Rebase(ctx context.Context, in *autoid.RebaseRequest, opts
var global = make(map[string]*mockClient)

// MockForTest is used for testing, the UT test and unistore use this.
func MockForTest(store kv.Storage) *mockClient {
func MockForTest(store kv.Storage) autoid.AutoIDAllocClient {
uuid := store.UUID()
ret, ok := global[uuid]
if !ok {
Expand Down Expand Up @@ -515,3 +516,7 @@ func (s *Service) Rebase(ctx context.Context, req *autoid.RebaseRequest) (*autoi
}
return &autoid.RebaseResponse{}, nil
}

func init() {
autoid1.MockForTest = MockForTest
}
18 changes: 15 additions & 3 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,18 @@ func appendRanges(tbl *model.TableInfo, tblID int64) ([]kv.KeyRange, error) {
return retRanges, nil
}

type mockRequirement struct {
kv.Storage
}

func (r mockRequirement) Store() kv.Storage {
return r.Storage
}

func (r mockRequirement) AutoIDClient() *autoid.ClientDiscover {
return nil
}

// BuildBackupRangeAndSchema gets KV range and schema of tables.
// KV ranges are separated by Table IDs.
// Also, KV ranges are separated by Index IDs in the same table.
Expand Down Expand Up @@ -575,9 +587,9 @@ func BuildBackupRangeAndSchema(

autoIDAccess := m.GetAutoIDAccessors(dbInfo.ID, tableInfo.ID)
tblVer := autoid.AllocOptionTableInfoVersion(tableInfo.Version)
idAlloc := autoid.NewAllocator(storage, dbInfo.ID, tableInfo.ID, false, autoid.RowIDAllocType, tblVer)
seqAlloc := autoid.NewAllocator(storage, dbInfo.ID, tableInfo.ID, false, autoid.SequenceType, tblVer)
randAlloc := autoid.NewAllocator(storage, dbInfo.ID, tableInfo.ID, false, autoid.AutoRandomType, tblVer)
idAlloc := autoid.NewAllocator(mockRequirement{storage}, dbInfo.ID, tableInfo.ID, false, autoid.RowIDAllocType, tblVer)
seqAlloc := autoid.NewAllocator(mockRequirement{storage}, dbInfo.ID, tableInfo.ID, false, autoid.SequenceType, tblVer)
randAlloc := autoid.NewAllocator(mockRequirement{storage}, dbInfo.ID, tableInfo.ID, false, autoid.AutoRandomType, tblVer)

var globalAutoID int64
switch {
Expand Down
23 changes: 11 additions & 12 deletions br/pkg/lightning/restore/meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
verify "github.com/pingcap/tidb/br/pkg/lightning/verification"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/redact"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser/model"
"go.uber.org/zap"
Expand Down Expand Up @@ -255,10 +254,10 @@ func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, rawRowIDMax int64
if curStatus == metaStatusInitial {
if needAutoID {
// maxRowIDMax is the max row_id that other tasks has allocated, we need to rebase the global autoid base first.
if err := rebaseGlobalAutoID(ctx, maxRowIDMax, m.tr.kvStore, m.tr.dbInfo.ID, m.tr.tableInfo.Core); err != nil {
if err := rebaseGlobalAutoID(ctx, maxRowIDMax, m.tr, m.tr.dbInfo.ID, m.tr.tableInfo.Core); err != nil {
return errors.Trace(err)
}
newRowIDBase, newRowIDMax, err = allocGlobalAutoID(ctx, rawRowIDMax, m.tr.kvStore, m.tr.dbInfo.ID, m.tr.tableInfo.Core)
newRowIDBase, newRowIDMax, err = allocGlobalAutoID(ctx, rawRowIDMax, m.tr, m.tr.dbInfo.ID, m.tr.tableInfo.Core)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -1159,8 +1158,8 @@ func (m *singleTaskMetaMgr) CleanupAllMetas(ctx context.Context) error {
func (m *singleTaskMetaMgr) Close() {
}

func allocGlobalAutoID(ctx context.Context, n int64, store kv.Storage, dbID int64, tblInfo *model.TableInfo) (autoIDBase, autoIDMax int64, err error) {
allocators, err := getGlobalAutoIDAlloc(store, dbID, tblInfo)
func allocGlobalAutoID(ctx context.Context, n int64, r autoid.Requirement, dbID int64, tblInfo *model.TableInfo) (autoIDBase, autoIDMax int64, err error) {
allocators, err := getGlobalAutoIDAlloc(r, dbID, tblInfo)
if err != nil {
return 0, 0, err
}
Expand All @@ -1177,8 +1176,8 @@ func allocGlobalAutoID(ctx context.Context, n int64, store kv.Storage, dbID int6
return
}

func rebaseGlobalAutoID(ctx context.Context, newBase int64, store kv.Storage, dbID int64, tblInfo *model.TableInfo) error {
allocators, err := getGlobalAutoIDAlloc(store, dbID, tblInfo)
func rebaseGlobalAutoID(ctx context.Context, newBase int64, r autoid.Requirement, dbID int64, tblInfo *model.TableInfo) error {
allocators, err := getGlobalAutoIDAlloc(r, dbID, tblInfo)
if err != nil {
return err
}
Expand All @@ -1191,8 +1190,8 @@ func rebaseGlobalAutoID(ctx context.Context, newBase int64, store kv.Storage, db
return nil
}

func getGlobalAutoIDAlloc(store kv.Storage, dbID int64, tblInfo *model.TableInfo) ([]autoid.Allocator, error) {
if store == nil {
func getGlobalAutoIDAlloc(r autoid.Requirement, dbID int64, tblInfo *model.TableInfo) ([]autoid.Allocator, error) {
if r == nil || r.Store() == nil {
return nil, errors.New("internal error: kv store should not be nil")
}
if dbID == 0 {
Expand Down Expand Up @@ -1224,15 +1223,15 @@ func getGlobalAutoIDAlloc(store kv.Storage, dbID int64, tblInfo *model.TableInfo
case hasRowID || hasAutoIncID:
allocators := make([]autoid.Allocator, 0, 2)
if tblInfo.SepAutoInc() && hasAutoIncID {
allocators = append(allocators, autoid.NewAllocator(store, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(),
allocators = append(allocators, autoid.NewAllocator(r, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(),
autoid.AutoIncrementType, noCache, tblVer))
}
// this allocator is NOT used when SepAutoInc=true and auto increment column is clustered.
allocators = append(allocators, autoid.NewAllocator(store, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(),
allocators = append(allocators, autoid.NewAllocator(r, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(),
autoid.RowIDAllocType, noCache, tblVer))
return allocators, nil
case hasAutoRandID:
return []autoid.Allocator{autoid.NewAllocator(store, dbID, tblInfo.ID, tblInfo.IsAutoRandomBitColUnsigned(),
return []autoid.Allocator{autoid.NewAllocator(r, dbID, tblInfo.ID, tblInfo.IsAutoRandomBitColUnsigned(),
autoid.AutoRandomType, noCache, tblVer)}, nil
default:
return nil, errors.Errorf("internal error: table %s has no auto ID", tblInfo.Name)
Expand Down
20 changes: 16 additions & 4 deletions br/pkg/lightning/restore/meta_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func (s *metaMgrSuite) prepareMockInner(rowsVal [][]driver.Value, nextRowID *int
WillReturnRows(rows)

if nextRowID != nil {
allocs := autoid.NewAllocatorsFromTblInfo(s.mgr.tr.kvStore, s.mgr.tr.dbInfo.ID, s.mgr.tr.tableInfo.Core)
allocs := autoid.NewAllocatorsFromTblInfo(s.mgr.tr, s.mgr.tr.dbInfo.ID, s.mgr.tr.tableInfo.Core)
alloc := allocs.Get(autoid.RowIDAllocType)
alloc.ForceRebase(*nextRowID - 1)
}
Expand Down Expand Up @@ -480,6 +480,18 @@ func newTableInfo2(t *testing.T,
return tableInfo
}

type mockRequirement struct {
kv.Storage
}

func (r mockRequirement) Store() kv.Storage {
return r.Storage
}

func (r mockRequirement) AutoIDClient() *autoid.ClientDiscover {
return nil
}

func TestAllocGlobalAutoID(t *testing.T) {
storePath := t.TempDir()
kvStore, err := mockstore.NewMockStore(mockstore.WithPath(storePath))
Expand Down Expand Up @@ -557,11 +569,11 @@ func TestAllocGlobalAutoID(t *testing.T) {
ctx := context.Background()
for _, c := range cases {
ti := newTableInfo2(t, 1, c.tableID, c.createTableSQL, kvStore)
allocators, err := getGlobalAutoIDAlloc(kvStore, 1, ti)
allocators, err := getGlobalAutoIDAlloc(mockRequirement{kvStore}, 1, ti)
if c.expectErrStr == "" {
require.NoError(t, err, c.tableID)
require.NoError(t, rebaseGlobalAutoID(ctx, 123, kvStore, 1, ti))
base, idMax, err := allocGlobalAutoID(ctx, 100, kvStore, 1, ti)
require.NoError(t, rebaseGlobalAutoID(ctx, 123, mockRequirement{kvStore}, 1, ti))
base, idMax, err := allocGlobalAutoID(ctx, 100, mockRequirement{kvStore}, 1, ti)
require.NoError(t, err, c.tableID)
require.Equal(t, int64(123), base, c.tableID)
require.Equal(t, int64(223), idMax, c.tableID)
Expand Down
9 changes: 8 additions & 1 deletion br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ import (
"github.com/pingcap/tidb/util/set"
tikvconfig "github.com/tikv/client-go/v2/config"
pd "github.com/tikv/pd/client"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/atomic"
"go.uber.org/multierr"
"go.uber.org/zap"
Expand Down Expand Up @@ -1473,6 +1474,7 @@ func (rc *Controller) restoreTables(ctx context.Context) (finalErr error) {
cleanup := false
postProgress := func() error { return nil }
var kvStore tidbkv.Storage
var etcdCli *clientv3.Client

if isLocalBackend(rc.cfg) {
var (
Expand Down Expand Up @@ -1580,6 +1582,11 @@ func (rc *Controller) restoreTables(ctx context.Context) (finalErr error) {
logTask.Warn("failed to close kv store", zap.Error(err))
}
}
if etcdCli != nil {
if err := etcdCli.Close(); err != nil {
logTask.Warn("failed to close etcd client", zap.Error(err))
}
}
}()

taskCh := make(chan task, rc.cfg.App.IndexConcurrency)
Expand Down Expand Up @@ -1629,7 +1636,7 @@ func (rc *Controller) restoreTables(ctx context.Context) (finalErr error) {
if err != nil {
return errors.Trace(err)
}
tr, err := NewTableRestore(tableName, tableMeta, dbInfo, tableInfo, cp, igCols.ColumnsMap(), kvStore, log.FromContext(ctx))
tr, err := NewTableRestore(tableName, tableMeta, dbInfo, tableInfo, cp, igCols.ColumnsMap(), kvStore, etcdCli, log.FromContext(ctx))
if err != nil {
return errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/restore/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestNewTableRestore(t *testing.T) {
for _, tc := range testCases {
tableInfo := dbInfo.Tables[tc.name]
tableName := common.UniqueTable("mockdb", tableInfo.Name)
tr, err := NewTableRestore(tableName, nil, dbInfo, tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, log.L())
tr, err := NewTableRestore(tableName, nil, dbInfo, tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, nil, log.L())
require.NotNil(t, tr)
require.NoError(t, err)
}
Expand All @@ -91,7 +91,7 @@ func TestNewTableRestoreFailure(t *testing.T) {
}}
tableName := common.UniqueTable("mockdb", "failure")

_, err := NewTableRestore(tableName, nil, dbInfo, tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, log.L())
_, err := NewTableRestore(tableName, nil, dbInfo, tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, nil, log.L())
require.Regexp(t, `failed to tables\.TableFromMeta.*`, err.Error())
}

Expand Down
18 changes: 17 additions & 1 deletion br/pkg/lightning/restore/table_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/util/mathutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/multierr"
"go.uber.org/zap"
"golang.org/x/exp/slices"
Expand All @@ -58,6 +59,8 @@ type TableRestore struct {
alloc autoid.Allocators
logger log.Logger
kvStore tidbkv.Storage
etcdCli *clientv3.Client
autoidCli *autoid.ClientDiscover

ignoreColumns map[string]struct{}
}
Expand All @@ -70,13 +73,15 @@ func NewTableRestore(
cp *checkpoints.TableCheckpoint,
ignoreColumns map[string]struct{},
kvStore tidbkv.Storage,
etcdCli *clientv3.Client,
logger log.Logger,
) (*TableRestore, error) {
idAlloc := kv.NewPanickingAllocators(cp.AllocBase)
tbl, err := tables.TableFromMeta(idAlloc, tableInfo.Core)
if err != nil {
return nil, errors.Annotatef(err, "failed to tables.TableFromMeta %s", tableName)
}
autoidCli := autoid.NewClientDiscover(etcdCli)

return &TableRestore{
tableName: tableName,
Expand All @@ -86,11 +91,21 @@ func NewTableRestore(
encTable: tbl,
alloc: idAlloc,
kvStore: kvStore,
etcdCli: etcdCli,
autoidCli: autoidCli,
logger: logger.With(zap.String("table", tableName)),
ignoreColumns: ignoreColumns,
}, nil
}

func (tr *TableRestore) Store() tidbkv.Storage {
return tr.kvStore
}

func (tr *TableRestore) AutoIDClient() *autoid.ClientDiscover {
return tr.autoidCli
}

func (tr *TableRestore) Close() {
tr.encTable = nil
tr.logger.Info("restore done")
Expand Down Expand Up @@ -146,6 +161,7 @@ func (tr *TableRestore) populateChunks(ctx context.Context, rc *Controller, cp *
return err
}

// RebaseChunkRowIDs rebase the row id of the chunks.
func (tr *TableRestore) RebaseChunkRowIDs(cp *checkpoints.TableCheckpoint, rowIDBase int64) {
if rowIDBase == 0 {
return
Expand Down Expand Up @@ -758,7 +774,7 @@ func (tr *TableRestore) postProcess(
// And in this case, ALTER TABLE xxx AUTO_INCREMENT = xxx only works on the allocator of auto_increment column,
// not for allocator of _tidb_rowid.
// So we need to rebase IDs for those 2 allocators explicitly.
err = rebaseGlobalAutoID(ctx, adjustIDBase(newBase), tr.kvStore, tr.dbInfo.ID, tr.tableInfo.Core)
err = rebaseGlobalAutoID(ctx, adjustIDBase(newBase), tr, tr.dbInfo.ID, tr.tableInfo.Core)
}
}
rc.alterTableLock.Unlock()
Expand Down
6 changes: 3 additions & 3 deletions br/pkg/lightning/restore/table_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (s *tableRestoreSuiteBase) setupSuite(t *testing.T) {
func (s *tableRestoreSuiteBase) setupTest(t *testing.T) {
// Collect into the test TableRestore structure
var err error
s.tr, err = NewTableRestore("`db`.`table`", s.tableMeta, s.dbInfo, s.tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, log.L())
s.tr, err = NewTableRestore("`db`.`table`", s.tableMeta, s.dbInfo, s.tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, nil, log.L())
require.NoError(t, err)

s.cfg = config.NewConfig()
Expand Down Expand Up @@ -458,7 +458,7 @@ func (s *tableRestoreSuite) TestPopulateChunksCSVHeader() {
cfg.Mydumper.StrictFormat = true
rc := &Controller{cfg: cfg, ioWorkers: worker.NewPool(context.Background(), 1, "io"), store: store}

tr, err := NewTableRestore("`db`.`table`", tableMeta, s.dbInfo, s.tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, log.L())
tr, err := NewTableRestore("`db`.`table`", tableMeta, s.dbInfo, s.tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, nil, log.L())
require.NoError(s.T(), err)
require.NoError(s.T(), tr.populateChunks(context.Background(), rc, cp))

Expand Down Expand Up @@ -723,7 +723,7 @@ func (s *tableRestoreSuite) TestInitializeColumnsGenerated() {
require.NoError(s.T(), err)
core.State = model.StatePublic
tableInfo := &checkpoints.TidbTableInfo{Name: "table", DB: "db", Core: core}
s.tr, err = NewTableRestore("`db`.`table`", s.tableMeta, s.dbInfo, tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, log.L())
s.tr, err = NewTableRestore("`db`.`table`", s.tableMeta, s.dbInfo, tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, nil, log.L())
require.NoError(s.T(), err)
ccp := &checkpoints.ChunkCheckpoint{}

Expand Down
8 changes: 4 additions & 4 deletions br/pkg/restore/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestRestoreAutoIncID(t *testing.T) {
DB: dbInfo,
}
// Get the next AutoIncID
idAlloc := autoid.NewAllocator(s.mock.Storage, dbInfo.ID, table.Info.ID, false, autoid.RowIDAllocType)
idAlloc := autoid.NewAllocator(s.mock.Domain, dbInfo.ID, table.Info.ID, false, autoid.RowIDAllocType)
globalAutoID, err := idAlloc.NextGlobalAutoID()
require.NoErrorf(t, err, "Error allocate next auto id")
require.Equal(t, uint64(globalAutoID), autoIncID)
Expand Down Expand Up @@ -380,7 +380,7 @@ func TestGetExistedUserDBs(t *testing.T) {
dbs := restore.GetExistedUserDBs(dom)
require.Equal(t, 0, len(dbs))

builder, err := infoschema.NewBuilder(m.Store(), nil).InitWithDBInfos(
builder, err := infoschema.NewBuilder(dom, nil).InitWithDBInfos(
[]*model.DBInfo{
{Name: model.NewCIStr("mysql")},
{Name: model.NewCIStr("test")},
Expand All @@ -391,7 +391,7 @@ func TestGetExistedUserDBs(t *testing.T) {
dbs = restore.GetExistedUserDBs(dom)
require.Equal(t, 0, len(dbs))

builder, err = infoschema.NewBuilder(m.Store(), nil).InitWithDBInfos(
builder, err = infoschema.NewBuilder(dom, nil).InitWithDBInfos(
[]*model.DBInfo{
{Name: model.NewCIStr("mysql")},
{Name: model.NewCIStr("test")},
Expand All @@ -403,7 +403,7 @@ func TestGetExistedUserDBs(t *testing.T) {
dbs = restore.GetExistedUserDBs(dom)
require.Equal(t, 1, len(dbs))

builder, err = infoschema.NewBuilder(m.Store(), nil).InitWithDBInfos(
builder, err = infoschema.NewBuilder(dom, nil).InitWithDBInfos(
[]*model.DBInfo{
{Name: model.NewCIStr("mysql")},
{Name: model.NewCIStr("d1")},
Expand Down
1 change: 1 addition & 0 deletions ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ go_test(
flaky = True,
shard_count = 50,
deps = [
"//autoid_service",
"//config",
"//ddl/ingest",
"//ddl/placement",
Expand Down
Loading