Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: fix pd http request using old address #45680

Merged
merged 13 commits into from
Aug 1, 2023
10 changes: 5 additions & 5 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,15 +267,15 @@ func (*encodingBuilder) MakeEmptyRows() encode.Rows {
type targetInfoGetter struct {
tls *common.TLS
targetDB *sql.DB
pdAddr string
pdCli pd.Client
}

// NewTargetInfoGetter creates an TargetInfoGetter with local backend implementation.
func NewTargetInfoGetter(tls *common.TLS, db *sql.DB, pdAddr string) backend.TargetInfoGetter {
func NewTargetInfoGetter(tls *common.TLS, db *sql.DB, pdCli pd.Client) backend.TargetInfoGetter {
return &targetInfoGetter{
tls: tls,
targetDB: db,
pdAddr: pdAddr,
pdCli: pdCli,
}
}

Expand All @@ -296,10 +296,10 @@ func (g *targetInfoGetter) CheckRequirements(ctx context.Context, checkCtx *back
if err := checkTiDBVersion(ctx, versionStr, localMinTiDBVersion, localMaxTiDBVersion); err != nil {
return err
}
if err := tikv.CheckPDVersion(ctx, g.tls, g.pdAddr, localMinPDVersion, localMaxPDVersion); err != nil {
if err := tikv.CheckPDVersion(ctx, g.tls, g.pdCli.GetLeaderAddr(), localMinPDVersion, localMaxPDVersion); err != nil {
return err
}
if err := tikv.CheckTiKVVersion(ctx, g.tls, g.pdAddr, localMinTiKVVersion, localMaxTiKVVersion); err != nil {
if err := tikv.CheckTiKVVersion(ctx, g.tls, g.pdCli.GetLeaderAddr(), localMinTiKVVersion, localMaxTiKVVersion); err != nil {
return err
}

Expand Down
9 changes: 5 additions & 4 deletions br/pkg/lightning/backend/local/tikv_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
sstpb "github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/tikv"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
)

Expand All @@ -34,15 +35,15 @@ type TiKVModeSwitcher interface {
// TiKVModeSwitcher is used to switch TiKV nodes between Import and Normal mode.
type switcher struct {
tls *common.TLS
pdAddr string
pdCli pd.Client
logger *zap.Logger
}

// NewTiKVModeSwitcher creates a new TiKVModeSwitcher.
func NewTiKVModeSwitcher(tls *common.TLS, pdAddr string, logger *zap.Logger) TiKVModeSwitcher {
func NewTiKVModeSwitcher(tls *common.TLS, pdCli pd.Client, logger *zap.Logger) TiKVModeSwitcher {
return &switcher{
tls: tls,
pdAddr: pdAddr,
pdCli: pdCli,
logger: logger,
}
}
Expand All @@ -68,7 +69,7 @@ func (rc *switcher) switchTiKVMode(ctx context.Context, mode sstpb.SwitchMode) {
} else {
minState = tikv.StoreStateDisconnected
}
tls := rc.tls.WithHost(rc.pdAddr)
tls := rc.tls.WithHost(rc.pdCli.GetLeaderAddr())
// we ignore switch mode failure since it is not fatal.
// no need log the error, it is done in kv.SwitchMode already.
_ = tikv.ForAllStores(
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/lightning/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ go_test(
"@com_github_stretchr_testify//require",
"@com_github_stretchr_testify//suite",
"@com_github_tikv_client_go_v2//config",
"@com_github_tikv_client_go_v2//testutils",
"@com_github_tikv_pd_client//:client",
"@com_github_xitongsys_parquet_go//writer",
"@com_github_xitongsys_parquet_go_source//buffer",
"@io_etcd_go_etcd_client_v3//:client",
Expand Down
12 changes: 2 additions & 10 deletions br/pkg/lightning/importer/checksum_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/metric"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/kv"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
)

Expand All @@ -37,21 +36,14 @@ func NewChecksumManager(ctx context.Context, rc *Controller, store kv.Storage) (
return nil, nil
}

pdAddr := rc.cfg.TiDB.PdAddr
pdVersion, err := pdutil.FetchPDVersion(ctx, rc.tls, pdAddr)
pdVersion, err := pdutil.FetchPDVersion(ctx, rc.tls, rc.pdCli.GetLeaderAddr())
if err != nil {
return nil, errors.Trace(err)
}

// for v4.0.0 or upper, we can use the gc ttl api
var manager local.ChecksumManager
if pdVersion.Major >= 4 && !rc.cfg.PostRestore.ChecksumViaSQL {
tlsOpt := rc.tls.ToPDSecurityOption()
pdCli, err := pd.NewClientWithContext(ctx, []string{pdAddr}, tlsOpt)
if err != nil {
return nil, errors.Trace(err)
}

backoffWeight, err := common.GetBackoffWeightFromDB(ctx, rc.db)
// only set backoff weight when it's smaller than default value
if err == nil && backoffWeight >= local.DefaultBackoffWeight {
Expand All @@ -66,7 +58,7 @@ func NewChecksumManager(ctx context.Context, rc *Controller, store kv.Storage) (
log.FromContext(ctx).Warn("get tidb_request_source_type failed", zap.Error(err), zap.String("tidb_request_source_type", explicitRequestSourceType))
return nil, errors.Trace(err)
}
manager = local.NewTiKVChecksumManager(store.GetClient(), pdCli, uint(rc.cfg.TiDB.DistSQLScanConcurrency), backoffWeight, rc.resourceGroupName, explicitRequestSourceType)
manager = local.NewTiKVChecksumManager(store.GetClient(), rc.pdCli, uint(rc.cfg.TiDB.DistSQLScanConcurrency), backoffWeight, rc.resourceGroupName, explicitRequestSourceType)
} else {
manager = local.NewTiDBChecksumExecutor(rc.db)
}
Expand Down
16 changes: 12 additions & 4 deletions br/pkg/lightning/importer/get_pre_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/mock"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
"golang.org/x/exp/maps"
)
Expand Down Expand Up @@ -123,23 +124,29 @@ type TargetInfoGetterImpl struct {
db *sql.DB
tls *common.TLS
backend backend.TargetInfoGetter
pdCli pd.Client
}

// NewTargetInfoGetterImpl creates a TargetInfoGetterImpl object.
func NewTargetInfoGetterImpl(
ctx context.Context,
cfg *config.Config,
targetDB *sql.DB,
) (*TargetInfoGetterImpl, error) {
tls, err := cfg.ToTLS()
if err != nil {
return nil, errors.Trace(err)
}
pdCli, err := pd.NewClientWithContext(ctx, []string{cfg.TiDB.PdAddr}, tls.ToPDSecurityOption())
if err != nil {
return nil, errors.Trace(err)
}
var backendTargetInfoGetter backend.TargetInfoGetter
switch cfg.TikvImporter.Backend {
case config.BackendTiDB:
backendTargetInfoGetter = tidb.NewTargetInfoGetter(targetDB)
case config.BackendLocal:
backendTargetInfoGetter = local.NewTargetInfoGetter(tls, targetDB, cfg.TiDB.PdAddr)
backendTargetInfoGetter = local.NewTargetInfoGetter(tls, targetDB, pdCli)
default:
return nil, common.ErrUnknownBackend.GenWithStackByArgs(cfg.TikvImporter.Backend)
}
Expand All @@ -148,6 +155,7 @@ func NewTargetInfoGetterImpl(
tls: tls,
db: targetDB,
backend: backendTargetInfoGetter,
pdCli: pdCli,
}, nil
}

Expand Down Expand Up @@ -229,7 +237,7 @@ func (g *TargetInfoGetterImpl) GetTargetSysVariablesForImport(ctx context.Contex
// It uses the PD interface through TLS to get the information.
func (g *TargetInfoGetterImpl) GetReplicationConfig(ctx context.Context) (*pdtypes.ReplicationConfig, error) {
result := new(pdtypes.ReplicationConfig)
if err := g.tls.WithHost(g.cfg.TiDB.PdAddr).GetJSON(ctx, pdReplicate, &result); err != nil {
if err := g.tls.WithHost(g.pdCli.GetLeaderAddr()).GetJSON(ctx, pdReplicate, &result); err != nil {
return nil, errors.Trace(err)
}
return result, nil
Expand All @@ -240,7 +248,7 @@ func (g *TargetInfoGetterImpl) GetReplicationConfig(ctx context.Context) (*pdtyp
// It uses the PD interface through TLS to get the information.
func (g *TargetInfoGetterImpl) GetStorageInfo(ctx context.Context) (*pdtypes.StoresInfo, error) {
result := new(pdtypes.StoresInfo)
if err := g.tls.WithHost(g.cfg.TiDB.PdAddr).GetJSON(ctx, pdStores, result); err != nil {
if err := g.tls.WithHost(g.pdCli.GetLeaderAddr()).GetJSON(ctx, pdStores, result); err != nil {
return nil, errors.Trace(err)
}
return result, nil
Expand All @@ -251,7 +259,7 @@ func (g *TargetInfoGetterImpl) GetStorageInfo(ctx context.Context) (*pdtypes.Sto
// It uses the PD interface through TLS to get the information.
func (g *TargetInfoGetterImpl) GetEmptyRegionsInfo(ctx context.Context) (*pdtypes.RegionsInfo, error) {
result := new(pdtypes.RegionsInfo)
if err := g.tls.WithHost(g.cfg.TiDB.PdAddr).GetJSON(ctx, pdEmptyRegions, &result); err != nil {
if err := g.tls.WithHost(g.pdCli.GetLeaderAddr()).GetJSON(ctx, pdEmptyRegions, &result); err != nil {
return nil, errors.Trace(err)
}
return result, nil
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/importer/get_pre_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,7 @@ func TestGetPreInfoIsTableEmpty(t *testing.T) {
require.NoError(t, err)
lnConfig := config.NewConfig()
lnConfig.TikvImporter.Backend = config.BackendLocal
targetGetter, err := NewTargetInfoGetterImpl(lnConfig, db)
targetGetter, err := NewTargetInfoGetterImpl(ctx, lnConfig, db)
require.NoError(t, err)
require.Equal(t, lnConfig, targetGetter.cfg)

Expand Down
14 changes: 10 additions & 4 deletions br/pkg/lightning/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ type Controller struct {
engineMgr backend.EngineManager
backend backend.Backend
db *sql.DB
pdCli pd.Client

alterTableLock sync.Mutex
sysVars map[string]string
Expand Down Expand Up @@ -294,6 +295,10 @@ func NewImportControllerWithPauser(
if err != nil {
return nil, err
}
pdCli, err := pd.NewClientWithContext(ctx, []string{cfg.TiDB.PdAddr}, tls.ToPDSecurityOption())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not closed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

other places too, seems not close anywhere

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

already addressed in f163612
PTAL again

if err != nil {
return nil, errors.Trace(err)
}

var cpdb checkpoints.DB
// if CheckpointStorage is set, we should use given ExternalStorage to create checkpoints.
Expand Down Expand Up @@ -349,7 +354,7 @@ func NewImportControllerWithPauser(
}

if cfg.TikvImporter.DuplicateResolution != config.DupeResAlgNone {
if err := tikv.CheckTiKVVersion(ctx, tls, cfg.TiDB.PdAddr, minTiKVVersionForDuplicateResolution, maxTiKVVersionForDuplicateResolution); err != nil {
if err := tikv.CheckTiKVVersion(ctx, tls, pdCli.GetLeaderAddr(), minTiKVVersionForDuplicateResolution, maxTiKVVersionForDuplicateResolution); err != nil {
if !berrors.Is(err, berrors.ErrVersionMismatch) {
return nil, common.ErrCheckKVVersion.Wrap(err).GenWithStackByArgs()
}
Expand Down Expand Up @@ -410,7 +415,7 @@ func NewImportControllerWithPauser(

var wrapper backend.TargetInfoGetter
if cfg.TikvImporter.Backend == config.BackendLocal {
wrapper = local.NewTargetInfoGetter(tls, db, cfg.TiDB.PdAddr)
wrapper = local.NewTargetInfoGetter(tls, db, pdCli)
} else {
wrapper = tidb.NewTargetInfoGetter(db)
}
Expand Down Expand Up @@ -449,6 +454,7 @@ func NewImportControllerWithPauser(
pauser: p.Pauser,
engineMgr: backend.MakeEngineManager(backendObj),
backend: backendObj,
pdCli: pdCli,
db: db,
sysVars: common.DefaultImportantVariables,
tls: tls,
Expand All @@ -473,7 +479,7 @@ func NewImportControllerWithPauser(
preInfoGetter: preInfoGetter,
precheckItemBuilder: preCheckBuilder,
encBuilder: encodingBuilder,
tikvModeSwitcher: local.NewTiKVModeSwitcher(tls, cfg.TiDB.PdAddr, log.FromContext(ctx).Logger),
tikvModeSwitcher: local.NewTiKVModeSwitcher(tls, pdCli, log.FromContext(ctx).Logger),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

init another client for it? odd to have an object with close but not called

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think creating two pd client is not effient and a bit confusing. pd has a background goroutine to update members.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can add a comment here, easy to be taken as a bug. and i think it's a bad practice to rely on tikvModeSwitcher only closes pd-client in it's Close method, and does nothing else. it's easy to forget in later change.


keyspaceName: p.KeyspaceName,
resourceGroupName: p.ResourceGroupName,
Expand Down Expand Up @@ -1849,7 +1855,7 @@ func (rc *Controller) fullCompact(ctx context.Context) error {
}

func (rc *Controller) doCompact(ctx context.Context, level int32) error {
tls := rc.tls.WithHost(rc.cfg.TiDB.PdAddr)
tls := rc.tls.WithHost(rc.pdCli.GetLeaderAddr())
return tikv.ForAllStores(
ctx,
tls,
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/importer/precheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func NewPrecheckItemBuilderFromConfig(ctx context.Context, cfg *config.Config, o
if err != nil {
return nil, errors.Trace(err)
}
targetInfoGetter, err := NewTargetInfoGetterImpl(cfg, targetDB)
targetInfoGetter, err := NewTargetInfoGetterImpl(ctx, cfg, targetDB)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
29 changes: 25 additions & 4 deletions br/pkg/lightning/importer/table_import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ import (
filter "github.com/pingcap/tidb/util/table-filter"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/tikv/client-go/v2/testutils"
pd "github.com/tikv/pd/client"
)

const (
Expand Down Expand Up @@ -1161,6 +1163,8 @@ func (s *tableRestoreSuite) TestCheckClusterResource() {
require.NoError(s.T(), err)
mockStore, err := storage.NewLocalStorage(dir)
require.NoError(s.T(), err)
_, _, pdClient, err := testutils.NewMockTiKV("", nil)
require.NoError(s.T(), err)
for _, ca := range cases {
server := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
var err error
Expand All @@ -1177,9 +1181,11 @@ func (s *tableRestoreSuite) TestCheckClusterResource() {

url := strings.TrimPrefix(server.URL, "https://")
cfg := &config.Config{TiDB: config.DBStore{PdAddr: url}}
pdCli := &mockPDClient{Client: pdClient, leaderAddr: url}
targetInfoGetter := &TargetInfoGetterImpl{
cfg: cfg,
tls: tls,
cfg: cfg,
tls: tls,
pdCli: pdCli,
}
preInfoGetter := &PreImportInfoGetterImpl{
cfg: cfg,
Expand All @@ -1194,6 +1200,7 @@ func (s *tableRestoreSuite) TestCheckClusterResource() {
checkTemplate: template,
preInfoGetter: preInfoGetter,
precheckItemBuilder: theCheckBuilder,
pdCli: pdCli,
}
var sourceSize int64
err = rc.store.WalkDir(ctx, &storage.WalkOption{}, func(path string, size int64) error {
Expand Down Expand Up @@ -1230,6 +1237,15 @@ func (mockTaskMetaMgr) CheckTasksExclusively(ctx context.Context, action func(ta
return err
}

type mockPDClient struct {
pd.Client
leaderAddr string
}

func (m *mockPDClient) GetLeaderAddr() string {
return m.leaderAddr
}

func (s *tableRestoreSuite) TestCheckClusterRegion() {
type testCase struct {
stores pdtypes.StoresInfo
Expand All @@ -1245,6 +1261,8 @@ func (s *tableRestoreSuite) TestCheckClusterRegion() {
}
return regions
}
_, _, pdClient, err := testutils.NewMockTiKV("", nil)
require.NoError(s.T(), err)

testCases := []testCase{
{
Expand Down Expand Up @@ -1320,10 +1338,12 @@ func (s *tableRestoreSuite) TestCheckClusterRegion() {

url := strings.TrimPrefix(server.URL, "https://")
cfg := &config.Config{TiDB: config.DBStore{PdAddr: url}}
pdCli := &mockPDClient{Client: pdClient, leaderAddr: url}

targetInfoGetter := &TargetInfoGetterImpl{
cfg: cfg,
tls: tls,
cfg: cfg,
tls: tls,
pdCli: pdCli,
}
dbMetas := []*mydump.MDDatabaseMeta{}
preInfoGetter := &PreImportInfoGetterImpl{
Expand All @@ -1340,6 +1360,7 @@ func (s *tableRestoreSuite) TestCheckClusterRegion() {
preInfoGetter: preInfoGetter,
dbInfos: make(map[string]*checkpoints.TidbDBInfo),
precheckItemBuilder: theCheckBuilder,
pdCli: pdCli,
}

preInfoGetter.dbInfosCache = rc.dbInfos
Expand Down
4 changes: 2 additions & 2 deletions disttask/importinto/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (h *flowHandle) switchTiKVMode(ctx context.Context, task *proto.Task) {
}

logger := logutil.BgLogger().With(zap.Int64("task-id", task.ID))
switcher, err := importer.GetTiKVModeSwitcher(logger)
switcher, err := importer.GetTiKVModeSwitcher(ctx, logger)
if err != nil {
logger.Warn("get tikv mode switcher failed", zap.Error(err))
return
Expand Down Expand Up @@ -335,7 +335,7 @@ func (h *flowHandle) switchTiKV2NormalMode(ctx context.Context, task *proto.Task
h.mu.Lock()
defer h.mu.Unlock()

switcher, err := importer.GetTiKVModeSwitcher(logger)
switcher, err := importer.GetTiKVModeSwitcher(ctx, logger)
if err != nil {
logger.Warn("get tikv mode switcher failed", zap.Error(err))
return
Expand Down
1 change: 1 addition & 0 deletions executor/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ go_library(
"@com_github_tikv_client_go_v2//config",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//util",
"@com_github_tikv_pd_client//:client",
"@org_golang_x_exp//slices",
"@org_golang_x_sync//errgroup",
"@org_uber_go_multierr//:multierr",
Expand Down
Loading