Skip to content

Commit

Permalink
*: share etcd client from domain for autoid allocator pingcap#46647 (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao committed Nov 24, 2023
1 parent 5a6b36c commit 07dcfb0
Show file tree
Hide file tree
Showing 28 changed files with 250 additions and 121 deletions.
1 change: 1 addition & 0 deletions br/pkg/backup/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ go_library(
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//txnkv/txnlock",
"@com_github_tikv_pd_client//:client",
"@io_etcd_go_etcd_client_v3//:client",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
"@org_golang_x_sync//errgroup",
Expand Down
19 changes: 16 additions & 3 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/txnkv/txnlock"
pd "github.com/tikv/pd/client"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -492,6 +493,18 @@ func appendRanges(tbl *model.TableInfo, tblID int64) ([]kv.KeyRange, error) {
return retRanges, nil
}

type mockRequirement struct {
kv.Storage
}

func (r mockRequirement) Store() kv.Storage {
return r.Storage
}

func (r mockRequirement) GetEtcdClient() *clientv3.Client {
return nil
}

// BuildBackupRangeAndSchema gets KV range and schema of tables.
// KV ranges are separated by Table IDs.
// Also, KV ranges are separated by Index IDs in the same table.
Expand Down Expand Up @@ -575,9 +588,9 @@ func BuildBackupRangeAndSchema(

autoIDAccess := m.GetAutoIDAccessors(dbInfo.ID, tableInfo.ID)
tblVer := autoid.AllocOptionTableInfoVersion(tableInfo.Version)
idAlloc := autoid.NewAllocator(storage, dbInfo.ID, tableInfo.ID, false, autoid.RowIDAllocType, tblVer)
seqAlloc := autoid.NewAllocator(storage, dbInfo.ID, tableInfo.ID, false, autoid.SequenceType, tblVer)
randAlloc := autoid.NewAllocator(storage, dbInfo.ID, tableInfo.ID, false, autoid.AutoRandomType, tblVer)
idAlloc := autoid.NewAllocator(mockRequirement{storage}, dbInfo.ID, tableInfo.ID, false, autoid.RowIDAllocType, tblVer)
seqAlloc := autoid.NewAllocator(mockRequirement{storage}, dbInfo.ID, tableInfo.ID, false, autoid.SequenceType, tblVer)
randAlloc := autoid.NewAllocator(mockRequirement{storage}, dbInfo.ID, tableInfo.ID, false, autoid.AutoRandomType, tblVer)

var globalAutoID int64
switch {
Expand Down
23 changes: 11 additions & 12 deletions br/pkg/lightning/restore/meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
verify "github.com/pingcap/tidb/br/pkg/lightning/verification"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/redact"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser/model"
"go.uber.org/zap"
Expand Down Expand Up @@ -255,10 +254,10 @@ func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, rawRowIDMax int64
if curStatus == metaStatusInitial {
if needAutoID {
// maxRowIDMax is the max row_id that other tasks has allocated, we need to rebase the global autoid base first.
if err := rebaseGlobalAutoID(ctx, maxRowIDMax, m.tr.kvStore, m.tr.dbInfo.ID, m.tr.tableInfo.Core); err != nil {
if err := rebaseGlobalAutoID(ctx, maxRowIDMax, m.tr, m.tr.dbInfo.ID, m.tr.tableInfo.Core); err != nil {
return errors.Trace(err)
}
newRowIDBase, newRowIDMax, err = allocGlobalAutoID(ctx, rawRowIDMax, m.tr.kvStore, m.tr.dbInfo.ID, m.tr.tableInfo.Core)
newRowIDBase, newRowIDMax, err = allocGlobalAutoID(ctx, rawRowIDMax, m.tr, m.tr.dbInfo.ID, m.tr.tableInfo.Core)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -1159,8 +1158,8 @@ func (m *singleTaskMetaMgr) CleanupAllMetas(ctx context.Context) error {
func (m *singleTaskMetaMgr) Close() {
}

func allocGlobalAutoID(ctx context.Context, n int64, store kv.Storage, dbID int64, tblInfo *model.TableInfo) (autoIDBase, autoIDMax int64, err error) {
allocators, err := getGlobalAutoIDAlloc(store, dbID, tblInfo)
func allocGlobalAutoID(ctx context.Context, n int64, r autoid.Requirement, dbID int64, tblInfo *model.TableInfo) (autoIDBase, autoIDMax int64, err error) {
allocators, err := getGlobalAutoIDAlloc(r, dbID, tblInfo)
if err != nil {
return 0, 0, err
}
Expand All @@ -1177,8 +1176,8 @@ func allocGlobalAutoID(ctx context.Context, n int64, store kv.Storage, dbID int6
return
}

func rebaseGlobalAutoID(ctx context.Context, newBase int64, store kv.Storage, dbID int64, tblInfo *model.TableInfo) error {
allocators, err := getGlobalAutoIDAlloc(store, dbID, tblInfo)
func rebaseGlobalAutoID(ctx context.Context, newBase int64, r autoid.Requirement, dbID int64, tblInfo *model.TableInfo) error {
allocators, err := getGlobalAutoIDAlloc(r, dbID, tblInfo)
if err != nil {
return err
}
Expand All @@ -1191,8 +1190,8 @@ func rebaseGlobalAutoID(ctx context.Context, newBase int64, store kv.Storage, db
return nil
}

func getGlobalAutoIDAlloc(store kv.Storage, dbID int64, tblInfo *model.TableInfo) ([]autoid.Allocator, error) {
if store == nil {
func getGlobalAutoIDAlloc(r autoid.Requirement, dbID int64, tblInfo *model.TableInfo) ([]autoid.Allocator, error) {
if r == nil || r.Store() == nil {
return nil, errors.New("internal error: kv store should not be nil")
}
if dbID == 0 {
Expand Down Expand Up @@ -1224,15 +1223,15 @@ func getGlobalAutoIDAlloc(store kv.Storage, dbID int64, tblInfo *model.TableInfo
case hasRowID || hasAutoIncID:
allocators := make([]autoid.Allocator, 0, 2)
if tblInfo.SepAutoInc() && hasAutoIncID {
allocators = append(allocators, autoid.NewAllocator(store, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(),
allocators = append(allocators, autoid.NewAllocator(r, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(),
autoid.AutoIncrementType, noCache, tblVer))
}
// this allocator is NOT used when SepAutoInc=true and auto increment column is clustered.
allocators = append(allocators, autoid.NewAllocator(store, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(),
allocators = append(allocators, autoid.NewAllocator(r, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(),
autoid.RowIDAllocType, noCache, tblVer))
return allocators, nil
case hasAutoRandID:
return []autoid.Allocator{autoid.NewAllocator(store, dbID, tblInfo.ID, tblInfo.IsAutoRandomBitColUnsigned(),
return []autoid.Allocator{autoid.NewAllocator(r, dbID, tblInfo.ID, tblInfo.IsAutoRandomBitColUnsigned(),
autoid.AutoRandomType, noCache, tblVer)}, nil
default:
return nil, errors.Errorf("internal error: table %s has no auto ID", tblInfo.Name)
Expand Down
21 changes: 17 additions & 4 deletions br/pkg/lightning/restore/meta_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
clientv3 "go.etcd.io/etcd/client/v3"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/store/mockstore"
tmock "github.com/pingcap/tidb/util/mock"
Expand Down Expand Up @@ -324,7 +325,7 @@ func (s *metaMgrSuite) prepareMockInner(rowsVal [][]driver.Value, nextRowID *int
WillReturnRows(rows)

if nextRowID != nil {
allocs := autoid.NewAllocatorsFromTblInfo(s.mgr.tr.kvStore, s.mgr.tr.dbInfo.ID, s.mgr.tr.tableInfo.Core)
allocs := autoid.NewAllocatorsFromTblInfo(s.mgr.tr, s.mgr.tr.dbInfo.ID, s.mgr.tr.tableInfo.Core)
alloc := allocs.Get(autoid.RowIDAllocType)
alloc.ForceRebase(*nextRowID - 1)
}
Expand Down Expand Up @@ -480,6 +481,18 @@ func newTableInfo2(t *testing.T,
return tableInfo
}

type mockRequirement struct {
kv.Storage
}

func (r mockRequirement) Store() kv.Storage {
return r.Storage
}

func (r mockRequirement) GetEtcdClient() *clientv3.Client {
return nil
}

func TestAllocGlobalAutoID(t *testing.T) {
storePath := t.TempDir()
kvStore, err := mockstore.NewMockStore(mockstore.WithPath(storePath))
Expand Down Expand Up @@ -557,11 +570,11 @@ func TestAllocGlobalAutoID(t *testing.T) {
ctx := context.Background()
for _, c := range cases {
ti := newTableInfo2(t, 1, c.tableID, c.createTableSQL, kvStore)
allocators, err := getGlobalAutoIDAlloc(kvStore, 1, ti)
allocators, err := getGlobalAutoIDAlloc(mockRequirement{kvStore}, 1, ti)
if c.expectErrStr == "" {
require.NoError(t, err, c.tableID)
require.NoError(t, rebaseGlobalAutoID(ctx, 123, kvStore, 1, ti))
base, idMax, err := allocGlobalAutoID(ctx, 100, kvStore, 1, ti)
require.NoError(t, rebaseGlobalAutoID(ctx, 123, mockRequirement{kvStore}, 1, ti))
base, idMax, err := allocGlobalAutoID(ctx, 100, mockRequirement{kvStore}, 1, ti)
require.NoError(t, err, c.tableID)
require.Equal(t, int64(123), base, c.tableID)
require.Equal(t, int64(223), idMax, c.tableID)
Expand Down
17 changes: 16 additions & 1 deletion br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ import (
"github.com/pingcap/tidb/util/set"
tikvconfig "github.com/tikv/client-go/v2/config"
pd "github.com/tikv/pd/client"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/atomic"
"go.uber.org/multierr"
"go.uber.org/zap"
Expand Down Expand Up @@ -1473,6 +1474,7 @@ func (rc *Controller) restoreTables(ctx context.Context) (finalErr error) {
cleanup := false
postProgress := func() error { return nil }
var kvStore tidbkv.Storage
var etcdCli *clientv3.Client

if isLocalBackend(rc.cfg) {
var (
Expand Down Expand Up @@ -1526,6 +1528,14 @@ func (rc *Controller) restoreTables(ctx context.Context) (finalErr error) {
if err != nil {
return errors.Trace(err)
}
// etcdCli, err := clientv3.New(clientv3.Config{
// Endpoints: []string{rc.cfg.TiDB.PdAddr},
// AutoSyncInterval: 30 * time.Second,
// TLS: rc.tls.TLSConfig(),
// })
// if err != nil {
// return errors.Trace(err)
// }
manager, err := newChecksumManager(ctx, rc, kvStore)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -1580,6 +1590,11 @@ func (rc *Controller) restoreTables(ctx context.Context) (finalErr error) {
logTask.Warn("failed to close kv store", zap.Error(err))
}
}
if etcdCli != nil {
if err := etcdCli.Close(); err != nil {
logTask.Warn("failed to close etcd client", zap.Error(err))
}
}
}()

taskCh := make(chan task, rc.cfg.App.IndexConcurrency)
Expand Down Expand Up @@ -1629,7 +1644,7 @@ func (rc *Controller) restoreTables(ctx context.Context) (finalErr error) {
if err != nil {
return errors.Trace(err)
}
tr, err := NewTableRestore(tableName, tableMeta, dbInfo, tableInfo, cp, igCols.ColumnsMap(), kvStore, log.FromContext(ctx))
tr, err := NewTableRestore(tableName, tableMeta, dbInfo, tableInfo, cp, igCols.ColumnsMap(), kvStore, etcdCli, log.FromContext(ctx))
if err != nil {
return errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/restore/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestNewTableRestore(t *testing.T) {
for _, tc := range testCases {
tableInfo := dbInfo.Tables[tc.name]
tableName := common.UniqueTable("mockdb", tableInfo.Name)
tr, err := NewTableRestore(tableName, nil, dbInfo, tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, log.L())
tr, err := NewTableRestore(tableName, nil, dbInfo, tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, nil, log.L())
require.NotNil(t, tr)
require.NoError(t, err)
}
Expand All @@ -91,7 +91,7 @@ func TestNewTableRestoreFailure(t *testing.T) {
}}
tableName := common.UniqueTable("mockdb", "failure")

_, err := NewTableRestore(tableName, nil, dbInfo, tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, log.L())
_, err := NewTableRestore(tableName, nil, dbInfo, tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, nil, log.L())
require.Regexp(t, `failed to tables\.TableFromMeta.*`, err.Error())
}

Expand Down
28 changes: 27 additions & 1 deletion br/pkg/lightning/restore/table_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/util/mathutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/multierr"
"go.uber.org/zap"
"golang.org/x/exp/slices"
Expand All @@ -58,6 +59,7 @@ type TableRestore struct {
alloc autoid.Allocators
logger log.Logger
kvStore tidbkv.Storage
etcdCli *clientv3.Client

ignoreColumns map[string]struct{}
}
Expand All @@ -70,6 +72,7 @@ func NewTableRestore(
cp *checkpoints.TableCheckpoint,
ignoreColumns map[string]struct{},
kvStore tidbkv.Storage,
etcdCli *clientv3.Client,
logger log.Logger,
) (*TableRestore, error) {
idAlloc := kv.NewPanickingAllocators(cp.AllocBase)
Expand All @@ -86,11 +89,20 @@ func NewTableRestore(
encTable: tbl,
alloc: idAlloc,
kvStore: kvStore,
etcdCli: etcdCli,
logger: logger.With(zap.String("table", tableName)),
ignoreColumns: ignoreColumns,
}, nil
}

func (tr *TableRestore) Store() tidbkv.Storage {
return tr.kvStore
}

func (tr *TableRestore) GetEtcdClient() *clientv3.Client {
return tr.etcdCli
}

func (tr *TableRestore) Close() {
tr.encTable = nil
tr.logger.Info("restore done")
Expand Down Expand Up @@ -146,6 +158,20 @@ func (tr *TableRestore) populateChunks(ctx context.Context, rc *Controller, cp *
return err
}

// // AutoIDRequirement implements autoid.Requirement.
// var _ autoid.Requirement = &TableImporter{}

// // Store implements the autoid.Requirement interface.
// func (tr *TableImporter) Store() tidbkv.Storage {
// return tr.kvStore
// }

// // GetEtcdClient implements the autoid.Requirement interface.
// func (tr *TableImporter) GetEtcdClient() *clientv3.Client {
// return tr.etcdCli
// }

// RebaseChunkRowIDs rebase the row id of the chunks.
func (tr *TableRestore) RebaseChunkRowIDs(cp *checkpoints.TableCheckpoint, rowIDBase int64) {
if rowIDBase == 0 {
return
Expand Down Expand Up @@ -758,7 +784,7 @@ func (tr *TableRestore) postProcess(
// And in this case, ALTER TABLE xxx AUTO_INCREMENT = xxx only works on the allocator of auto_increment column,
// not for allocator of _tidb_rowid.
// So we need to rebase IDs for those 2 allocators explicitly.
err = rebaseGlobalAutoID(ctx, adjustIDBase(newBase), tr.kvStore, tr.dbInfo.ID, tr.tableInfo.Core)
err = rebaseGlobalAutoID(ctx, adjustIDBase(newBase), tr, tr.dbInfo.ID, tr.tableInfo.Core)
}
}
rc.alterTableLock.Unlock()
Expand Down
6 changes: 3 additions & 3 deletions br/pkg/lightning/restore/table_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (s *tableRestoreSuiteBase) setupSuite(t *testing.T) {
func (s *tableRestoreSuiteBase) setupTest(t *testing.T) {
// Collect into the test TableRestore structure
var err error
s.tr, err = NewTableRestore("`db`.`table`", s.tableMeta, s.dbInfo, s.tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, log.L())
s.tr, err = NewTableRestore("`db`.`table`", s.tableMeta, s.dbInfo, s.tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, nil, log.L())
require.NoError(t, err)

s.cfg = config.NewConfig()
Expand Down Expand Up @@ -458,7 +458,7 @@ func (s *tableRestoreSuite) TestPopulateChunksCSVHeader() {
cfg.Mydumper.StrictFormat = true
rc := &Controller{cfg: cfg, ioWorkers: worker.NewPool(context.Background(), 1, "io"), store: store}

tr, err := NewTableRestore("`db`.`table`", tableMeta, s.dbInfo, s.tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, log.L())
tr, err := NewTableRestore("`db`.`table`", tableMeta, s.dbInfo, s.tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, nil, log.L())
require.NoError(s.T(), err)
require.NoError(s.T(), tr.populateChunks(context.Background(), rc, cp))

Expand Down Expand Up @@ -723,7 +723,7 @@ func (s *tableRestoreSuite) TestInitializeColumnsGenerated() {
require.NoError(s.T(), err)
core.State = model.StatePublic
tableInfo := &checkpoints.TidbTableInfo{Name: "table", DB: "db", Core: core}
s.tr, err = NewTableRestore("`db`.`table`", s.tableMeta, s.dbInfo, tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, log.L())
s.tr, err = NewTableRestore("`db`.`table`", s.tableMeta, s.dbInfo, tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, nil, log.L())
require.NoError(s.T(), err)
ccp := &checkpoints.ChunkCheckpoint{}

Expand Down
8 changes: 4 additions & 4 deletions br/pkg/restore/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestRestoreAutoIncID(t *testing.T) {
DB: dbInfo,
}
// Get the next AutoIncID
idAlloc := autoid.NewAllocator(s.mock.Storage, dbInfo.ID, table.Info.ID, false, autoid.RowIDAllocType)
idAlloc := autoid.NewAllocator(s.mock.Domain, dbInfo.ID, table.Info.ID, false, autoid.RowIDAllocType)
globalAutoID, err := idAlloc.NextGlobalAutoID()
require.NoErrorf(t, err, "Error allocate next auto id")
require.Equal(t, uint64(globalAutoID), autoIncID)
Expand Down Expand Up @@ -380,7 +380,7 @@ func TestGetExistedUserDBs(t *testing.T) {
dbs := restore.GetExistedUserDBs(dom)
require.Equal(t, 0, len(dbs))

builder, err := infoschema.NewBuilder(m.Store(), nil).InitWithDBInfos(
builder, err := infoschema.NewBuilder(dom, nil).InitWithDBInfos(
[]*model.DBInfo{
{Name: model.NewCIStr("mysql")},
{Name: model.NewCIStr("test")},
Expand All @@ -391,7 +391,7 @@ func TestGetExistedUserDBs(t *testing.T) {
dbs = restore.GetExistedUserDBs(dom)
require.Equal(t, 0, len(dbs))

builder, err = infoschema.NewBuilder(m.Store(), nil).InitWithDBInfos(
builder, err = infoschema.NewBuilder(dom, nil).InitWithDBInfos(
[]*model.DBInfo{
{Name: model.NewCIStr("mysql")},
{Name: model.NewCIStr("test")},
Expand All @@ -403,7 +403,7 @@ func TestGetExistedUserDBs(t *testing.T) {
dbs = restore.GetExistedUserDBs(dom)
require.Equal(t, 1, len(dbs))

builder, err = infoschema.NewBuilder(m.Store(), nil).InitWithDBInfos(
builder, err = infoschema.NewBuilder(dom, nil).InitWithDBInfos(
[]*model.DBInfo{
{Name: model.NewCIStr("mysql")},
{Name: model.NewCIStr("d1")},
Expand Down
Loading

0 comments on commit 07dcfb0

Please sign in to comment.