Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: share etcd client from domain for autoid allocator #46647 #48335

Merged
merged 4 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions br/pkg/lightning/common/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ go_library(
"//br/pkg/lightning/log",
"//br/pkg/utils",
"//errno",
"//kv",
"//meta/autoid",
"//parser/model",
"//sessionctx/variable",
Expand Down Expand Up @@ -102,7 +101,7 @@ go_test(
],
embed = [":common"],
flaky = True,
shard_count = 20,
shard_count = 21,
deps = [
"//br/pkg/errors",
"//br/pkg/lightning/log",
Expand All @@ -125,6 +124,7 @@ go_test(
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@io_etcd_go_etcd_client_v3//:client",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
"@org_uber_go_goleak//:goleak",
Expand Down
43 changes: 33 additions & 10 deletions br/pkg/lightning/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser/model"
)
Expand Down Expand Up @@ -50,9 +49,9 @@ var DefaultImportVariablesTiDB = map[string]string{
}

// AllocGlobalAutoID allocs N consecutive autoIDs from TiDB.
func AllocGlobalAutoID(ctx context.Context, n int64, store kv.Storage, dbID int64,
func AllocGlobalAutoID(ctx context.Context, n int64, r autoid.Requirement, dbID int64,
tblInfo *model.TableInfo) (autoIDBase, autoIDMax int64, err error) {
allocators, err := GetGlobalAutoIDAlloc(store, dbID, tblInfo)
allocators, err := GetGlobalAutoIDAlloc(r, dbID, tblInfo)
if err != nil {
return 0, 0, err
}
Expand All @@ -70,9 +69,9 @@ func AllocGlobalAutoID(ctx context.Context, n int64, store kv.Storage, dbID int6
}

// RebaseGlobalAutoID rebase the autoID base to newBase.
func RebaseGlobalAutoID(ctx context.Context, newBase int64, store kv.Storage, dbID int64,
func RebaseGlobalAutoID(ctx context.Context, newBase int64, r autoid.Requirement, dbID int64,
tblInfo *model.TableInfo) error {
allocators, err := GetGlobalAutoIDAlloc(store, dbID, tblInfo)
allocators, err := GetGlobalAutoIDAlloc(r, dbID, tblInfo)
if err != nil {
return err
}
Expand All @@ -85,10 +84,34 @@ func RebaseGlobalAutoID(ctx context.Context, newBase int64, store kv.Storage, db
return nil
}

// RebaseTableAllocators rebase the allocators of a table.
// This function only rebase a table allocator when its new base is given in
// `bases` param, else it will be skipped.
// base is the max id that have been used by the table, the next usable id will
// be base + 1, see Allocator.Alloc.
func RebaseTableAllocators(ctx context.Context, bases map[autoid.AllocatorType]int64, r autoid.Requirement, dbID int64,
tblInfo *model.TableInfo) error {
allocators, err := GetGlobalAutoIDAlloc(r, dbID, tblInfo)
if err != nil {
return err
}
for _, alloc := range allocators {
base, ok := bases[alloc.GetType()]
if !ok {
continue
}
err = alloc.Rebase(ctx, base, false)
if err != nil {
return err
}
}
return nil
}

// GetGlobalAutoIDAlloc returns the autoID allocators for a table.
// export it for testing.
func GetGlobalAutoIDAlloc(store kv.Storage, dbID int64, tblInfo *model.TableInfo) ([]autoid.Allocator, error) {
if store == nil {
func GetGlobalAutoIDAlloc(r autoid.Requirement, dbID int64, tblInfo *model.TableInfo) ([]autoid.Allocator, error) {
if r == nil || r.Store() == nil {
return nil, errors.New("internal error: kv store should not be nil")
}
if dbID == 0 {
Expand Down Expand Up @@ -120,15 +143,15 @@ func GetGlobalAutoIDAlloc(store kv.Storage, dbID int64, tblInfo *model.TableInfo
case hasRowID || hasAutoIncID:
allocators := make([]autoid.Allocator, 0, 2)
if tblInfo.SepAutoInc() && hasAutoIncID {
allocators = append(allocators, autoid.NewAllocator(store, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(),
allocators = append(allocators, autoid.NewAllocator(r, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(),
autoid.AutoIncrementType, noCache, tblVer))
}
// this allocator is NOT used when SepAutoInc=true and auto increment column is clustered.
allocators = append(allocators, autoid.NewAllocator(store, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(),
allocators = append(allocators, autoid.NewAllocator(r, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(),
autoid.RowIDAllocType, noCache, tblVer))
return allocators, nil
case hasAutoRandID:
return []autoid.Allocator{autoid.NewAllocator(store, dbID, tblInfo.ID, tblInfo.IsAutoRandomBitColUnsigned(),
return []autoid.Allocator{autoid.NewAllocator(r, dbID, tblInfo.ID, tblInfo.IsAutoRandomBitColUnsigned(),
autoid.AutoRandomType, noCache, tblVer)}, nil
default:
return nil, errors.Errorf("internal error: table %s has no auto ID", tblInfo.Name)
Expand Down
74 changes: 71 additions & 3 deletions br/pkg/lightning/common/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/store/mockstore"
tmock "github.com/pingcap/tidb/util/mock"
"github.com/stretchr/testify/require"
clientv3 "go.etcd.io/etcd/client/v3"
)

func newTableInfo(t *testing.T,
Expand Down Expand Up @@ -134,11 +135,11 @@ func TestAllocGlobalAutoID(t *testing.T) {
ctx := context.Background()
for _, c := range cases {
ti := newTableInfo(t, 1, c.tableID, c.createTableSQL, kvStore)
allocators, err := common.GetGlobalAutoIDAlloc(kvStore, 1, ti)
allocators, err := common.GetGlobalAutoIDAlloc(mockRequirement{kvStore}, 1, ti)
if c.expectErrStr == "" {
require.NoError(t, err, c.tableID)
require.NoError(t, common.RebaseGlobalAutoID(ctx, 123, kvStore, 1, ti))
base, idMax, err := common.AllocGlobalAutoID(ctx, 100, kvStore, 1, ti)
require.NoError(t, common.RebaseGlobalAutoID(ctx, 123, mockRequirement{kvStore}, 1, ti))
base, idMax, err := common.AllocGlobalAutoID(ctx, 100, mockRequirement{kvStore}, 1, ti)
require.NoError(t, err, c.tableID)
require.Equal(t, int64(123), base, c.tableID)
require.Equal(t, int64(223), idMax, c.tableID)
Expand All @@ -159,3 +160,70 @@ func TestAllocGlobalAutoID(t *testing.T) {
require.Equal(t, c.expectAllocatorTypes, allocatorTypes, c.tableID)
}
}

type mockRequirement struct {
kv.Storage
}

func (r mockRequirement) Store() kv.Storage {
return r.Storage
}

func (r mockRequirement) GetEtcdClient() *clientv3.Client {
return nil
}

func TestRebaseTableAllocators(t *testing.T) {
storePath := t.TempDir()
kvStore, err := mockstore.NewMockStore(mockstore.WithPath(storePath))
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, kvStore.Close())
})
ti := newTableInfo(t, 1, 42,
"create table t42 (a int primary key nonclustered auto_increment) AUTO_ID_CACHE 1", kvStore)
allocators, err := common.GetGlobalAutoIDAlloc(mockRequirement{kvStore}, 1, ti)
require.NoError(t, err)
require.Len(t, allocators, 2)
for _, alloc := range allocators {
id, err := alloc.NextGlobalAutoID()
require.NoError(t, err)
require.Equal(t, int64(1), id)
}
ctx := context.Background()
allocatorTypes := make([]autoid.AllocatorType, 0, len(allocators))
// rebase to 123
for _, alloc := range allocators {
require.NoError(t, alloc.Rebase(ctx, 123, false))
allocatorTypes = append(allocatorTypes, alloc.GetType())
}
require.Equal(t, []autoid.AllocatorType{autoid.AutoIncrementType, autoid.RowIDAllocType}, allocatorTypes)
// this call does nothing
require.NoError(t, common.RebaseTableAllocators(ctx, nil, mockRequirement{kvStore}, 1, ti))
for _, alloc := range allocators {
nextID, err := alloc.NextGlobalAutoID()
require.NoError(t, err)
require.Equal(t, int64(124), nextID)
}
// this call rebase AutoIncrementType allocator to 223
require.NoError(t, common.RebaseTableAllocators(ctx, map[autoid.AllocatorType]int64{
autoid.AutoIncrementType: 223,
}, mockRequirement{kvStore}, 1, ti))
next, err := allocators[0].NextGlobalAutoID()
require.NoError(t, err)
require.Equal(t, int64(224), next)
next, err = allocators[1].NextGlobalAutoID()
require.NoError(t, err)
require.Equal(t, int64(124), next)
// this call rebase AutoIncrementType allocator to 323, RowIDAllocType allocator to 423
require.NoError(t, common.RebaseTableAllocators(ctx, map[autoid.AllocatorType]int64{
autoid.AutoIncrementType: 323,
autoid.RowIDAllocType: 423,
}, mockRequirement{kvStore}, 1, ti))
next, err = allocators[0].NextGlobalAutoID()
require.NoError(t, err)
require.Equal(t, int64(324), next)
next, err = allocators[1].NextGlobalAutoID()
require.NoError(t, err)
require.Equal(t, int64(424), next)
}
1 change: 1 addition & 0 deletions br/pkg/lightning/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ go_library(
"//util/collate",
"//util/dbterror",
"//util/engine",
"//util/etcd",
"//util/mathutil",
"//util/mock",
"//util/regexpr-router",
Expand Down
21 changes: 20 additions & 1 deletion br/pkg/lightning/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
"github.com/pingcap/tidb/br/pkg/version/build"
tidbconfig "github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/keyspace"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser"
Expand All @@ -62,12 +63,14 @@ import (
"github.com/pingcap/tidb/store/driver"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/collate"
"github.com/pingcap/tidb/util/etcd"
"github.com/pingcap/tidb/util/mathutil"
regexprrouter "github.com/pingcap/tidb/util/regexpr-router"
"github.com/pingcap/tidb/util/set"
"github.com/prometheus/client_golang/prometheus"
tikvconfig "github.com/tikv/client-go/v2/config"
pd "github.com/tikv/pd/client"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/atomic"
"go.uber.org/multierr"
"go.uber.org/zap"
Expand Down Expand Up @@ -1517,6 +1520,7 @@ func (rc *Controller) importTables(ctx context.Context) (finalErr error) {
cleanup := false
postProgress := func() error { return nil }
var kvStore tidbkv.Storage
var etcdCli *clientv3.Client

if isLocalBackend(rc.cfg) {
var (
Expand Down Expand Up @@ -1570,6 +1574,16 @@ func (rc *Controller) importTables(ctx context.Context) (finalErr error) {
if err != nil {
return errors.Trace(err)
}
etcdCli, err := clientv3.New(clientv3.Config{
Endpoints: []string{rc.cfg.TiDB.PdAddr},
AutoSyncInterval: 30 * time.Second,
TLS: rc.tls.TLSConfig(),
})
if err != nil {
return errors.Trace(err)
}
etcd.SetEtcdCliByNamespace(etcdCli, keyspace.MakeKeyspaceEtcdNamespace(kvStore.GetCodec()))

manager, err := NewChecksumManager(ctx, rc, kvStore)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -1637,6 +1651,11 @@ func (rc *Controller) importTables(ctx context.Context) (finalErr error) {
logTask.Warn("failed to close kv store", zap.Error(err))
}
}
if etcdCli != nil {
if err := etcdCli.Close(); err != nil {
logTask.Warn("failed to close etcd client", zap.Error(err))
}
}
}()

taskCh := make(chan task, rc.cfg.App.IndexConcurrency)
Expand Down Expand Up @@ -1686,7 +1705,7 @@ func (rc *Controller) importTables(ctx context.Context) (finalErr error) {
if err != nil {
return errors.Trace(err)
}
tr, err := NewTableImporter(tableName, tableMeta, dbInfo, tableInfo, cp, igCols.ColumnsMap(), kvStore, log.FromContext(ctx))
tr, err := NewTableImporter(tableName, tableMeta, dbInfo, tableInfo, cp, igCols.ColumnsMap(), kvStore, etcdCli, log.FromContext(ctx))
if err != nil {
return errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/importer/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestNewTableRestore(t *testing.T) {
for _, tc := range testCases {
tableInfo := dbInfo.Tables[tc.name]
tableName := common.UniqueTable("mockdb", tableInfo.Name)
tr, err := NewTableImporter(tableName, nil, dbInfo, tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, log.L())
tr, err := NewTableImporter(tableName, nil, dbInfo, tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, nil, log.L())
require.NotNil(t, tr)
require.NoError(t, err)
}
Expand All @@ -89,7 +89,7 @@ func TestNewTableRestoreFailure(t *testing.T) {
}}
tableName := common.UniqueTable("mockdb", "failure")

_, err := NewTableImporter(tableName, nil, dbInfo, tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, log.L())
_, err := NewTableImporter(tableName, nil, dbInfo, tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, nil, log.L())
require.Regexp(t, `failed to tables\.TableFromMeta.*`, err.Error())
}

Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/importer/meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,10 +252,10 @@ func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, rawRowIDMax int64
if curStatus == metaStatusInitial {
if needAutoID {
// maxRowIDMax is the max row_id that other tasks has allocated, we need to rebase the global autoid base first.
if err := common.RebaseGlobalAutoID(ctx, maxRowIDMax, m.tr.kvStore, m.tr.dbInfo.ID, m.tr.tableInfo.Core); err != nil {
if err := common.RebaseGlobalAutoID(ctx, maxRowIDMax, m.tr, m.tr.dbInfo.ID, m.tr.tableInfo.Core); err != nil {
return errors.Trace(err)
}
newRowIDBase, newRowIDMax, err = common.AllocGlobalAutoID(ctx, rawRowIDMax, m.tr.kvStore, m.tr.dbInfo.ID, m.tr.tableInfo.Core)
newRowIDBase, newRowIDMax, err = common.AllocGlobalAutoID(ctx, rawRowIDMax, m.tr, m.tr.dbInfo.ID, m.tr.tableInfo.Core)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/importer/meta_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func (s *metaMgrSuite) prepareMockInner(rowsVal [][]driver.Value, nextRowID *int
WillReturnRows(rows)

if nextRowID != nil {
allocs := autoid.NewAllocatorsFromTblInfo(s.mgr.tr.kvStore, s.mgr.tr.dbInfo.ID, s.mgr.tr.tableInfo.Core)
allocs := autoid.NewAllocatorsFromTblInfo(s.mgr.tr, s.mgr.tr.dbInfo.ID, s.mgr.tr.tableInfo.Core)
alloc := allocs.Get(autoid.RowIDAllocType)
alloc.ForceRebase(*nextRowID - 1)
}
Expand Down
19 changes: 18 additions & 1 deletion br/pkg/lightning/importer/table_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/mathutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/multierr"
"go.uber.org/zap"
"golang.org/x/exp/slices"
Expand All @@ -69,6 +70,7 @@ type TableImporter struct {
alloc autoid.Allocators
logger log.Logger
kvStore tidbkv.Storage
etcdCli *clientv3.Client

ignoreColumns map[string]struct{}
}
Expand All @@ -82,6 +84,7 @@ func NewTableImporter(
cp *checkpoints.TableCheckpoint,
ignoreColumns map[string]struct{},
kvStore tidbkv.Storage,
etcdCli *clientv3.Client,
logger log.Logger,
) (*TableImporter, error) {
idAlloc := kv.NewPanickingAllocators(cp.AllocBase)
Expand All @@ -98,6 +101,7 @@ func NewTableImporter(
encTable: tbl,
alloc: idAlloc,
kvStore: kvStore,
etcdCli: etcdCli,
logger: logger.With(zap.String("table", tableName)),
ignoreColumns: ignoreColumns,
}, nil
Expand Down Expand Up @@ -268,6 +272,19 @@ func (tr *TableImporter) populateChunks(ctx context.Context, rc *Controller, cp
return err
}

// AutoIDRequirement implements autoid.Requirement.
var _ autoid.Requirement = &TableImporter{}

// Store implements the autoid.Requirement interface.
func (tr *TableImporter) Store() tidbkv.Storage {
return tr.kvStore
}

// GetEtcdClient implements the autoid.Requirement interface.
func (tr *TableImporter) GetEtcdClient() *clientv3.Client {
return tr.etcdCli
}

// RebaseChunkRowIDs rebase the row id of the chunks.
func (*TableImporter) RebaseChunkRowIDs(cp *checkpoints.TableCheckpoint, rowIDBase int64) {
if rowIDBase == 0 {
Expand Down Expand Up @@ -898,7 +915,7 @@ func (tr *TableImporter) postProcess(
// And in this case, ALTER TABLE xxx AUTO_INCREMENT = xxx only works on the allocator of auto_increment column,
// not for allocator of _tidb_rowid.
// So we need to rebase IDs for those 2 allocators explicitly.
err = common.RebaseGlobalAutoID(ctx, adjustIDBase(newBase), tr.kvStore, tr.dbInfo.ID, tr.tableInfo.Core)
err = common.RebaseGlobalAutoID(ctx, adjustIDBase(newBase), tr, tr.dbInfo.ID, tr.tableInfo.Core)
}
}
rc.alterTableLock.Unlock()
Expand Down
Loading