Skip to content

Commit

Permalink
*: share etcd client from domain for autoid allocator (#46647) (#48310)
Browse files Browse the repository at this point in the history
close #46324
  • Loading branch information
ti-chi-bot authored Nov 27, 2023
1 parent a2aeb3b commit c2bea02
Show file tree
Hide file tree
Showing 46 changed files with 289 additions and 156 deletions.
1 change: 1 addition & 0 deletions autoid_service/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
"//config",
"//kv",
"//meta",
"//meta/autoid",
"//metrics",
"//owner",
"//parser/model",
Expand Down
7 changes: 6 additions & 1 deletion autoid_service/autoid.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
autoid1 "github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/owner"
"github.com/pingcap/tidb/parser/model"
Expand Down Expand Up @@ -299,7 +300,7 @@ func (m *mockClient) Rebase(ctx context.Context, in *autoid.RebaseRequest, opts
var global = make(map[string]*mockClient)

// MockForTest is used for testing, the UT test and unistore use this.
func MockForTest(store kv.Storage) *mockClient {
func MockForTest(store kv.Storage) autoid.AutoIDAllocClient {
uuid := store.UUID()
ret, ok := global[uuid]
if !ok {
Expand Down Expand Up @@ -515,3 +516,7 @@ func (s *Service) Rebase(ctx context.Context, req *autoid.RebaseRequest) (*autoi
}
return &autoid.RebaseResponse{}, nil
}

func init() {
autoid1.MockForTest = MockForTest
}
18 changes: 15 additions & 3 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,18 @@ func appendRanges(tbl *model.TableInfo, tblID int64) ([]kv.KeyRange, error) {
return retRanges, nil
}

type mockRequirement struct {
kv.Storage
}

func (r mockRequirement) Store() kv.Storage {
return r.Storage
}

func (r mockRequirement) AutoIDClient() *autoid.ClientDiscover {
return nil
}

// BuildBackupRangeAndSchema gets KV range and schema of tables.
// KV ranges are separated by Table IDs.
// Also, KV ranges are separated by Index IDs in the same table.
Expand Down Expand Up @@ -575,9 +587,9 @@ func BuildBackupRangeAndSchema(

autoIDAccess := m.GetAutoIDAccessors(dbInfo.ID, tableInfo.ID)
tblVer := autoid.AllocOptionTableInfoVersion(tableInfo.Version)
idAlloc := autoid.NewAllocator(storage, dbInfo.ID, tableInfo.ID, false, autoid.RowIDAllocType, tblVer)
seqAlloc := autoid.NewAllocator(storage, dbInfo.ID, tableInfo.ID, false, autoid.SequenceType, tblVer)
randAlloc := autoid.NewAllocator(storage, dbInfo.ID, tableInfo.ID, false, autoid.AutoRandomType, tblVer)
idAlloc := autoid.NewAllocator(mockRequirement{storage}, dbInfo.ID, tableInfo.ID, false, autoid.RowIDAllocType, tblVer)
seqAlloc := autoid.NewAllocator(mockRequirement{storage}, dbInfo.ID, tableInfo.ID, false, autoid.SequenceType, tblVer)
randAlloc := autoid.NewAllocator(mockRequirement{storage}, dbInfo.ID, tableInfo.ID, false, autoid.AutoRandomType, tblVer)

var globalAutoID int64
switch {
Expand Down
23 changes: 11 additions & 12 deletions br/pkg/lightning/restore/meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
verify "github.com/pingcap/tidb/br/pkg/lightning/verification"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/redact"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser/model"
"go.uber.org/zap"
Expand Down Expand Up @@ -255,10 +254,10 @@ func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, rawRowIDMax int64
if curStatus == metaStatusInitial {
if needAutoID {
// maxRowIDMax is the max row_id that other tasks has allocated, we need to rebase the global autoid base first.
if err := rebaseGlobalAutoID(ctx, maxRowIDMax, m.tr.kvStore, m.tr.dbInfo.ID, m.tr.tableInfo.Core); err != nil {
if err := rebaseGlobalAutoID(ctx, maxRowIDMax, m.tr, m.tr.dbInfo.ID, m.tr.tableInfo.Core); err != nil {
return errors.Trace(err)
}
newRowIDBase, newRowIDMax, err = allocGlobalAutoID(ctx, rawRowIDMax, m.tr.kvStore, m.tr.dbInfo.ID, m.tr.tableInfo.Core)
newRowIDBase, newRowIDMax, err = allocGlobalAutoID(ctx, rawRowIDMax, m.tr, m.tr.dbInfo.ID, m.tr.tableInfo.Core)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -1159,8 +1158,8 @@ func (m *singleTaskMetaMgr) CleanupAllMetas(ctx context.Context) error {
func (m *singleTaskMetaMgr) Close() {
}

func allocGlobalAutoID(ctx context.Context, n int64, store kv.Storage, dbID int64, tblInfo *model.TableInfo) (autoIDBase, autoIDMax int64, err error) {
allocators, err := getGlobalAutoIDAlloc(store, dbID, tblInfo)
func allocGlobalAutoID(ctx context.Context, n int64, r autoid.Requirement, dbID int64, tblInfo *model.TableInfo) (autoIDBase, autoIDMax int64, err error) {
allocators, err := getGlobalAutoIDAlloc(r, dbID, tblInfo)
if err != nil {
return 0, 0, err
}
Expand All @@ -1177,8 +1176,8 @@ func allocGlobalAutoID(ctx context.Context, n int64, store kv.Storage, dbID int6
return
}

func rebaseGlobalAutoID(ctx context.Context, newBase int64, store kv.Storage, dbID int64, tblInfo *model.TableInfo) error {
allocators, err := getGlobalAutoIDAlloc(store, dbID, tblInfo)
func rebaseGlobalAutoID(ctx context.Context, newBase int64, r autoid.Requirement, dbID int64, tblInfo *model.TableInfo) error {
allocators, err := getGlobalAutoIDAlloc(r, dbID, tblInfo)
if err != nil {
return err
}
Expand All @@ -1191,8 +1190,8 @@ func rebaseGlobalAutoID(ctx context.Context, newBase int64, store kv.Storage, db
return nil
}

func getGlobalAutoIDAlloc(store kv.Storage, dbID int64, tblInfo *model.TableInfo) ([]autoid.Allocator, error) {
if store == nil {
func getGlobalAutoIDAlloc(r autoid.Requirement, dbID int64, tblInfo *model.TableInfo) ([]autoid.Allocator, error) {
if r == nil || r.Store() == nil {
return nil, errors.New("internal error: kv store should not be nil")
}
if dbID == 0 {
Expand Down Expand Up @@ -1224,15 +1223,15 @@ func getGlobalAutoIDAlloc(store kv.Storage, dbID int64, tblInfo *model.TableInfo
case hasRowID || hasAutoIncID:
allocators := make([]autoid.Allocator, 0, 2)
if tblInfo.SepAutoInc() && hasAutoIncID {
allocators = append(allocators, autoid.NewAllocator(store, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(),
allocators = append(allocators, autoid.NewAllocator(r, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(),
autoid.AutoIncrementType, noCache, tblVer))
}
// this allocator is NOT used when SepAutoInc=true and auto increment column is clustered.
allocators = append(allocators, autoid.NewAllocator(store, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(),
allocators = append(allocators, autoid.NewAllocator(r, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(),
autoid.RowIDAllocType, noCache, tblVer))
return allocators, nil
case hasAutoRandID:
return []autoid.Allocator{autoid.NewAllocator(store, dbID, tblInfo.ID, tblInfo.IsAutoRandomBitColUnsigned(),
return []autoid.Allocator{autoid.NewAllocator(r, dbID, tblInfo.ID, tblInfo.IsAutoRandomBitColUnsigned(),
autoid.AutoRandomType, noCache, tblVer)}, nil
default:
return nil, errors.Errorf("internal error: table %s has no auto ID", tblInfo.Name)
Expand Down
20 changes: 16 additions & 4 deletions br/pkg/lightning/restore/meta_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func (s *metaMgrSuite) prepareMockInner(rowsVal [][]driver.Value, nextRowID *int
WillReturnRows(rows)

if nextRowID != nil {
allocs := autoid.NewAllocatorsFromTblInfo(s.mgr.tr.kvStore, s.mgr.tr.dbInfo.ID, s.mgr.tr.tableInfo.Core)
allocs := autoid.NewAllocatorsFromTblInfo(s.mgr.tr, s.mgr.tr.dbInfo.ID, s.mgr.tr.tableInfo.Core)
alloc := allocs.Get(autoid.RowIDAllocType)
alloc.ForceRebase(*nextRowID - 1)
}
Expand Down Expand Up @@ -480,6 +480,18 @@ func newTableInfo2(t *testing.T,
return tableInfo
}

type mockRequirement struct {
kv.Storage
}

func (r mockRequirement) Store() kv.Storage {
return r.Storage
}

func (r mockRequirement) AutoIDClient() *autoid.ClientDiscover {
return nil
}

func TestAllocGlobalAutoID(t *testing.T) {
storePath := t.TempDir()
kvStore, err := mockstore.NewMockStore(mockstore.WithPath(storePath))
Expand Down Expand Up @@ -557,11 +569,11 @@ func TestAllocGlobalAutoID(t *testing.T) {
ctx := context.Background()
for _, c := range cases {
ti := newTableInfo2(t, 1, c.tableID, c.createTableSQL, kvStore)
allocators, err := getGlobalAutoIDAlloc(kvStore, 1, ti)
allocators, err := getGlobalAutoIDAlloc(mockRequirement{kvStore}, 1, ti)
if c.expectErrStr == "" {
require.NoError(t, err, c.tableID)
require.NoError(t, rebaseGlobalAutoID(ctx, 123, kvStore, 1, ti))
base, idMax, err := allocGlobalAutoID(ctx, 100, kvStore, 1, ti)
require.NoError(t, rebaseGlobalAutoID(ctx, 123, mockRequirement{kvStore}, 1, ti))
base, idMax, err := allocGlobalAutoID(ctx, 100, mockRequirement{kvStore}, 1, ti)
require.NoError(t, err, c.tableID)
require.Equal(t, int64(123), base, c.tableID)
require.Equal(t, int64(223), idMax, c.tableID)
Expand Down
9 changes: 8 additions & 1 deletion br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ import (
"github.com/pingcap/tidb/util/set"
tikvconfig "github.com/tikv/client-go/v2/config"
pd "github.com/tikv/pd/client"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/atomic"
"go.uber.org/multierr"
"go.uber.org/zap"
Expand Down Expand Up @@ -1473,6 +1474,7 @@ func (rc *Controller) restoreTables(ctx context.Context) (finalErr error) {
cleanup := false
postProgress := func() error { return nil }
var kvStore tidbkv.Storage
var etcdCli *clientv3.Client

if isLocalBackend(rc.cfg) {
var (
Expand Down Expand Up @@ -1580,6 +1582,11 @@ func (rc *Controller) restoreTables(ctx context.Context) (finalErr error) {
logTask.Warn("failed to close kv store", zap.Error(err))
}
}
if etcdCli != nil {
if err := etcdCli.Close(); err != nil {
logTask.Warn("failed to close etcd client", zap.Error(err))
}
}
}()

taskCh := make(chan task, rc.cfg.App.IndexConcurrency)
Expand Down Expand Up @@ -1629,7 +1636,7 @@ func (rc *Controller) restoreTables(ctx context.Context) (finalErr error) {
if err != nil {
return errors.Trace(err)
}
tr, err := NewTableRestore(tableName, tableMeta, dbInfo, tableInfo, cp, igCols.ColumnsMap(), kvStore, log.FromContext(ctx))
tr, err := NewTableRestore(tableName, tableMeta, dbInfo, tableInfo, cp, igCols.ColumnsMap(), kvStore, etcdCli, log.FromContext(ctx))
if err != nil {
return errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/restore/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestNewTableRestore(t *testing.T) {
for _, tc := range testCases {
tableInfo := dbInfo.Tables[tc.name]
tableName := common.UniqueTable("mockdb", tableInfo.Name)
tr, err := NewTableRestore(tableName, nil, dbInfo, tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, log.L())
tr, err := NewTableRestore(tableName, nil, dbInfo, tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, nil, log.L())
require.NotNil(t, tr)
require.NoError(t, err)
}
Expand All @@ -91,7 +91,7 @@ func TestNewTableRestoreFailure(t *testing.T) {
}}
tableName := common.UniqueTable("mockdb", "failure")

_, err := NewTableRestore(tableName, nil, dbInfo, tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, log.L())
_, err := NewTableRestore(tableName, nil, dbInfo, tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, nil, log.L())
require.Regexp(t, `failed to tables\.TableFromMeta.*`, err.Error())
}

Expand Down
18 changes: 17 additions & 1 deletion br/pkg/lightning/restore/table_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/util/mathutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/multierr"
"go.uber.org/zap"
"golang.org/x/exp/slices"
Expand All @@ -58,6 +59,8 @@ type TableRestore struct {
alloc autoid.Allocators
logger log.Logger
kvStore tidbkv.Storage
etcdCli *clientv3.Client
autoidCli *autoid.ClientDiscover

ignoreColumns map[string]struct{}
}
Expand All @@ -70,13 +73,15 @@ func NewTableRestore(
cp *checkpoints.TableCheckpoint,
ignoreColumns map[string]struct{},
kvStore tidbkv.Storage,
etcdCli *clientv3.Client,
logger log.Logger,
) (*TableRestore, error) {
idAlloc := kv.NewPanickingAllocators(cp.AllocBase)
tbl, err := tables.TableFromMeta(idAlloc, tableInfo.Core)
if err != nil {
return nil, errors.Annotatef(err, "failed to tables.TableFromMeta %s", tableName)
}
autoidCli := autoid.NewClientDiscover(etcdCli)

return &TableRestore{
tableName: tableName,
Expand All @@ -86,11 +91,21 @@ func NewTableRestore(
encTable: tbl,
alloc: idAlloc,
kvStore: kvStore,
etcdCli: etcdCli,
autoidCli: autoidCli,
logger: logger.With(zap.String("table", tableName)),
ignoreColumns: ignoreColumns,
}, nil
}

func (tr *TableRestore) Store() tidbkv.Storage {
return tr.kvStore
}

func (tr *TableRestore) AutoIDClient() *autoid.ClientDiscover {
return tr.autoidCli
}

func (tr *TableRestore) Close() {
tr.encTable = nil
tr.logger.Info("restore done")
Expand Down Expand Up @@ -146,6 +161,7 @@ func (tr *TableRestore) populateChunks(ctx context.Context, rc *Controller, cp *
return err
}

// RebaseChunkRowIDs rebase the row id of the chunks.
func (tr *TableRestore) RebaseChunkRowIDs(cp *checkpoints.TableCheckpoint, rowIDBase int64) {
if rowIDBase == 0 {
return
Expand Down Expand Up @@ -758,7 +774,7 @@ func (tr *TableRestore) postProcess(
// And in this case, ALTER TABLE xxx AUTO_INCREMENT = xxx only works on the allocator of auto_increment column,
// not for allocator of _tidb_rowid.
// So we need to rebase IDs for those 2 allocators explicitly.
err = rebaseGlobalAutoID(ctx, adjustIDBase(newBase), tr.kvStore, tr.dbInfo.ID, tr.tableInfo.Core)
err = rebaseGlobalAutoID(ctx, adjustIDBase(newBase), tr, tr.dbInfo.ID, tr.tableInfo.Core)
}
}
rc.alterTableLock.Unlock()
Expand Down
6 changes: 3 additions & 3 deletions br/pkg/lightning/restore/table_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (s *tableRestoreSuiteBase) setupSuite(t *testing.T) {
func (s *tableRestoreSuiteBase) setupTest(t *testing.T) {
// Collect into the test TableRestore structure
var err error
s.tr, err = NewTableRestore("`db`.`table`", s.tableMeta, s.dbInfo, s.tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, log.L())
s.tr, err = NewTableRestore("`db`.`table`", s.tableMeta, s.dbInfo, s.tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, nil, log.L())
require.NoError(t, err)

s.cfg = config.NewConfig()
Expand Down Expand Up @@ -458,7 +458,7 @@ func (s *tableRestoreSuite) TestPopulateChunksCSVHeader() {
cfg.Mydumper.StrictFormat = true
rc := &Controller{cfg: cfg, ioWorkers: worker.NewPool(context.Background(), 1, "io"), store: store}

tr, err := NewTableRestore("`db`.`table`", tableMeta, s.dbInfo, s.tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, log.L())
tr, err := NewTableRestore("`db`.`table`", tableMeta, s.dbInfo, s.tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, nil, log.L())
require.NoError(s.T(), err)
require.NoError(s.T(), tr.populateChunks(context.Background(), rc, cp))

Expand Down Expand Up @@ -723,7 +723,7 @@ func (s *tableRestoreSuite) TestInitializeColumnsGenerated() {
require.NoError(s.T(), err)
core.State = model.StatePublic
tableInfo := &checkpoints.TidbTableInfo{Name: "table", DB: "db", Core: core}
s.tr, err = NewTableRestore("`db`.`table`", s.tableMeta, s.dbInfo, tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, log.L())
s.tr, err = NewTableRestore("`db`.`table`", s.tableMeta, s.dbInfo, tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, nil, log.L())
require.NoError(s.T(), err)
ccp := &checkpoints.ChunkCheckpoint{}

Expand Down
8 changes: 4 additions & 4 deletions br/pkg/restore/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestRestoreAutoIncID(t *testing.T) {
DB: dbInfo,
}
// Get the next AutoIncID
idAlloc := autoid.NewAllocator(s.mock.Storage, dbInfo.ID, table.Info.ID, false, autoid.RowIDAllocType)
idAlloc := autoid.NewAllocator(s.mock.Domain, dbInfo.ID, table.Info.ID, false, autoid.RowIDAllocType)
globalAutoID, err := idAlloc.NextGlobalAutoID()
require.NoErrorf(t, err, "Error allocate next auto id")
require.Equal(t, uint64(globalAutoID), autoIncID)
Expand Down Expand Up @@ -380,7 +380,7 @@ func TestGetExistedUserDBs(t *testing.T) {
dbs := restore.GetExistedUserDBs(dom)
require.Equal(t, 0, len(dbs))

builder, err := infoschema.NewBuilder(m.Store(), nil).InitWithDBInfos(
builder, err := infoschema.NewBuilder(dom, nil).InitWithDBInfos(
[]*model.DBInfo{
{Name: model.NewCIStr("mysql")},
{Name: model.NewCIStr("test")},
Expand All @@ -391,7 +391,7 @@ func TestGetExistedUserDBs(t *testing.T) {
dbs = restore.GetExistedUserDBs(dom)
require.Equal(t, 0, len(dbs))

builder, err = infoschema.NewBuilder(m.Store(), nil).InitWithDBInfos(
builder, err = infoschema.NewBuilder(dom, nil).InitWithDBInfos(
[]*model.DBInfo{
{Name: model.NewCIStr("mysql")},
{Name: model.NewCIStr("test")},
Expand All @@ -403,7 +403,7 @@ func TestGetExistedUserDBs(t *testing.T) {
dbs = restore.GetExistedUserDBs(dom)
require.Equal(t, 1, len(dbs))

builder, err = infoschema.NewBuilder(m.Store(), nil).InitWithDBInfos(
builder, err = infoschema.NewBuilder(dom, nil).InitWithDBInfos(
[]*model.DBInfo{
{Name: model.NewCIStr("mysql")},
{Name: model.NewCIStr("d1")},
Expand Down
1 change: 1 addition & 0 deletions ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ go_test(
flaky = True,
shard_count = 50,
deps = [
"//autoid_service",
"//config",
"//ddl/ingest",
"//ddl/placement",
Expand Down
Loading

0 comments on commit c2bea02

Please sign in to comment.