Skip to content

Commit

Permalink
lightning: always get latest PD leader when access PD after initializ…
Browse files Browse the repository at this point in the history
…ed (#46726) (#46757)

close #43436, close #46688
  • Loading branch information
ti-chi-bot authored Sep 11, 2023
1 parent 0ba0a82 commit a4aec47
Show file tree
Hide file tree
Showing 21 changed files with 188 additions and 48 deletions.
3 changes: 3 additions & 0 deletions br/pkg/lightning/restore/check_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ func TestCheckCSVHeader(t *testing.T) {
dbMetas,
preInfoGetter,
nil,
nil,
)
preInfoGetter.dbInfosCache = rc.dbInfos
err = rc.checkCSVHeader(ctx)
Expand Down Expand Up @@ -465,6 +466,7 @@ func TestCheckTableEmpty(t *testing.T) {
dbMetas,
preInfoGetter,
nil,
nil,
)

rc := &Controller{
Expand Down Expand Up @@ -622,6 +624,7 @@ func TestLocalResource(t *testing.T) {
nil,
preInfoGetter,
nil,
nil,
)
rc := &Controller{
cfg: cfg,
Expand Down
39 changes: 28 additions & 11 deletions br/pkg/lightning/restore/precheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,21 @@ func WithPrecheckKey(ctx context.Context, key precheckContextKey, val any) conte
}

type PrecheckItemBuilder struct {
cfg *config.Config
dbMetas []*mydump.MDDatabaseMeta
preInfoGetter PreRestoreInfoGetter
checkpointsDB checkpoints.DB
cfg *config.Config
dbMetas []*mydump.MDDatabaseMeta
preInfoGetter PreRestoreInfoGetter
checkpointsDB checkpoints.DB
pdLeaderAddrGetter func() string
}

func NewPrecheckItemBuilderFromConfig(ctx context.Context, cfg *config.Config, pdCli pd.Client, opts ...ropts.PrecheckItemBuilderOption) (*PrecheckItemBuilder, error) {
// NewPrecheckItemBuilderFromConfig creates a new PrecheckItemBuilder from config
// pdCli **must not** be nil for local backend
func NewPrecheckItemBuilderFromConfig(
ctx context.Context,
cfg *config.Config,
pdCli pd.Client,
opts ...ropts.PrecheckItemBuilderOption,
) (*PrecheckItemBuilder, error) {
var gerr error
builderCfg := new(ropts.PrecheckItemBuilderConfig)
for _, o := range opts {
Expand Down Expand Up @@ -98,20 +106,29 @@ func NewPrecheckItemBuilderFromConfig(ctx context.Context, cfg *config.Config, p
if err != nil {
return nil, errors.Trace(err)
}
return NewPrecheckItemBuilder(cfg, dbMetas, preInfoGetter, cpdb), gerr
return NewPrecheckItemBuilder(cfg, dbMetas, preInfoGetter, cpdb, pdCli), gerr
}

func NewPrecheckItemBuilder(
cfg *config.Config,
dbMetas []*mydump.MDDatabaseMeta,
preInfoGetter PreRestoreInfoGetter,
checkpointsDB checkpoints.DB,
pdCli pd.Client,
) *PrecheckItemBuilder {
leaderAddrGetter := func() string {
return cfg.TiDB.PdAddr
}
// in tests we may not have a pdCli
if pdCli != nil {
leaderAddrGetter = pdCli.GetLeaderAddr
}
return &PrecheckItemBuilder{
cfg: cfg,
dbMetas: dbMetas,
preInfoGetter: preInfoGetter,
checkpointsDB: checkpointsDB,
cfg: cfg,
dbMetas: dbMetas,
preInfoGetter: preInfoGetter,
checkpointsDB: checkpointsDB,
pdLeaderAddrGetter: leaderAddrGetter,
}
}

Expand Down Expand Up @@ -142,7 +159,7 @@ func (b *PrecheckItemBuilder) BuildPrecheckItem(checkID CheckItemID) (PrecheckIt
case CheckLocalTempKVDir:
return NewLocalTempKVDirCheckItem(b.cfg, b.preInfoGetter), nil
case CheckTargetUsingCDCPITR:
return NewCDCPITRCheckItem(b.cfg), nil
return NewCDCPITRCheckItem(b.cfg, b.pdLeaderAddrGetter), nil
default:
return nil, errors.Errorf("unsupported check item: %v", checkID)
}
Expand Down
22 changes: 14 additions & 8 deletions br/pkg/lightning/restore/precheck_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,17 +682,19 @@ func (ci *checkpointCheckItem) checkpointIsValid(ctx context.Context, tableInfo
// CDCPITRCheckItem check downstream has enabled CDC or PiTR. It's exposed to let
// caller override the Instruction message.
type CDCPITRCheckItem struct {
cfg *config.Config
Instruction string
cfg *config.Config
Instruction string
leaderAddrGetter func() string
// used in test
etcdCli *clientv3.Client
}

// NewCDCPITRCheckItem creates a checker to check downstream has enabled CDC or PiTR.
func NewCDCPITRCheckItem(cfg *config.Config) PrecheckItem {
func NewCDCPITRCheckItem(cfg *config.Config, leaderAddrGetter func() string) PrecheckItem {
return &CDCPITRCheckItem{
cfg: cfg,
Instruction: "local backend is not compatible with them. Please switch to tidb backend then try again.",
cfg: cfg,
Instruction: "local backend is not compatible with them. Please switch to tidb backend then try again.",
leaderAddrGetter: leaderAddrGetter,
}
}

Expand All @@ -701,7 +703,11 @@ func (ci *CDCPITRCheckItem) GetCheckItemID() CheckItemID {
return CheckTargetUsingCDCPITR
}

func dialEtcdWithCfg(ctx context.Context, cfg *config.Config) (*clientv3.Client, error) {
func dialEtcdWithCfg(
ctx context.Context,
cfg *config.Config,
leaderAddr string,
) (*clientv3.Client, error) {
cfg2, err := cfg.ToTLS()
if err != nil {
return nil, err
Expand All @@ -710,7 +716,7 @@ func dialEtcdWithCfg(ctx context.Context, cfg *config.Config) (*clientv3.Client,

return clientv3.New(clientv3.Config{
TLS: tlsConfig,
Endpoints: []string{cfg.TiDB.PdAddr},
Endpoints: []string{leaderAddr},
AutoSyncInterval: 30 * time.Second,
DialTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{
Expand Down Expand Up @@ -741,7 +747,7 @@ func (ci *CDCPITRCheckItem) Check(ctx context.Context) (*CheckResult, error) {

if ci.etcdCli == nil {
var err error
ci.etcdCli, err = dialEtcdWithCfg(ctx, ci.cfg)
ci.etcdCli, err = dialEtcdWithCfg(ctx, ci.cfg, ci.leaderAddrGetter())
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/restore/precheck_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ func (s *precheckImplSuite) TestCDCPITRCheckItem() {
Backend: config.BackendLocal,
},
}
ci := NewCDCPITRCheckItem(cfg)
ci := NewCDCPITRCheckItem(cfg, nil)
checker := ci.(*CDCPITRCheckItem)
checker.etcdCli = testEtcdCluster.RandClient()
result, err := ci.Check(ctx)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/restore/precheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestPrecheckBuilderBasic(t *testing.T) {

preInfoGetter, err := NewPreRestoreInfoGetter(cfg, mockSrc.GetAllDBFileMetas(), mockSrc.GetStorage(), mockTarget, nil, nil)
require.NoError(t, err)
theCheckBuilder := NewPrecheckItemBuilder(cfg, mockSrc.GetAllDBFileMetas(), preInfoGetter, nil)
theCheckBuilder := NewPrecheckItemBuilder(cfg, mockSrc.GetAllDBFileMetas(), preInfoGetter, nil, nil)
for _, checkItemID := range []CheckItemID{
CheckLargeDataFile,
CheckSourcePermission,
Expand Down
15 changes: 11 additions & 4 deletions br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ func NewRestoreControllerWithPauser(
}

preCheckBuilder := NewPrecheckItemBuilder(
cfg, p.DBMetas, preInfoGetter, cpdb,
cfg, p.DBMetas, preInfoGetter, cpdb, pdCli,
)

rc := &Controller{
Expand Down Expand Up @@ -462,6 +462,8 @@ func (rc *Controller) Close() {
}

func (rc *Controller) Run(ctx context.Context) error {
failpoint.Inject("beforeRun", func() {})

opts := []func(context.Context) error{
rc.setGlobalVariables,
rc.restoreSchema,
Expand Down Expand Up @@ -1351,7 +1353,7 @@ const (

func (rc *Controller) keepPauseGCForDupeRes(ctx context.Context) (<-chan struct{}, error) {
tlsOpt := rc.tls.ToPDSecurityOption()
pdCli, err := pd.NewClientWithContext(ctx, []string{rc.cfg.TiDB.PdAddr}, tlsOpt)
pdCli, err := pd.NewClientWithContext(ctx, []string{rc.pdCli.GetLeaderAddr()}, tlsOpt)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -1512,8 +1514,13 @@ func (rc *Controller) restoreTables(ctx context.Context) (finalErr error) {
}

// Disable GC because TiDB enables GC already.

currentLeaderAddr := rc.pdCli.GetLeaderAddr()
// remove URL scheme
currentLeaderAddr = strings.TrimPrefix(currentLeaderAddr, "http://")
currentLeaderAddr = strings.TrimPrefix(currentLeaderAddr, "https://")
kvStore, err = driver.TiKVDriver{}.OpenWithOptions(
fmt.Sprintf("tikv://%s?disableGC=true", rc.cfg.TiDB.PdAddr),
fmt.Sprintf("tikv://%s?disableGC=true", currentLeaderAddr),
driver.WithSecurity(rc.tls.ToTiKVSecurityConfig()),
)
if err != nil {
Expand Down Expand Up @@ -2114,7 +2121,7 @@ func (rc *Controller) preCheckRequirements(ctx context.Context) error {
rc.status.TotalFileSize.Store(estimatedSizeResult.SizeWithoutIndex)
}
if isLocalBackend(rc.cfg) {
pdController, err := pdutil.NewPdController(ctx, rc.cfg.TiDB.PdAddr,
pdController, err := pdutil.NewPdController(ctx, rc.pdCli.GetLeaderAddr(),
rc.tls.TLSConfig(), rc.tls.ToPDSecurityOption())
if err != nil {
return common.NormalizeOrWrapErr(common.ErrCreatePDClient, err)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/restore/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func TestPreCheckFailed(t *testing.T) {
dbMetas: make([]*mydump.MDDatabaseMeta, 0),
}
cpdb := panicCheckpointDB{}
theCheckBuilder := NewPrecheckItemBuilder(cfg, make([]*mydump.MDDatabaseMeta, 0), preInfoGetter, cpdb)
theCheckBuilder := NewPrecheckItemBuilder(cfg, make([]*mydump.MDDatabaseMeta, 0), preInfoGetter, cpdb, nil)
ctl := &Controller{
cfg: cfg,
saveCpCh: make(chan saveCp),
Expand Down
8 changes: 4 additions & 4 deletions br/pkg/lightning/restore/table_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1130,7 +1130,7 @@ func (s *tableRestoreSuite) TestCheckClusterResource() {
targetInfoGetter: targetInfoGetter,
srcStorage: mockStore,
}
theCheckBuilder := NewPrecheckItemBuilder(cfg, []*mydump.MDDatabaseMeta{}, preInfoGetter, nil)
theCheckBuilder := NewPrecheckItemBuilder(cfg, []*mydump.MDDatabaseMeta{}, preInfoGetter, nil, nil)
rc := &Controller{
cfg: cfg,
tls: tls,
Expand Down Expand Up @@ -1294,7 +1294,7 @@ func (s *tableRestoreSuite) TestCheckClusterRegion() {
targetInfoGetter: targetInfoGetter,
dbMetas: dbMetas,
}
theCheckBuilder := NewPrecheckItemBuilder(cfg, dbMetas, preInfoGetter, checkpoints.NewNullCheckpointsDB())
theCheckBuilder := NewPrecheckItemBuilder(cfg, dbMetas, preInfoGetter, checkpoints.NewNullCheckpointsDB(), nil)
rc := &Controller{
cfg: cfg,
tls: tls,
Expand Down Expand Up @@ -1390,7 +1390,7 @@ func (s *tableRestoreSuite) TestCheckHasLargeCSV() {
for _, ca := range cases {
template := NewSimpleTemplate()
cfg := &config.Config{Mydumper: config.MydumperRuntime{StrictFormat: ca.strictFormat}}
theCheckBuilder := NewPrecheckItemBuilder(cfg, ca.dbMetas, nil, nil)
theCheckBuilder := NewPrecheckItemBuilder(cfg, ca.dbMetas, nil, nil, nil)
rc := &Controller{
cfg: cfg,
checkTemplate: template,
Expand Down Expand Up @@ -1445,7 +1445,7 @@ func (s *tableRestoreSuite) TestEstimate() {
targetInfoGetter: mockTarget,
}
preInfoGetter.Init()
theCheckBuilder := NewPrecheckItemBuilder(s.cfg, dbMetas, preInfoGetter, nil)
theCheckBuilder := NewPrecheckItemBuilder(s.cfg, dbMetas, preInfoGetter, nil, nil)
rc := &Controller{
cfg: s.cfg,
checkTemplate: template,
Expand Down
Loading

0 comments on commit a4aec47

Please sign in to comment.